af22ecc0b8c837f9b4148fa8825eeaf400cc4e29
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 import random
25 import time
26 import shlex
27 import shutil
28 import stat
29 import subprocess
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.exceptions import K8sException
35 from n2vc.k8s_conn import K8sConnector
36
37
38 class K8sHelmBaseConnector(K8sConnector):
39
40 """
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
44 """
45 service_account = "osm"
46 _STABLE_REPO_URL = "https://charts.helm.sh/stable"
47
48 def __init__(
49 self,
50 fs: object,
51 db: object,
52 kubectl_command: str = "/usr/bin/kubectl",
53 helm_command: str = "/usr/bin/helm",
54 log: object = None,
55 on_update_db=None,
56 vca_config: dict = None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
87 # obtain stable repo url from config or apply default
88 if not vca_config or not vca_config.get("stablerepourl"):
89 self._stable_repo_url = self._STABLE_REPO_URL
90 else:
91 self._stable_repo_url = vca_config.get("stablerepourl")
92
93 @staticmethod
94 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
95 """
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
98 """
99 namespace, _, cluster_id = cluster_uuid.rpartition(':')
100 return namespace, cluster_id
101
102 async def init_env(
103 self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
104 ) -> (str, bool):
105 """
106 It prepares a given K8s cluster environment to run Charts
107
108 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
109 '.kube/config'
110 :param namespace: optional namespace to be used for helm. By default,
111 'kube-system' will be used
112 :param reuse_cluster_uuid: existing cluster uuid for reuse
113 :return: uuid of the K8s cluster and True if connector has installed some
114 software in the cluster
115 (on error, an exception will be raised)
116 """
117
118 if reuse_cluster_uuid:
119 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
120 namespace = namespace_ or namespace
121 else:
122 cluster_id = str(uuid4())
123 cluster_uuid = "{}:{}".format(namespace, cluster_id)
124
125 self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
126
127 paths, env = self._init_paths_env(
128 cluster_name=cluster_id, create_if_not_exist=True
129 )
130 mode = stat.S_IRUSR | stat.S_IWUSR
131 with open(paths["kube_config"], "w", mode) as f:
132 f.write(k8s_creds)
133 os.chmod(paths["kube_config"], 0o600)
134
135 # Code with initialization specific of helm version
136 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
137
138 # sync fs with local data
139 self.fs.reverse_sync(from_path=cluster_id)
140
141 self.log.info("Cluster {} initialized".format(cluster_id))
142
143 return cluster_uuid, n2vc_installed_sw
144
145 async def repo_add(
146 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
147 ):
148 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
149 self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
150 cluster_id, repo_type, name, url))
151
152 # sync local dir
153 self.fs.sync(from_path=cluster_id)
154
155 # init_env
156 paths, env = self._init_paths_env(
157 cluster_name=cluster_id, create_if_not_exist=True
158 )
159
160 # helm repo update
161 command = "{} repo update".format(
162 self._helm_command
163 )
164 self.log.debug("updating repo: {}".format(command))
165 await self._local_async_exec(command=command, raise_exception_on_error=False, env=env)
166
167 # helm repo add name url
168 command = "{} repo add {} {}".format(
169 self._helm_command, name, url
170 )
171 self.log.debug("adding repo: {}".format(command))
172 await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
173
174 # sync fs
175 self.fs.reverse_sync(from_path=cluster_id)
176
177 async def repo_list(self, cluster_uuid: str) -> list:
178 """
179 Get the list of registered repositories
180
181 :return: list of registered repositories: [ (name, url) .... ]
182 """
183
184 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
185 self.log.debug("list repositories for cluster {}".format(cluster_id))
186
187 # sync local dir
188 self.fs.sync(from_path=cluster_id)
189
190 # config filename
191 paths, env = self._init_paths_env(
192 cluster_name=cluster_id, create_if_not_exist=True
193 )
194
195 command = "{} repo list --output yaml".format(
196 self._helm_command
197 )
198
199 # Set exception to false because if there are no repos just want an empty list
200 output, _rc = await self._local_async_exec(
201 command=command, raise_exception_on_error=False, env=env
202 )
203
204 # sync fs
205 self.fs.reverse_sync(from_path=cluster_id)
206
207 if _rc == 0:
208 if output and len(output) > 0:
209 repos = yaml.load(output, Loader=yaml.SafeLoader)
210 # unify format between helm2 and helm3 setting all keys lowercase
211 return self._lower_keys_list(repos)
212 else:
213 return []
214 else:
215 return []
216
217 async def repo_remove(self, cluster_uuid: str, name: str):
218
219 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
220 self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
221
222 # sync local dir
223 self.fs.sync(from_path=cluster_id)
224
225 # init env, paths
226 paths, env = self._init_paths_env(
227 cluster_name=cluster_id, create_if_not_exist=True
228 )
229
230 command = "{} repo remove {}".format(
231 self._helm_command, name
232 )
233 await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
234
235 # sync fs
236 self.fs.reverse_sync(from_path=cluster_id)
237
238 async def reset(
239 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
240 ) -> bool:
241
242 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
243 self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
244 .format(cluster_id, uninstall_sw))
245
246 # sync local dir
247 self.fs.sync(from_path=cluster_id)
248
249 # uninstall releases if needed.
250 if uninstall_sw:
251 releases = await self.instances_list(cluster_uuid=cluster_uuid)
252 if len(releases) > 0:
253 if force:
254 for r in releases:
255 try:
256 kdu_instance = r.get("name")
257 chart = r.get("chart")
258 self.log.debug(
259 "Uninstalling {} -> {}".format(chart, kdu_instance)
260 )
261 await self.uninstall(
262 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
263 )
264 except Exception as e:
265 # will not raise exception as it was found
266 # that in some cases of previously installed helm releases it
267 # raised an error
268 self.log.warn(
269 "Error uninstalling release {}: {}".format(kdu_instance, e)
270 )
271 else:
272 msg = (
273 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
274 ).format(cluster_id)
275 self.log.warn(msg)
276 uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
277
278 if uninstall_sw:
279 await self._uninstall_sw(cluster_id, namespace)
280
281 # delete cluster directory
282 self.log.debug("Removing directory {}".format(cluster_id))
283 self.fs.file_delete(cluster_id, ignore_non_exist=True)
284 # Remove also local directorio if still exist
285 direct = self.fs.path + "/" + cluster_id
286 shutil.rmtree(direct, ignore_errors=True)
287
288 return True
289
290 async def _install_impl(
291 self,
292 cluster_id: str,
293 kdu_model: str,
294 paths: dict,
295 env: dict,
296 atomic: bool = True,
297 timeout: float = 300,
298 params: dict = None,
299 db_dict: dict = None,
300 kdu_name: str = None,
301 namespace: str = None,
302 ):
303 # params to str
304 params_str, file_to_delete = self._params_to_file_option(
305 cluster_id=cluster_id, params=params
306 )
307
308 # version
309 version = None
310 if ":" in kdu_model:
311 parts = kdu_model.split(sep=":")
312 if len(parts) == 2:
313 version = str(parts[1])
314 kdu_model = parts[0]
315
316 # generate a name for the release. Then, check if already exists
317 kdu_instance = None
318 while kdu_instance is None:
319 kdu_instance = self._generate_release_name(kdu_model)
320 try:
321 result = await self._status_kdu(
322 cluster_id=cluster_id,
323 kdu_instance=kdu_instance,
324 namespace=namespace,
325 show_error_log=False,
326 )
327 if result is not None:
328 # instance already exists: generate a new one
329 kdu_instance = None
330 except K8sException:
331 pass
332
333 command = self._get_install_command(kdu_model, kdu_instance, namespace,
334 params_str, version, atomic, timeout)
335
336 self.log.debug("installing: {}".format(command))
337
338 if atomic:
339 # exec helm in a task
340 exec_task = asyncio.ensure_future(
341 coro_or_future=self._local_async_exec(
342 command=command, raise_exception_on_error=False, env=env
343 )
344 )
345
346 # write status in another task
347 status_task = asyncio.ensure_future(
348 coro_or_future=self._store_status(
349 cluster_id=cluster_id,
350 kdu_instance=kdu_instance,
351 namespace=namespace,
352 db_dict=db_dict,
353 operation="install",
354 run_once=False,
355 )
356 )
357
358 # wait for execution task
359 await asyncio.wait([exec_task])
360
361 # cancel status task
362 status_task.cancel()
363
364 output, rc = exec_task.result()
365
366 else:
367
368 output, rc = await self._local_async_exec(
369 command=command, raise_exception_on_error=False, env=env
370 )
371
372 # remove temporal values yaml file
373 if file_to_delete:
374 os.remove(file_to_delete)
375
376 # write final status
377 await self._store_status(
378 cluster_id=cluster_id,
379 kdu_instance=kdu_instance,
380 namespace=namespace,
381 db_dict=db_dict,
382 operation="install",
383 run_once=True,
384 check_every=0,
385 )
386
387 if rc != 0:
388 msg = "Error executing command: {}\nOutput: {}".format(command, output)
389 self.log.error(msg)
390 raise K8sException(msg)
391
392 return kdu_instance
393
394 async def upgrade(
395 self,
396 cluster_uuid: str,
397 kdu_instance: str,
398 kdu_model: str = None,
399 atomic: bool = True,
400 timeout: float = 300,
401 params: dict = None,
402 db_dict: dict = None,
403 ):
404 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
405 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
406
407 # sync local dir
408 self.fs.sync(from_path=cluster_id)
409
410 # look for instance to obtain namespace
411 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
412 if not instance_info:
413 raise K8sException("kdu_instance {} not found".format(kdu_instance))
414
415 # init env, paths
416 paths, env = self._init_paths_env(
417 cluster_name=cluster_id, create_if_not_exist=True
418 )
419
420 # params to str
421 params_str, file_to_delete = self._params_to_file_option(
422 cluster_id=cluster_id, params=params
423 )
424
425 # version
426 version = None
427 if ":" in kdu_model:
428 parts = kdu_model.split(sep=":")
429 if len(parts) == 2:
430 version = str(parts[1])
431 kdu_model = parts[0]
432
433 command = self._get_upgrade_command(kdu_model, kdu_instance, instance_info["namespace"],
434 params_str, version, atomic, timeout)
435
436 self.log.debug("upgrading: {}".format(command))
437
438 if atomic:
439
440 # exec helm in a task
441 exec_task = asyncio.ensure_future(
442 coro_or_future=self._local_async_exec(
443 command=command, raise_exception_on_error=False, env=env
444 )
445 )
446 # write status in another task
447 status_task = asyncio.ensure_future(
448 coro_or_future=self._store_status(
449 cluster_id=cluster_id,
450 kdu_instance=kdu_instance,
451 namespace=instance_info["namespace"],
452 db_dict=db_dict,
453 operation="upgrade",
454 run_once=False,
455 )
456 )
457
458 # wait for execution task
459 await asyncio.wait([exec_task])
460
461 # cancel status task
462 status_task.cancel()
463 output, rc = exec_task.result()
464
465 else:
466
467 output, rc = await self._local_async_exec(
468 command=command, raise_exception_on_error=False, env=env
469 )
470
471 # remove temporal values yaml file
472 if file_to_delete:
473 os.remove(file_to_delete)
474
475 # write final status
476 await self._store_status(
477 cluster_id=cluster_id,
478 kdu_instance=kdu_instance,
479 namespace=instance_info["namespace"],
480 db_dict=db_dict,
481 operation="upgrade",
482 run_once=True,
483 check_every=0,
484 )
485
486 if rc != 0:
487 msg = "Error executing command: {}\nOutput: {}".format(command, output)
488 self.log.error(msg)
489 raise K8sException(msg)
490
491 # sync fs
492 self.fs.reverse_sync(from_path=cluster_id)
493
494 # return new revision number
495 instance = await self.get_instance_info(
496 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
497 )
498 if instance:
499 revision = int(instance.get("revision"))
500 self.log.debug("New revision: {}".format(revision))
501 return revision
502 else:
503 return 0
504
505 async def rollback(
506 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
507 ):
508
509 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
510 self.log.debug(
511 "rollback kdu_instance {} to revision {} from cluster {}".format(
512 kdu_instance, revision, cluster_id
513 )
514 )
515
516 # sync local dir
517 self.fs.sync(from_path=cluster_id)
518
519 # look for instance to obtain namespace
520 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
521 if not instance_info:
522 raise K8sException("kdu_instance {} not found".format(kdu_instance))
523
524 # init env, paths
525 paths, env = self._init_paths_env(
526 cluster_name=cluster_id, create_if_not_exist=True
527 )
528
529 command = self._get_rollback_command(kdu_instance, instance_info["namespace"],
530 revision)
531
532 self.log.debug("rolling_back: {}".format(command))
533
534 # exec helm in a task
535 exec_task = asyncio.ensure_future(
536 coro_or_future=self._local_async_exec(
537 command=command, raise_exception_on_error=False, env=env
538 )
539 )
540 # write status in another task
541 status_task = asyncio.ensure_future(
542 coro_or_future=self._store_status(
543 cluster_id=cluster_id,
544 kdu_instance=kdu_instance,
545 namespace=instance_info["namespace"],
546 db_dict=db_dict,
547 operation="rollback",
548 run_once=False,
549 )
550 )
551
552 # wait for execution task
553 await asyncio.wait([exec_task])
554
555 # cancel status task
556 status_task.cancel()
557
558 output, rc = exec_task.result()
559
560 # write final status
561 await self._store_status(
562 cluster_id=cluster_id,
563 kdu_instance=kdu_instance,
564 namespace=instance_info["namespace"],
565 db_dict=db_dict,
566 operation="rollback",
567 run_once=True,
568 check_every=0,
569 )
570
571 if rc != 0:
572 msg = "Error executing command: {}\nOutput: {}".format(command, output)
573 self.log.error(msg)
574 raise K8sException(msg)
575
576 # sync fs
577 self.fs.reverse_sync(from_path=cluster_id)
578
579 # return new revision number
580 instance = await self.get_instance_info(
581 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
582 )
583 if instance:
584 revision = int(instance.get("revision"))
585 self.log.debug("New revision: {}".format(revision))
586 return revision
587 else:
588 return 0
589
590 async def uninstall(self, cluster_uuid: str, kdu_instance: str):
591 """
592 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
593 (this call should happen after all _terminate-config-primitive_ of the VNF
594 are invoked).
595
596 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
597 :param kdu_instance: unique name for the KDU instance to be deleted
598 :return: True if successful
599 """
600
601 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
602 self.log.debug(
603 "uninstall kdu_instance {} from cluster {}".format(
604 kdu_instance, cluster_id
605 )
606 )
607
608 # sync local dir
609 self.fs.sync(from_path=cluster_id)
610
611 # look for instance to obtain namespace
612 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
613 if not instance_info:
614 raise K8sException("kdu_instance {} not found".format(kdu_instance))
615
616 # init env, paths
617 paths, env = self._init_paths_env(
618 cluster_name=cluster_id, create_if_not_exist=True
619 )
620
621 command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
622 output, _rc = await self._local_async_exec(
623 command=command, raise_exception_on_error=True, env=env
624 )
625
626 # sync fs
627 self.fs.reverse_sync(from_path=cluster_id)
628
629 return self._output_to_table(output)
630
631 async def instances_list(self, cluster_uuid: str) -> list:
632 """
633 returns a list of deployed releases in a cluster
634
635 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
636 :return:
637 """
638
639 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
640 self.log.debug("list releases for cluster {}".format(cluster_id))
641
642 # sync local dir
643 self.fs.sync(from_path=cluster_id)
644
645 # execute internal command
646 result = await self._instances_list(cluster_id)
647
648 # sync fs
649 self.fs.reverse_sync(from_path=cluster_id)
650
651 return result
652
653 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
654 instances = await self.instances_list(cluster_uuid=cluster_uuid)
655 for instance in instances:
656 if instance.get("name") == kdu_instance:
657 return instance
658 self.log.debug("Instance {} not found".format(kdu_instance))
659 return None
660
661 async def exec_primitive(
662 self,
663 cluster_uuid: str = None,
664 kdu_instance: str = None,
665 primitive_name: str = None,
666 timeout: float = 300,
667 params: dict = None,
668 db_dict: dict = None,
669 ) -> str:
670 """Exec primitive (Juju action)
671
672 :param cluster_uuid: The UUID of the cluster or namespace:cluster
673 :param kdu_instance: The unique name of the KDU instance
674 :param primitive_name: Name of action that will be executed
675 :param timeout: Timeout for action execution
676 :param params: Dictionary of all the parameters needed for the action
677 :db_dict: Dictionary for any additional data
678
679 :return: Returns the output of the action
680 """
681 raise K8sException(
682 "KDUs deployed with Helm don't support actions "
683 "different from rollback, upgrade and status"
684 )
685
686 async def get_services(self,
687 cluster_uuid: str,
688 kdu_instance: str,
689 namespace: str) -> list:
690 """
691 Returns a list of services defined for the specified kdu instance.
692
693 :param cluster_uuid: UUID of a K8s cluster known by OSM
694 :param kdu_instance: unique name for the KDU instance
695 :param namespace: K8s namespace used by the KDU instance
696 :return: If successful, it will return a list of services, Each service
697 can have the following data:
698 - `name` of the service
699 - `type` type of service in the k8 cluster
700 - `ports` List of ports offered by the service, for each port includes at least
701 name, port, protocol
702 - `cluster_ip` Internal ip to be used inside k8s cluster
703 - `external_ip` List of external ips (in case they are available)
704 """
705
706 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
707 self.log.debug(
708 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
709 cluster_uuid, kdu_instance
710 )
711 )
712
713 # sync local dir
714 self.fs.sync(from_path=cluster_id)
715
716 # get list of services names for kdu
717 service_names = await self._get_services(cluster_id, kdu_instance, namespace)
718
719 service_list = []
720 for service in service_names:
721 service = await self._get_service(cluster_id, service, namespace)
722 service_list.append(service)
723
724 # sync fs
725 self.fs.reverse_sync(from_path=cluster_id)
726
727 return service_list
728
729 async def get_service(self,
730 cluster_uuid: str,
731 service_name: str,
732 namespace: str) -> object:
733
734 self.log.debug(
735 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
736 service_name, namespace, cluster_uuid)
737 )
738
739 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
740
741 # sync local dir
742 self.fs.sync(from_path=cluster_id)
743
744 service = await self._get_service(cluster_id, service_name, namespace)
745
746 # sync fs
747 self.fs.reverse_sync(from_path=cluster_id)
748
749 return service
750
751 async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
752
753 self.log.debug(
754 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
755 cluster_uuid, kdu_instance
756 )
757 )
758
759 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
760
761 # sync local dir
762 self.fs.sync(from_path=cluster_id)
763
764 # get instance: needed to obtain namespace
765 instances = await self._instances_list(cluster_id=cluster_id)
766 for instance in instances:
767 if instance.get("name") == kdu_instance:
768 break
769 else:
770 # instance does not exist
771 raise K8sException("Instance name: {} not found in cluster: {}".format(
772 kdu_instance, cluster_id))
773
774 status = await self._status_kdu(
775 cluster_id=cluster_id,
776 kdu_instance=kdu_instance,
777 namespace=instance["namespace"],
778 show_error_log=True,
779 return_text=True,
780 )
781
782 # sync fs
783 self.fs.reverse_sync(from_path=cluster_id)
784
785 return status
786
787 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
788
789 self.log.debug(
790 "inspect kdu_model values {} from (optional) repo: {}".format(
791 kdu_model, repo_url
792 )
793 )
794
795 return await self._exec_inspect_comand(
796 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
797 )
798
799 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
800
801 self.log.debug(
802 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
803 )
804
805 return await self._exec_inspect_comand(
806 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
807 )
808
809 async def synchronize_repos(self, cluster_uuid: str):
810
811 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
812 try:
813 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
814 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
815
816 local_repo_list = await self.repo_list(cluster_uuid)
817 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
818
819 deleted_repo_list = []
820 added_repo_dict = {}
821
822 # iterate over the list of repos in the database that should be
823 # added if not present
824 for repo_name, db_repo in db_repo_dict.items():
825 try:
826 # check if it is already present
827 curr_repo_url = local_repo_dict.get(db_repo["name"])
828 repo_id = db_repo.get("_id")
829 if curr_repo_url != db_repo["url"]:
830 if curr_repo_url:
831 self.log.debug("repo {} url changed, delete and and again".format(
832 db_repo["url"]))
833 await self.repo_remove(cluster_uuid, db_repo["name"])
834 deleted_repo_list.append(repo_id)
835
836 # add repo
837 self.log.debug("add repo {}".format(db_repo["name"]))
838 await self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"])
839 added_repo_dict[repo_id] = db_repo["name"]
840 except Exception as e:
841 raise K8sException(
842 "Error adding repo id: {}, err_msg: {} ".format(
843 repo_id, repr(e)
844 )
845 )
846
847 # Delete repos that are present but not in nbi_list
848 for repo_name in local_repo_dict:
849 if not db_repo_dict.get(repo_name) and repo_name != "stable":
850 self.log.debug("delete repo {}".format(repo_name))
851 try:
852 await self.repo_remove(cluster_uuid, repo_name)
853 deleted_repo_list.append(repo_name)
854 except Exception as e:
855 self.warning(
856 "Error deleting repo, name: {}, err_msg: {}".format(
857 repo_name, str(e)
858 )
859 )
860
861 return deleted_repo_list, added_repo_dict
862
863 except K8sException:
864 raise
865 except Exception as e:
866 # Do not raise errors synchronizing repos
867 self.log.error("Error synchronizing repos: {}".format(e))
868 raise Exception("Error synchronizing repos: {}".format(e))
869
870 def _get_db_repos_dict(self, repo_ids: list):
871 db_repos_dict = {}
872 for repo_id in repo_ids:
873 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
874 db_repos_dict[db_repo["name"]] = db_repo
875 return db_repos_dict
876
877 """
878 ####################################################################################
879 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
880 ####################################################################################
881 """
882
883 @abc.abstractmethod
884 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
885 """
886 Creates and returns base cluster and kube dirs and returns them.
887 Also created helm3 dirs according to new directory specification, paths are
888 not returned but assigned to helm environment variables
889
890 :param cluster_name: cluster_name
891 :return: Dictionary with config_paths and dictionary with helm environment variables
892 """
893
894 @abc.abstractmethod
895 async def _cluster_init(self, cluster_id, namespace, paths, env):
896 """
897 Implements the helm version dependent cluster initialization
898 """
899
900 @abc.abstractmethod
901 async def _instances_list(self, cluster_id):
902 """
903 Implements the helm version dependent helm instances list
904 """
905
906 @abc.abstractmethod
907 async def _get_services(self, cluster_id, kdu_instance, namespace):
908 """
909 Implements the helm version dependent method to obtain services from a helm instance
910 """
911
912 @abc.abstractmethod
913 async def _status_kdu(self, cluster_id: str, kdu_instance: str, namespace: str = None,
914 show_error_log: bool = False, return_text: bool = False):
915 """
916 Implements the helm version dependent method to obtain status of a helm instance
917 """
918
919 @abc.abstractmethod
920 def _get_install_command(self, kdu_model, kdu_instance, namespace,
921 params_str, version, atomic, timeout) -> str:
922 """
923 Obtain command to be executed to delete the indicated instance
924 """
925
926 @abc.abstractmethod
927 def _get_upgrade_command(self, kdu_model, kdu_instance, namespace,
928 params_str, version, atomic, timeout) -> str:
929 """
930 Obtain command to be executed to upgrade the indicated instance
931 """
932
933 @abc.abstractmethod
934 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
935 """
936 Obtain command to be executed to rollback the indicated instance
937 """
938
939 @abc.abstractmethod
940 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
941 """
942 Obtain command to be executed to delete the indicated instance
943 """
944
945 @abc.abstractmethod
946 def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
947 version: str):
948 """
949 Obtain command to be executed to obtain information about the kdu
950 """
951
952 @abc.abstractmethod
953 async def _uninstall_sw(self, cluster_id: str, namespace: str):
954 """
955 Method call to uninstall cluster software for helm. This method is dependent
956 of helm version
957 For Helm v2 it will be called when Tiller must be uninstalled
958 For Helm v3 it does nothing and does not need to be callled
959 """
960
961 @abc.abstractmethod
962 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
963 """
964 Obtains the cluster repos identifiers
965 """
966
967 """
968 ####################################################################################
969 ################################### P R I V A T E ##################################
970 ####################################################################################
971 """
972
973 @staticmethod
974 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
975 if os.path.exists(filename):
976 return True
977 else:
978 msg = "File {} does not exist".format(filename)
979 if exception_if_not_exists:
980 raise K8sException(msg)
981
982 @staticmethod
983 def _remove_multiple_spaces(strobj):
984 strobj = strobj.strip()
985 while " " in strobj:
986 strobj = strobj.replace(" ", " ")
987 return strobj
988
989 @staticmethod
990 def _output_to_lines(output: str) -> list:
991 output_lines = list()
992 lines = output.splitlines(keepends=False)
993 for line in lines:
994 line = line.strip()
995 if len(line) > 0:
996 output_lines.append(line)
997 return output_lines
998
999 @staticmethod
1000 def _output_to_table(output: str) -> list:
1001 output_table = list()
1002 lines = output.splitlines(keepends=False)
1003 for line in lines:
1004 line = line.replace("\t", " ")
1005 line_list = list()
1006 output_table.append(line_list)
1007 cells = line.split(sep=" ")
1008 for cell in cells:
1009 cell = cell.strip()
1010 if len(cell) > 0:
1011 line_list.append(cell)
1012 return output_table
1013
1014 @staticmethod
1015 def _parse_services(output: str) -> list:
1016 lines = output.splitlines(keepends=False)
1017 services = []
1018 for line in lines:
1019 line = line.replace("\t", " ")
1020 cells = line.split(sep=" ")
1021 if len(cells) > 0 and cells[0].startswith("service/"):
1022 elems = cells[0].split(sep="/")
1023 if len(elems) > 1:
1024 services.append(elems[1])
1025 return services
1026
1027 @staticmethod
1028 def _get_deep(dictionary: dict, members: tuple):
1029 target = dictionary
1030 value = None
1031 try:
1032 for m in members:
1033 value = target.get(m)
1034 if not value:
1035 return None
1036 else:
1037 target = value
1038 except Exception:
1039 pass
1040 return value
1041
1042 # find key:value in several lines
1043 @staticmethod
1044 def _find_in_lines(p_lines: list, p_key: str) -> str:
1045 for line in p_lines:
1046 try:
1047 if line.startswith(p_key + ":"):
1048 parts = line.split(":")
1049 the_value = parts[1].strip()
1050 return the_value
1051 except Exception:
1052 # ignore it
1053 pass
1054 return None
1055
1056 @staticmethod
1057 def _lower_keys_list(input_list: list):
1058 """
1059 Transform the keys in a list of dictionaries to lower case and returns a new list
1060 of dictionaries
1061 """
1062 new_list = []
1063 for dictionary in input_list:
1064 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1065 new_list.append(new_dict)
1066 return new_list
1067
1068 def _local_exec(self, command: str) -> (str, int):
1069 command = self._remove_multiple_spaces(command)
1070 self.log.debug("Executing sync local command: {}".format(command))
1071 # raise exception if fails
1072 output = ""
1073 try:
1074 output = subprocess.check_output(
1075 command, shell=True, universal_newlines=True
1076 )
1077 return_code = 0
1078 self.log.debug(output)
1079 except Exception:
1080 return_code = 1
1081
1082 return output, return_code
1083
1084 async def _local_async_exec(
1085 self,
1086 command: str,
1087 raise_exception_on_error: bool = False,
1088 show_error_log: bool = True,
1089 encode_utf8: bool = False,
1090 env: dict = None
1091 ) -> (str, int):
1092
1093 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1094 self.log.debug("Executing async local command: {}, env: {}".format(command, env))
1095
1096 # split command
1097 command = shlex.split(command)
1098
1099 environ = os.environ.copy()
1100 if env:
1101 environ.update(env)
1102
1103 try:
1104 process = await asyncio.create_subprocess_exec(
1105 *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1106 env=environ
1107 )
1108
1109 # wait for command terminate
1110 stdout, stderr = await process.communicate()
1111
1112 return_code = process.returncode
1113
1114 output = ""
1115 if stdout:
1116 output = stdout.decode("utf-8").strip()
1117 # output = stdout.decode()
1118 if stderr:
1119 output = stderr.decode("utf-8").strip()
1120 # output = stderr.decode()
1121
1122 if return_code != 0 and show_error_log:
1123 self.log.debug(
1124 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1125 )
1126 else:
1127 self.log.debug("Return code: {}".format(return_code))
1128
1129 if raise_exception_on_error and return_code != 0:
1130 raise K8sException(output)
1131
1132 if encode_utf8:
1133 output = output.encode("utf-8").strip()
1134 output = str(output).replace("\\n", "\n")
1135
1136 return output, return_code
1137
1138 except asyncio.CancelledError:
1139 raise
1140 except K8sException:
1141 raise
1142 except Exception as e:
1143 msg = "Exception executing command: {} -> {}".format(command, e)
1144 self.log.error(msg)
1145 if raise_exception_on_error:
1146 raise K8sException(e) from e
1147 else:
1148 return "", -1
1149
1150 async def _local_async_exec_pipe(self,
1151 command1: str,
1152 command2: str,
1153 raise_exception_on_error: bool = True,
1154 show_error_log: bool = True,
1155 encode_utf8: bool = False,
1156 env: dict = None):
1157
1158 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1159 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1160 command = "{} | {}".format(command1, command2)
1161 self.log.debug("Executing async local command: {}, env: {}".format(command, env))
1162
1163 # split command
1164 command1 = shlex.split(command1)
1165 command2 = shlex.split(command2)
1166
1167 environ = os.environ.copy()
1168 if env:
1169 environ.update(env)
1170
1171 try:
1172 read, write = os.pipe()
1173 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1174 os.close(write)
1175 process_2 = await asyncio.create_subprocess_exec(*command2, stdin=read,
1176 stdout=asyncio.subprocess.PIPE,
1177 env=environ)
1178 os.close(read)
1179 stdout, stderr = await process_2.communicate()
1180
1181 return_code = process_2.returncode
1182
1183 output = ""
1184 if stdout:
1185 output = stdout.decode("utf-8").strip()
1186 # output = stdout.decode()
1187 if stderr:
1188 output = stderr.decode("utf-8").strip()
1189 # output = stderr.decode()
1190
1191 if return_code != 0 and show_error_log:
1192 self.log.debug(
1193 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1194 )
1195 else:
1196 self.log.debug("Return code: {}".format(return_code))
1197
1198 if raise_exception_on_error and return_code != 0:
1199 raise K8sException(output)
1200
1201 if encode_utf8:
1202 output = output.encode("utf-8").strip()
1203 output = str(output).replace("\\n", "\n")
1204
1205 return output, return_code
1206 except asyncio.CancelledError:
1207 raise
1208 except K8sException:
1209 raise
1210 except Exception as e:
1211 msg = "Exception executing command: {} -> {}".format(command, e)
1212 self.log.error(msg)
1213 if raise_exception_on_error:
1214 raise K8sException(e) from e
1215 else:
1216 return "", -1
1217
1218 async def _get_service(self, cluster_id, service_name, namespace):
1219 """
1220 Obtains the data of the specified service in the k8cluster.
1221
1222 :param cluster_id: id of a K8s cluster known by OSM
1223 :param service_name: name of the K8s service in the specified namespace
1224 :param namespace: K8s namespace used by the KDU instance
1225 :return: If successful, it will return a service with the following data:
1226 - `name` of the service
1227 - `type` type of service in the k8 cluster
1228 - `ports` List of ports offered by the service, for each port includes at least
1229 name, port, protocol
1230 - `cluster_ip` Internal ip to be used inside k8s cluster
1231 - `external_ip` List of external ips (in case they are available)
1232 """
1233
1234 # init config, env
1235 paths, env = self._init_paths_env(
1236 cluster_name=cluster_id, create_if_not_exist=True
1237 )
1238
1239 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1240 self.kubectl_command, paths["kube_config"], namespace, service_name
1241 )
1242
1243 output, _rc = await self._local_async_exec(
1244 command=command, raise_exception_on_error=True, env=env
1245 )
1246
1247 data = yaml.load(output, Loader=yaml.SafeLoader)
1248
1249 service = {
1250 "name": service_name,
1251 "type": self._get_deep(data, ("spec", "type")),
1252 "ports": self._get_deep(data, ("spec", "ports")),
1253 "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
1254 }
1255 if service["type"] == "LoadBalancer":
1256 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1257 ip_list = [elem["ip"] for elem in ip_map_list]
1258 service["external_ip"] = ip_list
1259
1260 return service
1261
1262 async def _exec_inspect_comand(
1263 self, inspect_command: str, kdu_model: str, repo_url: str = None
1264 ):
1265 """
1266 Obtains information about a kdu, no cluster (no env)
1267 """
1268
1269 repo_str = ""
1270 if repo_url:
1271 repo_str = " --repo {}".format(repo_url)
1272
1273 idx = kdu_model.find("/")
1274 if idx >= 0:
1275 idx += 1
1276 kdu_model = kdu_model[idx:]
1277
1278 version = ""
1279 if ":" in kdu_model:
1280 parts = kdu_model.split(sep=":")
1281 if len(parts) == 2:
1282 version = "--version {}".format(str(parts[1]))
1283 kdu_model = parts[0]
1284
1285 full_command = self._get_inspect_command(inspect_command, kdu_model, repo_str, version)
1286 output, _rc = await self._local_async_exec(
1287 command=full_command, encode_utf8=True
1288 )
1289
1290 return output
1291
1292 async def _store_status(
1293 self,
1294 cluster_id: str,
1295 operation: str,
1296 kdu_instance: str,
1297 namespace: str = None,
1298 check_every: float = 10,
1299 db_dict: dict = None,
1300 run_once: bool = False,
1301 ):
1302 while True:
1303 try:
1304 await asyncio.sleep(check_every)
1305 detailed_status = await self._status_kdu(
1306 cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=namespace,
1307 return_text=False
1308 )
1309 status = detailed_status.get("info").get("description")
1310 self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
1311 # write status to db
1312 result = await self.write_app_status_to_db(
1313 db_dict=db_dict,
1314 status=str(status),
1315 detailed_status=str(detailed_status),
1316 operation=operation,
1317 )
1318 if not result:
1319 self.log.info("Error writing in database. Task exiting...")
1320 return
1321 except asyncio.CancelledError:
1322 self.log.debug("Task cancelled")
1323 return
1324 except Exception as e:
1325 self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
1326 pass
1327 finally:
1328 if run_once:
1329 return
1330
1331 # params for use in -f file
1332 # returns values file option and filename (in order to delete it at the end)
1333 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1334
1335 if params and len(params) > 0:
1336 self._init_paths_env(
1337 cluster_name=cluster_id, create_if_not_exist=True
1338 )
1339
1340 def get_random_number():
1341 r = random.randrange(start=1, stop=99999999)
1342 s = str(r)
1343 while len(s) < 10:
1344 s = "0" + s
1345 return s
1346
1347 params2 = dict()
1348 for key in params:
1349 value = params.get(key)
1350 if "!!yaml" in str(value):
1351 value = yaml.load(value[7:])
1352 params2[key] = value
1353
1354 values_file = get_random_number() + ".yaml"
1355 with open(values_file, "w") as stream:
1356 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1357
1358 return "-f {}".format(values_file), values_file
1359
1360 return "", None
1361
1362 # params for use in --set option
1363 @staticmethod
1364 def _params_to_set_option(params: dict) -> str:
1365 params_str = ""
1366 if params and len(params) > 0:
1367 start = True
1368 for key in params:
1369 value = params.get(key, None)
1370 if value is not None:
1371 if start:
1372 params_str += "--set "
1373 start = False
1374 else:
1375 params_str += ","
1376 params_str += "{}={}".format(key, value)
1377 return params_str
1378
1379 @staticmethod
1380 def _generate_release_name(chart_name: str):
1381 # check embeded chart (file or dir)
1382 if chart_name.startswith("/"):
1383 # extract file or directory name
1384 chart_name = chart_name[chart_name.rfind("/") + 1:]
1385 # check URL
1386 elif "://" in chart_name:
1387 # extract last portion of URL
1388 chart_name = chart_name[chart_name.rfind("/") + 1:]
1389
1390 name = ""
1391 for c in chart_name:
1392 if c.isalpha() or c.isnumeric():
1393 name += c
1394 else:
1395 name += "-"
1396 if len(name) > 35:
1397 name = name[0:35]
1398
1399 # if does not start with alpha character, prefix 'a'
1400 if not name[0].isalpha():
1401 name = "a" + name
1402
1403 name += "-"
1404
1405 def get_random_number():
1406 r = random.randrange(start=1, stop=99999999)
1407 s = str(r)
1408 s = s.rjust(10, "0")
1409 return s
1410
1411 name = name + get_random_number()
1412 return name.lower()