Bug 1400: Fix stable repo urls that have changed
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import asyncio
24 import os
25 import random
26 import shutil
27 import subprocess
28 import time
29 from uuid import uuid4
30
31 from n2vc.exceptions import K8sException
32 from n2vc.k8s_conn import K8sConnector
33 import yaml
34
35
36 class K8sHelmConnector(K8sConnector):
37
38 """
39 ####################################################################################
40 ################################### P U B L I C ####################################
41 ####################################################################################
42 """
43 service_account = "osm"
44 _STABLE_REPO_URL = "https://charts.helm.sh/stable"
45
46 def __init__(
47 self,
48 fs: object,
49 db: object,
50 kubectl_command: str = "/usr/bin/kubectl",
51 helm_command: str = "/usr/bin/helm",
52 log: object = None,
53 on_update_db=None,
54 vca_config: dict = None,
55 ):
56 """
57
58 :param fs: file system for kubernetes and helm configuration
59 :param db: database object to write current operation status
60 :param kubectl_command: path to kubectl executable
61 :param helm_command: path to helm executable
62 :param log: logger
63 :param on_update_db: callback called when k8s connector updates database
64 """
65
66 # parent class
67 K8sConnector.__init__(self, db=db, log=log, on_update_db=on_update_db)
68
69 self.log.info("Initializing K8S Helm connector")
70
71 # random numbers for release name generation
72 random.seed(time.time())
73
74 # the file system
75 self.fs = fs
76
77 # exception if kubectl is not installed
78 self.kubectl_command = kubectl_command
79 self._check_file_exists(filename=kubectl_command, exception_if_not_exists=True)
80
81 # exception if helm is not installed
82 self._helm_command = helm_command
83 self._check_file_exists(filename=helm_command, exception_if_not_exists=True)
84
85 # obtain stable repo url from config or apply default
86 if not vca_config or not vca_config.get("stablerepourl"):
87 self._stable_repo_url = self._STABLE_REPO_URL
88 else:
89 self._stable_repo_url = vca_config.get("stablerepourl")
90
91 # initialize helm client-only
92 self.log.debug("Initializing helm client-only...")
93 command = "{} init --client-only --stable-repo-url {}".format(
94 self._helm_command, self._stable_repo_url)
95 try:
96 asyncio.ensure_future(
97 self._local_async_exec(command=command, raise_exception_on_error=False)
98 )
99 # loop = asyncio.get_event_loop()
100 # loop.run_until_complete(self._local_async_exec(command=command,
101 # raise_exception_on_error=False))
102 except Exception as e:
103 self.warning(
104 msg="helm init failed (it was already initialized): {}".format(e)
105 )
106
107 self.log.info("K8S Helm connector initialized")
108
109 @staticmethod
110 def _get_namespace_cluster_id(cluster_uuid: str) -> (str, str):
111 """
112 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
113 cluster_id for backward compatibility
114 """
115 namespace, _, cluster_id = cluster_uuid.rpartition(':')
116 return namespace, cluster_id
117
118 async def init_env(
119 self, k8s_creds: str, namespace: str = "kube-system", reuse_cluster_uuid=None
120 ) -> (str, bool):
121 """
122 It prepares a given K8s cluster environment to run Charts on both sides:
123 client (OSM)
124 server (Tiller)
125
126 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
127 '.kube/config'
128 :param namespace: optional namespace to be used for helm. By default,
129 'kube-system' will be used
130 :param reuse_cluster_uuid: existing cluster uuid for reuse
131 :return: uuid of the K8s cluster and True if connector has installed some
132 software in the cluster
133 (on error, an exception will be raised)
134 """
135
136 if reuse_cluster_uuid:
137 namespace_, cluster_id = self._get_namespace_cluster_id(reuse_cluster_uuid)
138 namespace = namespace_ or namespace
139 else:
140 cluster_id = str(uuid4())
141 cluster_uuid = "{}:{}".format(namespace, cluster_id)
142
143 self.log.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id, namespace))
144
145 # create config filename
146 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
147 cluster_name=cluster_id, create_if_not_exist=True
148 )
149 with open(config_filename, "w") as f:
150 f.write(k8s_creds)
151
152 # check if tiller pod is up in cluster
153 command = "{} --kubeconfig={} --namespace={} get deployments".format(
154 self.kubectl_command, config_filename, namespace
155 )
156 output, _rc = await self._local_async_exec(
157 command=command, raise_exception_on_error=True
158 )
159
160 output_table = self._output_to_table(output=output)
161
162 # find 'tiller' pod in all pods
163 already_initialized = False
164 try:
165 for row in output_table:
166 if row[0].startswith("tiller-deploy"):
167 already_initialized = True
168 break
169 except Exception:
170 pass
171
172 # helm init
173 n2vc_installed_sw = False
174 if not already_initialized:
175 self.log.info(
176 "Initializing helm in client and server: {}".format(cluster_id)
177 )
178 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
179 self.kubectl_command, config_filename, self.service_account)
180 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
181
182 command = ("{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
183 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
184 ).format(self.kubectl_command, config_filename, self.service_account)
185 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=False)
186
187 command = ("{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
188 " --stable-repo-url {} init").format(self._helm_command,
189 config_filename,
190 namespace, helm_dir,
191 self.service_account,
192 self._stable_repo_url)
193 _, _rc = await self._local_async_exec(command=command, raise_exception_on_error=True)
194 n2vc_installed_sw = True
195 else:
196 # check client helm installation
197 check_file = helm_dir + "/repository/repositories.yaml"
198 if not self._check_file_exists(filename=check_file, exception_if_not_exists=False):
199 self.log.info("Initializing helm in client: {}".format(cluster_id))
200 command = (
201 "{} --kubeconfig={} --tiller-namespace={} "
202 "--home={} init --client-only --stable-repo-url {} "
203 ).format(self._helm_command, config_filename, namespace,
204 helm_dir, self._stable_repo_url)
205 output, _rc = await self._local_async_exec(
206 command=command, raise_exception_on_error=True
207 )
208 else:
209 self.log.info("Helm client already initialized")
210
211 # remove old stable repo and add new one
212 cluster_uuid = "{}:{}".format(namespace, cluster_id)
213 repo_list = await self.repo_list(cluster_uuid)
214 for repo in repo_list:
215 if repo["Name"] == "stable" and repo["URL"] != self._stable_repo_url:
216 self.log.debug("Add new stable repo url: {}")
217 await self.repo_remove(cluster_uuid,
218 "stable")
219 await self.repo_add(cluster_uuid,
220 "stable",
221 self._stable_repo_url)
222 break
223
224 self.log.info("Cluster {} initialized".format(cluster_id))
225
226 return cluster_uuid, n2vc_installed_sw
227
228 async def repo_add(
229 self, cluster_uuid: str, name: str, url: str, repo_type: str = "chart"
230 ):
231 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
232 self.log.debug("Cluster {}, adding {} repository {}. URL: {}".format(
233 cluster_id, repo_type, name, url))
234
235 # config filename
236 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
237 cluster_name=cluster_id, create_if_not_exist=True
238 )
239
240 # helm repo update
241 command = "{} --kubeconfig={} --home={} repo update".format(
242 self._helm_command, config_filename, helm_dir
243 )
244 self.log.debug("updating repo: {}".format(command))
245 await self._local_async_exec(command=command, raise_exception_on_error=False)
246
247 # helm repo add name url
248 command = "{} --kubeconfig={} --home={} repo add {} {}".format(
249 self._helm_command, config_filename, helm_dir, name, url
250 )
251 self.log.debug("adding repo: {}".format(command))
252 await self._local_async_exec(command=command, raise_exception_on_error=True)
253
254 async def repo_list(self, cluster_uuid: str) -> list:
255 """
256 Get the list of registered repositories
257
258 :return: list of registered repositories: [ (name, url) .... ]
259 """
260
261 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
262 self.log.debug("list repositories for cluster {}".format(cluster_id))
263
264 # config filename
265 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
266 cluster_name=cluster_id, create_if_not_exist=True
267 )
268
269 command = "{} --kubeconfig={} --home={} repo list --output yaml".format(
270 self._helm_command, config_filename, helm_dir
271 )
272
273 output, _rc = await self._local_async_exec(
274 command=command, raise_exception_on_error=True
275 )
276 if output and len(output) > 0:
277 return yaml.load(output, Loader=yaml.SafeLoader)
278 else:
279 return []
280
281 async def repo_remove(self, cluster_uuid: str, name: str):
282 """
283 Remove a repository from OSM
284
285 :param cluster_uuid: the cluster or 'namespace:cluster'
286 :param name: repo name in OSM
287 :return: True if successful
288 """
289
290 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
291 self.log.debug("list repositories for cluster {}".format(cluster_id))
292
293 # config filename
294 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
295 cluster_name=cluster_id, create_if_not_exist=True
296 )
297
298 command = "{} --kubeconfig={} --home={} repo remove {}".format(
299 self._helm_command, config_filename, helm_dir, name
300 )
301
302 await self._local_async_exec(command=command, raise_exception_on_error=True)
303
304 async def reset(
305 self, cluster_uuid: str, force: bool = False, uninstall_sw: bool = False
306 ) -> bool:
307
308 namespace, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
309 self.log.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
310 .format(cluster_id, uninstall_sw))
311
312 # get kube and helm directories
313 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
314 cluster_name=cluster_id, create_if_not_exist=False
315 )
316
317 # uninstall releases if needed.
318 if uninstall_sw:
319 releases = await self.instances_list(cluster_uuid=cluster_uuid)
320 if len(releases) > 0:
321 if force:
322 for r in releases:
323 try:
324 kdu_instance = r.get("Name")
325 chart = r.get("Chart")
326 self.log.debug(
327 "Uninstalling {} -> {}".format(chart, kdu_instance)
328 )
329 await self.uninstall(
330 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
331 )
332 except Exception as e:
333 self.log.error(
334 "Error uninstalling release {}: {}".format(kdu_instance, e)
335 )
336 else:
337 msg = (
338 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
339 ).format(cluster_id)
340 self.log.warn(msg)
341 uninstall_sw = False # Allow to remove k8s cluster without removing Tiller
342
343 if uninstall_sw:
344
345 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
346
347 if not namespace:
348 # find namespace for tiller pod
349 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
350 self.kubectl_command, config_filename
351 )
352 output, _rc = await self._local_async_exec(
353 command=command, raise_exception_on_error=False
354 )
355 output_table = K8sHelmConnector._output_to_table(output=output)
356 namespace = None
357 for r in output_table:
358 try:
359 if "tiller-deploy" in r[1]:
360 namespace = r[0]
361 break
362 except Exception:
363 pass
364 else:
365 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
366 self.log.error(msg)
367
368 self.log.debug("namespace for tiller: {}".format(namespace))
369
370 if namespace:
371 # uninstall tiller from cluster
372 self.log.debug(
373 "Uninstalling tiller from cluster {}".format(cluster_id)
374 )
375 command = "{} --kubeconfig={} --home={} reset".format(
376 self._helm_command, config_filename, helm_dir
377 )
378 self.log.debug("resetting: {}".format(command))
379 output, _rc = await self._local_async_exec(
380 command=command, raise_exception_on_error=True
381 )
382 # Delete clusterrolebinding and serviceaccount.
383 # Ignore if errors for backward compatibility
384 command = ("{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
385 "io/osm-tiller-cluster-rule").format(self.kubectl_command,
386 config_filename)
387 output, _rc = await self._local_async_exec(command=command,
388 raise_exception_on_error=False)
389 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".\
390 format(self.kubectl_command, config_filename, self.service_account)
391 output, _rc = await self._local_async_exec(command=command,
392 raise_exception_on_error=False)
393
394 else:
395 self.log.debug("namespace not found")
396
397 # delete cluster directory
398 direct = self.fs.path + "/" + cluster_id
399 self.log.debug("Removing directory {}".format(direct))
400 shutil.rmtree(direct, ignore_errors=True)
401
402 return True
403
404 async def install(
405 self,
406 cluster_uuid: str,
407 kdu_model: str,
408 atomic: bool = True,
409 timeout: float = 300,
410 params: dict = None,
411 db_dict: dict = None,
412 kdu_name: str = None,
413 namespace: str = None,
414 ):
415
416 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
417 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
418
419 # config filename
420 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
421 cluster_name=cluster_id, create_if_not_exist=True
422 )
423
424 # params to str
425 # params_str = K8sHelmConnector._params_to_set_option(params)
426 params_str, file_to_delete = self._params_to_file_option(
427 cluster_id=cluster_id, params=params
428 )
429
430 timeout_str = ""
431 if timeout:
432 timeout_str = "--timeout {}".format(timeout)
433
434 # atomic
435 atomic_str = ""
436 if atomic:
437 atomic_str = "--atomic"
438 # namespace
439 namespace_str = ""
440 if namespace:
441 namespace_str = "--namespace {}".format(namespace)
442
443 # version
444 version_str = ""
445 if ":" in kdu_model:
446 parts = kdu_model.split(sep=":")
447 if len(parts) == 2:
448 version_str = "--version {}".format(parts[1])
449 kdu_model = parts[0]
450
451 # generate a name for the release. Then, check if already exists
452 kdu_instance = None
453 while kdu_instance is None:
454 kdu_instance = K8sHelmConnector._generate_release_name(kdu_model)
455 try:
456 result = await self._status_kdu(
457 cluster_id=cluster_id,
458 kdu_instance=kdu_instance,
459 show_error_log=False,
460 )
461 if result is not None:
462 # instance already exists: generate a new one
463 kdu_instance = None
464 except K8sException:
465 pass
466
467 # helm repo install
468 command = (
469 "{helm} install {atomic} --output yaml --kubeconfig={config} --home={dir} "
470 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
471 helm=self._helm_command,
472 atomic=atomic_str,
473 config=config_filename,
474 dir=helm_dir,
475 params=params_str,
476 timeout=timeout_str,
477 name=kdu_instance,
478 ns=namespace_str,
479 model=kdu_model,
480 ver=version_str,
481 )
482 )
483 self.log.debug("installing: {}".format(command))
484
485 if atomic:
486 # exec helm in a task
487 exec_task = asyncio.ensure_future(
488 coro_or_future=self._local_async_exec(
489 command=command, raise_exception_on_error=False
490 )
491 )
492
493 # write status in another task
494 status_task = asyncio.ensure_future(
495 coro_or_future=self._store_status(
496 cluster_id=cluster_id,
497 kdu_instance=kdu_instance,
498 db_dict=db_dict,
499 operation="install",
500 run_once=False,
501 )
502 )
503
504 # wait for execution task
505 await asyncio.wait([exec_task])
506
507 # cancel status task
508 status_task.cancel()
509
510 output, rc = exec_task.result()
511
512 else:
513
514 output, rc = await self._local_async_exec(
515 command=command, raise_exception_on_error=False
516 )
517
518 # remove temporal values yaml file
519 if file_to_delete:
520 os.remove(file_to_delete)
521
522 # write final status
523 await self._store_status(
524 cluster_id=cluster_id,
525 kdu_instance=kdu_instance,
526 db_dict=db_dict,
527 operation="install",
528 run_once=True,
529 check_every=0,
530 )
531
532 if rc != 0:
533 msg = "Error executing command: {}\nOutput: {}".format(command, output)
534 self.log.error(msg)
535 raise K8sException(msg)
536
537 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
538 return kdu_instance
539
540 async def instances_list(self, cluster_uuid: str) -> list:
541 """
542 returns a list of deployed releases in a cluster
543
544 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
545 :return:
546 """
547
548 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
549 self.log.debug("list releases for cluster {}".format(cluster_id))
550
551 # config filename
552 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
553 cluster_name=cluster_id, create_if_not_exist=True
554 )
555
556 command = "{} --kubeconfig={} --home={} list --output yaml".format(
557 self._helm_command, config_filename, helm_dir
558 )
559
560 output, _rc = await self._local_async_exec(
561 command=command, raise_exception_on_error=True
562 )
563
564 if output and len(output) > 0:
565 return yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
566 else:
567 return []
568
569 async def upgrade(
570 self,
571 cluster_uuid: str,
572 kdu_instance: str,
573 kdu_model: str = None,
574 atomic: bool = True,
575 timeout: float = 300,
576 params: dict = None,
577 db_dict: dict = None,
578 ):
579
580 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
581 self.log.debug("upgrading {} in cluster {}".format(kdu_model, cluster_id))
582
583 # config filename
584 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
585 cluster_name=cluster_id, create_if_not_exist=True
586 )
587
588 # params to str
589 # params_str = K8sHelmConnector._params_to_set_option(params)
590 params_str, file_to_delete = self._params_to_file_option(
591 cluster_id=cluster_id, params=params
592 )
593
594 timeout_str = ""
595 if timeout:
596 timeout_str = "--timeout {}".format(timeout)
597
598 # atomic
599 atomic_str = ""
600 if atomic:
601 atomic_str = "--atomic"
602
603 # version
604 version_str = ""
605 if kdu_model and ":" in kdu_model:
606 parts = kdu_model.split(sep=":")
607 if len(parts) == 2:
608 version_str = "--version {}".format(parts[1])
609 kdu_model = parts[0]
610
611 # helm repo upgrade
612 command = (
613 "{} upgrade {} --output yaml --kubeconfig={} " "--home={} {} {} {} {} {}"
614 ).format(
615 self._helm_command,
616 atomic_str,
617 config_filename,
618 helm_dir,
619 params_str,
620 timeout_str,
621 kdu_instance,
622 kdu_model,
623 version_str,
624 )
625 self.log.debug("upgrading: {}".format(command))
626
627 if atomic:
628
629 # exec helm in a task
630 exec_task = asyncio.ensure_future(
631 coro_or_future=self._local_async_exec(
632 command=command, raise_exception_on_error=False
633 )
634 )
635 # write status in another task
636 status_task = asyncio.ensure_future(
637 coro_or_future=self._store_status(
638 cluster_id=cluster_id,
639 kdu_instance=kdu_instance,
640 db_dict=db_dict,
641 operation="upgrade",
642 run_once=False,
643 )
644 )
645
646 # wait for execution task
647 await asyncio.wait([exec_task])
648
649 # cancel status task
650 status_task.cancel()
651 output, rc = exec_task.result()
652
653 else:
654
655 output, rc = await self._local_async_exec(
656 command=command, raise_exception_on_error=False
657 )
658
659 # remove temporal values yaml file
660 if file_to_delete:
661 os.remove(file_to_delete)
662
663 # write final status
664 await self._store_status(
665 cluster_id=cluster_id,
666 kdu_instance=kdu_instance,
667 db_dict=db_dict,
668 operation="upgrade",
669 run_once=True,
670 check_every=0,
671 )
672
673 if rc != 0:
674 msg = "Error executing command: {}\nOutput: {}".format(command, output)
675 self.log.error(msg)
676 raise K8sException(msg)
677
678 # return new revision number
679 instance = await self.get_instance_info(
680 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
681 )
682 if instance:
683 revision = int(instance.get("Revision"))
684 self.log.debug("New revision: {}".format(revision))
685 return revision
686 else:
687 return 0
688
689 async def rollback(
690 self, cluster_uuid: str, kdu_instance: str, revision=0, db_dict: dict = None
691 ):
692
693 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
694 self.log.debug(
695 "rollback kdu_instance {} to revision {} from cluster {}".format(
696 kdu_instance, revision, cluster_id
697 )
698 )
699
700 # config filename
701 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
702 cluster_name=cluster_id, create_if_not_exist=True
703 )
704
705 command = "{} rollback --kubeconfig={} --home={} {} {} --wait".format(
706 self._helm_command, config_filename, helm_dir, kdu_instance, revision
707 )
708
709 # exec helm in a task
710 exec_task = asyncio.ensure_future(
711 coro_or_future=self._local_async_exec(
712 command=command, raise_exception_on_error=False
713 )
714 )
715 # write status in another task
716 status_task = asyncio.ensure_future(
717 coro_or_future=self._store_status(
718 cluster_id=cluster_id,
719 kdu_instance=kdu_instance,
720 db_dict=db_dict,
721 operation="rollback",
722 run_once=False,
723 )
724 )
725
726 # wait for execution task
727 await asyncio.wait([exec_task])
728
729 # cancel status task
730 status_task.cancel()
731
732 output, rc = exec_task.result()
733
734 # write final status
735 await self._store_status(
736 cluster_id=cluster_id,
737 kdu_instance=kdu_instance,
738 db_dict=db_dict,
739 operation="rollback",
740 run_once=True,
741 check_every=0,
742 )
743
744 if rc != 0:
745 msg = "Error executing command: {}\nOutput: {}".format(command, output)
746 self.log.error(msg)
747 raise K8sException(msg)
748
749 # return new revision number
750 instance = await self.get_instance_info(
751 cluster_uuid=cluster_uuid, kdu_instance=kdu_instance
752 )
753 if instance:
754 revision = int(instance.get("Revision"))
755 self.log.debug("New revision: {}".format(revision))
756 return revision
757 else:
758 return 0
759
760 async def uninstall(self, cluster_uuid: str, kdu_instance: str):
761 """
762 Removes an existing KDU instance. It would implicitly use the `delete` call
763 (this call would happen after all _terminate-config-primitive_ of the VNF
764 are invoked).
765
766 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
767 :param kdu_instance: unique name for the KDU instance to be deleted
768 :return: True if successful
769 """
770
771 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
772 self.log.debug(
773 "uninstall kdu_instance {} from cluster {}".format(
774 kdu_instance, cluster_id
775 )
776 )
777
778 # config filename
779 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
780 cluster_name=cluster_id, create_if_not_exist=True
781 )
782
783 command = "{} --kubeconfig={} --home={} delete --purge {}".format(
784 self._helm_command, config_filename, helm_dir, kdu_instance
785 )
786
787 output, _rc = await self._local_async_exec(
788 command=command, raise_exception_on_error=True
789 )
790
791 return self._output_to_table(output)
792
793 async def exec_primitive(
794 self,
795 cluster_uuid: str = None,
796 kdu_instance: str = None,
797 primitive_name: str = None,
798 timeout: float = 300,
799 params: dict = None,
800 db_dict: dict = None,
801 ) -> str:
802 """Exec primitive (Juju action)
803
804 :param cluster_uuid str: The UUID of the cluster or namespace:cluster
805 :param kdu_instance str: The unique name of the KDU instance
806 :param primitive_name: Name of action that will be executed
807 :param timeout: Timeout for action execution
808 :param params: Dictionary of all the parameters needed for the action
809 :db_dict: Dictionary for any additional data
810
811 :return: Returns the output of the action
812 """
813 raise K8sException(
814 "KDUs deployed with Helm don't support actions "
815 "different from rollback, upgrade and status"
816 )
817
818 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
819
820 self.log.debug(
821 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
822 )
823
824 return await self._exec_inspect_comand(
825 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
826 )
827
828 async def values_kdu(self, kdu_model: str, repo_url: str = None) -> str:
829
830 self.log.debug(
831 "inspect kdu_model values {} from (optional) repo: {}".format(
832 kdu_model, repo_url
833 )
834 )
835
836 return await self._exec_inspect_comand(
837 inspect_command="values", kdu_model=kdu_model, repo_url=repo_url
838 )
839
840 async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
841
842 self.log.debug(
843 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
844 )
845
846 return await self._exec_inspect_comand(
847 inspect_command="readme", kdu_model=kdu_model, repo_url=repo_url
848 )
849
850 async def status_kdu(self, cluster_uuid: str, kdu_instance: str) -> str:
851
852 # call internal function
853 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
854 return await self._status_kdu(
855 cluster_id=cluster_id,
856 kdu_instance=kdu_instance,
857 show_error_log=True,
858 return_text=True,
859 )
860
861 async def get_services(self,
862 cluster_uuid: str,
863 kdu_instance: str,
864 namespace: str) -> list:
865
866 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
867 self.log.debug(
868 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
869 cluster_uuid, kdu_instance
870 )
871 )
872
873 status = await self._status_kdu(
874 cluster_id, kdu_instance, return_text=False
875 )
876
877 service_names = self._parse_helm_status_service_info(status)
878 service_list = []
879 for service in service_names:
880 service = await self.get_service(cluster_uuid, service, namespace)
881 service_list.append(service)
882
883 return service_list
884
885 async def get_service(self,
886 cluster_uuid: str,
887 service_name: str,
888 namespace: str) -> object:
889
890 self.log.debug(
891 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
892 service_name, namespace, cluster_uuid)
893 )
894
895 # get paths
896 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
897 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
898 cluster_name=cluster_id, create_if_not_exist=True
899 )
900
901 command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
902 self.kubectl_command, config_filename, namespace, service_name
903 )
904
905 output, _rc = await self._local_async_exec(
906 command=command, raise_exception_on_error=True
907 )
908
909 data = yaml.load(output, Loader=yaml.SafeLoader)
910
911 service = {
912 "name": service_name,
913 "type": self._get_deep(data, ("spec", "type")),
914 "ports": self._get_deep(data, ("spec", "ports")),
915 "cluster_ip": self._get_deep(data, ("spec", "clusterIP"))
916 }
917 if service["type"] == "LoadBalancer":
918 ip_map_list = self._get_deep(data, ("status", "loadBalancer", "ingress"))
919 ip_list = [elem["ip"] for elem in ip_map_list]
920 service["external_ip"] = ip_list
921
922 return service
923
924 async def synchronize_repos(self, cluster_uuid: str):
925
926 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
927 self.log.debug("syncronize repos for cluster helm-id: {}".format(cluster_id))
928 try:
929 update_repos_timeout = (
930 300 # max timeout to sync a single repos, more than this is too much
931 )
932 db_k8scluster = self.db.get_one(
933 "k8sclusters", {"_admin.helm-chart.id": cluster_uuid}
934 )
935 if db_k8scluster:
936 nbi_repo_list = (
937 db_k8scluster.get("_admin").get("helm_chart_repos") or []
938 )
939 cluster_repo_dict = (
940 db_k8scluster.get("_admin").get("helm_charts_added") or {}
941 )
942 # elements that must be deleted
943 deleted_repo_list = []
944 added_repo_dict = {}
945 # self.log.debug("helm_chart_repos: {}".format(nbi_repo_list))
946 # self.log.debug("helm_charts_added: {}".format(cluster_repo_dict))
947
948 # obtain repos to add: registered by nbi but not added
949 repos_to_add = [
950 repo for repo in nbi_repo_list if not cluster_repo_dict.get(repo)
951 ]
952
953 # obtain repos to delete: added by cluster but not in nbi list
954 repos_to_delete = [
955 repo
956 for repo in cluster_repo_dict.keys()
957 if repo not in nbi_repo_list
958 ]
959
960 # delete repos: must delete first then add because there may be
961 # different repos with same name but
962 # different id and url
963 if repos_to_delete:
964 self.log.debug("repos to delete: {}".format(repos_to_delete))
965 for repo_id in repos_to_delete:
966 # try to delete repos
967 try:
968 repo_delete_task = asyncio.ensure_future(
969 self.repo_remove(
970 cluster_uuid=cluster_uuid,
971 name=cluster_repo_dict[repo_id],
972 )
973 )
974 await asyncio.wait_for(repo_delete_task, update_repos_timeout)
975 except Exception as e:
976 self.warning(
977 "Error deleting repo, id: {}, name: {}, err_msg: {}".format(
978 repo_id, cluster_repo_dict[repo_id], str(e)
979 )
980 )
981 # always add to the list of to_delete if there is an error
982 # because if is not there
983 # deleting raises error
984 deleted_repo_list.append(repo_id)
985
986 # add repos
987 if repos_to_add:
988 self.log.debug("repos to add: {}".format(repos_to_add))
989 for repo_id in repos_to_add:
990 # obtain the repo data from the db
991 # if there is an error getting the repo in the database we will
992 # ignore this repo and continue
993 # because there is a possible race condition where the repo has
994 # been deleted while processing
995 db_repo = self.db.get_one("k8srepos", {"_id": repo_id})
996 self.log.debug(
997 "obtained repo: id, {}, name: {}, url: {}".format(
998 repo_id, db_repo["name"], db_repo["url"]
999 )
1000 )
1001 try:
1002 repo_add_task = asyncio.ensure_future(
1003 self.repo_add(
1004 cluster_uuid=cluster_uuid,
1005 name=db_repo["name"],
1006 url=db_repo["url"],
1007 repo_type="chart",
1008 )
1009 )
1010 await asyncio.wait_for(repo_add_task, update_repos_timeout)
1011 added_repo_dict[repo_id] = db_repo["name"]
1012 self.log.debug(
1013 "added repo: id, {}, name: {}".format(
1014 repo_id, db_repo["name"]
1015 )
1016 )
1017 except Exception as e:
1018 # deal with error adding repo, adding a repo that already
1019 # exists does not raise any error
1020 # will not raise error because a wrong repos added by
1021 # anyone could prevent instantiating any ns
1022 self.log.error(
1023 "Error adding repo id: {}, err_msg: {} ".format(
1024 repo_id, repr(e)
1025 )
1026 )
1027
1028 return deleted_repo_list, added_repo_dict
1029
1030 else: # else db_k8scluster does not exist
1031 raise K8sException(
1032 "k8cluster with helm-id : {} not found".format(cluster_uuid)
1033 )
1034
1035 except Exception as e:
1036 self.log.error("Error synchronizing repos: {}".format(str(e)))
1037 raise K8sException("Error synchronizing repos")
1038
1039 """
1040 ####################################################################################
1041 ################################### P R I V A T E ##################################
1042 ####################################################################################
1043 """
1044
1045 async def _exec_inspect_comand(
1046 self, inspect_command: str, kdu_model: str, repo_url: str = None
1047 ):
1048
1049 repo_str = ""
1050 if repo_url:
1051 repo_str = " --repo {}".format(repo_url)
1052 idx = kdu_model.find("/")
1053 if idx >= 0:
1054 idx += 1
1055 kdu_model = kdu_model[idx:]
1056
1057 inspect_command = "{} inspect {} {}{}".format(
1058 self._helm_command, inspect_command, kdu_model, repo_str
1059 )
1060 output, _rc = await self._local_async_exec(
1061 command=inspect_command, encode_utf8=True
1062 )
1063
1064 return output
1065
1066 async def _status_kdu(
1067 self,
1068 cluster_id: str,
1069 kdu_instance: str,
1070 show_error_log: bool = False,
1071 return_text: bool = False,
1072 ):
1073
1074 self.log.debug("status of kdu_instance {}".format(kdu_instance))
1075
1076 # config filename
1077 _kube_dir, helm_dir, config_filename, _cluster_dir = self._get_paths(
1078 cluster_name=cluster_id, create_if_not_exist=True
1079 )
1080
1081 command = "{} --kubeconfig={} --home={} status {} --output yaml".format(
1082 self._helm_command, config_filename, helm_dir, kdu_instance
1083 )
1084
1085 output, rc = await self._local_async_exec(
1086 command=command,
1087 raise_exception_on_error=True,
1088 show_error_log=show_error_log,
1089 )
1090
1091 if return_text:
1092 return str(output)
1093
1094 if rc != 0:
1095 return None
1096
1097 data = yaml.load(output, Loader=yaml.SafeLoader)
1098
1099 # remove field 'notes'
1100 try:
1101 del data.get("info").get("status")["notes"]
1102 except KeyError:
1103 pass
1104
1105 # parse field 'resources'
1106 try:
1107 resources = str(data.get("info").get("status").get("resources"))
1108 resource_table = self._output_to_table(resources)
1109 data.get("info").get("status")["resources"] = resource_table
1110 except Exception:
1111 pass
1112
1113 return data
1114
1115 async def get_instance_info(self, cluster_uuid: str, kdu_instance: str):
1116 instances = await self.instances_list(cluster_uuid=cluster_uuid)
1117 for instance in instances:
1118 if instance.get("Name") == kdu_instance:
1119 return instance
1120 self.log.debug("Instance {} not found".format(kdu_instance))
1121 return None
1122
1123 @staticmethod
1124 def _generate_release_name(chart_name: str):
1125 # check embeded chart (file or dir)
1126 if chart_name.startswith("/"):
1127 # extract file or directory name
1128 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1129 # check URL
1130 elif "://" in chart_name:
1131 # extract last portion of URL
1132 chart_name = chart_name[chart_name.rfind("/") + 1 :]
1133
1134 name = ""
1135 for c in chart_name:
1136 if c.isalpha() or c.isnumeric():
1137 name += c
1138 else:
1139 name += "-"
1140 if len(name) > 35:
1141 name = name[0:35]
1142
1143 # if does not start with alpha character, prefix 'a'
1144 if not name[0].isalpha():
1145 name = "a" + name
1146
1147 name += "-"
1148
1149 def get_random_number():
1150 r = random.randrange(start=1, stop=99999999)
1151 s = str(r)
1152 s = s.rjust(10, "0")
1153 return s
1154
1155 name = name + get_random_number()
1156 return name.lower()
1157
1158 async def _store_status(
1159 self,
1160 cluster_id: str,
1161 operation: str,
1162 kdu_instance: str,
1163 check_every: float = 10,
1164 db_dict: dict = None,
1165 run_once: bool = False,
1166 ):
1167 previous_exception = None
1168 while True:
1169 try:
1170 await asyncio.sleep(check_every)
1171 detailed_status = await self._status_kdu(
1172 cluster_id=cluster_id, kdu_instance=kdu_instance,
1173 return_text=False
1174 )
1175 status = detailed_status.get("info").get("Description")
1176 self.log.debug('KDU {} STATUS: {}.'.format(kdu_instance, status))
1177 # write status to db
1178 result = await self.write_app_status_to_db(
1179 db_dict=db_dict,
1180 status=str(status),
1181 detailed_status=str(detailed_status),
1182 operation=operation,
1183 )
1184 if not result:
1185 self.log.info("Error writing in database. Task exiting...")
1186 return
1187 except asyncio.CancelledError:
1188 self.log.debug("Task cancelled")
1189 return
1190 except Exception as e:
1191 # log only once in the while loop
1192 if str(previous_exception) != str(e):
1193 self.log.debug("_store_status exception: {}".format(str(e)))
1194 previous_exception = e
1195 finally:
1196 if run_once:
1197 return
1198
1199 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
1200
1201 status = await self._status_kdu(
1202 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
1203 )
1204
1205 # extract info.status.resources-> str
1206 # format:
1207 # ==> v1/Deployment
1208 # NAME READY UP-TO-DATE AVAILABLE AGE
1209 # halting-horse-mongodb 0/1 1 0 0s
1210 # halting-petit-mongodb 1/1 1 0 0s
1211 # blank line
1212 resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
1213
1214 # convert to table
1215 resources = K8sHelmConnector._output_to_table(resources)
1216
1217 num_lines = len(resources)
1218 index = 0
1219 while index < num_lines:
1220 try:
1221 line1 = resources[index]
1222 index += 1
1223 # find '==>' in column 0
1224 if line1[0] == "==>":
1225 line2 = resources[index]
1226 index += 1
1227 # find READY in column 1
1228 if line2[1] == "READY":
1229 # read next lines
1230 line3 = resources[index]
1231 index += 1
1232 while len(line3) > 1 and index < num_lines:
1233 ready_value = line3[1]
1234 parts = ready_value.split(sep="/")
1235 current = int(parts[0])
1236 total = int(parts[1])
1237 if current < total:
1238 self.log.debug("NOT READY:\n {}".format(line3))
1239 ready = False
1240 line3 = resources[index]
1241 index += 1
1242
1243 except Exception:
1244 pass
1245
1246 return ready
1247
1248 def _parse_helm_status_service_info(self, status):
1249
1250 # extract info.status.resources-> str
1251 # format:
1252 # ==> v1/Deployment
1253 # NAME READY UP-TO-DATE AVAILABLE AGE
1254 # halting-horse-mongodb 0/1 1 0 0s
1255 # halting-petit-mongodb 1/1 1 0 0s
1256 # blank line
1257 resources = K8sHelmConnector._get_deep(status, ("info", "status", "resources"))
1258
1259 service_list = []
1260 first_line_skipped = service_found = False
1261 for line in resources:
1262 if not service_found:
1263 if len(line) >= 2 and line[0] == "==>" and line[1] == "v1/Service":
1264 service_found = True
1265 continue
1266 else:
1267 if len(line) >= 2 and line[0] == "==>":
1268 service_found = first_line_skipped = False
1269 continue
1270 if not line:
1271 continue
1272 if not first_line_skipped:
1273 first_line_skipped = True
1274 continue
1275 service_list.append(line[0])
1276
1277 return service_list
1278
1279 @staticmethod
1280 def _get_deep(dictionary: dict, members: tuple):
1281 target = dictionary
1282 value = None
1283 try:
1284 for m in members:
1285 value = target.get(m)
1286 if not value:
1287 return None
1288 else:
1289 target = value
1290 except Exception:
1291 pass
1292 return value
1293
1294 # find key:value in several lines
1295 @staticmethod
1296 def _find_in_lines(p_lines: list, p_key: str) -> str:
1297 for line in p_lines:
1298 try:
1299 if line.startswith(p_key + ":"):
1300 parts = line.split(":")
1301 the_value = parts[1].strip()
1302 return the_value
1303 except Exception:
1304 # ignore it
1305 pass
1306 return None
1307
1308 # params for use in -f file
1309 # returns values file option and filename (in order to delete it at the end)
1310 def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
1311
1312 if params and len(params) > 0:
1313 self._get_paths(cluster_name=cluster_id, create_if_not_exist=True)
1314
1315 def get_random_number():
1316 r = random.randrange(start=1, stop=99999999)
1317 s = str(r)
1318 while len(s) < 10:
1319 s = "0" + s
1320 return s
1321
1322 params2 = dict()
1323 for key in params:
1324 value = params.get(key)
1325 if "!!yaml" in str(value):
1326 value = yaml.load(value[7:])
1327 params2[key] = value
1328
1329 values_file = get_random_number() + ".yaml"
1330 with open(values_file, "w") as stream:
1331 yaml.dump(params2, stream, indent=4, default_flow_style=False)
1332
1333 return "-f {}".format(values_file), values_file
1334
1335 return "", None
1336
1337 # params for use in --set option
1338 @staticmethod
1339 def _params_to_set_option(params: dict) -> str:
1340 params_str = ""
1341 if params and len(params) > 0:
1342 start = True
1343 for key in params:
1344 value = params.get(key, None)
1345 if value is not None:
1346 if start:
1347 params_str += "--set "
1348 start = False
1349 else:
1350 params_str += ","
1351 params_str += "{}={}".format(key, value)
1352 return params_str
1353
1354 @staticmethod
1355 def _output_to_lines(output: str) -> list:
1356 output_lines = list()
1357 lines = output.splitlines(keepends=False)
1358 for line in lines:
1359 line = line.strip()
1360 if len(line) > 0:
1361 output_lines.append(line)
1362 return output_lines
1363
1364 @staticmethod
1365 def _output_to_table(output: str) -> list:
1366 output_table = list()
1367 lines = output.splitlines(keepends=False)
1368 for line in lines:
1369 line = line.replace("\t", " ")
1370 line_list = list()
1371 output_table.append(line_list)
1372 cells = line.split(sep=" ")
1373 for cell in cells:
1374 cell = cell.strip()
1375 if len(cell) > 0:
1376 line_list.append(cell)
1377 return output_table
1378
1379 def _get_paths(
1380 self, cluster_name: str, create_if_not_exist: bool = False
1381 ) -> (str, str, str, str):
1382 """
1383 Returns kube and helm directories
1384
1385 :param cluster_name:
1386 :param create_if_not_exist:
1387 :return: kube, helm directories, config filename and cluster dir.
1388 Raises exception if not exist and cannot create
1389 """
1390
1391 base = self.fs.path
1392 if base.endswith("/") or base.endswith("\\"):
1393 base = base[:-1]
1394
1395 # base dir for cluster
1396 cluster_dir = base + "/" + cluster_name
1397 if create_if_not_exist and not os.path.exists(cluster_dir):
1398 self.log.debug("Creating dir {}".format(cluster_dir))
1399 os.makedirs(cluster_dir)
1400 if not os.path.exists(cluster_dir):
1401 msg = "Base cluster dir {} does not exist".format(cluster_dir)
1402 self.log.error(msg)
1403 raise K8sException(msg)
1404
1405 # kube dir
1406 kube_dir = cluster_dir + "/" + ".kube"
1407 if create_if_not_exist and not os.path.exists(kube_dir):
1408 self.log.debug("Creating dir {}".format(kube_dir))
1409 os.makedirs(kube_dir)
1410 if not os.path.exists(kube_dir):
1411 msg = "Kube config dir {} does not exist".format(kube_dir)
1412 self.log.error(msg)
1413 raise K8sException(msg)
1414
1415 # helm home dir
1416 helm_dir = cluster_dir + "/" + ".helm"
1417 if create_if_not_exist and not os.path.exists(helm_dir):
1418 self.log.debug("Creating dir {}".format(helm_dir))
1419 os.makedirs(helm_dir)
1420 if not os.path.exists(helm_dir):
1421 msg = "Helm config dir {} does not exist".format(helm_dir)
1422 self.log.error(msg)
1423 raise K8sException(msg)
1424
1425 config_filename = kube_dir + "/config"
1426 return kube_dir, helm_dir, config_filename, cluster_dir
1427
1428 @staticmethod
1429 def _remove_multiple_spaces(strobj):
1430 strobj = strobj.strip()
1431 while " " in strobj:
1432 strobj = strobj.replace(" ", " ")
1433 return strobj
1434
1435 def _local_exec(self, command: str) -> (str, int):
1436 command = K8sHelmConnector._remove_multiple_spaces(command)
1437 self.log.debug("Executing sync local command: {}".format(command))
1438 # raise exception if fails
1439 output = ""
1440 try:
1441 output = subprocess.check_output(
1442 command, shell=True, universal_newlines=True
1443 )
1444 return_code = 0
1445 self.log.debug(output)
1446 except Exception:
1447 return_code = 1
1448
1449 return output, return_code
1450
1451 async def _local_async_exec(
1452 self,
1453 command: str,
1454 raise_exception_on_error: bool = False,
1455 show_error_log: bool = True,
1456 encode_utf8: bool = False,
1457 ) -> (str, int):
1458
1459 command = K8sHelmConnector._remove_multiple_spaces(command)
1460 self.log.debug("Executing async local command: {}".format(command))
1461
1462 # split command
1463 command = command.split(sep=" ")
1464
1465 try:
1466 process = await asyncio.create_subprocess_exec(
1467 *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
1468 )
1469
1470 # wait for command terminate
1471 stdout, stderr = await process.communicate()
1472
1473 return_code = process.returncode
1474
1475 output = ""
1476 if stdout:
1477 output = stdout.decode("utf-8").strip()
1478 # output = stdout.decode()
1479 if stderr:
1480 output = stderr.decode("utf-8").strip()
1481 # output = stderr.decode()
1482
1483 if return_code != 0 and show_error_log:
1484 self.log.debug(
1485 "Return code (FAIL): {}\nOutput:\n{}".format(return_code, output)
1486 )
1487 else:
1488 self.log.debug("Return code: {}".format(return_code))
1489
1490 if raise_exception_on_error and return_code != 0:
1491 raise K8sException(output)
1492
1493 if encode_utf8:
1494 output = output.encode("utf-8").strip()
1495 output = str(output).replace("\\n", "\n")
1496
1497 return output, return_code
1498
1499 except asyncio.CancelledError:
1500 raise
1501 except K8sException:
1502 raise
1503 except Exception as e:
1504 msg = "Exception executing command: {} -> {}".format(command, e)
1505 self.log.error(msg)
1506 if raise_exception_on_error:
1507 raise K8sException(e) from e
1508 else:
1509 return "", -1
1510
1511 def _check_file_exists(self, filename: str, exception_if_not_exists: bool = False):
1512 # self.log.debug('Checking if file {} exists...'.format(filename))
1513 if os.path.exists(filename):
1514 return True
1515 else:
1516 msg = "File {} does not exist".format(filename)
1517 if exception_if_not_exists:
1518 # self.log.error(msg)
1519 raise K8sException(msg)