b5bf640409bf49b992573f864b3e33040503331d
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 import random
25 import time
26 import shlex
27 import shutil
28 import stat
29 import subprocess
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.exceptions import K8sException
35 from n2vc.k8s_conn import K8sConnector
36
37
38 class K8sHelmBaseConnector(K8sConnector):
39
40 """
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
44 """
45 service_account = "osm"
46 _STABLE_REPO_URL = "https://charts.helm.sh/stable"
47
48 def __init__(
49 self,
50 fs: object,
51 db: object,
52 kubectl_command: str = "/usr/bin/kubectl",
53 helm_command: str = "/usr/bin/helm",
54 log: object = None,
55 on_update_db=None,
56 vca_config: dict = None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
87 # obtain stable repo url from config or apply default
88 if not vca_config or not vca_config.get("stablerepourl"):
89 self._stable_repo_url = self._STABLE_REPO_URL
90 else:
91 self._stable_repo_url = vca_config.get("stablerepourl")
92
93 @staticmethod
94 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
95 """
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
98 """
99 namespace, _, cluster_id = cluster_uuid.rpartition(':')
100 return namespace, cluster_id
101
102 async def init_env(
103 self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None, **kwargs,
104 ) -> (str, bool):
105 """
106 It prepares a given K8s cluster environment to run Charts
107
108 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
109 '.kube/config'
110 :param namespace: optional namespace to be used for helm. By default,
111 'kube-system' will be used
112 :param reuse_cluster_uuid: existing cluster uuid for reuse
113 :param kwargs: Additional parameters (None yet)
114 :return: uuid of the K8s cluster and True if connector has installed some
115 software in the cluster
116 (on error, an exception will be raised)
117 """
118
119 if reuse_cluster_uuid:
120 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
121 namespace = namespace_ or namespace
122 else:
123 cluster_id = str(uuid4())
124 cluster_uuid = "{}:{}".format(namespace, cluster_id)
125
126 self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
127
128 paths, env = self._init_paths_env(
129 cluster_name=cluster_id, create_if_not_exist=True
130 )
131 mode = stat.S_IRUSR | stat.S_IWUSR
132 with open(paths["kube_config"], "w", mode) as f:
133 f.write(k8s_creds)
134 os.chmod(paths["kube_config"], 0o600)
135
136 # Code with initialization specific of helm version
137 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
138
139 # sync fs with local data
140 self.fs.reverse_sync(from_path=cluster_id)
141
142 self.log.info("Cluster {} initialized".format(cluster_id))
143
144 return cluster_uuid, n2vc_installed_sw
145
146 async def repo_add(
147 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
148 ):
149 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
150 self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
151 cluster_id, repo_type, name, url))
152
153 # sync local dir
154 self.fs.sync(from_path=cluster_id)
155
156 # init_env
157 paths, env = self._init_paths_env(
158 cluster_name=cluster_id, create_if_not_exist=True
159 )
160
161 # helm repo update
162 command = "{} repo update".format(
163 self._helm_command
164 )
165 self.log.debug("updating repo: {}".format(command))
166 await self._local_async_exec(command=command, raise_exception_on_error=False, env=env)
167
168 # helm repo add name url
169 command = "{} repo add {} {}".format(
170 self._helm_command, name, url
171 )
172 self.log.debug("adding repo: {}".format(command))
173 await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
174
175 # sync fs
176 self.fs.reverse_sync(from_path=cluster_id)
177
178 async def repo_list(self, cluster_uuid: str) -> list:
179 """
180 Get the list of registered repositories
181
182 :return: list of registered repositories: [ (name, url) .... ]
183 """
184
185 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
186 self.log.debug("list repositories for cluster {}".format(cluster_id))
187
188 # sync local dir
189 self.fs.sync(from_path=cluster_id)
190
191 # config filename
192 paths, env = self._init_paths_env(
193 cluster_name=cluster_id, create_if_not_exist=True
194 )
195
196 command = "{} repo list --output yaml".format(
197 self._helm_command
198 )
199
200 # Set exception to false because if there are no repos just want an empty list
201 output, _rc = await self._local_async_exec(
202 command=command, raise_exception_on_error=False, env=env
203 )
204
205 # sync fs
206 self.fs.reverse_sync(from_path=cluster_id)
207
208 if _rc == 0:
209 if output and len(output) > 0:
210 repos = yaml.load(output, Loader=yaml.SafeLoader)
211 # unify format between helm2 and helm3 setting all keys lowercase
212 return self._lower_keys_list(repos)
213 else:
214 return []
215 else:
216 return []
217
218 async def repo_remove(self, cluster_uuid: str, name: str):
219
220 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
221 self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
222
223 # sync local dir
224 self.fs.sync(from_path=cluster_id)
225
226 # init env, paths
227 paths, env = self._init_paths_env(
228 cluster_name=cluster_id, create_if_not_exist=True
229 )
230
231 command = "{} repo remove {}".format(
232 self._helm_command, name
233 )
234 await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
235
236 # sync fs
237 self.fs.reverse_sync(from_path=cluster_id)
238
239 async def reset(
240 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False, **kwargs
241 ) -> bool:
242 """Reset a cluster
243
244 Resets the Kubernetes cluster by removing the helm deployment that represents it.
245
246 :param cluster_uuid: The UUID of the cluster to reset
247 :param force: Boolean to force the reset
248 :param uninstall_sw: Boolean to force the reset
249 :param kwargs: Additional parameters (None yet)
250 :return: Returns True if successful or raises an exception.
251 """
252 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
253 self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
254 .format(cluster_id, uninstall_sw))
255
256 # sync local dir
257 self.fs.sync(from_path=cluster_id)
258
259 # uninstall releases if needed.
260 if uninstall_sw:
261 releases = await self.instances_list(cluster_uuid=cluster_uuid)
262 if len(releases) > 0:
263 if force:
264 for r in releases:
265 try:
266 kdu_instance = r.get("name")
267 chart = r.get("chart")
268 self.log.debug(
269 "Uninstalling {} -> {}".format(chart, kdu_instance)
270 )
271 await self.uninstall(
272 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
273 )
274 except Exception as e:
275 # will not raise exception as it was found
276 # that in some cases of previously installed helm releases it
277 # raised an error
278 self.log.warn(
279 "Error uninstalling release {}: {}".format(kdu_instance, e)
280 )
281 else:
282 msg = (
283 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
284 ).format(cluster_id)
285 self.log.warn(msg)
286 uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
287
288 if uninstall_sw:
289 await self._uninstall_sw(cluster_id, namespace)
290
291 # delete cluster directory
292 self.log.debug("Removing directory {}".format(cluster_id))
293 self.fs.file_delete(cluster_id, ignore_non_exist=True)
294 # Remove also local directorio if still exist
295 direct = self.fs.path + "/" + cluster_id
296 shutil.rmtree(direct, ignore_errors=True)
297
298 return True
299
300 async def _install_impl(
301 self,
302 cluster_id: str,
303 kdu_model: str,
304 paths: dict,
305 env: dict,
306 kdu_instance: str,
307 atomic: bool = True,
308 timeout: float = 300,
309 params: dict = None,
310 db_dict: dict = None,
311 kdu_name: str = None,
312 namespace: str = None,
313 ):
314 # params to str
315 params_str, file_to_delete = self._params_to_file_option(
316 cluster_id=cluster_id, params=params
317 )
318
319 # version
320 version = None
321 if ":" in kdu_model:
322 parts = kdu_model.split(sep=":")
323 if len(parts) == 2:
324 version = str(parts[1])
325 kdu_model = parts[0]
326
327 command = self._get_install_command(kdu_model, kdu_instance, namespace,
328 params_str, version, atomic, timeout)
329
330 self.log.debug("installing: {}".format(command))
331
332 if atomic:
333 # exec helm in a task
334 exec_task = asyncio.ensure_future(
335 coro_or_future=self._local_async_exec(
336 command=command, raise_exception_on_error=False, env=env
337 )
338 )
339
340 # write status in another task
341 status_task = asyncio.ensure_future(
342 coro_or_future=self._store_status(
343 cluster_id=cluster_id,
344 kdu_instance=kdu_instance,
345 namespace=namespace,
346 db_dict=db_dict,
347 operation="install",
348 run_once=False,
349 )
350 )
351
352 # wait for execution task
353 await asyncio.wait([exec_task])
354
355 # cancel status task
356 status_task.cancel()
357
358 output, rc = exec_task.result()
359
360 else:
361
362 output, rc = await self._local_async_exec(
363 command=command, raise_exception_on_error=False, env=env
364 )
365
366 # remove temporal values yaml file
367 if file_to_delete:
368 os.remove(file_to_delete)
369
370 # write final status
371 await self._store_status(
372 cluster_id=cluster_id,
373 kdu_instance=kdu_instance,
374 namespace=namespace,
375 db_dict=db_dict,
376 operation="install",
377 run_once=True,
378 check_every=0,
379 )
380
381 if rc != 0:
382 msg = "Error executing command: {}\nOutput: {}".format(command, output)
383 self.log.error(msg)
384 raise K8sException(msg)
385
386 async def upgrade(
387 self,
388 cluster_uuid: str,
389 kdu_instance: str,
390 kdu_model: str = None,
391 atomic: bool = True,
392 timeout: float = 300,
393 params: dict = None,
394 db_dict: dict = None,
395 ):
396 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
397 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
398
399 # sync local dir
400 self.fs.sync(from_path=cluster_id)
401
402 # look for instance to obtain namespace
403 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
404 if not instance_info:
405 raise K8sException("kdu_instance {} not found".format(kdu_instance))
406
407 # init env, paths
408 paths, env = self._init_paths_env(
409 cluster_name=cluster_id, create_if_not_exist=True
410 )
411
412 # params to str
413 params_str, file_to_delete = self._params_to_file_option(
414 cluster_id=cluster_id, params=params
415 )
416
417 # version
418 version = None
419 if ":" in kdu_model:
420 parts = kdu_model.split(sep=":")
421 if len(parts) == 2:
422 version = str(parts[1])
423 kdu_model = parts[0]
424
425 command = self._get_upgrade_command(kdu_model, kdu_instance, instance_info["namespace"],
426 params_str, version, atomic, timeout)
427
428 self.log.debug("upgrading: {}".format(command))
429
430 if atomic:
431
432 # exec helm in a task
433 exec_task = asyncio.ensure_future(
434 coro_or_future=self._local_async_exec(
435 command=command, raise_exception_on_error=False, env=env
436 )
437 )
438 # write status in another task
439 status_task = asyncio.ensure_future(
440 coro_or_future=self._store_status(
441 cluster_id=cluster_id,
442 kdu_instance=kdu_instance,
443 namespace=instance_info["namespace"],
444 db_dict=db_dict,
445 operation="upgrade",
446 run_once=False,
447 )
448 )
449
450 # wait for execution task
451 await asyncio.wait([exec_task])
452
453 # cancel status task
454 status_task.cancel()
455 output, rc = exec_task.result()
456
457 else:
458
459 output, rc = await self._local_async_exec(
460 command=command, raise_exception_on_error=False, env=env
461 )
462
463 # remove temporal values yaml file
464 if file_to_delete:
465 os.remove(file_to_delete)
466
467 # write final status
468 await self._store_status(
469 cluster_id=cluster_id,
470 kdu_instance=kdu_instance,
471 namespace=instance_info["namespace"],
472 db_dict=db_dict,
473 operation="upgrade",
474 run_once=True,
475 check_every=0,
476 )
477
478 if rc != 0:
479 msg = "Error executing command: {}\nOutput: {}".format(command, output)
480 self.log.error(msg)
481 raise K8sException(msg)
482
483 # sync fs
484 self.fs.reverse_sync(from_path=cluster_id)
485
486 # return new revision number
487 instance = await self.get_instance_info(
488 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
489 )
490 if instance:
491 revision = int(instance.get("revision"))
492 self.log.debug("New revision: {}".format(revision))
493 return revision
494 else:
495 return 0
496
497 async def scale(
498 self,
499 kdu_instance: str,
500 scale: int,
501 resource_name: str,
502 total_timeout: float = 1800,
503 **kwargs,
504 ):
505 raise NotImplementedError("Method not implemented")
506
507 async def get_scale_count(
508 self,
509 resource_name: str,
510 kdu_instance: str,
511 **kwargs,
512 ):
513 raise NotImplementedError("Method not implemented")
514
515 async def rollback(
516 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
517 ):
518
519 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
520 self.log.debug(
521 "rollback kdu_instance {} to revision {} from cluster {}".format(
522 kdu_instance, revision, cluster_id
523 )
524 )
525
526 # sync local dir
527 self.fs.sync(from_path=cluster_id)
528
529 # look for instance to obtain namespace
530 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
531 if not instance_info:
532 raise K8sException("kdu_instance {} not found".format(kdu_instance))
533
534 # init env, paths
535 paths, env = self._init_paths_env(
536 cluster_name=cluster_id, create_if_not_exist=True
537 )
538
539 command = self._get_rollback_command(kdu_instance, instance_info["namespace"],
540 revision)
541
542 self.log.debug("rolling_back: {}".format(command))
543
544 # exec helm in a task
545 exec_task = asyncio.ensure_future(
546 coro_or_future=self._local_async_exec(
547 command=command, raise_exception_on_error=False, env=env
548 )
549 )
550 # write status in another task
551 status_task = asyncio.ensure_future(
552 coro_or_future=self._store_status(
553 cluster_id=cluster_id,
554 kdu_instance=kdu_instance,
555 namespace=instance_info["namespace"],
556 db_dict=db_dict,
557 operation="rollback",
558 run_once=False,
559 )
560 )
561
562 # wait for execution task
563 await asyncio.wait([exec_task])
564
565 # cancel status task
566 status_task.cancel()
567
568 output, rc = exec_task.result()
569
570 # write final status
571 await self._store_status(
572 cluster_id=cluster_id,
573 kdu_instance=kdu_instance,
574 namespace=instance_info["namespace"],
575 db_dict=db_dict,
576 operation="rollback",
577 run_once=True,
578 check_every=0,
579 )
580
581 if rc != 0:
582 msg = "Error executing command: {}\nOutput: {}".format(command, output)
583 self.log.error(msg)
584 raise K8sException(msg)
585
586 # sync fs
587 self.fs.reverse_sync(from_path=cluster_id)
588
589 # return new revision number
590 instance = await self.get_instance_info(
591 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
592 )
593 if instance:
594 revision = int(instance.get("revision"))
595 self.log.debug("New revision: {}".format(revision))
596 return revision
597 else:
598 return 0
599
600 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
601 """
602 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
603 (this call should happen after all _terminate-config-primitive_ of the VNF
604 are invoked).
605
606 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
607 :param kdu_instance: unique name for the KDU instance to be deleted
608 :param kwargs: Additional parameters (None yet)
609 :return: True if successful
610 """
611
612 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
613 self.log.debug(
614 "uninstall kdu_instance {} from cluster {}".format(
615 kdu_instance, cluster_id
616 )
617 )
618
619 # sync local dir
620 self.fs.sync(from_path=cluster_id)
621
622 # look for instance to obtain namespace
623 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
624 if not instance_info:
625 raise K8sException("kdu_instance {} not found".format(kdu_instance))
626
627 # init env, paths
628 paths, env = self._init_paths_env(
629 cluster_name=cluster_id, create_if_not_exist=True
630 )
631
632 command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
633 output, _rc = await self._local_async_exec(
634 command=command, raise_exception_on_error=True, env=env
635 )
636
637 # sync fs
638 self.fs.reverse_sync(from_path=cluster_id)
639
640 return self._output_to_table(output)
641
642 async def instances_list(self, cluster_uuid: str) -> list:
643 """
644 returns a list of deployed releases in a cluster
645
646 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
647 :return:
648 """
649
650 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
651 self.log.debug("list releases for cluster {}".format(cluster_id))
652
653 # sync local dir
654 self.fs.sync(from_path=cluster_id)
655
656 # execute internal command
657 result = await self._instances_list(cluster_id)
658
659 # sync fs
660 self.fs.reverse_sync(from_path=cluster_id)
661
662 return result
663
664 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
665 instances = await self.instances_list(cluster_uuid=cluster_uuid)
666 for instance in instances:
667 if instance.get("name") == kdu_instance:
668 return instance
669 self.log.debug("Instance {} not found".format(kdu_instance))
670 return None
671
672 async def exec_primitive(
673 self,
674 cluster_uuid: str = None,
675 kdu_instance: str = None,
676 primitive_name: str = None,
677 timeout: float = 300,
678 params: dict = None,
679 db_dict: dict = None,
680 **kwargs,
681 ) -> str:
682 """Exec primitive (Juju action)
683
684 :param cluster_uuid: The UUID of the cluster or namespace:cluster
685 :param kdu_instance: The unique name of the KDU instance
686 :param primitive_name: Name of action that will be executed
687 :param timeout: Timeout for action execution
688 :param params: Dictionary of all the parameters needed for the action
689 :db_dict: Dictionary for any additional data
690 :param kwargs: Additional parameters (None yet)
691
692 :return: Returns the output of the action
693 """
694 raise K8sException(
695 "KDUs deployed with Helm don't support actions "
696 "different from rollback, upgrade and status"
697 )
698
699 async def get_services(self,
700 cluster_uuid: str,
701 kdu_instance: str,
702 namespace: str) -> list:
703 """
704 Returns a list of services defined for the specified kdu instance.
705
706 :param cluster_uuid: UUID of a K8s cluster known by OSM
707 :param kdu_instance: unique name for the KDU instance
708 :param namespace: K8s namespace used by the KDU instance
709 :return: If successful, it will return a list of services, Each service
710 can have the following data:
711 - `name` of the service
712 - `type` type of service in the k8 cluster
713 - `ports` List of ports offered by the service, for each port includes at least
714 name, port, protocol
715 - `cluster_ip` Internal ip to be used inside k8s cluster
716 - `external_ip` List of external ips (in case they are available)
717 """
718
719 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
720 self.log.debug(
721 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
722 cluster_uuid, kdu_instance
723 )
724 )
725
726 # sync local dir
727 self.fs.sync(from_path=cluster_id)
728
729 # get list of services names for kdu
730 service_names = await self._get_services(cluster_id, kdu_instance, namespace)
731
732 service_list = []
733 for service in service_names:
734 service = await self._get_service(cluster_id, service, namespace)
735 service_list.append(service)
736
737 # sync fs
738 self.fs.reverse_sync(from_path=cluster_id)
739
740 return service_list
741
742 async def get_service(self,
743 cluster_uuid: str,
744 service_name: str,
745 namespace: str) -> object:
746
747 self.log.debug(
748 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
749 service_name, namespace, cluster_uuid)
750 )
751
752 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
753
754 # sync local dir
755 self.fs.sync(from_path=cluster_id)
756
757 service = await self._get_service(cluster_id, service_name, namespace)
758
759 # sync fs
760 self.fs.reverse_sync(from_path=cluster_id)
761
762 return service
763
764 async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
765 """
766 This call would retrieve tha current state of a given KDU instance. It would be
767 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
768 values_ of the configuration parameters applied to a given instance. This call
769 would be based on the `status` call.
770
771 :param cluster_uuid: UUID of a K8s cluster known by OSM
772 :param kdu_instance: unique name for the KDU instance
773 :param kwargs: Additional parameters (None yet)
774 :return: If successful, it will return the following vector of arguments:
775 - K8s `namespace` in the cluster where the KDU lives
776 - `state` of the KDU instance. It can be:
777 - UNKNOWN
778 - DEPLOYED
779 - DELETED
780 - SUPERSEDED
781 - FAILED or
782 - DELETING
783 - List of `resources` (objects) that this release consists of, sorted by kind,
784 and the status of those resources
785 - Last `deployment_time`.
786
787 """
788 self.log.debug(
789 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
790 cluster_uuid, kdu_instance
791 )
792 )
793
794 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
795
796 # sync local dir
797 self.fs.sync(from_path=cluster_id)
798
799 # get instance: needed to obtain namespace
800 instances = await self._instances_list(cluster_id=cluster_id)
801 for instance in instances:
802 if instance.get("name") == kdu_instance:
803 break
804 else:
805 # instance does not exist
806 raise K8sException("Instance name: {} not found in cluster: {}".format(
807 kdu_instance, cluster_id))
808
809 status = await self._status_kdu(
810 cluster_id=cluster_id,
811 kdu_instance=kdu_instance,
812 namespace=instance["namespace"],
813 show_error_log=True,
814 return_text=True,
815 )
816
817 # sync fs
818 self.fs.reverse_sync(from_path=cluster_id)
819
820 return status
821
822 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
823
824 self.log.debug(
825 "inspect kdu_model values {} from (optional) repo: {}".format(
826 kdu_model, repo_url
827 )
828 )
829
830 return await self._exec_inspect_comand(
831 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
832 )
833
834 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
835
836 self.log.debug(
837 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
838 )
839
840 return await self._exec_inspect_comand(
841 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
842 )
843
844 async def synchronize_repos(self, cluster_uuid: str):
845
846 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
847 try:
848 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
849 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
850
851 local_repo_list = await self.repo_list(cluster_uuid)
852 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
853
854 deleted_repo_list = []
855 added_repo_dict = {}
856
857 # iterate over the list of repos in the database that should be
858 # added if not present
859 for repo_name, db_repo in db_repo_dict.items():
860 try:
861 # check if it is already present
862 curr_repo_url = local_repo_dict.get(db_repo["name"])
863 repo_id = db_repo.get("_id")
864 if curr_repo_url != db_repo["url"]:
865 if curr_repo_url:
866 self.log.debug("repo {} url changed, delete and and again".format(
867 db_repo["url"]))
868 await self.repo_remove(cluster_uuid, db_repo["name"])
869 deleted_repo_list.append(repo_id)
870
871 # add repo
872 self.log.debug("add repo {}".format(db_repo["name"]))
873 await self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"])
874 added_repo_dict[repo_id] = db_repo["name"]
875 except Exception as e:
876 raise K8sException(
877 "Error adding repo id: {}, err_msg: {} ".format(
878 repo_id, repr(e)
879 )
880 )
881
882 # Delete repos that are present but not in nbi_list
883 for repo_name in local_repo_dict:
884 if not db_repo_dict.get(repo_name) and repo_name != "stable":
885 self.log.debug("delete repo {}".format(repo_name))
886 try:
887 await self.repo_remove(cluster_uuid, repo_name)
888 deleted_repo_list.append(repo_name)
889 except Exception as e:
890 self.warning(
891 "Error deleting repo, name: {}, err_msg: {}".format(
892 repo_name, str(e)
893 )
894 )
895
896 return deleted_repo_list, added_repo_dict
897
898 except K8sException:
899 raise
900 except Exception as e:
901 # Do not raise errors synchronizing repos
902 self.log.error("Error synchronizing repos: {}".format(e))
903 raise Exception("Error synchronizing repos: {}".format(e))
904
905 def _get_db_repos_dict(self, repo_ids: list):
906 db_repos_dict = {}
907 for repo_id in repo_ids:
908 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
909 db_repos_dict[db_repo["name"]] = db_repo
910 return db_repos_dict
911
912 """
913 ####################################################################################
914 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
915 ####################################################################################
916 """
917
918 @abc.abstractmethod
919 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
920 """
921 Creates and returns base cluster and kube dirs and returns them.
922 Also created helm3 dirs according to new directory specification, paths are
923 not returned but assigned to helm environment variables
924
925 :param cluster_name: cluster_name
926 :return: Dictionary with config_paths and dictionary with helm environment variables
927 """
928
929 @abc.abstractmethod
930 async def _cluster_init(self, cluster_id, namespace, paths, env):
931 """
932 Implements the helm version dependent cluster initialization
933 """
934
935 @abc.abstractmethod
936 async def _instances_list(self, cluster_id):
937 """
938 Implements the helm version dependent helm instances list
939 """
940
941 @abc.abstractmethod
942 async def _get_services(self, cluster_id, kdu_instance, namespace):
943 """
944 Implements the helm version dependent method to obtain services from a helm instance
945 """
946
947 @abc.abstractmethod
948 async def _status_kdu(self, cluster_id: str, kdu_instance: str, namespace: str = None,
949 show_error_log: bool = False, return_text: bool = False):
950 """
951 Implements the helm version dependent method to obtain status of a helm instance
952 """
953
954 @abc.abstractmethod
955 def _get_install_command(self, kdu_model, kdu_instance, namespace,
956 params_str, version, atomic, timeout) -> str:
957 """
958 Obtain command to be executed to delete the indicated instance
959 """
960
961 @abc.abstractmethod
962 def _get_upgrade_command(self, kdu_model, kdu_instance, namespace,
963 params_str, version, atomic, timeout) -> str:
964 """
965 Obtain command to be executed to upgrade the indicated instance
966 """
967
968 @abc.abstractmethod
969 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
970 """
971 Obtain command to be executed to rollback the indicated instance
972 """
973
974 @abc.abstractmethod
975 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
976 """
977 Obtain command to be executed to delete the indicated instance
978 """
979
980 @abc.abstractmethod
981 def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
982 version: str):
983 """
984 Obtain command to be executed to obtain information about the kdu
985 """
986
987 @abc.abstractmethod
988 async def _uninstall_sw(self, cluster_id: str, namespace: str):
989 """
990 Method call to uninstall cluster software for helm. This method is dependent
991 of helm version
992 For Helm v2 it will be called when Tiller must be uninstalled
993 For Helm v3 it does nothing and does not need to be callled
994 """
995
996 @abc.abstractmethod
997 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
998 """
999 Obtains the cluster repos identifiers
1000 """
1001
1002 """
1003 ####################################################################################
1004 ################################### P R I V A T E ##################################
1005 ####################################################################################
1006 """
1007
1008 @staticmethod
1009 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1010 if os.path.exists(filename):
1011 return True
1012 else:
1013 msg = "File {} does not exist".format(filename)
1014 if exception_if_not_exists:
1015 raise K8sException(msg)
1016
1017 @staticmethod
1018 def _remove_multiple_spaces(strobj):
1019 strobj = strobj.strip()
1020 while " " in strobj:
1021 strobj = strobj.replace(" ", " ")
1022 return strobj
1023
1024 @staticmethod
1025 def _output_to_lines(output: str) -> list:
1026 output_lines = list()
1027 lines = output.splitlines(keepends=False)
1028 for line in lines:
1029 line = line.strip()
1030 if len(line) > 0:
1031 output_lines.append(line)
1032 return output_lines
1033
1034 @staticmethod
1035 def _output_to_table(output: str) -> list:
1036 output_table = list()
1037 lines = output.splitlines(keepends=False)
1038 for line in lines:
1039 line = line.replace("\t", " ")
1040 line_list = list()
1041 output_table.append(line_list)
1042 cells = line.split(sep=" ")
1043 for cell in cells:
1044 cell = cell.strip()
1045 if len(cell) > 0:
1046 line_list.append(cell)
1047 return output_table
1048
1049 @staticmethod
1050 def _parse_services(output: str) -> list:
1051 lines = output.splitlines(keepends=False)
1052 services = []
1053 for line in lines:
1054 line = line.replace("\t", " ")
1055 cells = line.split(sep=" ")
1056 if len(cells) > 0 and cells[0].startswith("service/"):
1057 elems = cells[0].split(sep="/")
1058 if len(elems) > 1:
1059 services.append(elems[1])
1060 return services
1061
1062 @staticmethod
1063 def _get_deep(dictionary: dict, members: tuple):
1064 target = dictionary
1065 value = None
1066 try:
1067 for m in members:
1068 value = target.get(m)
1069 if not value:
1070 return None
1071 else:
1072 target = value
1073 except Exception:
1074 pass
1075 return value
1076
1077 # find key:value in several lines
1078 @staticmethod
1079 def _find_in_lines(p_lines: list, p_key: str) -> str:
1080 for line in p_lines:
1081 try:
1082 if line.startswith(p_key + ":"):
1083 parts = line.split(":")
1084 the_value = parts[1].strip()
1085 return the_value
1086 except Exception:
1087 # ignore it
1088 pass
1089 return None
1090
1091 @staticmethod
1092 def _lower_keys_list(input_list: list):
1093 """
1094 Transform the keys in a list of dictionaries to lower case and returns a new list
1095 of dictionaries
1096 """
1097 new_list = []
1098 for dictionary in input_list:
1099 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1100 new_list.append(new_dict)
1101 return new_list
1102
1103 def _local_exec(self, command: str) -> (str, int):
1104 command = self._remove_multiple_spaces(command)
1105 self.log.debug("Executing sync local command: {}".format(command))
1106 # raise exception if fails
1107 output = ""
1108 try:
1109 output = subprocess.check_output(
1110 command, shell=True, universal_newlines=True
1111 )
1112 return_code = 0
1113 self.log.debug(output)
1114 except Exception:
1115 return_code = 1
1116
1117 return output, return_code
1118
1119 async def _local_async_exec(
1120 self,
1121 command: str,
1122 raise_exception_on_error: bool = False,
1123 show_error_log: bool = True,
1124 encode_utf8: bool = False,
1125 env: dict = None
1126 ) -> (str, int):
1127
1128 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1129 self.log.debug("Executing async local command: {}, env: {}".format(command, env))
1130
1131 # split command
1132 command = shlex.split(command)
1133
1134 environ = os.environ.copy()
1135 if env:
1136 environ.update(env)
1137
1138 try:
1139 process = await asyncio.create_subprocess_exec(
1140 *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1141 env=environ
1142 )
1143
1144 # wait for command terminate
1145 stdout, stderr = await process.communicate()
1146
1147 return_code = process.returncode
1148
1149 output = ""
1150 if stdout:
1151 output = stdout.decode("utf-8").strip()
1152 # output = stdout.decode()
1153 if stderr:
1154 output = stderr.decode("utf-8").strip()
1155 # output = stderr.decode()
1156
1157 if return_code != 0 and show_error_log:
1158 self.log.debug(
1159 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1160 )
1161 else:
1162 self.log.debug("Return code: {}".format(return_code))
1163
1164 if raise_exception_on_error and return_code != 0:
1165 raise K8sException(output)
1166
1167 if encode_utf8:
1168 output = output.encode("utf-8").strip()
1169 output = str(output).replace("\\n", "\n")
1170
1171 return output, return_code
1172
1173 except asyncio.CancelledError:
1174 raise
1175 except K8sException:
1176 raise
1177 except Exception as e:
1178 msg = "Exception executing command: {} -> {}".format(command, e)
1179 self.log.error(msg)
1180 if raise_exception_on_error:
1181 raise K8sException(e) from e
1182 else:
1183 return "", -1
1184
1185 async def _local_async_exec_pipe(self,
1186 command1: str,
1187 command2: str,
1188 raise_exception_on_error: bool = True,
1189 show_error_log: bool = True,
1190 encode_utf8: bool = False,
1191 env: dict = None):
1192
1193 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1194 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1195 command = "{} | {}".format(command1, command2)
1196 self.log.debug("Executing async local command: {}, env: {}".format(command, env))
1197
1198 # split command
1199 command1 = shlex.split(command1)
1200 command2 = shlex.split(command2)
1201
1202 environ = os.environ.copy()
1203 if env:
1204 environ.update(env)
1205
1206 try:
1207 read, write = os.pipe()
1208 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1209 os.close(write)
1210 process_2 = await asyncio.create_subprocess_exec(*command2, stdin=read,
1211 stdout=asyncio.subprocess.PIPE,
1212 env=environ)
1213 os.close(read)
1214 stdout, stderr = await process_2.communicate()
1215
1216 return_code = process_2.returncode
1217
1218 output = ""
1219 if stdout:
1220 output = stdout.decode("utf-8").strip()
1221 # output = stdout.decode()
1222 if stderr:
1223 output = stderr.decode("utf-8").strip()
1224 # output = stderr.decode()
1225
1226 if return_code != 0 and show_error_log:
1227 self.log.debug(
1228 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1229 )
1230 else:
1231 self.log.debug("Return code: {}".format(return_code))
1232
1233 if raise_exception_on_error and return_code != 0:
1234 raise K8sException(output)
1235
1236 if encode_utf8:
1237 output = output.encode("utf-8").strip()
1238 output = str(output).replace("\\n", "\n")
1239
1240 return output, return_code
1241 except asyncio.CancelledError:
1242 raise
1243 except K8sException:
1244 raise
1245 except Exception as e:
1246 msg = "Exception executing command: {} -> {}".format(command, e)
1247 self.log.error(msg)
1248 if raise_exception_on_error:
1249 raise K8sException(e) from e
1250 else:
1251 return "", -1
1252
1253 async def _get_service(self, cluster_id, service_name, namespace):
1254 """
1255 Obtains the data of the specified service in the k8cluster.
1256
1257 :param cluster_id: id of a K8s cluster known by OSM
1258 :param service_name: name of the K8s service in the specified namespace
1259 :param namespace: K8s namespace used by the KDU instance
1260 :return: If successful, it will return a service with the following data:
1261 - `name` of the service
1262 - `type` type of service in the k8 cluster
1263 - `ports` List of ports offered by the service, for each port includes at least
1264 name, port, protocol
1265 - `cluster_ip` Internal ip to be used inside k8s cluster
1266 - `external_ip` List of external ips (in case they are available)
1267 """
1268
1269 # init config, env
1270 paths, env = self._init_paths_env(
1271 cluster_name=cluster_id, create_if_not_exist=True
1272 )
1273
1274 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1275 self.kubectl_command, paths["kube_config"], namespace, service_name
1276 )
1277
1278 output, _rc = await self._local_async_exec(
1279 command=command, raise_exception_on_error=True, env=env
1280 )
1281
1282 data = yaml.load(output, Loader=yaml.SafeLoader)
1283
1284 service = {
1285 "name": service_name,
1286 "type": self._get_deep(data, ("spec", "type")),
1287 "ports": self._get_deep(data, ("spec", "ports")),
1288 "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
1289 }
1290 if service["type"] == "LoadBalancer":
1291 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1292 ip_list = [elem["ip"] for elem in ip_map_list]
1293 service["external_ip"] = ip_list
1294
1295 return service
1296
1297 async def _exec_inspect_comand(
1298 self, inspect_command: str, kdu_model: str, repo_url: str = None
1299 ):
1300 """
1301 Obtains information about a kdu, no cluster (no env)
1302 """
1303
1304 repo_str = ""
1305 if repo_url:
1306 repo_str = " --repo {}".format(repo_url)
1307
1308 idx = kdu_model.find("/")
1309 if idx >= 0:
1310 idx += 1
1311 kdu_model = kdu_model[idx:]
1312
1313 version = ""
1314 if ":" in kdu_model:
1315 parts = kdu_model.split(sep=":")
1316 if len(parts) == 2:
1317 version = "--version {}".format(str(parts[1]))
1318 kdu_model = parts[0]
1319
1320 full_command = self._get_inspect_command(inspect_command, kdu_model, repo_str, version)
1321 output, _rc = await self._local_async_exec(
1322 command=full_command, encode_utf8=True
1323 )
1324
1325 return output
1326
1327 async def _store_status(
1328 self,
1329 cluster_id: str,
1330 operation: str,
1331 kdu_instance: str,
1332 namespace: str = None,
1333 check_every: float = 10,
1334 db_dict: dict = None,
1335 run_once: bool = False,
1336 ):
1337 while True:
1338 try:
1339 await asyncio.sleep(check_every)
1340 detailed_status = await self._status_kdu(
1341 cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=namespace,
1342 return_text=False
1343 )
1344 status = detailed_status.get("info").get("description")
1345 self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
1346 # write status to db
1347 result = await self.write_app_status_to_db(
1348 db_dict=db_dict,
1349 status=str(status),
1350 detailed_status=str(detailed_status),
1351 operation=operation,
1352 )
1353 if not result:
1354 self.log.info("Error writing in database. Task exiting...")
1355 return
1356 except asyncio.CancelledError:
1357 self.log.debug("Task cancelled")
1358 return
1359 except Exception as e:
1360 self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
1361 pass
1362 finally:
1363 if run_once:
1364 return
1365
1366 # params for use in -f file
1367 # returns values file option and filename (in order to delete it at the end)
1368 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1369
1370 if params and len(params) > 0:
1371 self._init_paths_env(
1372 cluster_name=cluster_id, create_if_not_exist=True
1373 )
1374
1375 def get_random_number():
1376 r = random.randrange(start=1, stop=99999999)
1377 s = str(r)
1378 while len(s) < 10:
1379 s = "0" + s
1380 return s
1381
1382 params2 = dict()
1383 for key in params:
1384 value = params.get(key)
1385 if "!!yaml" in str(value):
1386 value = yaml.load(value[7:])
1387 params2[key] = value
1388
1389 values_file = get_random_number() + ".yaml"
1390 with open(values_file, "w") as stream:
1391 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1392
1393 return "-f {}".format(values_file), values_file
1394
1395 return "", None
1396
1397 # params for use in --set option
1398 @staticmethod
1399 def _params_to_set_option(params: dict) -> str:
1400 params_str = ""
1401 if params and len(params) > 0:
1402 start = True
1403 for key in params:
1404 value = params.get(key, None)
1405 if value is not None:
1406 if start:
1407 params_str += "--set "
1408 start = False
1409 else:
1410 params_str += ","
1411 params_str += "{}={}".format(key, value)
1412 return params_str
1413
1414 @staticmethod
1415 def generate_kdu_instance_name(**kwargs):
1416 chart_name = kwargs["kdu_model"]
1417 # check embeded chart (file or dir)
1418 if chart_name.startswith("/"):
1419 # extract file or directory name
1420 chart_name = chart_name[chart_name.rfind("/") + 1:]
1421 # check URL
1422 elif "://" in chart_name:
1423 # extract last portion of URL
1424 chart_name = chart_name[chart_name.rfind("/") + 1:]
1425
1426 name = ""
1427 for c in chart_name:
1428 if c.isalpha() or c.isnumeric():
1429 name += c
1430 else:
1431 name += "-"
1432 if len(name) > 35:
1433 name = name[0:35]
1434
1435 # if does not start with alpha character, prefix 'a'
1436 if not name[0].isalpha():
1437 name = "a" + name
1438
1439 name += "-"
1440
1441 def get_random_number():
1442 r = random.randrange(start=1, stop=99999999)
1443 s = str(r)
1444 s = s.rjust(10, "0")
1445 return s
1446
1447 name = name + get_random_number()
1448 return name.lower()