Feature 10239: Distributed VCA
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 import random
25 import time
26 import shlex
27 import shutil
28 import stat
29 import subprocess
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.exceptions import K8sException
35 from n2vc.k8s_conn import K8sConnector
36
37
38 class K8sHelmBaseConnector(K8sConnector):
39
40 """
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
44 """
45 service_account = "osm"
46 _STABLE_REPO_URL = "https://charts.helm.sh/stable"
47
48 def __init__(
49 self,
50 fs: object,
51 db: object,
52 kubectl_command: str = "/usr/bin/kubectl",
53 helm_command: str = "/usr/bin/helm",
54 log: object = None,
55 on_update_db=None,
56 vca_config: dict = None,
57 ):
58 """
59
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
64 :param log: logger
65 :param on_update_db: callback called when k8s connector updates database
66 """
67
68 # parent class
69 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
70
71 self.log.info("Initializing K8S Helm connector")
72
73 # random numbers for release name generation
74 random.seed(time.time())
75
76 # the file system
77 self.fs = fs
78
79 # exception if kubectl is not installed
80 self.kubectl_command = kubectl_command
81 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
82
83 # exception if helm is not installed
84 self._helm_command = helm_command
85 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
86
87 # obtain stable repo url from config or apply default
88 if not vca_config or not vca_config.get("stablerepourl"):
89 self._stable_repo_url = self._STABLE_REPO_URL
90 else:
91 self._stable_repo_url = vca_config.get("stablerepourl")
92
93 @staticmethod
94 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
95 """
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
98 """
99 namespace, _, cluster_id = cluster_uuid.rpartition(':')
100 return namespace, cluster_id
101
102 async def init_env(
103 self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None, **kwargs,
104 ) -> (str, bool):
105 """
106 It prepares a given K8s cluster environment to run Charts
107
108 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
109 '.kube/config'
110 :param namespace: optional namespace to be used for helm. By default,
111 'kube-system' will be used
112 :param reuse_cluster_uuid: existing cluster uuid for reuse
113 :param kwargs: Additional parameters (None yet)
114 :return: uuid of the K8s cluster and True if connector has installed some
115 software in the cluster
116 (on error, an exception will be raised)
117 """
118
119 if reuse_cluster_uuid:
120 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
121 namespace = namespace_ or namespace
122 else:
123 cluster_id = str(uuid4())
124 cluster_uuid = "{}:{}".format(namespace, cluster_id)
125
126 self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
127
128 paths, env = self._init_paths_env(
129 cluster_name=cluster_id, create_if_not_exist=True
130 )
131 mode = stat.S_IRUSR | stat.S_IWUSR
132 with open(paths["kube_config"], "w", mode) as f:
133 f.write(k8s_creds)
134 os.chmod(paths["kube_config"], 0o600)
135
136 # Code with initialization specific of helm version
137 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
138
139 # sync fs with local data
140 self.fs.reverse_sync(from_path=cluster_id)
141
142 self.log.info("Cluster {} initialized".format(cluster_id))
143
144 return cluster_uuid, n2vc_installed_sw
145
146 async def repo_add(
147 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
148 ):
149 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
150 self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
151 cluster_id, repo_type, name, url))
152
153 # sync local dir
154 self.fs.sync(from_path=cluster_id)
155
156 # init_env
157 paths, env = self._init_paths_env(
158 cluster_name=cluster_id, create_if_not_exist=True
159 )
160
161 # helm repo update
162 command = "{} repo update".format(
163 self._helm_command
164 )
165 self.log.debug("updating repo: {}".format(command))
166 await self._local_async_exec(command=command, raise_exception_on_error=False, env=env)
167
168 # helm repo add name url
169 command = "{} repo add {} {}".format(
170 self._helm_command, name, url
171 )
172 self.log.debug("adding repo: {}".format(command))
173 await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
174
175 # sync fs
176 self.fs.reverse_sync(from_path=cluster_id)
177
178 async def repo_list(self, cluster_uuid: str) -> list:
179 """
180 Get the list of registered repositories
181
182 :return: list of registered repositories: [ (name, url) .... ]
183 """
184
185 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
186 self.log.debug("list repositories for cluster {}".format(cluster_id))
187
188 # sync local dir
189 self.fs.sync(from_path=cluster_id)
190
191 # config filename
192 paths, env = self._init_paths_env(
193 cluster_name=cluster_id, create_if_not_exist=True
194 )
195
196 command = "{} repo list --output yaml".format(
197 self._helm_command
198 )
199
200 # Set exception to false because if there are no repos just want an empty list
201 output, _rc = await self._local_async_exec(
202 command=command, raise_exception_on_error=False, env=env
203 )
204
205 # sync fs
206 self.fs.reverse_sync(from_path=cluster_id)
207
208 if _rc == 0:
209 if output and len(output) > 0:
210 repos = yaml.load(output, Loader=yaml.SafeLoader)
211 # unify format between helm2 and helm3 setting all keys lowercase
212 return self._lower_keys_list(repos)
213 else:
214 return []
215 else:
216 return []
217
218 async def repo_remove(self, cluster_uuid: str, name: str):
219
220 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
221 self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
222
223 # sync local dir
224 self.fs.sync(from_path=cluster_id)
225
226 # init env, paths
227 paths, env = self._init_paths_env(
228 cluster_name=cluster_id, create_if_not_exist=True
229 )
230
231 command = "{} repo remove {}".format(
232 self._helm_command, name
233 )
234 await self._local_async_exec(command=command, raise_exception_on_error=True, env=env)
235
236 # sync fs
237 self.fs.reverse_sync(from_path=cluster_id)
238
239 async def reset(
240 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False, **kwargs
241 ) -> bool:
242 """Reset a cluster
243
244 Resets the Kubernetes cluster by removing the helm deployment that represents it.
245
246 :param cluster_uuid: The UUID of the cluster to reset
247 :param force: Boolean to force the reset
248 :param uninstall_sw: Boolean to force the reset
249 :param kwargs: Additional parameters (None yet)
250 :return: Returns True if successful or raises an exception.
251 """
252 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
253 self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
254 .format(cluster_id, uninstall_sw))
255
256 # sync local dir
257 self.fs.sync(from_path=cluster_id)
258
259 # uninstall releases if needed.
260 if uninstall_sw:
261 releases = await self.instances_list(cluster_uuid=cluster_uuid)
262 if len(releases) > 0:
263 if force:
264 for r in releases:
265 try:
266 kdu_instance = r.get("name")
267 chart = r.get("chart")
268 self.log.debug(
269 "Uninstalling {} -> {}".format(chart, kdu_instance)
270 )
271 await self.uninstall(
272 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
273 )
274 except Exception as e:
275 # will not raise exception as it was found
276 # that in some cases of previously installed helm releases it
277 # raised an error
278 self.log.warn(
279 "Error uninstalling release {}: {}".format(kdu_instance, e)
280 )
281 else:
282 msg = (
283 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
284 ).format(cluster_id)
285 self.log.warn(msg)
286 uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
287
288 if uninstall_sw:
289 await self._uninstall_sw(cluster_id, namespace)
290
291 # delete cluster directory
292 self.log.debug("Removing directory {}".format(cluster_id))
293 self.fs.file_delete(cluster_id, ignore_non_exist=True)
294 # Remove also local directorio if still exist
295 direct = self.fs.path + "/" + cluster_id
296 shutil.rmtree(direct, ignore_errors=True)
297
298 return True
299
300 async def _install_impl(
301 self,
302 cluster_id: str,
303 kdu_model: str,
304 paths: dict,
305 env: dict,
306 kdu_instance: str,
307 atomic: bool = True,
308 timeout: float = 300,
309 params: dict = None,
310 db_dict: dict = None,
311 kdu_name: str = None,
312 namespace: str = None,
313 ):
314 # params to str
315 params_str, file_to_delete = self._params_to_file_option(
316 cluster_id=cluster_id, params=params
317 )
318
319 # version
320 version = None
321 if ":" in kdu_model:
322 parts = kdu_model.split(sep=":")
323 if len(parts) == 2:
324 version = str(parts[1])
325 kdu_model = parts[0]
326
327 command = self._get_install_command(kdu_model, kdu_instance, namespace,
328 params_str, version, atomic, timeout)
329
330 self.log.debug("installing: {}".format(command))
331
332 if atomic:
333 # exec helm in a task
334 exec_task = asyncio.ensure_future(
335 coro_or_future=self._local_async_exec(
336 command=command, raise_exception_on_error=False, env=env
337 )
338 )
339
340 # write status in another task
341 status_task = asyncio.ensure_future(
342 coro_or_future=self._store_status(
343 cluster_id=cluster_id,
344 kdu_instance=kdu_instance,
345 namespace=namespace,
346 db_dict=db_dict,
347 operation="install",
348 run_once=False,
349 )
350 )
351
352 # wait for execution task
353 await asyncio.wait([exec_task])
354
355 # cancel status task
356 status_task.cancel()
357
358 output, rc = exec_task.result()
359
360 else:
361
362 output, rc = await self._local_async_exec(
363 command=command, raise_exception_on_error=False, env=env
364 )
365
366 # remove temporal values yaml file
367 if file_to_delete:
368 os.remove(file_to_delete)
369
370 # write final status
371 await self._store_status(
372 cluster_id=cluster_id,
373 kdu_instance=kdu_instance,
374 namespace=namespace,
375 db_dict=db_dict,
376 operation="install",
377 run_once=True,
378 check_every=0,
379 )
380
381 if rc != 0:
382 msg = "Error executing command: {}\nOutput: {}".format(command, output)
383 self.log.error(msg)
384 raise K8sException(msg)
385
386 async def upgrade(
387 self,
388 cluster_uuid: str,
389 kdu_instance: str,
390 kdu_model: str = None,
391 atomic: bool = True,
392 timeout: float = 300,
393 params: dict = None,
394 db_dict: dict = None,
395 ):
396 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
397 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
398
399 # sync local dir
400 self.fs.sync(from_path=cluster_id)
401
402 # look for instance to obtain namespace
403 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
404 if not instance_info:
405 raise K8sException("kdu_instance {} not found".format(kdu_instance))
406
407 # init env, paths
408 paths, env = self._init_paths_env(
409 cluster_name=cluster_id, create_if_not_exist=True
410 )
411
412 # params to str
413 params_str, file_to_delete = self._params_to_file_option(
414 cluster_id=cluster_id, params=params
415 )
416
417 # version
418 version = None
419 if ":" in kdu_model:
420 parts = kdu_model.split(sep=":")
421 if len(parts) == 2:
422 version = str(parts[1])
423 kdu_model = parts[0]
424
425 command = self._get_upgrade_command(kdu_model, kdu_instance, instance_info["namespace"],
426 params_str, version, atomic, timeout)
427
428 self.log.debug("upgrading: {}".format(command))
429
430 if atomic:
431
432 # exec helm in a task
433 exec_task = asyncio.ensure_future(
434 coro_or_future=self._local_async_exec(
435 command=command, raise_exception_on_error=False, env=env
436 )
437 )
438 # write status in another task
439 status_task = asyncio.ensure_future(
440 coro_or_future=self._store_status(
441 cluster_id=cluster_id,
442 kdu_instance=kdu_instance,
443 namespace=instance_info["namespace"],
444 db_dict=db_dict,
445 operation="upgrade",
446 run_once=False,
447 )
448 )
449
450 # wait for execution task
451 await asyncio.wait([exec_task])
452
453 # cancel status task
454 status_task.cancel()
455 output, rc = exec_task.result()
456
457 else:
458
459 output, rc = await self._local_async_exec(
460 command=command, raise_exception_on_error=False, env=env
461 )
462
463 # remove temporal values yaml file
464 if file_to_delete:
465 os.remove(file_to_delete)
466
467 # write final status
468 await self._store_status(
469 cluster_id=cluster_id,
470 kdu_instance=kdu_instance,
471 namespace=instance_info["namespace"],
472 db_dict=db_dict,
473 operation="upgrade",
474 run_once=True,
475 check_every=0,
476 )
477
478 if rc != 0:
479 msg = "Error executing command: {}\nOutput: {}".format(command, output)
480 self.log.error(msg)
481 raise K8sException(msg)
482
483 # sync fs
484 self.fs.reverse_sync(from_path=cluster_id)
485
486 # return new revision number
487 instance = await self.get_instance_info(
488 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
489 )
490 if instance:
491 revision = int(instance.get("revision"))
492 self.log.debug("New revision: {}".format(revision))
493 return revision
494 else:
495 return 0
496
497 async def rollback(
498 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
499 ):
500
501 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
502 self.log.debug(
503 "rollback kdu_instance {} to revision {} from cluster {}".format(
504 kdu_instance, revision, cluster_id
505 )
506 )
507
508 # sync local dir
509 self.fs.sync(from_path=cluster_id)
510
511 # look for instance to obtain namespace
512 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
513 if not instance_info:
514 raise K8sException("kdu_instance {} not found".format(kdu_instance))
515
516 # init env, paths
517 paths, env = self._init_paths_env(
518 cluster_name=cluster_id, create_if_not_exist=True
519 )
520
521 command = self._get_rollback_command(kdu_instance, instance_info["namespace"],
522 revision)
523
524 self.log.debug("rolling_back: {}".format(command))
525
526 # exec helm in a task
527 exec_task = asyncio.ensure_future(
528 coro_or_future=self._local_async_exec(
529 command=command, raise_exception_on_error=False, env=env
530 )
531 )
532 # write status in another task
533 status_task = asyncio.ensure_future(
534 coro_or_future=self._store_status(
535 cluster_id=cluster_id,
536 kdu_instance=kdu_instance,
537 namespace=instance_info["namespace"],
538 db_dict=db_dict,
539 operation="rollback",
540 run_once=False,
541 )
542 )
543
544 # wait for execution task
545 await asyncio.wait([exec_task])
546
547 # cancel status task
548 status_task.cancel()
549
550 output, rc = exec_task.result()
551
552 # write final status
553 await self._store_status(
554 cluster_id=cluster_id,
555 kdu_instance=kdu_instance,
556 namespace=instance_info["namespace"],
557 db_dict=db_dict,
558 operation="rollback",
559 run_once=True,
560 check_every=0,
561 )
562
563 if rc != 0:
564 msg = "Error executing command: {}\nOutput: {}".format(command, output)
565 self.log.error(msg)
566 raise K8sException(msg)
567
568 # sync fs
569 self.fs.reverse_sync(from_path=cluster_id)
570
571 # return new revision number
572 instance = await self.get_instance_info(
573 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
574 )
575 if instance:
576 revision = int(instance.get("revision"))
577 self.log.debug("New revision: {}".format(revision))
578 return revision
579 else:
580 return 0
581
582 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
583 """
584 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
585 (this call should happen after all _terminate-config-primitive_ of the VNF
586 are invoked).
587
588 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
589 :param kdu_instance: unique name for the KDU instance to be deleted
590 :param kwargs: Additional parameters (None yet)
591 :return: True if successful
592 """
593
594 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
595 self.log.debug(
596 "uninstall kdu_instance {} from cluster {}".format(
597 kdu_instance, cluster_id
598 )
599 )
600
601 # sync local dir
602 self.fs.sync(from_path=cluster_id)
603
604 # look for instance to obtain namespace
605 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
606 if not instance_info:
607 raise K8sException("kdu_instance {} not found".format(kdu_instance))
608
609 # init env, paths
610 paths, env = self._init_paths_env(
611 cluster_name=cluster_id, create_if_not_exist=True
612 )
613
614 command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
615 output, _rc = await self._local_async_exec(
616 command=command, raise_exception_on_error=True, env=env
617 )
618
619 # sync fs
620 self.fs.reverse_sync(from_path=cluster_id)
621
622 return self._output_to_table(output)
623
624 async def instances_list(self, cluster_uuid: str) -> list:
625 """
626 returns a list of deployed releases in a cluster
627
628 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
629 :return:
630 """
631
632 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
633 self.log.debug("list releases for cluster {}".format(cluster_id))
634
635 # sync local dir
636 self.fs.sync(from_path=cluster_id)
637
638 # execute internal command
639 result = await self._instances_list(cluster_id)
640
641 # sync fs
642 self.fs.reverse_sync(from_path=cluster_id)
643
644 return result
645
646 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
647 instances = await self.instances_list(cluster_uuid=cluster_uuid)
648 for instance in instances:
649 if instance.get("name") == kdu_instance:
650 return instance
651 self.log.debug("Instance {} not found".format(kdu_instance))
652 return None
653
654 async def exec_primitive(
655 self,
656 cluster_uuid: str = None,
657 kdu_instance: str = None,
658 primitive_name: str = None,
659 timeout: float = 300,
660 params: dict = None,
661 db_dict: dict = None,
662 **kwargs,
663 ) -> str:
664 """Exec primitive (Juju action)
665
666 :param cluster_uuid: The UUID of the cluster or namespace:cluster
667 :param kdu_instance: The unique name of the KDU instance
668 :param primitive_name: Name of action that will be executed
669 :param timeout: Timeout for action execution
670 :param params: Dictionary of all the parameters needed for the action
671 :db_dict: Dictionary for any additional data
672 :param kwargs: Additional parameters (None yet)
673
674 :return: Returns the output of the action
675 """
676 raise K8sException(
677 "KDUs deployed with Helm don't support actions "
678 "different from rollback, upgrade and status"
679 )
680
681 async def get_services(self,
682 cluster_uuid: str,
683 kdu_instance: str,
684 namespace: str) -> list:
685 """
686 Returns a list of services defined for the specified kdu instance.
687
688 :param cluster_uuid: UUID of a K8s cluster known by OSM
689 :param kdu_instance: unique name for the KDU instance
690 :param namespace: K8s namespace used by the KDU instance
691 :return: If successful, it will return a list of services, Each service
692 can have the following data:
693 - `name` of the service
694 - `type` type of service in the k8 cluster
695 - `ports` List of ports offered by the service, for each port includes at least
696 name, port, protocol
697 - `cluster_ip` Internal ip to be used inside k8s cluster
698 - `external_ip` List of external ips (in case they are available)
699 """
700
701 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
702 self.log.debug(
703 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
704 cluster_uuid, kdu_instance
705 )
706 )
707
708 # sync local dir
709 self.fs.sync(from_path=cluster_id)
710
711 # get list of services names for kdu
712 service_names = await self._get_services(cluster_id, kdu_instance, namespace)
713
714 service_list = []
715 for service in service_names:
716 service = await self._get_service(cluster_id, service, namespace)
717 service_list.append(service)
718
719 # sync fs
720 self.fs.reverse_sync(from_path=cluster_id)
721
722 return service_list
723
724 async def get_service(self,
725 cluster_uuid: str,
726 service_name: str,
727 namespace: str) -> object:
728
729 self.log.debug(
730 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
731 service_name, namespace, cluster_uuid)
732 )
733
734 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
735
736 # sync local dir
737 self.fs.sync(from_path=cluster_id)
738
739 service = await self._get_service(cluster_id, service_name, namespace)
740
741 # sync fs
742 self.fs.reverse_sync(from_path=cluster_id)
743
744 return service
745
746 async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
747 """
748 This call would retrieve tha current state of a given KDU instance. It would be
749 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
750 values_ of the configuration parameters applied to a given instance. This call
751 would be based on the `status` call.
752
753 :param cluster_uuid: UUID of a K8s cluster known by OSM
754 :param kdu_instance: unique name for the KDU instance
755 :param kwargs: Additional parameters (None yet)
756 :return: If successful, it will return the following vector of arguments:
757 - K8s `namespace` in the cluster where the KDU lives
758 - `state` of the KDU instance. It can be:
759 - UNKNOWN
760 - DEPLOYED
761 - DELETED
762 - SUPERSEDED
763 - FAILED or
764 - DELETING
765 - List of `resources` (objects) that this release consists of, sorted by kind,
766 and the status of those resources
767 - Last `deployment_time`.
768
769 """
770 self.log.debug(
771 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
772 cluster_uuid, kdu_instance
773 )
774 )
775
776 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
777
778 # sync local dir
779 self.fs.sync(from_path=cluster_id)
780
781 # get instance: needed to obtain namespace
782 instances = await self._instances_list(cluster_id=cluster_id)
783 for instance in instances:
784 if instance.get("name") == kdu_instance:
785 break
786 else:
787 # instance does not exist
788 raise K8sException("Instance name: {} not found in cluster: {}".format(
789 kdu_instance, cluster_id))
790
791 status = await self._status_kdu(
792 cluster_id=cluster_id,
793 kdu_instance=kdu_instance,
794 namespace=instance["namespace"],
795 show_error_log=True,
796 return_text=True,
797 )
798
799 # sync fs
800 self.fs.reverse_sync(from_path=cluster_id)
801
802 return status
803
804 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
805
806 self.log.debug(
807 "inspect kdu_model values {} from (optional) repo: {}".format(
808 kdu_model, repo_url
809 )
810 )
811
812 return await self._exec_inspect_comand(
813 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
814 )
815
816 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
817
818 self.log.debug(
819 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
820 )
821
822 return await self._exec_inspect_comand(
823 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
824 )
825
826 async def synchronize_repos(self, cluster_uuid: str):
827
828 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
829 try:
830 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
831 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
832
833 local_repo_list = await self.repo_list(cluster_uuid)
834 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
835
836 deleted_repo_list = []
837 added_repo_dict = {}
838
839 # iterate over the list of repos in the database that should be
840 # added if not present
841 for repo_name, db_repo in db_repo_dict.items():
842 try:
843 # check if it is already present
844 curr_repo_url = local_repo_dict.get(db_repo["name"])
845 repo_id = db_repo.get("_id")
846 if curr_repo_url != db_repo["url"]:
847 if curr_repo_url:
848 self.log.debug("repo {} url changed, delete and and again".format(
849 db_repo["url"]))
850 await self.repo_remove(cluster_uuid, db_repo["name"])
851 deleted_repo_list.append(repo_id)
852
853 # add repo
854 self.log.debug("add repo {}".format(db_repo["name"]))
855 await self.repo_add(cluster_uuid, db_repo["name"], db_repo["url"])
856 added_repo_dict[repo_id] = db_repo["name"]
857 except Exception as e:
858 raise K8sException(
859 "Error adding repo id: {}, err_msg: {} ".format(
860 repo_id, repr(e)
861 )
862 )
863
864 # Delete repos that are present but not in nbi_list
865 for repo_name in local_repo_dict:
866 if not db_repo_dict.get(repo_name) and repo_name != "stable":
867 self.log.debug("delete repo {}".format(repo_name))
868 try:
869 await self.repo_remove(cluster_uuid, repo_name)
870 deleted_repo_list.append(repo_name)
871 except Exception as e:
872 self.warning(
873 "Error deleting repo, name: {}, err_msg: {}".format(
874 repo_name, str(e)
875 )
876 )
877
878 return deleted_repo_list, added_repo_dict
879
880 except K8sException:
881 raise
882 except Exception as e:
883 # Do not raise errors synchronizing repos
884 self.log.error("Error synchronizing repos: {}".format(e))
885 raise Exception("Error synchronizing repos: {}".format(e))
886
887 def _get_db_repos_dict(self, repo_ids: list):
888 db_repos_dict = {}
889 for repo_id in repo_ids:
890 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
891 db_repos_dict[db_repo["name"]] = db_repo
892 return db_repos_dict
893
894 """
895 ####################################################################################
896 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
897 ####################################################################################
898 """
899
900 @abc.abstractmethod
901 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
902 """
903 Creates and returns base cluster and kube dirs and returns them.
904 Also created helm3 dirs according to new directory specification, paths are
905 not returned but assigned to helm environment variables
906
907 :param cluster_name: cluster_name
908 :return: Dictionary with config_paths and dictionary with helm environment variables
909 """
910
911 @abc.abstractmethod
912 async def _cluster_init(self, cluster_id, namespace, paths, env):
913 """
914 Implements the helm version dependent cluster initialization
915 """
916
917 @abc.abstractmethod
918 async def _instances_list(self, cluster_id):
919 """
920 Implements the helm version dependent helm instances list
921 """
922
923 @abc.abstractmethod
924 async def _get_services(self, cluster_id, kdu_instance, namespace):
925 """
926 Implements the helm version dependent method to obtain services from a helm instance
927 """
928
929 @abc.abstractmethod
930 async def _status_kdu(self, cluster_id: str, kdu_instance: str, namespace: str = None,
931 show_error_log: bool = False, return_text: bool = False):
932 """
933 Implements the helm version dependent method to obtain status of a helm instance
934 """
935
936 @abc.abstractmethod
937 def _get_install_command(self, kdu_model, kdu_instance, namespace,
938 params_str, version, atomic, timeout) -> str:
939 """
940 Obtain command to be executed to delete the indicated instance
941 """
942
943 @abc.abstractmethod
944 def _get_upgrade_command(self, kdu_model, kdu_instance, namespace,
945 params_str, version, atomic, timeout) -> str:
946 """
947 Obtain command to be executed to upgrade the indicated instance
948 """
949
950 @abc.abstractmethod
951 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
952 """
953 Obtain command to be executed to rollback the indicated instance
954 """
955
956 @abc.abstractmethod
957 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
958 """
959 Obtain command to be executed to delete the indicated instance
960 """
961
962 @abc.abstractmethod
963 def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
964 version: str):
965 """
966 Obtain command to be executed to obtain information about the kdu
967 """
968
969 @abc.abstractmethod
970 async def _uninstall_sw(self, cluster_id: str, namespace: str):
971 """
972 Method call to uninstall cluster software for helm. This method is dependent
973 of helm version
974 For Helm v2 it will be called when Tiller must be uninstalled
975 For Helm v3 it does nothing and does not need to be callled
976 """
977
978 @abc.abstractmethod
979 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
980 """
981 Obtains the cluster repos identifiers
982 """
983
984 """
985 ####################################################################################
986 ################################### P R I V A T E ##################################
987 ####################################################################################
988 """
989
990 @staticmethod
991 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
992 if os.path.exists(filename):
993 return True
994 else:
995 msg = "File {} does not exist".format(filename)
996 if exception_if_not_exists:
997 raise K8sException(msg)
998
999 @staticmethod
1000 def _remove_multiple_spaces(strobj):
1001 strobj = strobj.strip()
1002 while " " in strobj:
1003 strobj = strobj.replace(" ", " ")
1004 return strobj
1005
1006 @staticmethod
1007 def _output_to_lines(output: str) -> list:
1008 output_lines = list()
1009 lines = output.splitlines(keepends=False)
1010 for line in lines:
1011 line = line.strip()
1012 if len(line) > 0:
1013 output_lines.append(line)
1014 return output_lines
1015
1016 @staticmethod
1017 def _output_to_table(output: str) -> list:
1018 output_table = list()
1019 lines = output.splitlines(keepends=False)
1020 for line in lines:
1021 line = line.replace("\t", " ")
1022 line_list = list()
1023 output_table.append(line_list)
1024 cells = line.split(sep=" ")
1025 for cell in cells:
1026 cell = cell.strip()
1027 if len(cell) > 0:
1028 line_list.append(cell)
1029 return output_table
1030
1031 @staticmethod
1032 def _parse_services(output: str) -> list:
1033 lines = output.splitlines(keepends=False)
1034 services = []
1035 for line in lines:
1036 line = line.replace("\t", " ")
1037 cells = line.split(sep=" ")
1038 if len(cells) > 0 and cells[0].startswith("service/"):
1039 elems = cells[0].split(sep="/")
1040 if len(elems) > 1:
1041 services.append(elems[1])
1042 return services
1043
1044 @staticmethod
1045 def _get_deep(dictionary: dict, members: tuple):
1046 target = dictionary
1047 value = None
1048 try:
1049 for m in members:
1050 value = target.get(m)
1051 if not value:
1052 return None
1053 else:
1054 target = value
1055 except Exception:
1056 pass
1057 return value
1058
1059 # find key:value in several lines
1060 @staticmethod
1061 def _find_in_lines(p_lines: list, p_key: str) -> str:
1062 for line in p_lines:
1063 try:
1064 if line.startswith(p_key + ":"):
1065 parts = line.split(":")
1066 the_value = parts[1].strip()
1067 return the_value
1068 except Exception:
1069 # ignore it
1070 pass
1071 return None
1072
1073 @staticmethod
1074 def _lower_keys_list(input_list: list):
1075 """
1076 Transform the keys in a list of dictionaries to lower case and returns a new list
1077 of dictionaries
1078 """
1079 new_list = []
1080 for dictionary in input_list:
1081 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1082 new_list.append(new_dict)
1083 return new_list
1084
1085 def _local_exec(self, command: str) -> (str, int):
1086 command = self._remove_multiple_spaces(command)
1087 self.log.debug("Executing sync local command: {}".format(command))
1088 # raise exception if fails
1089 output = ""
1090 try:
1091 output = subprocess.check_output(
1092 command, shell=True, universal_newlines=True
1093 )
1094 return_code = 0
1095 self.log.debug(output)
1096 except Exception:
1097 return_code = 1
1098
1099 return output, return_code
1100
1101 async def _local_async_exec(
1102 self,
1103 command: str,
1104 raise_exception_on_error: bool = False,
1105 show_error_log: bool = True,
1106 encode_utf8: bool = False,
1107 env: dict = None
1108 ) -> (str, int):
1109
1110 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1111 self.log.debug("Executing async local command: {}, env: {}".format(command, env))
1112
1113 # split command
1114 command = shlex.split(command)
1115
1116 environ = os.environ.copy()
1117 if env:
1118 environ.update(env)
1119
1120 try:
1121 process = await asyncio.create_subprocess_exec(
1122 *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
1123 env=environ
1124 )
1125
1126 # wait for command terminate
1127 stdout, stderr = await process.communicate()
1128
1129 return_code = process.returncode
1130
1131 output = ""
1132 if stdout:
1133 output = stdout.decode("utf-8").strip()
1134 # output = stdout.decode()
1135 if stderr:
1136 output = stderr.decode("utf-8").strip()
1137 # output = stderr.decode()
1138
1139 if return_code != 0 and show_error_log:
1140 self.log.debug(
1141 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1142 )
1143 else:
1144 self.log.debug("Return code: {}".format(return_code))
1145
1146 if raise_exception_on_error and return_code != 0:
1147 raise K8sException(output)
1148
1149 if encode_utf8:
1150 output = output.encode("utf-8").strip()
1151 output = str(output).replace("\\n", "\n")
1152
1153 return output, return_code
1154
1155 except asyncio.CancelledError:
1156 raise
1157 except K8sException:
1158 raise
1159 except Exception as e:
1160 msg = "Exception executing command: {} -> {}".format(command, e)
1161 self.log.error(msg)
1162 if raise_exception_on_error:
1163 raise K8sException(e) from e
1164 else:
1165 return "", -1
1166
1167 async def _local_async_exec_pipe(self,
1168 command1: str,
1169 command2: str,
1170 raise_exception_on_error: bool = True,
1171 show_error_log: bool = True,
1172 encode_utf8: bool = False,
1173 env: dict = None):
1174
1175 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1176 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1177 command = "{} | {}".format(command1, command2)
1178 self.log.debug("Executing async local command: {}, env: {}".format(command, env))
1179
1180 # split command
1181 command1 = shlex.split(command1)
1182 command2 = shlex.split(command2)
1183
1184 environ = os.environ.copy()
1185 if env:
1186 environ.update(env)
1187
1188 try:
1189 read, write = os.pipe()
1190 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1191 os.close(write)
1192 process_2 = await asyncio.create_subprocess_exec(*command2, stdin=read,
1193 stdout=asyncio.subprocess.PIPE,
1194 env=environ)
1195 os.close(read)
1196 stdout, stderr = await process_2.communicate()
1197
1198 return_code = process_2.returncode
1199
1200 output = ""
1201 if stdout:
1202 output = stdout.decode("utf-8").strip()
1203 # output = stdout.decode()
1204 if stderr:
1205 output = stderr.decode("utf-8").strip()
1206 # output = stderr.decode()
1207
1208 if return_code != 0 and show_error_log:
1209 self.log.debug(
1210 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1211 )
1212 else:
1213 self.log.debug("Return code: {}".format(return_code))
1214
1215 if raise_exception_on_error and return_code != 0:
1216 raise K8sException(output)
1217
1218 if encode_utf8:
1219 output = output.encode("utf-8").strip()
1220 output = str(output).replace("\\n", "\n")
1221
1222 return output, return_code
1223 except asyncio.CancelledError:
1224 raise
1225 except K8sException:
1226 raise
1227 except Exception as e:
1228 msg = "Exception executing command: {} -> {}".format(command, e)
1229 self.log.error(msg)
1230 if raise_exception_on_error:
1231 raise K8sException(e) from e
1232 else:
1233 return "", -1
1234
1235 async def _get_service(self, cluster_id, service_name, namespace):
1236 """
1237 Obtains the data of the specified service in the k8cluster.
1238
1239 :param cluster_id: id of a K8s cluster known by OSM
1240 :param service_name: name of the K8s service in the specified namespace
1241 :param namespace: K8s namespace used by the KDU instance
1242 :return: If successful, it will return a service with the following data:
1243 - `name` of the service
1244 - `type` type of service in the k8 cluster
1245 - `ports` List of ports offered by the service, for each port includes at least
1246 name, port, protocol
1247 - `cluster_ip` Internal ip to be used inside k8s cluster
1248 - `external_ip` List of external ips (in case they are available)
1249 """
1250
1251 # init config, env
1252 paths, env = self._init_paths_env(
1253 cluster_name=cluster_id, create_if_not_exist=True
1254 )
1255
1256 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1257 self.kubectl_command, paths["kube_config"], namespace, service_name
1258 )
1259
1260 output, _rc = await self._local_async_exec(
1261 command=command, raise_exception_on_error=True, env=env
1262 )
1263
1264 data = yaml.load(output, Loader=yaml.SafeLoader)
1265
1266 service = {
1267 "name": service_name,
1268 "type": self._get_deep(data, ("spec", "type")),
1269 "ports": self._get_deep(data, ("spec", "ports")),
1270 "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
1271 }
1272 if service["type"] == "LoadBalancer":
1273 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1274 ip_list = [elem["ip"] for elem in ip_map_list]
1275 service["external_ip"] = ip_list
1276
1277 return service
1278
1279 async def _exec_inspect_comand(
1280 self, inspect_command: str, kdu_model: str, repo_url: str = None
1281 ):
1282 """
1283 Obtains information about a kdu, no cluster (no env)
1284 """
1285
1286 repo_str = ""
1287 if repo_url:
1288 repo_str = " --repo {}".format(repo_url)
1289
1290 idx = kdu_model.find("/")
1291 if idx >= 0:
1292 idx += 1
1293 kdu_model = kdu_model[idx:]
1294
1295 version = ""
1296 if ":" in kdu_model:
1297 parts = kdu_model.split(sep=":")
1298 if len(parts) == 2:
1299 version = "--version {}".format(str(parts[1]))
1300 kdu_model = parts[0]
1301
1302 full_command = self._get_inspect_command(inspect_command, kdu_model, repo_str, version)
1303 output, _rc = await self._local_async_exec(
1304 command=full_command, encode_utf8=True
1305 )
1306
1307 return output
1308
1309 async def _store_status(
1310 self,
1311 cluster_id: str,
1312 operation: str,
1313 kdu_instance: str,
1314 namespace: str = None,
1315 check_every: float = 10,
1316 db_dict: dict = None,
1317 run_once: bool = False,
1318 ):
1319 while True:
1320 try:
1321 await asyncio.sleep(check_every)
1322 detailed_status = await self._status_kdu(
1323 cluster_id=cluster_id, kdu_instance=kdu_instance, namespace=namespace,
1324 return_text=False
1325 )
1326 status = detailed_status.get("info").get("description")
1327 self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
1328 # write status to db
1329 result = await self.write_app_status_to_db(
1330 db_dict=db_dict,
1331 status=str(status),
1332 detailed_status=str(detailed_status),
1333 operation=operation,
1334 )
1335 if not result:
1336 self.log.info("Error writing in database. Task exiting...")
1337 return
1338 except asyncio.CancelledError:
1339 self.log.debug("Task cancelled")
1340 return
1341 except Exception as e:
1342 self.log.debug("_store_status exception: {}".format(str(e)), exc_info=True)
1343 pass
1344 finally:
1345 if run_once:
1346 return
1347
1348 # params for use in -f file
1349 # returns values file option and filename (in order to delete it at the end)
1350 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1351
1352 if params and len(params) > 0:
1353 self._init_paths_env(
1354 cluster_name=cluster_id, create_if_not_exist=True
1355 )
1356
1357 def get_random_number():
1358 r = random.randrange(start=1, stop=99999999)
1359 s = str(r)
1360 while len(s) < 10:
1361 s = "0" + s
1362 return s
1363
1364 params2 = dict()
1365 for key in params:
1366 value = params.get(key)
1367 if "!!yaml" in str(value):
1368 value = yaml.load(value[7:])
1369 params2[key] = value
1370
1371 values_file = get_random_number() + ".yaml"
1372 with open(values_file, "w") as stream:
1373 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1374
1375 return "-f {}".format(values_file), values_file
1376
1377 return "", None
1378
1379 # params for use in --set option
1380 @staticmethod
1381 def _params_to_set_option(params: dict) -> str:
1382 params_str = ""
1383 if params and len(params) > 0:
1384 start = True
1385 for key in params:
1386 value = params.get(key, None)
1387 if value is not None:
1388 if start:
1389 params_str += "--set "
1390 start = False
1391 else:
1392 params_str += ","
1393 params_str += "{}={}".format(key, value)
1394 return params_str
1395
1396 @staticmethod
1397 def generate_kdu_instance_name(**kwargs):
1398 chart_name = kwargs["kdu_model"]
1399 # check embeded chart (file or dir)
1400 if chart_name.startswith("/"):
1401 # extract file or directory name
1402 chart_name = chart_name[chart_name.rfind("/") + 1:]
1403 # check URL
1404 elif "://" in chart_name:
1405 # extract last portion of URL
1406 chart_name = chart_name[chart_name.rfind("/") + 1:]
1407
1408 name = ""
1409 for c in chart_name:
1410 if c.isalpha() or c.isnumeric():
1411 name += c
1412 else:
1413 name += "-"
1414 if len(name) > 35:
1415 name = name[0:35]
1416
1417 # if does not start with alpha character, prefix 'a'
1418 if not name[0].isalpha():
1419 name = "a" + name
1420
1421 name += "-"
1422
1423 def get_random_number():
1424 r = random.randrange(start=1, stop=99999999)
1425 s = str(r)
1426 s = s.rjust(10, "0")
1427 return s
1428
1429 name = name + get_random_number()
1430 return name.lower()