Add generate_kdu_instance_name method in K8sConn
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 import random
25 import time
26 import shlex
27 import shutil
28 import stat
29 import subprocess
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.exceptions import K8sException
35 from n2vc.k8s_conn import K8sConnector
36
37
38 class K8sHelmBaseConnector(K8sConnector):
39
40 """
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
44 """
45 service_account = "osm"
46 _STABLE_REPO_URL = "https://charts.helm.sh/stable"
47
48 def __init__(
49 self,
50 fs: object,
51 db: object,
52 kubectl_command: str = "/usr/bin/kubectl",
53 helm_command: str = "/usr/bin/helm",
54 log: object = None,
55 on_update_db=None,
56 vca_config: dict = None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
87 # obtain stable repo url from config or apply default
88 if not vca_config or not vca_config.get("stablerepourl"):
89 self._stable_repo_url = self._STABLE_REPO_URL
90 else:
91 self._stable_repo_url = vca_config.get("stablerepourl")
92
93 @staticmethod
94 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
95 """
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
98 """
99 namespace, _, cluster_id = cluster_uuid.rpartition(':')
100 return namespace, cluster_id
101
102 async def init_env(
103 self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
104 ) -> (str, bool):
105 """
106 It prepares a given K8s cluster environment to run Charts
107
108 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
109 '.kube/config'
110 :param namespace: optional namespace to be used for helm. By default,
111 'kube-system' will be used
112 :param reuse_cluster_uuid: existing cluster uuid for reuse
113 :return: uuid of the K8s cluster and True if connector has installed some
114 software in the cluster
115 (on error, an exception will be raised)
116 """
117
118 if reuse_cluster_uuid:
119 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
120 namespace = namespace_ or namespace
121 else:
122 cluster_id = str(uuid4())
123 cluster_uuid = "{}:{}".format(namespace, cluster_id)
124
125 self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
126
127 paths, env = self._init_paths_env(
128 cluster_name=cluster_id, create_if_not_exist=True
129 )
130 mode = stat.S_IRUSR | stat.S_IWUSR
131 with open(paths["kube_config"], "w", mode) as f:
132 f.write(k8s_creds)
133 os.chmod(paths["kube_config"], 0o600)
134
135 # Code with initialization specific of helm version
136 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
137
138 # sync fs with local data
139 self.fs.reverse_sync(from_path=cluster_id)
140
141 self.log.info("Cluster {} initialized".format(cluster_id))
142
143 return cluster_uuid, n2vc_installed_sw
144
145 async def repo_add(
146 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
147 ):
148 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
149 self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
150 cluster_id, repo_type, name, url))
151
152 # sync local dir
153 self.fs.sync(from_path=cluster_id)
154
155 # init_env
156 paths, env = self._init_paths_env(
157 cluster_name=cluster_id, create_if_not_exist=True
158 )
159
160 # helm repo update
161 command = "{} repo update".format(
162 self._helm_command
163 )
164 self.log.debug("updating repo: {}".format(command))
165 await self._local_async_exec(command=command, raise_exception_on_error=False, env=env)
166
167 # helm repo add name url
168 command = "{} repo add {} {}".format(
169 self._helm_command, name, url
170 )
171 self.log.debug("adding repo: {}".format(command))
172 await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
173
174 # sync fs
175 self.fs.reverse_sync(from_path=cluster_id)
176
177 async def repo_list(self, cluster_uuid: str) -> list:
178 """
179 Get the list of registered repositories
180
181 :return: list of registered repositories: [ (name, url) .... ]
182 """
183
184 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
185 self.log.debug("list repositories for cluster {}".format(cluster_id))
186
187 # sync local dir
188 self.fs.sync(from_path=cluster_id)
189
190 # config filename
191 paths, env = self._init_paths_env(
192 cluster_name=cluster_id, create_if_not_exist=True
193 )
194
195 command = "{} repo list --output yaml".format(
196 self._helm_command
197 )
198
199 # Set exception to false because if there are no repos just want an empty list
200 output, _rc = await self._local_async_exec(
201 command=command, raise_exception_on_error=False, env=env
202 )
203
204 # sync fs
205 self.fs.reverse_sync(from_path=cluster_id)
206
207 if _rc == 0:
208 if output and len(output) > 0:
209 repos = yaml.load(output, Loader=yaml.SafeLoader)
210 # unify format between helm2 and helm3 setting all keys lowercase
211 return self._lower_keys_list(repos)
212 else:
213 return []
214 else:
215 return []
216
217 async def repo_remove(self, cluster_uuid: str, name: str):
218
219 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
220 self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
221
222 # sync local dir
223 self.fs.sync(from_path=cluster_id)
224
225 # init env, paths
226 paths, env = self._init_paths_env(
227 cluster_name=cluster_id, create_if_not_exist=True
228 )
229
230 command = "{} repo remove {}".format(
231 self._helm_command, name
232 )
233 await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
234
235 # sync fs
236 self.fs.reverse_sync(from_path=cluster_id)
237
238 async def reset(
239 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
240 ) -> bool:
241
242 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
243 self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
244 .format(cluster_id, uninstall_sw))
245
246 # sync local dir
247 self.fs.sync(from_path=cluster_id)
248
249 # uninstall releases if needed.
250 if uninstall_sw:
251 releases = await self.instances_list(cluster_uuid=cluster_uuid)
252 if len(releases) > 0:
253 if force:
254 for r in releases:
255 try:
256 kdu_instance = r.get("name")
257 chart = r.get("chart")
258 self.log.debug(
259 "Uninstalling {} -> {}".format(chart, kdu_instance)
260 )
261 await self.uninstall(
262 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
263 )
264 except Exception as e:
265 # will not raise exception as it was found
266 # that in some cases of previously installed helm releases it
267 # raised an error
268 self.log.warn(
269 "Error uninstalling release {}: {}".format(kdu_instance, e)
270 )
271 else:
272 msg = (
273 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
274 ).format(cluster_id)
275 self.log.warn(msg)
276 uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
277
278 if uninstall_sw:
279 await self._uninstall_sw(cluster_id, namespace)
280
281 # delete cluster directory
282 self.log.debug("Removing directory {}".format(cluster_id))
283 self.fs.file_delete(cluster_id, ignore_non_exist=True)
284 # Remove also local directorio if still exist
285 direct = self.fs.path + "/" + cluster_id
286 shutil.rmtree(direct, ignore_errors=True)
287
288 return True
289
290 async def _install_impl(
291 self,
292 cluster_id: str,
293 kdu_model: str,
294 paths: dict,
295 env: dict,
296 kdu_instance: str,
297 atomic: bool = True,
298 timeout: float = 300,
299 params: dict = None,
300 db_dict: dict = None,
301 kdu_name: str = None,
302 namespace: str = None,
303 ):
304 # params to str
305 params_str, file_to_delete = self._params_to_file_option(
306 cluster_id=cluster_id, params=params
307 )
308
309 # version
310 version = None
311 if ":" in kdu_model:
312 parts = kdu_model.split(sep=":")
313 if len(parts) == 2:
314 version = str(parts[1])
315 kdu_model = parts[0]
316
317 command = self._get_install_command(kdu_model, kdu_instance, namespace,
318 params_str, version, atomic, timeout)
319
320 self.log.debug("installing: {}".format(command))
321
322 if atomic:
323 # exec helm in a task
324 exec_task = asyncio.ensure_future(
325 coro_or_future=self._local_async_exec(
326 command=command, raise_exception_on_error=False, env=env
327 )
328 )
329
330 # write status in another task
331 status_task = asyncio.ensure_future(
332 coro_or_future=self._store_status(
333 cluster_id=cluster_id,
334 kdu_instance=kdu_instance,
335 namespace=namespace,
336 db_dict=db_dict,
337 operation="install",
338 run_once=False,
339 )
340 )
341
342 # wait for execution task
343 await asyncio.wait([exec_task])
344
345 # cancel status task
346 status_task.cancel()
347
348 output, rc = exec_task.result()
349
350 else:
351
352 output, rc = await self._local_async_exec(
353 command=command, raise_exception_on_error=False, env=env
354 )
355
356 # remove temporal values yaml file
357 if file_to_delete:
358 os.remove(file_to_delete)
359
360 # write final status
361 await self._store_status(
362 cluster_id=cluster_id,
363 kdu_instance=kdu_instance,
364 namespace=namespace,
365 db_dict=db_dict,
366 operation="install",
367 run_once=True,
368 check_every=0,
369 )
370
371 if rc != 0:
372 msg = "Error executing command: {}\nOutput: {}".format(command, output)
373 self.log.error(msg)
374 raise K8sException(msg)
375
376 async def upgrade(
377 self,
378 cluster_uuid: str,
379 kdu_instance: str,
380 kdu_model: str = None,
381 atomic: bool = True,
382 timeout: float = 300,
383 params: dict = None,
384 db_dict: dict = None,
385 ):
386 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
387 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
388
389 # sync local dir
390 self.fs.sync(from_path=cluster_id)
391
392 # look for instance to obtain namespace
393 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
394 if not instance_info:
395 raise K8sException("kdu_instance {} not found".format(kdu_instance))
396
397 # init env, paths
398 paths, env = self._init_paths_env(
399 cluster_name=cluster_id, create_if_not_exist=True
400 )
401
402 # params to str
403 params_str, file_to_delete = self._params_to_file_option(
404 cluster_id=cluster_id, params=params
405 )
406
407 # version
408 version = None
409 if ":" in kdu_model:
410 parts = kdu_model.split(sep=":")
411 if len(parts) == 2:
412 version = str(parts[1])
413 kdu_model = parts[0]
414
415 command = self._get_upgrade_command(kdu_model, kdu_instance, instance_info["namespace"],
416 params_str, version, atomic, timeout)
417
418 self.log.debug("upgrading: {}".format(command))
419
420 if atomic:
421
422 # exec helm in a task
423 exec_task = asyncio.ensure_future(
424 coro_or_future=self._local_async_exec(
425 command=command, raise_exception_on_error=False, env=env
426 )
427 )
428 # write status in another task
429 status_task = asyncio.ensure_future(
430 coro_or_future=self._store_status(
431 cluster_id=cluster_id,
432 kdu_instance=kdu_instance,
433 namespace=instance_info["namespace"],
434 db_dict=db_dict,
435 operation="upgrade",
436 run_once=False,
437 )
438 )
439
440 # wait for execution task
441 await asyncio.wait([exec_task])
442
443 # cancel status task
444 status_task.cancel()
445 output, rc = exec_task.result()
446
447 else:
448
449 output, rc = await self._local_async_exec(
450 command=command, raise_exception_on_error=False, env=env
451 )
452
453 # remove temporal values yaml file
454 if file_to_delete:
455 os.remove(file_to_delete)
456
457 # write final status
458 await self._store_status(
459 cluster_id=cluster_id,
460 kdu_instance=kdu_instance,
461 namespace=instance_info["namespace"],
462 db_dict=db_dict,
463 operation="upgrade",
464 run_once=True,
465 check_every=0,
466 )
467
468 if rc != 0:
469 msg = "Error executing command: {}\nOutput: {}".format(command, output)
470 self.log.error(msg)
471 raise K8sException(msg)
472
473 # sync fs
474 self.fs.reverse_sync(from_path=cluster_id)
475
476 # return new revision number
477 instance = await self.get_instance_info(
478 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
479 )
480 if instance:
481 revision = int(instance.get("revision"))
482 self.log.debug("New revision: {}".format(revision))
483 return revision
484 else:
485 return 0
486
487 async def rollback(
488 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
489 ):
490
491 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
492 self.log.debug(
493 "rollback kdu_instance {} to revision {} from cluster {}".format(
494 kdu_instance, revision, cluster_id
495 )
496 )
497
498 # sync local dir
499 self.fs.sync(from_path=cluster_id)
500
501 # look for instance to obtain namespace
502 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
503 if not instance_info:
504 raise K8sException("kdu_instance {} not found".format(kdu_instance))
505
506 # init env, paths
507 paths, env = self._init_paths_env(
508 cluster_name=cluster_id, create_if_not_exist=True
509 )
510
511 command = self._get_rollback_command(kdu_instance, instance_info["namespace"],
512 revision)
513
514 self.log.debug("rolling_back: {}".format(command))
515
516 # exec helm in a task
517 exec_task = asyncio.ensure_future(
518 coro_or_future=self._local_async_exec(
519 command=command, raise_exception_on_error=False, env=env
520 )
521 )
522 # write status in another task
523 status_task = asyncio.ensure_future(
524 coro_or_future=self._store_status(
525 cluster_id=cluster_id,
526 kdu_instance=kdu_instance,
527 namespace=instance_info["namespace"],
528 db_dict=db_dict,
529 operation="rollback",
530 run_once=False,
531 )
532 )
533
534 # wait for execution task
535 await asyncio.wait([exec_task])
536
537 # cancel status task
538 status_task.cancel()
539
540 output, rc = exec_task.result()
541
542 # write final status
543 await self._store_status(
544 cluster_id=cluster_id,
545 kdu_instance=kdu_instance,
546 namespace=instance_info["namespace"],
547 db_dict=db_dict,
548 operation="rollback",
549 run_once=True,
550 check_every=0,
551 )
552
553 if rc != 0:
554 msg = "Error executing command: {}\nOutput: {}".format(command, output)
555 self.log.error(msg)
556 raise K8sException(msg)
557
558 # sync fs
559 self.fs.reverse_sync(from_path=cluster_id)
560
561 # return new revision number
562 instance = await self.get_instance_info(
563 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
564 )
565 if instance:
566 revision = int(instance.get("revision"))
567 self.log.debug("New revision: {}".format(revision))
568 return revision
569 else:
570 return 0
571
572 async def uninstall(self, cluster_uuid: str, kdu_instance: str):
573 """
574 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
575 (this call should happen after all _terminate-config-primitive_ of the VNF
576 are invoked).
577
578 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
579 :param kdu_instance: unique name for the KDU instance to be deleted
580 :return: True if successful
581 """
582
583 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
584 self.log.debug(
585 "uninstall kdu_instance {} from cluster {}".format(
586 kdu_instance, cluster_id
587 )
588 )
589
590 # sync local dir
591 self.fs.sync(from_path=cluster_id)
592
593 # look for instance to obtain namespace
594 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
595 if not instance_info:
596 raise K8sException("kdu_instance {} not found".format(kdu_instance))
597
598 # init env, paths
599 paths, env = self._init_paths_env(
600 cluster_name=cluster_id, create_if_not_exist=True
601 )
602
603 command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
604 output, _rc = await self._local_async_exec(
605 command=command, raise_exception_on_error=True, env=env
606 )
607
608 # sync fs
609 self.fs.reverse_sync(from_path=cluster_id)
610
611 return self._output_to_table(output)
612
613 async def instances_list(self, cluster_uuid: str) -> list:
614 """
615 returns a list of deployed releases in a cluster
616
617 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
618 :return:
619 """
620
621 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
622 self.log.debug("list releases for cluster {}".format(cluster_id))
623
624 # sync local dir
625 self.fs.sync(from_path=cluster_id)
626
627 # execute internal command
628 result = await self._instances_list(cluster_id)
629
630 # sync fs
631 self.fs.reverse_sync(from_path=cluster_id)
632
633 return result
634
635 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
636 instances = await self.instances_list(cluster_uuid=cluster_uuid)
637 for instance in instances:
638 if instance.get("name") == kdu_instance:
639 return instance
640 self.log.debug("Instance {} not found".format(kdu_instance))
641 return None
642
643 async def exec_primitive(
644 self,
645 cluster_uuid: str = None,
646 kdu_instance: str = None,
647 primitive_name: str = None,
648 timeout: float = 300,
649 params: dict = None,
650 db_dict: dict = None,
651 ) -> str:
652 """Exec primitive (Juju action)
653
654 :param cluster_uuid: The UUID of the cluster or namespace:cluster
655 :param kdu_instance: The unique name of the KDU instance
656 :param primitive_name: Name of action that will be executed
657 :param timeout: Timeout for action execution
658 :param params: Dictionary of all the parameters needed for the action
659 :db_dict: Dictionary for any additional data
660
661 :return: Returns the output of the action
662 """
663 raise K8sException(
664 "KDUs deployed with Helm don't support actions "
665 "different from rollback, upgrade and status"
666 )
667
668 async def get_services(self,
669 cluster_uuid: str,
670 kdu_instance: str,
671 namespace: str) -> list:
672 """
673 Returns a list of services defined for the specified kdu instance.
674
675 :param cluster_uuid: UUID of a K8s cluster known by OSM
676 :param kdu_instance: unique name for the KDU instance
677 :param namespace: K8s namespace used by the KDU instance
678 :return: If successful, it will return a list of services, Each service
679 can have the following data:
680 - `name` of the service
681 - `type` type of service in the k8 cluster
682 - `ports` List of ports offered by the service, for each port includes at least
683 name, port, protocol
684 - `cluster_ip` Internal ip to be used inside k8s cluster
685 - `external_ip` List of external ips (in case they are available)
686 """
687
688 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
689 self.log.debug(
690 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
691 cluster_uuid, kdu_instance
692 )
693 )
694
695 # sync local dir
696 self.fs.sync(from_path=cluster_id)
697
698 # get list of services names for kdu
699 service_names = await self._get_services(cluster_id, kdu_instance, namespace)
700
701 service_list = []
702 for service in service_names:
703 service = await self._get_service(cluster_id, service, namespace)
704 service_list.append(service)
705
706 # sync fs
707 self.fs.reverse_sync(from_path=cluster_id)
708
709 return service_list
710
711 async def get_service(self,
712 cluster_uuid: str,
713 service_name: str,
714 namespace: str) -> object:
715
716 self.log.debug(
717 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
718 service_name, namespace, cluster_uuid)
719 )
720
721 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
722
723 # sync local dir
724 self.fs.sync(from_path=cluster_id)
725
726 service = await self._get_service(cluster_id, service_name, namespace)
727
728 # sync fs
729 self.fs.reverse_sync(from_path=cluster_id)
730
731 return service
732
733 async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
734
735 self.log.debug(
736 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
737 cluster_uuid, kdu_instance
738 )
739 )
740
741 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
742
743 # sync local dir
744 self.fs.sync(from_path=cluster_id)
745
746 # get instance: needed to obtain namespace
747 instances = await self._instances_list(cluster_id=cluster_id)
748 for instance in instances:
749 if instance.get("name") == kdu_instance:
750 break
751 else:
752 # instance does not exist
753 raise K8sException("Instance name: {} not found in cluster: {}".format(
754 kdu_instance, cluster_id))
755
756 status = await self._status_kdu(
757 cluster_id=cluster_id,
758 kdu_instance=kdu_instance,
759 namespace=instance["namespace"],
760 show_error_log=True,
761 return_text=True,
762 )
763
764 # sync fs
765 self.fs.reverse_sync(from_path=cluster_id)
766
767 return status
768
769 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
770
771 self.log.debug(
772 "inspect kdu_model values {} from (optional) repo: {}".format(
773 kdu_model, repo_url
774 )
775 )
776
777 return await self._exec_inspect_comand(
778 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
779 )
780
781 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
782
783 self.log.debug(
784 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
785 )
786
787 return await self._exec_inspect_comand(
788 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
789 )
790
791 async def synchronize_repos(self, cluster_uuid: str):
792
793 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
794 try:
795 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
796 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
797
798 local_repo_list = await self.repo_list(cluster_uuid)
799 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
800
801 deleted_repo_list = []
802 added_repo_dict = {}
803
804 # iterate over the list of repos in the database that should be
805 # added if not present
806 for repo_name, db_repo in db_repo_dict.items():
807 try:
808 # check if it is already present
809 curr_repo_url = local_repo_dict.get(db_repo["name"])
810 repo_id = db_repo.get("_id")
811 if curr_repo_url != db_repo["url"]:
812 if curr_repo_url:
813 self.log.debug("repo {} url changed, delete and and again".format(
814 db_repo["url"]))
815 await self.repo_remove(cluster_uuid, db_repo["name"])
816 deleted_repo_list.append(repo_id)
817
818 # add repo
819 self.log.debug("add repo {}".format(db_repo["name"]))
820 await self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"])
821 added_repo_dict[repo_id] = db_repo["name"]
822 except Exception as e:
823 raise K8sException(
824 "Error adding repo id: {}, err_msg: {} ".format(
825 repo_id, repr(e)
826 )
827 )
828
829 # Delete repos that are present but not in nbi_list
830 for repo_name in local_repo_dict:
831 if not db_repo_dict.get(repo_name) and repo_name != "stable":
832 self.log.debug("delete repo {}".format(repo_name))
833 try:
834 await self.repo_remove(cluster_uuid, repo_name)
835 deleted_repo_list.append(repo_name)
836 except Exception as e:
837 self.warning(
838 "Error deleting repo, name: {}, err_msg: {}".format(
839 repo_name, str(e)
840 )
841 )
842
843 return deleted_repo_list, added_repo_dict
844
845 except K8sException:
846 raise
847 except Exception as e:
848 # Do not raise errors synchronizing repos
849 self.log.error("Error synchronizing repos: {}".format(e))
850 raise Exception("Error synchronizing repos: {}".format(e))
851
852 def _get_db_repos_dict(self, repo_ids: list):
853 db_repos_dict = {}
854 for repo_id in repo_ids:
855 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
856 db_repos_dict[db_repo["name"]] = db_repo
857 return db_repos_dict
858
859 """
860 ####################################################################################
861 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
862 ####################################################################################
863 """
864
865 @abc.abstractmethod
866 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
867 """
868 Creates and returns base cluster and kube dirs and returns them.
869 Also created helm3 dirs according to new directory specification, paths are
870 not returned but assigned to helm environment variables
871
872 :param cluster_name: cluster_name
873 :return: Dictionary with config_paths and dictionary with helm environment variables
874 """
875
876 @abc.abstractmethod
877 async def _cluster_init(self, cluster_id, namespace, paths, env):
878 """
879 Implements the helm version dependent cluster initialization
880 """
881
882 @abc.abstractmethod
883 async def _instances_list(self, cluster_id):
884 """
885 Implements the helm version dependent helm instances list
886 """
887
888 @abc.abstractmethod
889 async def _get_services(self, cluster_id, kdu_instance, namespace):
890 """
891 Implements the helm version dependent method to obtain services from a helm instance
892 """
893
894 @abc.abstractmethod
895 async def _status_kdu(self, cluster_id: str, kdu_instance: str, namespace: str = None,
896 show_error_log: bool = False, return_text: bool = False):
897 """
898 Implements the helm version dependent method to obtain status of a helm instance
899 """
900
901 @abc.abstractmethod
902 def _get_install_command(self, kdu_model, kdu_instance, namespace,
903 params_str, version, atomic, timeout) -> str:
904 """
905 Obtain command to be executed to delete the indicated instance
906 """
907
908 @abc.abstractmethod
909 def _get_upgrade_command(self, kdu_model, kdu_instance, namespace,
910 params_str, version, atomic, timeout) -> str:
911 """
912 Obtain command to be executed to upgrade the indicated instance
913 """
914
915 @abc.abstractmethod
916 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
917 """
918 Obtain command to be executed to rollback the indicated instance
919 """
920
921 @abc.abstractmethod
922 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
923 """
924 Obtain command to be executed to delete the indicated instance
925 """
926
927 @abc.abstractmethod
928 def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
929 version: str):
930 """
931 Obtain command to be executed to obtain information about the kdu
932 """
933
934 @abc.abstractmethod
935 async def _uninstall_sw(self, cluster_id: str, namespace: str):
936 """
937 Method call to uninstall cluster software for helm. This method is dependent
938 of helm version
939 For Helm v2 it will be called when Tiller must be uninstalled
940 For Helm v3 it does nothing and does not need to be callled
941 """
942
943 @abc.abstractmethod
944 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
945 """
946 Obtains the cluster repos identifiers
947 """
948
949 """
950 ####################################################################################
951 ################################### P R I V A T E ##################################
952 ####################################################################################
953 """
954
955 @staticmethod
956 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
957 if os.path.exists(filename):
958 return True
959 else:
960 msg = "File {} does not exist".format(filename)
961 if exception_if_not_exists:
962 raise K8sException(msg)
963
964 @staticmethod
965 def _remove_multiple_spaces(strobj):
966 strobj = strobj.strip()
967 while " " in strobj:
968 strobj = strobj.replace(" ", " ")
969 return strobj
970
971 @staticmethod
972 def _output_to_lines(output: str) -> list:
973 output_lines = list()
974 lines = output.splitlines(keepends=False)
975 for line in lines:
976 line = line.strip()
977 if len(line) > 0:
978 output_lines.append(line)
979 return output_lines
980
981 @staticmethod
982 def _output_to_table(output: str) -> list:
983 output_table = list()
984 lines = output.splitlines(keepends=False)
985 for line in lines:
986 line = line.replace("\t", " ")
987 line_list = list()
988 output_table.append(line_list)
989 cells = line.split(sep=" ")
990 for cell in cells:
991 cell = cell.strip()
992 if len(cell) > 0:
993 line_list.append(cell)
994 return output_table
995
996 @staticmethod
997 def _parse_services(output: str) -> list:
998 lines = output.splitlines(keepends=False)
999 services = []
1000 for line in lines:
1001 line = line.replace("\t", " ")
1002 cells = line.split(sep=" ")
1003 if len(cells) > 0 and cells[0].startswith("service/"):
1004 elems = cells[0].split(sep="/")
1005 if len(elems) > 1:
1006 services.append(elems[1])
1007 return services
1008
1009 @staticmethod
1010 def _get_deep(dictionary: dict, members: tuple):
1011 target = dictionary
1012 value = None
1013 try:
1014 for m in members:
1015 value = target.get(m)
1016 if not value:
1017 return None
1018 else:
1019 target = value
1020 except Exception:
1021 pass
1022 return value
1023
1024 # find key:value in several lines
1025 @staticmethod
1026 def _find_in_lines(p_lines: list, p_key: str) -> str:
1027 for line in p_lines:
1028 try:
1029 if line.startswith(p_key + ":"):
1030 parts = line.split(":")
1031 the_value = parts[1].strip()
1032 return the_value
1033 except Exception:
1034 # ignore it
1035 pass
1036 return None
1037
1038 @staticmethod
1039 def _lower_keys_list(input_list: list):
1040 """
1041 Transform the keys in a list of dictionaries to lower case and returns a new list
1042 of dictionaries
1043 """
1044 new_list = []
1045 for dictionary in input_list:
1046 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1047 new_list.append(new_dict)
1048 return new_list
1049
1050 def _local_exec(self, command: str) -> (str, int):
1051 command = self._remove_multiple_spaces(command)
1052 self.log.debug("Executing sync local command: {}".format(command))
1053 # raise exception if fails
1054 output = ""
1055 try:
1056 output = subprocess.check_output(
1057 command, shell=True, universal_newlines=True
1058 )
1059 return_code = 0
1060 self.log.debug(output)
1061 except Exception:
1062 return_code = 1
1063
1064 return output, return_code
1065
1066 async def _local_async_exec(
1067 self,
1068 command: str,
1069 raise_exception_on_error: bool = False,
1070 show_error_log: bool = True,
1071 encode_utf8: bool = False,
1072 env: dict = None
1073 ) -> (str, int):
1074
1075 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1076 self.log.debug("Executing async local command: {}, env: {}".format(command, env))
1077
1078 # split command
1079 command = shlex.split(command)
1080
1081 environ = os.environ.copy()
1082 if env:
1083 environ.update(env)
1084
1085 try:
1086 process = await asyncio.create_subprocess_exec(
1087 *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1088 env=environ
1089 )
1090
1091 # wait for command terminate
1092 stdout, stderr = await process.communicate()
1093
1094 return_code = process.returncode
1095
1096 output = ""
1097 if stdout:
1098 output = stdout.decode("utf-8").strip()
1099 # output = stdout.decode()
1100 if stderr:
1101 output = stderr.decode("utf-8").strip()
1102 # output = stderr.decode()
1103
1104 if return_code != 0 and show_error_log:
1105 self.log.debug(
1106 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1107 )
1108 else:
1109 self.log.debug("Return code: {}".format(return_code))
1110
1111 if raise_exception_on_error and return_code != 0:
1112 raise K8sException(output)
1113
1114 if encode_utf8:
1115 output = output.encode("utf-8").strip()
1116 output = str(output).replace("\\n", "\n")
1117
1118 return output, return_code
1119
1120 except asyncio.CancelledError:
1121 raise
1122 except K8sException:
1123 raise
1124 except Exception as e:
1125 msg = "Exception executing command: {} -> {}".format(command, e)
1126 self.log.error(msg)
1127 if raise_exception_on_error:
1128 raise K8sException(e) from e
1129 else:
1130 return "", -1
1131
1132 async def _local_async_exec_pipe(self,
1133 command1: str,
1134 command2: str,
1135 raise_exception_on_error: bool = True,
1136 show_error_log: bool = True,
1137 encode_utf8: bool = False,
1138 env: dict = None):
1139
1140 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1141 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1142 command = "{} | {}".format(command1, command2)
1143 self.log.debug("Executing async local command: {}, env: {}".format(command, env))
1144
1145 # split command
1146 command1 = shlex.split(command1)
1147 command2 = shlex.split(command2)
1148
1149 environ = os.environ.copy()
1150 if env:
1151 environ.update(env)
1152
1153 try:
1154 read, write = os.pipe()
1155 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1156 os.close(write)
1157 process_2 = await asyncio.create_subprocess_exec(*command2, stdin=read,
1158 stdout=asyncio.subprocess.PIPE,
1159 env=environ)
1160 os.close(read)
1161 stdout, stderr = await process_2.communicate()
1162
1163 return_code = process_2.returncode
1164
1165 output = ""
1166 if stdout:
1167 output = stdout.decode("utf-8").strip()
1168 # output = stdout.decode()
1169 if stderr:
1170 output = stderr.decode("utf-8").strip()
1171 # output = stderr.decode()
1172
1173 if return_code != 0 and show_error_log:
1174 self.log.debug(
1175 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1176 )
1177 else:
1178 self.log.debug("Return code: {}".format(return_code))
1179
1180 if raise_exception_on_error and return_code != 0:
1181 raise K8sException(output)
1182
1183 if encode_utf8:
1184 output = output.encode("utf-8").strip()
1185 output = str(output).replace("\\n", "\n")
1186
1187 return output, return_code
1188 except asyncio.CancelledError:
1189 raise
1190 except K8sException:
1191 raise
1192 except Exception as e:
1193 msg = "Exception executing command: {} -> {}".format(command, e)
1194 self.log.error(msg)
1195 if raise_exception_on_error:
1196 raise K8sException(e) from e
1197 else:
1198 return "", -1
1199
1200 async def _get_service(self, cluster_id, service_name, namespace):
1201 """
1202 Obtains the data of the specified service in the k8cluster.
1203
1204 :param cluster_id: id of a K8s cluster known by OSM
1205 :param service_name: name of the K8s service in the specified namespace
1206 :param namespace: K8s namespace used by the KDU instance
1207 :return: If successful, it will return a service with the following data:
1208 - `name` of the service
1209 - `type` type of service in the k8 cluster
1210 - `ports` List of ports offered by the service, for each port includes at least
1211 name, port, protocol
1212 - `cluster_ip` Internal ip to be used inside k8s cluster
1213 - `external_ip` List of external ips (in case they are available)
1214 """
1215
1216 # init config, env
1217 paths, env = self._init_paths_env(
1218 cluster_name=cluster_id, create_if_not_exist=True
1219 )
1220
1221 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1222 self.kubectl_command, paths["kube_config"], namespace, service_name
1223 )
1224
1225 output, _rc = await self._local_async_exec(
1226 command=command, raise_exception_on_error=True, env=env
1227 )
1228
1229 data = yaml.load(output, Loader=yaml.SafeLoader)
1230
1231 service = {
1232 "name": service_name,
1233 "type": self._get_deep(data, ("spec", "type")),
1234 "ports": self._get_deep(data, ("spec", "ports")),
1235 "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
1236 }
1237 if service["type"] == "LoadBalancer":
1238 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1239 ip_list = [elem["ip"] for elem in ip_map_list]
1240 service["external_ip"] = ip_list
1241
1242 return service
1243
1244 async def _exec_inspect_comand(
1245 self, inspect_command: str, kdu_model: str, repo_url: str = None
1246 ):
1247 """
1248 Obtains information about a kdu, no cluster (no env)
1249 """
1250
1251 repo_str = ""
1252 if repo_url:
1253 repo_str = " --repo {}".format(repo_url)
1254
1255 idx = kdu_model.find("/")
1256 if idx >= 0:
1257 idx += 1
1258 kdu_model = kdu_model[idx:]
1259
1260 version = ""
1261 if ":" in kdu_model:
1262 parts = kdu_model.split(sep=":")
1263 if len(parts) == 2:
1264 version = "--version {}".format(str(parts[1]))
1265 kdu_model = parts[0]
1266
1267 full_command = self._get_inspect_command(inspect_command, kdu_model, repo_str, version)
1268 output, _rc = await self._local_async_exec(
1269 command=full_command, encode_utf8=True
1270 )
1271
1272 return output
1273
1274 async def _store_status(
1275 self,
1276 cluster_id: str,
1277 operation: str,
1278 kdu_instance: str,
1279 namespace: str = None,
1280 check_every: float = 10,
1281 db_dict: dict = None,
1282 run_once: bool = False,
1283 ):
1284 while True:
1285 try:
1286 await asyncio.sleep(check_every)
1287 detailed_status = await self._status_kdu(
1288 cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=namespace,
1289 return_text=False
1290 )
1291 status = detailed_status.get("info").get("description")
1292 self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
1293 # write status to db
1294 result = await self.write_app_status_to_db(
1295 db_dict=db_dict,
1296 status=str(status),
1297 detailed_status=str(detailed_status),
1298 operation=operation,
1299 )
1300 if not result:
1301 self.log.info("Error writing in database. Task exiting...")
1302 return
1303 except asyncio.CancelledError:
1304 self.log.debug("Task cancelled")
1305 return
1306 except Exception as e:
1307 self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
1308 pass
1309 finally:
1310 if run_once:
1311 return
1312
1313 # params for use in -f file
1314 # returns values file option and filename (in order to delete it at the end)
1315 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1316
1317 if params and len(params) > 0:
1318 self._init_paths_env(
1319 cluster_name=cluster_id, create_if_not_exist=True
1320 )
1321
1322 def get_random_number():
1323 r = random.randrange(start=1, stop=99999999)
1324 s = str(r)
1325 while len(s) < 10:
1326 s = "0" + s
1327 return s
1328
1329 params2 = dict()
1330 for key in params:
1331 value = params.get(key)
1332 if "!!yaml" in str(value):
1333 value = yaml.load(value[7:])
1334 params2[key] = value
1335
1336 values_file = get_random_number() + ".yaml"
1337 with open(values_file, "w") as stream:
1338 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1339
1340 return "-f {}".format(values_file), values_file
1341
1342 return "", None
1343
1344 # params for use in --set option
1345 @staticmethod
1346 def _params_to_set_option(params: dict) -> str:
1347 params_str = ""
1348 if params and len(params) > 0:
1349 start = True
1350 for key in params:
1351 value = params.get(key, None)
1352 if value is not None:
1353 if start:
1354 params_str += "--set "
1355 start = False
1356 else:
1357 params_str += ","
1358 params_str += "{}={}".format(key, value)
1359 return params_str
1360
1361 @staticmethod
1362 def generate_kdu_instance_name(**kwargs):
1363 chart_name = kwargs["kdu_model"]
1364 # check embeded chart (file or dir)
1365 if chart_name.startswith("/"):
1366 # extract file or directory name
1367 chart_name = chart_name[chart_name.rfind("/") + 1:]
1368 # check URL
1369 elif "://" in chart_name:
1370 # extract last portion of URL
1371 chart_name = chart_name[chart_name.rfind("/") + 1:]
1372
1373 name = ""
1374 for c in chart_name:
1375 if c.isalpha() or c.isnumeric():
1376 name += c
1377 else:
1378 name += "-"
1379 if len(name) > 35:
1380 name = name[0:35]
1381
1382 # if does not start with alpha character, prefix 'a'
1383 if not name[0].isalpha():
1384 name = "a" + name
1385
1386 name += "-"
1387
1388 def get_random_number():
1389 r = random.randrange(start=1, stop=99999999)
1390 s = str(r)
1391 s = s.rjust(10, "0")
1392 return s
1393
1394 name = name + get_random_number()
1395 return name.lower()