Fix bug 1542 to allow juju to add Azure AKS
[osm/N2VC.git] / n2vc / k8s_helm_base_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import abc
23 import asyncio
24 import random
25 import time
26 import shlex
27 import shutil
28 import stat
29 import subprocess
30 import os
31 import yaml
32 from uuid import uuid4
33
34 from n2vc.exceptions import K8sException
35 from n2vc.k8s_conn import K8sConnector
36
37
38 class K8sHelmBaseConnector(K8sConnector):
39
40 """
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
44 """
45
46 service_account = "osm"
47 _STABLE_REPO_URL = "https://charts.helm.sh/stable"
48
49 def __init__(
50 self,
51 fs: object,
52 db: object,
53 kubectl_command: str = "/usr/bin/kubectl",
54 helm_command: str = "/usr/bin/helm",
55 log: object = None,
56 on_update_db=None,
57 vca_config: dict = None,
58 ):
59 """
60
61 :param fs: file system for kubernetes and helm configuration
62 :param db: database object to write current operation status
63 :param kubectl_command: path to kubectl executable
64 :param helm_command: path to helm executable
65 :param log: logger
66 :param on_update_db: callback called when k8s connector updates database
67 """
68
69 # parent class
70 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
71
72 self.log.info("Initializing K8S Helm connector")
73
74 # random numbers for release name generation
75 random.seed(time.time())
76
77 # the file system
78 self.fs = fs
79
80 # exception if kubectl is not installed
81 self.kubectl_command = kubectl_command
82 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
83
84 # exception if helm is not installed
85 self._helm_command = helm_command
86 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
87
88 # obtain stable repo url from config or apply default
89 if not vca_config or not vca_config.get("stablerepourl"):
90 self._stable_repo_url = self._STABLE_REPO_URL
91 else:
92 self._stable_repo_url = vca_config.get("stablerepourl")
93
94 @staticmethod
95 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
96 """
97 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
98 cluster_id for backward compatibility
99 """
100 namespace, _, cluster_id = cluster_uuid.rpartition(":")
101 return namespace, cluster_id
102
103 async def init_env(
104 self,
105 k8s_creds: str,
106 namespace: str = "kube-system",
107 reuse_cluster_uuid=None,
108 **kwargs,
109 ) -> (str, bool):
110 """
111 It prepares a given K8s cluster environment to run Charts
112
113 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
114 '.kube/config'
115 :param namespace: optional namespace to be used for helm. By default,
116 'kube-system' will be used
117 :param reuse_cluster_uuid: existing cluster uuid for reuse
118 :param kwargs: Additional parameters (None yet)
119 :return: uuid of the K8s cluster and True if connector has installed some
120 software in the cluster
121 (on error, an exception will be raised)
122 """
123
124 if reuse_cluster_uuid:
125 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
126 namespace = namespace_ or namespace
127 else:
128 cluster_id = str(uuid4())
129 cluster_uuid = "{}:{}".format(namespace, cluster_id)
130
131 self.log.debug(
132 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace)
133 )
134
135 paths, env = self._init_paths_env(
136 cluster_name=cluster_id, create_if_not_exist=True
137 )
138 mode = stat.S_IRUSR | stat.S_IWUSR
139 with open(paths["kube_config"], "w", mode) as f:
140 f.write(k8s_creds)
141 os.chmod(paths["kube_config"], 0o600)
142
143 # Code with initialization specific of helm version
144 n2vc_installed_sw = await self._cluster_init(cluster_id, namespace, paths, env)
145
146 # sync fs with local data
147 self.fs.reverse_sync(from_path=cluster_id)
148
149 self.log.info("Cluster {} initialized".format(cluster_id))
150
151 return cluster_uuid, n2vc_installed_sw
152
153 async def repo_add(
154 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
155 ):
156 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
157 self.log.debug(
158 "Cluster {}, adding {} repository {}. URL: {}".format(
159 cluster_id, repo_type, name, url
160 )
161 )
162
163 # sync local dir
164 self.fs.sync(from_path=cluster_id)
165
166 # init_env
167 paths, env = self._init_paths_env(
168 cluster_name=cluster_id, create_if_not_exist=True
169 )
170
171 # helm repo update
172 command = "{} repo update".format(self._helm_command)
173 self.log.debug("updating repo: {}".format(command))
174 await self._local_async_exec(
175 command=command, raise_exception_on_error=False, env=env
176 )
177
178 # helm repo add name url
179 command = "{} repo add {} {}".format(self._helm_command, name, url)
180 self.log.debug("adding repo: {}".format(command))
181 await self._local_async_exec(
182 command=command, raise_exception_on_error=True, env=env
183 )
184
185 # sync fs
186 self.fs.reverse_sync(from_path=cluster_id)
187
188 async def repo_list(self, cluster_uuid: str) -> list:
189 """
190 Get the list of registered repositories
191
192 :return: list of registered repositories: [ (name, url) .... ]
193 """
194
195 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
196 self.log.debug("list repositories for cluster {}".format(cluster_id))
197
198 # sync local dir
199 self.fs.sync(from_path=cluster_id)
200
201 # config filename
202 paths, env = self._init_paths_env(
203 cluster_name=cluster_id, create_if_not_exist=True
204 )
205
206 command = "{} repo list --output yaml".format(self._helm_command)
207
208 # Set exception to false because if there are no repos just want an empty list
209 output, _rc = await self._local_async_exec(
210 command=command, raise_exception_on_error=False, env=env
211 )
212
213 # sync fs
214 self.fs.reverse_sync(from_path=cluster_id)
215
216 if _rc == 0:
217 if output and len(output) > 0:
218 repos = yaml.load(output, Loader=yaml.SafeLoader)
219 # unify format between helm2 and helm3 setting all keys lowercase
220 return self._lower_keys_list(repos)
221 else:
222 return []
223 else:
224 return []
225
226 async def repo_remove(self, cluster_uuid: str, name: str):
227
228 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
229 self.log.debug("remove {} repositories for cluster {}".format(name, cluster_id))
230
231 # sync local dir
232 self.fs.sync(from_path=cluster_id)
233
234 # init env, paths
235 paths, env = self._init_paths_env(
236 cluster_name=cluster_id, create_if_not_exist=True
237 )
238
239 command = "{} repo remove {}".format(self._helm_command, name)
240 await self._local_async_exec(
241 command=command, raise_exception_on_error=True, env=env
242 )
243
244 # sync fs
245 self.fs.reverse_sync(from_path=cluster_id)
246
247 async def reset(
248 self,
249 cluster_uuid: str,
250 force: bool = False,
251 uninstall_sw: bool = False,
252 **kwargs,
253 ) -> bool:
254 """Reset a cluster
255
256 Resets the Kubernetes cluster by removing the helm deployment that represents it.
257
258 :param cluster_uuid: The UUID of the cluster to reset
259 :param force: Boolean to force the reset
260 :param uninstall_sw: Boolean to force the reset
261 :param kwargs: Additional parameters (None yet)
262 :return: Returns True if successful or raises an exception.
263 """
264 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
265 self.log.debug(
266 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
267 cluster_id, uninstall_sw
268 )
269 )
270
271 # sync local dir
272 self.fs.sync(from_path=cluster_id)
273
274 # uninstall releases if needed.
275 if uninstall_sw:
276 releases = await self.instances_list(cluster_uuid=cluster_uuid)
277 if len(releases) > 0:
278 if force:
279 for r in releases:
280 try:
281 kdu_instance = r.get("name")
282 chart = r.get("chart")
283 self.log.debug(
284 "Uninstalling {} -> {}".format(chart, kdu_instance)
285 )
286 await self.uninstall(
287 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
288 )
289 except Exception as e:
290 # will not raise exception as it was found
291 # that in some cases of previously installed helm releases it
292 # raised an error
293 self.log.warn(
294 "Error uninstalling release {}: {}".format(
295 kdu_instance, e
296 )
297 )
298 else:
299 msg = (
300 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
301 ).format(cluster_id)
302 self.log.warn(msg)
303 uninstall_sw = (
304 False # Allow to remove k8s cluster without removing Tiller
305 )
306
307 if uninstall_sw:
308 await self._uninstall_sw(cluster_id, namespace)
309
310 # delete cluster directory
311 self.log.debug("Removing directory {}".format(cluster_id))
312 self.fs.file_delete(cluster_id, ignore_non_exist=True)
313 # Remove also local directorio if still exist
314 direct = self.fs.path + "/" + cluster_id
315 shutil.rmtree(direct, ignore_errors=True)
316
317 return True
318
319 async def _install_impl(
320 self,
321 cluster_id: str,
322 kdu_model: str,
323 paths: dict,
324 env: dict,
325 kdu_instance: str,
326 atomic: bool = True,
327 timeout: float = 300,
328 params: dict = None,
329 db_dict: dict = None,
330 kdu_name: str = None,
331 namespace: str = None,
332 ):
333 # params to str
334 params_str, file_to_delete = self._params_to_file_option(
335 cluster_id=cluster_id, params=params
336 )
337
338 # version
339 version = None
340 if ":" in kdu_model:
341 parts = kdu_model.split(sep=":")
342 if len(parts) == 2:
343 version = str(parts[1])
344 kdu_model = parts[0]
345
346 command = self._get_install_command(
347 kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
348 )
349
350 self.log.debug("installing: {}".format(command))
351
352 if atomic:
353 # exec helm in a task
354 exec_task = asyncio.ensure_future(
355 coro_or_future=self._local_async_exec(
356 command=command, raise_exception_on_error=False, env=env
357 )
358 )
359
360 # write status in another task
361 status_task = asyncio.ensure_future(
362 coro_or_future=self._store_status(
363 cluster_id=cluster_id,
364 kdu_instance=kdu_instance,
365 namespace=namespace,
366 db_dict=db_dict,
367 operation="install",
368 run_once=False,
369 )
370 )
371
372 # wait for execution task
373 await asyncio.wait([exec_task])
374
375 # cancel status task
376 status_task.cancel()
377
378 output, rc = exec_task.result()
379
380 else:
381
382 output, rc = await self._local_async_exec(
383 command=command, raise_exception_on_error=False, env=env
384 )
385
386 # remove temporal values yaml file
387 if file_to_delete:
388 os.remove(file_to_delete)
389
390 # write final status
391 await self._store_status(
392 cluster_id=cluster_id,
393 kdu_instance=kdu_instance,
394 namespace=namespace,
395 db_dict=db_dict,
396 operation="install",
397 run_once=True,
398 check_every=0,
399 )
400
401 if rc != 0:
402 msg = "Error executing command: {}\nOutput: {}".format(command, output)
403 self.log.error(msg)
404 raise K8sException(msg)
405
406 async def upgrade(
407 self,
408 cluster_uuid: str,
409 kdu_instance: str,
410 kdu_model: str = None,
411 atomic: bool = True,
412 timeout: float = 300,
413 params: dict = None,
414 db_dict: dict = None,
415 ):
416 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
417 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
418
419 # sync local dir
420 self.fs.sync(from_path=cluster_id)
421
422 # look for instance to obtain namespace
423 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
424 if not instance_info:
425 raise K8sException("kdu_instance {} not found".format(kdu_instance))
426
427 # init env, paths
428 paths, env = self._init_paths_env(
429 cluster_name=cluster_id, create_if_not_exist=True
430 )
431
432 # params to str
433 params_str, file_to_delete = self._params_to_file_option(
434 cluster_id=cluster_id, params=params
435 )
436
437 # version
438 version = None
439 if ":" in kdu_model:
440 parts = kdu_model.split(sep=":")
441 if len(parts) == 2:
442 version = str(parts[1])
443 kdu_model = parts[0]
444
445 command = self._get_upgrade_command(
446 kdu_model,
447 kdu_instance,
448 instance_info["namespace"],
449 params_str,
450 version,
451 atomic,
452 timeout,
453 )
454
455 self.log.debug("upgrading: {}".format(command))
456
457 if atomic:
458
459 # exec helm in a task
460 exec_task = asyncio.ensure_future(
461 coro_or_future=self._local_async_exec(
462 command=command, raise_exception_on_error=False, env=env
463 )
464 )
465 # write status in another task
466 status_task = asyncio.ensure_future(
467 coro_or_future=self._store_status(
468 cluster_id=cluster_id,
469 kdu_instance=kdu_instance,
470 namespace=instance_info["namespace"],
471 db_dict=db_dict,
472 operation="upgrade",
473 run_once=False,
474 )
475 )
476
477 # wait for execution task
478 await asyncio.wait([exec_task])
479
480 # cancel status task
481 status_task.cancel()
482 output, rc = exec_task.result()
483
484 else:
485
486 output, rc = await self._local_async_exec(
487 command=command, raise_exception_on_error=False, env=env
488 )
489
490 # remove temporal values yaml file
491 if file_to_delete:
492 os.remove(file_to_delete)
493
494 # write final status
495 await self._store_status(
496 cluster_id=cluster_id,
497 kdu_instance=kdu_instance,
498 namespace=instance_info["namespace"],
499 db_dict=db_dict,
500 operation="upgrade",
501 run_once=True,
502 check_every=0,
503 )
504
505 if rc != 0:
506 msg = "Error executing command: {}\nOutput: {}".format(command, output)
507 self.log.error(msg)
508 raise K8sException(msg)
509
510 # sync fs
511 self.fs.reverse_sync(from_path=cluster_id)
512
513 # return new revision number
514 instance = await self.get_instance_info(
515 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
516 )
517 if instance:
518 revision = int(instance.get("revision"))
519 self.log.debug("New revision: {}".format(revision))
520 return revision
521 else:
522 return 0
523
524 async def scale(
525 self,
526 kdu_instance: str,
527 scale: int,
528 resource_name: str,
529 total_timeout: float = 1800,
530 **kwargs,
531 ):
532 raise NotImplementedError("Method not implemented")
533
534 async def get_scale_count(
535 self,
536 resource_name: str,
537 kdu_instance: str,
538 **kwargs,
539 ):
540 raise NotImplementedError("Method not implemented")
541
542 async def rollback(
543 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
544 ):
545
546 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
547 self.log.debug(
548 "rollback kdu_instance {} to revision {} from cluster {}".format(
549 kdu_instance, revision, cluster_id
550 )
551 )
552
553 # sync local dir
554 self.fs.sync(from_path=cluster_id)
555
556 # look for instance to obtain namespace
557 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
558 if not instance_info:
559 raise K8sException("kdu_instance {} not found".format(kdu_instance))
560
561 # init env, paths
562 paths, env = self._init_paths_env(
563 cluster_name=cluster_id, create_if_not_exist=True
564 )
565
566 command = self._get_rollback_command(
567 kdu_instance, instance_info["namespace"], revision
568 )
569
570 self.log.debug("rolling_back: {}".format(command))
571
572 # exec helm in a task
573 exec_task = asyncio.ensure_future(
574 coro_or_future=self._local_async_exec(
575 command=command, raise_exception_on_error=False, env=env
576 )
577 )
578 # write status in another task
579 status_task = asyncio.ensure_future(
580 coro_or_future=self._store_status(
581 cluster_id=cluster_id,
582 kdu_instance=kdu_instance,
583 namespace=instance_info["namespace"],
584 db_dict=db_dict,
585 operation="rollback",
586 run_once=False,
587 )
588 )
589
590 # wait for execution task
591 await asyncio.wait([exec_task])
592
593 # cancel status task
594 status_task.cancel()
595
596 output, rc = exec_task.result()
597
598 # write final status
599 await self._store_status(
600 cluster_id=cluster_id,
601 kdu_instance=kdu_instance,
602 namespace=instance_info["namespace"],
603 db_dict=db_dict,
604 operation="rollback",
605 run_once=True,
606 check_every=0,
607 )
608
609 if rc != 0:
610 msg = "Error executing command: {}\nOutput: {}".format(command, output)
611 self.log.error(msg)
612 raise K8sException(msg)
613
614 # sync fs
615 self.fs.reverse_sync(from_path=cluster_id)
616
617 # return new revision number
618 instance = await self.get_instance_info(
619 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
620 )
621 if instance:
622 revision = int(instance.get("revision"))
623 self.log.debug("New revision: {}".format(revision))
624 return revision
625 else:
626 return 0
627
628 async def uninstall(self, cluster_uuid: str, kdu_instance: str, **kwargs):
629 """
630 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
631 (this call should happen after all _terminate-config-primitive_ of the VNF
632 are invoked).
633
634 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
635 :param kdu_instance: unique name for the KDU instance to be deleted
636 :param kwargs: Additional parameters (None yet)
637 :return: True if successful
638 """
639
640 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
641 self.log.debug(
642 "uninstall kdu_instance {} from cluster {}".format(kdu_instance, cluster_id)
643 )
644
645 # sync local dir
646 self.fs.sync(from_path=cluster_id)
647
648 # look for instance to obtain namespace
649 instance_info = await self.get_instance_info(cluster_uuid, kdu_instance)
650 if not instance_info:
651 raise K8sException("kdu_instance {} not found".format(kdu_instance))
652
653 # init env, paths
654 paths, env = self._init_paths_env(
655 cluster_name=cluster_id, create_if_not_exist=True
656 )
657
658 command = self._get_uninstall_command(kdu_instance, instance_info["namespace"])
659 output, _rc = await self._local_async_exec(
660 command=command, raise_exception_on_error=True, env=env
661 )
662
663 # sync fs
664 self.fs.reverse_sync(from_path=cluster_id)
665
666 return self._output_to_table(output)
667
668 async def instances_list(self, cluster_uuid: str) -> list:
669 """
670 returns a list of deployed releases in a cluster
671
672 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
673 :return:
674 """
675
676 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
677 self.log.debug("list releases for cluster {}".format(cluster_id))
678
679 # sync local dir
680 self.fs.sync(from_path=cluster_id)
681
682 # execute internal command
683 result = await self._instances_list(cluster_id)
684
685 # sync fs
686 self.fs.reverse_sync(from_path=cluster_id)
687
688 return result
689
690 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
691 instances = await self.instances_list(cluster_uuid=cluster_uuid)
692 for instance in instances:
693 if instance.get("name") == kdu_instance:
694 return instance
695 self.log.debug("Instance {} not found".format(kdu_instance))
696 return None
697
698 async def exec_primitive(
699 self,
700 cluster_uuid: str = None,
701 kdu_instance: str = None,
702 primitive_name: str = None,
703 timeout: float = 300,
704 params: dict = None,
705 db_dict: dict = None,
706 **kwargs,
707 ) -> str:
708 """Exec primitive (Juju action)
709
710 :param cluster_uuid: The UUID of the cluster or namespace:cluster
711 :param kdu_instance: The unique name of the KDU instance
712 :param primitive_name: Name of action that will be executed
713 :param timeout: Timeout for action execution
714 :param params: Dictionary of all the parameters needed for the action
715 :db_dict: Dictionary for any additional data
716 :param kwargs: Additional parameters (None yet)
717
718 :return: Returns the output of the action
719 """
720 raise K8sException(
721 "KDUs deployed with Helm don't support actions "
722 "different from rollback, upgrade and status"
723 )
724
725 async def get_services(
726 self, cluster_uuid: str, kdu_instance: str, namespace: str
727 ) -> list:
728 """
729 Returns a list of services defined for the specified kdu instance.
730
731 :param cluster_uuid: UUID of a K8s cluster known by OSM
732 :param kdu_instance: unique name for the KDU instance
733 :param namespace: K8s namespace used by the KDU instance
734 :return: If successful, it will return a list of services, Each service
735 can have the following data:
736 - `name` of the service
737 - `type` type of service in the k8 cluster
738 - `ports` List of ports offered by the service, for each port includes at least
739 name, port, protocol
740 - `cluster_ip` Internal ip to be used inside k8s cluster
741 - `external_ip` List of external ips (in case they are available)
742 """
743
744 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
745 self.log.debug(
746 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
747 cluster_uuid, kdu_instance
748 )
749 )
750
751 # sync local dir
752 self.fs.sync(from_path=cluster_id)
753
754 # get list of services names for kdu
755 service_names = await self._get_services(cluster_id, kdu_instance, namespace)
756
757 service_list = []
758 for service in service_names:
759 service = await self._get_service(cluster_id, service, namespace)
760 service_list.append(service)
761
762 # sync fs
763 self.fs.reverse_sync(from_path=cluster_id)
764
765 return service_list
766
767 async def get_service(
768 self, cluster_uuid: str, service_name: str, namespace: str
769 ) -> object:
770
771 self.log.debug(
772 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
773 service_name, namespace, cluster_uuid
774 )
775 )
776
777 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
778
779 # sync local dir
780 self.fs.sync(from_path=cluster_id)
781
782 service = await self._get_service(cluster_id, service_name, namespace)
783
784 # sync fs
785 self.fs.reverse_sync(from_path=cluster_id)
786
787 return service
788
789 async def status_kdu(self, cluster_uuid: str, kdu_instance: str, **kwargs) -> str:
790 """
791 This call would retrieve tha current state of a given KDU instance. It would be
792 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
793 values_ of the configuration parameters applied to a given instance. This call
794 would be based on the `status` call.
795
796 :param cluster_uuid: UUID of a K8s cluster known by OSM
797 :param kdu_instance: unique name for the KDU instance
798 :param kwargs: Additional parameters (None yet)
799 :return: If successful, it will return the following vector of arguments:
800 - K8s `namespace` in the cluster where the KDU lives
801 - `state` of the KDU instance. It can be:
802 - UNKNOWN
803 - DEPLOYED
804 - DELETED
805 - SUPERSEDED
806 - FAILED or
807 - DELETING
808 - List of `resources` (objects) that this release consists of, sorted by kind,
809 and the status of those resources
810 - Last `deployment_time`.
811
812 """
813 self.log.debug(
814 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
815 cluster_uuid, kdu_instance
816 )
817 )
818
819 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
820
821 # sync local dir
822 self.fs.sync(from_path=cluster_id)
823
824 # get instance: needed to obtain namespace
825 instances = await self._instances_list(cluster_id=cluster_id)
826 for instance in instances:
827 if instance.get("name") == kdu_instance:
828 break
829 else:
830 # instance does not exist
831 raise K8sException(
832 "Instance name: {} not found in cluster: {}".format(
833 kdu_instance, cluster_id
834 )
835 )
836
837 status = await self._status_kdu(
838 cluster_id=cluster_id,
839 kdu_instance=kdu_instance,
840 namespace=instance["namespace"],
841 show_error_log=True,
842 return_text=True,
843 )
844
845 # sync fs
846 self.fs.reverse_sync(from_path=cluster_id)
847
848 return status
849
850 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
851
852 self.log.debug(
853 "inspect kdu_model values {} from (optional) repo: {}".format(
854 kdu_model, repo_url
855 )
856 )
857
858 return await self._exec_inspect_comand(
859 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
860 )
861
862 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
863
864 self.log.debug(
865 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
866 )
867
868 return await self._exec_inspect_comand(
869 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
870 )
871
872 async def synchronize_repos(self, cluster_uuid: str):
873
874 self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
875 try:
876 db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
877 db_repo_dict = self._get_db_repos_dict(db_repo_ids)
878
879 local_repo_list = await self.repo_list(cluster_uuid)
880 local_repo_dict = {repo["name"]: repo["url"] for repo in local_repo_list}
881
882 deleted_repo_list = []
883 added_repo_dict = {}
884
885 # iterate over the list of repos in the database that should be
886 # added if not present
887 for repo_name, db_repo in db_repo_dict.items():
888 try:
889 # check if it is already present
890 curr_repo_url = local_repo_dict.get(db_repo["name"])
891 repo_id = db_repo.get("_id")
892 if curr_repo_url != db_repo["url"]:
893 if curr_repo_url:
894 self.log.debug(
895 "repo {} url changed, delete and and again".format(
896 db_repo["url"]
897 )
898 )
899 await self.repo_remove(cluster_uuid, db_repo["name"])
900 deleted_repo_list.append(repo_id)
901
902 # add repo
903 self.log.debug("add repo {}".format(db_repo["name"]))
904 await self.repo_add(
905 cluster_uuid, db_repo["name"], db_repo["url"]
906 )
907 added_repo_dict[repo_id] = db_repo["name"]
908 except Exception as e:
909 raise K8sException(
910 "Error adding repo id: {}, err_msg: {} ".format(
911 repo_id, repr(e)
912 )
913 )
914
915 # Delete repos that are present but not in nbi_list
916 for repo_name in local_repo_dict:
917 if not db_repo_dict.get(repo_name) and repo_name != "stable":
918 self.log.debug("delete repo {}".format(repo_name))
919 try:
920 await self.repo_remove(cluster_uuid, repo_name)
921 deleted_repo_list.append(repo_name)
922 except Exception as e:
923 self.warning(
924 "Error deleting repo, name: {}, err_msg: {}".format(
925 repo_name, str(e)
926 )
927 )
928
929 return deleted_repo_list, added_repo_dict
930
931 except K8sException:
932 raise
933 except Exception as e:
934 # Do not raise errors synchronizing repos
935 self.log.error("Error synchronizing repos: {}".format(e))
936 raise Exception("Error synchronizing repos: {}".format(e))
937
938 def _get_db_repos_dict(self, repo_ids: list):
939 db_repos_dict = {}
940 for repo_id in repo_ids:
941 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
942 db_repos_dict[db_repo["name"]] = db_repo
943 return db_repos_dict
944
945 """
946 ####################################################################################
947 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
948 ####################################################################################
949 """
950
951 @abc.abstractmethod
952 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
953 """
954 Creates and returns base cluster and kube dirs and returns them.
955 Also created helm3 dirs according to new directory specification, paths are
956 not returned but assigned to helm environment variables
957
958 :param cluster_name: cluster_name
959 :return: Dictionary with config_paths and dictionary with helm environment variables
960 """
961
962 @abc.abstractmethod
963 async def _cluster_init(self, cluster_id, namespace, paths, env):
964 """
965 Implements the helm version dependent cluster initialization
966 """
967
968 @abc.abstractmethod
969 async def _instances_list(self, cluster_id):
970 """
971 Implements the helm version dependent helm instances list
972 """
973
974 @abc.abstractmethod
975 async def _get_services(self, cluster_id, kdu_instance, namespace):
976 """
977 Implements the helm version dependent method to obtain services from a helm instance
978 """
979
980 @abc.abstractmethod
981 async def _status_kdu(
982 self,
983 cluster_id: str,
984 kdu_instance: str,
985 namespace: str = None,
986 show_error_log: bool = False,
987 return_text: bool = False,
988 ):
989 """
990 Implements the helm version dependent method to obtain status of a helm instance
991 """
992
993 @abc.abstractmethod
994 def _get_install_command(
995 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
996 ) -> str:
997 """
998 Obtain command to be executed to delete the indicated instance
999 """
1000
1001 @abc.abstractmethod
1002 def _get_upgrade_command(
1003 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
1004 ) -> str:
1005 """
1006 Obtain command to be executed to upgrade the indicated instance
1007 """
1008
1009 @abc.abstractmethod
1010 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
1011 """
1012 Obtain command to be executed to rollback the indicated instance
1013 """
1014
1015 @abc.abstractmethod
1016 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
1017 """
1018 Obtain command to be executed to delete the indicated instance
1019 """
1020
1021 @abc.abstractmethod
1022 def _get_inspect_command(
1023 self, show_command: str, kdu_model: str, repo_str: str, version: str
1024 ):
1025 """
1026 Obtain command to be executed to obtain information about the kdu
1027 """
1028
1029 @abc.abstractmethod
1030 async def _uninstall_sw(self, cluster_id: str, namespace: str):
1031 """
1032 Method call to uninstall cluster software for helm. This method is dependent
1033 of helm version
1034 For Helm v2 it will be called when Tiller must be uninstalled
1035 For Helm v3 it does nothing and does not need to be callled
1036 """
1037
1038 @abc.abstractmethod
1039 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
1040 """
1041 Obtains the cluster repos identifiers
1042 """
1043
1044 """
1045 ####################################################################################
1046 ################################### P R I V A T E ##################################
1047 ####################################################################################
1048 """
1049
1050 @staticmethod
1051 def _check_file_exists(filename: str, exception_if_not_exists: bool = False):
1052 if os.path.exists(filename):
1053 return True
1054 else:
1055 msg = "File {} does not exist".format(filename)
1056 if exception_if_not_exists:
1057 raise K8sException(msg)
1058
1059 @staticmethod
1060 def _remove_multiple_spaces(strobj):
1061 strobj = strobj.strip()
1062 while " " in strobj:
1063 strobj = strobj.replace(" ", " ")
1064 return strobj
1065
1066 @staticmethod
1067 def _output_to_lines(output: str) -> list:
1068 output_lines = list()
1069 lines = output.splitlines(keepends=False)
1070 for line in lines:
1071 line = line.strip()
1072 if len(line) > 0:
1073 output_lines.append(line)
1074 return output_lines
1075
1076 @staticmethod
1077 def _output_to_table(output: str) -> list:
1078 output_table = list()
1079 lines = output.splitlines(keepends=False)
1080 for line in lines:
1081 line = line.replace("\t", " ")
1082 line_list = list()
1083 output_table.append(line_list)
1084 cells = line.split(sep=" ")
1085 for cell in cells:
1086 cell = cell.strip()
1087 if len(cell) > 0:
1088 line_list.append(cell)
1089 return output_table
1090
1091 @staticmethod
1092 def _parse_services(output: str) -> list:
1093 lines = output.splitlines(keepends=False)
1094 services = []
1095 for line in lines:
1096 line = line.replace("\t", " ")
1097 cells = line.split(sep=" ")
1098 if len(cells) > 0 and cells[0].startswith("service/"):
1099 elems = cells[0].split(sep="/")
1100 if len(elems) > 1:
1101 services.append(elems[1])
1102 return services
1103
1104 @staticmethod
1105 def _get_deep(dictionary: dict, members: tuple):
1106 target = dictionary
1107 value = None
1108 try:
1109 for m in members:
1110 value = target.get(m)
1111 if not value:
1112 return None
1113 else:
1114 target = value
1115 except Exception:
1116 pass
1117 return value
1118
1119 # find key:value in several lines
1120 @staticmethod
1121 def _find_in_lines(p_lines: list, p_key: str) -> str:
1122 for line in p_lines:
1123 try:
1124 if line.startswith(p_key + ":"):
1125 parts = line.split(":")
1126 the_value = parts[1].strip()
1127 return the_value
1128 except Exception:
1129 # ignore it
1130 pass
1131 return None
1132
1133 @staticmethod
1134 def _lower_keys_list(input_list: list):
1135 """
1136 Transform the keys in a list of dictionaries to lower case and returns a new list
1137 of dictionaries
1138 """
1139 new_list = []
1140 for dictionary in input_list:
1141 new_dict = dict((k.lower(), v) for k, v in dictionary.items())
1142 new_list.append(new_dict)
1143 return new_list
1144
1145 def _local_exec(self, command: str) -> (str, int):
1146 command = self._remove_multiple_spaces(command)
1147 self.log.debug("Executing sync local command: {}".format(command))
1148 # raise exception if fails
1149 output = ""
1150 try:
1151 output = subprocess.check_output(
1152 command, shell=True, universal_newlines=True
1153 )
1154 return_code = 0
1155 self.log.debug(output)
1156 except Exception:
1157 return_code = 1
1158
1159 return output, return_code
1160
1161 async def _local_async_exec(
1162 self,
1163 command: str,
1164 raise_exception_on_error: bool = False,
1165 show_error_log: bool = True,
1166 encode_utf8: bool = False,
1167 env: dict = None,
1168 ) -> (str, int):
1169
1170 command = K8sHelmBaseConnector._remove_multiple_spaces(command)
1171 self.log.debug(
1172 "Executing async local command: {}, env: {}".format(command, env)
1173 )
1174
1175 # split command
1176 command = shlex.split(command)
1177
1178 environ = os.environ.copy()
1179 if env:
1180 environ.update(env)
1181
1182 try:
1183 process = await asyncio.create_subprocess_exec(
1184 *command,
1185 stdout=asyncio.subprocess.PIPE,
1186 stderr=asyncio.subprocess.PIPE,
1187 env=environ,
1188 )
1189
1190 # wait for command terminate
1191 stdout, stderr = await process.communicate()
1192
1193 return_code = process.returncode
1194
1195 output = ""
1196 if stdout:
1197 output = stdout.decode("utf-8").strip()
1198 # output = stdout.decode()
1199 if stderr:
1200 output = stderr.decode("utf-8").strip()
1201 # output = stderr.decode()
1202
1203 if return_code != 0 and show_error_log:
1204 self.log.debug(
1205 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1206 )
1207 else:
1208 self.log.debug("Return code: {}".format(return_code))
1209
1210 if raise_exception_on_error and return_code != 0:
1211 raise K8sException(output)
1212
1213 if encode_utf8:
1214 output = output.encode("utf-8").strip()
1215 output = str(output).replace("\\n", "\n")
1216
1217 return output, return_code
1218
1219 except asyncio.CancelledError:
1220 raise
1221 except K8sException:
1222 raise
1223 except Exception as e:
1224 msg = "Exception executing command: {} -> {}".format(command, e)
1225 self.log.error(msg)
1226 if raise_exception_on_error:
1227 raise K8sException(e) from e
1228 else:
1229 return "", -1
1230
1231 async def _local_async_exec_pipe(
1232 self,
1233 command1: str,
1234 command2: str,
1235 raise_exception_on_error: bool = True,
1236 show_error_log: bool = True,
1237 encode_utf8: bool = False,
1238 env: dict = None,
1239 ):
1240
1241 command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
1242 command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
1243 command = "{} | {}".format(command1, command2)
1244 self.log.debug(
1245 "Executing async local command: {}, env: {}".format(command, env)
1246 )
1247
1248 # split command
1249 command1 = shlex.split(command1)
1250 command2 = shlex.split(command2)
1251
1252 environ = os.environ.copy()
1253 if env:
1254 environ.update(env)
1255
1256 try:
1257 read, write = os.pipe()
1258 await asyncio.create_subprocess_exec(*command1, stdout=write, env=environ)
1259 os.close(write)
1260 process_2 = await asyncio.create_subprocess_exec(
1261 *command2, stdin=read, stdout=asyncio.subprocess.PIPE, env=environ
1262 )
1263 os.close(read)
1264 stdout, stderr = await process_2.communicate()
1265
1266 return_code = process_2.returncode
1267
1268 output = ""
1269 if stdout:
1270 output = stdout.decode("utf-8").strip()
1271 # output = stdout.decode()
1272 if stderr:
1273 output = stderr.decode("utf-8").strip()
1274 # output = stderr.decode()
1275
1276 if return_code != 0 and show_error_log:
1277 self.log.debug(
1278 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1279 )
1280 else:
1281 self.log.debug("Return code: {}".format(return_code))
1282
1283 if raise_exception_on_error and return_code != 0:
1284 raise K8sException(output)
1285
1286 if encode_utf8:
1287 output = output.encode("utf-8").strip()
1288 output = str(output).replace("\\n", "\n")
1289
1290 return output, return_code
1291 except asyncio.CancelledError:
1292 raise
1293 except K8sException:
1294 raise
1295 except Exception as e:
1296 msg = "Exception executing command: {} -> {}".format(command, e)
1297 self.log.error(msg)
1298 if raise_exception_on_error:
1299 raise K8sException(e) from e
1300 else:
1301 return "", -1
1302
1303 async def _get_service(self, cluster_id, service_name, namespace):
1304 """
1305 Obtains the data of the specified service in the k8cluster.
1306
1307 :param cluster_id: id of a K8s cluster known by OSM
1308 :param service_name: name of the K8s service in the specified namespace
1309 :param namespace: K8s namespace used by the KDU instance
1310 :return: If successful, it will return a service with the following data:
1311 - `name` of the service
1312 - `type` type of service in the k8 cluster
1313 - `ports` List of ports offered by the service, for each port includes at least
1314 name, port, protocol
1315 - `cluster_ip` Internal ip to be used inside k8s cluster
1316 - `external_ip` List of external ips (in case they are available)
1317 """
1318
1319 # init config, env
1320 paths, env = self._init_paths_env(
1321 cluster_name=cluster_id, create_if_not_exist=True
1322 )
1323
1324 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1325 self.kubectl_command, paths["kube_config"], namespace, service_name
1326 )
1327
1328 output, _rc = await self._local_async_exec(
1329 command=command, raise_exception_on_error=True, env=env
1330 )
1331
1332 data = yaml.load(output, Loader=yaml.SafeLoader)
1333
1334 service = {
1335 "name": service_name,
1336 "type": self._get_deep(data, ("spec", "type")),
1337 "ports": self._get_deep(data, ("spec", "ports")),
1338 "cluster_ip": self._get_deep(data, ("spec", "clusterIP")),
1339 }
1340 if service["type"] == "LoadBalancer":
1341 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
1342 ip_list = [elem["ip"] for elem in ip_map_list]
1343 service["external_ip"] = ip_list
1344
1345 return service
1346
1347 async def _exec_inspect_comand(
1348 self, inspect_command: str, kdu_model: str, repo_url: str = None
1349 ):
1350 """
1351 Obtains information about a kdu, no cluster (no env)
1352 """
1353
1354 repo_str = ""
1355 if repo_url:
1356 repo_str = " --repo {}".format(repo_url)
1357
1358 idx = kdu_model.find("/")
1359 if idx >= 0:
1360 idx += 1
1361 kdu_model = kdu_model[idx:]
1362
1363 version = ""
1364 if ":" in kdu_model:
1365 parts = kdu_model.split(sep=":")
1366 if len(parts) == 2:
1367 version = "--version {}".format(str(parts[1]))
1368 kdu_model = parts[0]
1369
1370 full_command = self._get_inspect_command(
1371 inspect_command, kdu_model, repo_str, version
1372 )
1373 output, _rc = await self._local_async_exec(
1374 command=full_command, encode_utf8=True
1375 )
1376
1377 return output
1378
1379 async def _store_status(
1380 self,
1381 cluster_id: str,
1382 operation: str,
1383 kdu_instance: str,
1384 namespace: str = None,
1385 check_every: float = 10,
1386 db_dict: dict = None,
1387 run_once: bool = False,
1388 ):
1389 while True:
1390 try:
1391 await asyncio.sleep(check_every)
1392 detailed_status = await self._status_kdu(
1393 cluster_id=cluster_id,
1394 kdu_instance=kdu_instance,
1395 namespace=namespace,
1396 return_text=False,
1397 )
1398 status = detailed_status.get("info").get("description")
1399 self.log.debug("KDU {} STATUS: {}.".format(kdu_instance, status))
1400 # write status to db
1401 result = await self.write_app_status_to_db(
1402 db_dict=db_dict,
1403 status=str(status),
1404 detailed_status=str(detailed_status),
1405 operation=operation,
1406 )
1407 if not result:
1408 self.log.info("Error writing in database. Task exiting...")
1409 return
1410 except asyncio.CancelledError:
1411 self.log.debug("Task cancelled")
1412 return
1413 except Exception as e:
1414 self.log.debug(
1415 "_store_status exception: {}".format(str(e)), exc_info=True
1416 )
1417 pass
1418 finally:
1419 if run_once:
1420 return
1421
1422 # params for use in -f file
1423 # returns values file option and filename (in order to delete it at the end)
1424 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1425
1426 if params and len(params) > 0:
1427 self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
1428
1429 def get_random_number():
1430 r = random.randrange(start=1, stop=99999999)
1431 s = str(r)
1432 while len(s) < 10:
1433 s = "0" + s
1434 return s
1435
1436 params2 = dict()
1437 for key in params:
1438 value = params.get(key)
1439 if "!!yaml" in str(value):
1440 value = yaml.load(value[7:])
1441 params2[key] = value
1442
1443 values_file = get_random_number() + ".yaml"
1444 with open(values_file, "w") as stream:
1445 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1446
1447 return "-f {}".format(values_file), values_file
1448
1449 return "", None
1450
1451 # params for use in --set option
1452 @staticmethod
1453 def _params_to_set_option(params: dict) -> str:
1454 params_str = ""
1455 if params and len(params) > 0:
1456 start = True
1457 for key in params:
1458 value = params.get(key, None)
1459 if value is not None:
1460 if start:
1461 params_str += "--set "
1462 start = False
1463 else:
1464 params_str += ","
1465 params_str += "{}={}".format(key, value)
1466 return params_str
1467
1468 @staticmethod
1469 def generate_kdu_instance_name(**kwargs):
1470 chart_name = kwargs["kdu_model"]
1471 # check embeded chart (file or dir)
1472 if chart_name.startswith("/"):
1473 # extract file or directory name
1474 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1475 # check URL
1476 elif "://" in chart_name:
1477 # extract last portion of URL
1478 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1479
1480 name = ""
1481 for c in chart_name:
1482 if c.isalpha() or c.isnumeric():
1483 name += c
1484 else:
1485 name += "-"
1486 if len(name) > 35:
1487 name = name[0:35]
1488
1489 # if does not start with alpha character, prefix 'a'
1490 if not name[0].isalpha():
1491 name = "a" + name
1492
1493 name += "-"
1494
1495 def get_random_number():
1496 r = random.randrange(start=1, stop=99999999)
1497 s = str(r)
1498 s = s.rjust(10, "0")
1499 return s
1500
1501 name = name + get_random_number()
1502 return name.lower()