5a788e9aade5c904074e251b61306bfbfb6d2b1f
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelmConnector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm",
44 log: object = None,
45 on_update_db=None,
46 ):
47 """
48 Initializes helm connector for helm v2
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(
60 self,
61 db=db,
62 log=log,
63 fs=fs,
64 kubectl_command=kubectl_command,
65 helm_command=helm_command,
66 on_update_db=on_update_db,
67 )
68
69 self.log.info("Initializing K8S Helm2 connector")
70
71 # initialize helm client-only
72 self.log.debug("Initializing helm client-only...")
73 command = "{} init --client-only {} ".format(
74 self._helm_command,
75 "--stable-repo-url {}".format(self._stable_repo_url)
76 if self._stable_repo_url
77 else "--skip-repos",
78 )
79 try:
80 asyncio.ensure_future(
81 self._local_async_exec(command=command, raise_exception_on_error=False)
82 )
83 # loop = asyncio.get_event_loop()
84 # loop.run_until_complete(self._local_async_exec(command=command,
85 # raise_exception_on_error=False))
86 except Exception as e:
87 self.warning(
88 msg="helm init failed (it was already initialized): {}".format(e)
89 )
90
91 self.log.info("K8S Helm2 connector initialized")
92
93 async def install(
94 self,
95 cluster_uuid: str,
96 kdu_model: str,
97 kdu_instance: str,
98 atomic: bool = True,
99 timeout: float = 300,
100 params: dict = None,
101 db_dict: dict = None,
102 kdu_name: str = None,
103 namespace: str = None,
104 **kwargs,
105 ):
106 """
107 Deploys of a new KDU instance. It would implicitly rely on the `install` call
108 to deploy the Chart/Bundle properly parametrized (in practice, this call would
109 happen before any _initial-config-primitive_of the VNF is called).
110
111 :param cluster_uuid: UUID of a K8s cluster known by OSM
112 :param kdu_model: chart/ reference (string), which can be either
113 of these options:
114 - a name of chart available via the repos known by OSM
115 - a path to a packaged chart
116 - a path to an unpacked chart directory or a URL
117 :param kdu_instance: Kdu instance name
118 :param atomic: If set, installation process purges chart/bundle on fail, also
119 will wait until all the K8s objects are active
120 :param timeout: Time in seconds to wait for the install of the chart/bundle
121 (defaults to Helm default timeout: 300s)
122 :param params: dictionary of key-value pairs for instantiation parameters
123 (overriding default values)
124 :param dict db_dict: where to write into database when the status changes.
125 It contains a dict with {collection: <str>, filter: {},
126 path: <str>},
127 e.g. {collection: "nsrs", filter:
128 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
129 :param kdu_name: Name of the KDU instance to be installed
130 :param namespace: K8s namespace to use for the KDU instance
131 :param kwargs: Additional parameters (None yet)
132 :return: True if successful
133 """
134 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
135 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
136
137 # sync local dir
138 self.fs.sync(from_path=cluster_id)
139
140 # init env, paths
141 paths, env = self._init_paths_env(
142 cluster_name=cluster_id, create_if_not_exist=True
143 )
144
145 await self._install_impl(
146 cluster_id,
147 kdu_model,
148 paths,
149 env,
150 kdu_instance,
151 atomic=atomic,
152 timeout=timeout,
153 params=params,
154 db_dict=db_dict,
155 kdu_name=kdu_name,
156 namespace=namespace,
157 )
158
159 # sync fs
160 self.fs.reverse_sync(from_path=cluster_id)
161
162 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
163 return True
164
165 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
166
167 self.log.debug(
168 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
169 )
170
171 return await self._exec_inspect_command(
172 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
173 )
174
175 """
176 ####################################################################################
177 ################################### P R I V A T E ##################################
178 ####################################################################################
179 """
180
181 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
182 """
183 Creates and returns base cluster and kube dirs and returns them.
184 Also created helm3 dirs according to new directory specification, paths are
185 returned and also environment variables that must be provided to execute commands
186
187 Helm 2 directory specification uses helm_home dir:
188
189 The variables assigned for this paths are:
190 - Helm hone: $HELM_HOME
191 - helm kubeconfig: $KUBECONFIG
192
193 :param cluster_name: cluster_name
194 :return: Dictionary with config_paths and dictionary with helm environment variables
195 """
196 base = self.fs.path
197 if base.endswith("/") or base.endswith("\\"):
198 base = base[:-1]
199
200 # base dir for cluster
201 cluster_dir = base + "/" + cluster_name
202
203 # kube dir
204 kube_dir = cluster_dir + "/" + ".kube"
205 if create_if_not_exist and not os.path.exists(kube_dir):
206 self.log.debug("Creating dir {}".format(kube_dir))
207 os.makedirs(kube_dir)
208
209 # helm home dir
210 helm_dir = cluster_dir + "/" + ".helm"
211 if create_if_not_exist and not os.path.exists(helm_dir):
212 self.log.debug("Creating dir {}".format(helm_dir))
213 os.makedirs(helm_dir)
214
215 config_filename = kube_dir + "/config"
216
217 # 2 - Prepare dictionary with paths
218 paths = {
219 "kube_dir": kube_dir,
220 "kube_config": config_filename,
221 "cluster_dir": cluster_dir,
222 "helm_dir": helm_dir,
223 }
224
225 for file_name, file in paths.items():
226 if "dir" in file_name and not os.path.exists(file):
227 err_msg = "{} dir does not exist".format(file)
228 self.log.error(err_msg)
229 raise K8sException(err_msg)
230
231 # 3 - Prepare environment variables
232 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
233
234 return paths, env
235
236 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
237
238 # init config, env
239 paths, env = self._init_paths_env(
240 cluster_name=cluster_id, create_if_not_exist=True
241 )
242
243 command1 = "env KUBECONFIG={} {} get manifest {} ".format(
244 kubeconfig, self._helm_command, kdu_instance
245 )
246 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
247 output, _rc = await self._local_async_exec_pipe(
248 command1, command2, env=env, raise_exception_on_error=True
249 )
250 services = self._parse_services(output)
251
252 return services
253
254 async def _cluster_init(
255 self, cluster_id: str, namespace: str, paths: dict, env: dict
256 ):
257 """
258 Implements the helm version dependent cluster initialization:
259 For helm2 it initialized tiller environment if needed
260 """
261
262 # check if tiller pod is up in cluster
263 command = "{} --kubeconfig={} --namespace={} get deployments".format(
264 self.kubectl_command, paths["kube_config"], namespace
265 )
266 output, _rc = await self._local_async_exec(
267 command=command, raise_exception_on_error=True, env=env
268 )
269
270 output_table = self._output_to_table(output=output)
271
272 # find 'tiller' pod in all pods
273 already_initialized = False
274 try:
275 for row in output_table:
276 if row[0].startswith("tiller-deploy"):
277 already_initialized = True
278 break
279 except Exception:
280 pass
281
282 # helm init
283 n2vc_installed_sw = False
284 if not already_initialized:
285 self.log.info(
286 "Initializing helm in client and server: {}".format(cluster_id)
287 )
288 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
289 self.kubectl_command, paths["kube_config"], self.service_account
290 )
291 _, _rc = await self._local_async_exec(
292 command=command, raise_exception_on_error=False, env=env
293 )
294
295 command = (
296 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
297 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
298 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
299 _, _rc = await self._local_async_exec(
300 command=command, raise_exception_on_error=False, env=env
301 )
302
303 command = (
304 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
305 " {} init"
306 ).format(
307 self._helm_command,
308 paths["kube_config"],
309 namespace,
310 paths["helm_dir"],
311 self.service_account,
312 "--stable-repo-url {}".format(self._stable_repo_url)
313 if self._stable_repo_url
314 else "--skip-repos",
315 )
316 _, _rc = await self._local_async_exec(
317 command=command, raise_exception_on_error=True, env=env
318 )
319 n2vc_installed_sw = True
320 else:
321 # check client helm installation
322 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
323 if not self._check_file_exists(
324 filename=check_file, exception_if_not_exists=False
325 ):
326 self.log.info("Initializing helm in client: {}".format(cluster_id))
327 command = (
328 "{} --kubeconfig={} --tiller-namespace={} "
329 "--home={} init --client-only {} "
330 ).format(
331 self._helm_command,
332 paths["kube_config"],
333 namespace,
334 paths["helm_dir"],
335 "--stable-repo-url {}".format(self._stable_repo_url)
336 if self._stable_repo_url
337 else "--skip-repos",
338 )
339 output, _rc = await self._local_async_exec(
340 command=command, raise_exception_on_error=True, env=env
341 )
342 else:
343 self.log.info("Helm client already initialized")
344
345 # remove old stable repo and add new one
346 cluster_uuid = "{}:{}".format(namespace, cluster_id)
347 repo_list = await self.repo_list(cluster_uuid)
348 for repo in repo_list:
349 if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
350 self.log.debug("Add new stable repo url: {}")
351 await self.repo_remove(cluster_uuid, "stable")
352 if self._stable_repo_url:
353 await self.repo_add(cluster_uuid, "stable", self._stable_repo_url)
354 break
355
356 return n2vc_installed_sw
357
358 async def _uninstall_sw(self, cluster_id: str, namespace: str):
359 # uninstall Tiller if necessary
360
361 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
362
363 # init paths, env
364 paths, env = self._init_paths_env(
365 cluster_name=cluster_id, create_if_not_exist=True
366 )
367
368 if not namespace:
369 # find namespace for tiller pod
370 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
371 self.kubectl_command, paths["kube_config"]
372 )
373 output, _rc = await self._local_async_exec(
374 command=command, raise_exception_on_error=False, env=env
375 )
376 output_table = self._output_to_table(output=output)
377 namespace = None
378 for r in output_table:
379 try:
380 if "tiller-deploy" in r[1]:
381 namespace = r[0]
382 break
383 except Exception:
384 pass
385 else:
386 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
387 self.log.error(msg)
388
389 self.log.debug("namespace for tiller: {}".format(namespace))
390
391 if namespace:
392 # uninstall tiller from cluster
393 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
394 command = "{} --kubeconfig={} --home={} reset".format(
395 self._helm_command, paths["kube_config"], paths["helm_dir"]
396 )
397 self.log.debug("resetting: {}".format(command))
398 output, _rc = await self._local_async_exec(
399 command=command, raise_exception_on_error=True, env=env
400 )
401 # Delete clusterrolebinding and serviceaccount.
402 # Ignore if errors for backward compatibility
403 command = (
404 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
405 "io/osm-tiller-cluster-rule"
406 ).format(self.kubectl_command, paths["kube_config"])
407 output, _rc = await self._local_async_exec(
408 command=command, raise_exception_on_error=False, env=env
409 )
410 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
411 self.kubectl_command, paths["kube_config"], self.service_account
412 )
413 output, _rc = await self._local_async_exec(
414 command=command, raise_exception_on_error=False, env=env
415 )
416
417 else:
418 self.log.debug("namespace not found")
419
420 async def _instances_list(self, cluster_id):
421
422 # init paths, env
423 paths, env = self._init_paths_env(
424 cluster_name=cluster_id, create_if_not_exist=True
425 )
426
427 command = "{} list --output yaml".format(self._helm_command)
428
429 output, _rc = await self._local_async_exec(
430 command=command, raise_exception_on_error=True, env=env
431 )
432
433 if output and len(output) > 0:
434 # parse yaml and update keys to lower case to unify with helm3
435 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
436 new_instances = []
437 for instance in instances:
438 new_instance = dict((k.lower(), v) for k, v in instance.items())
439 new_instances.append(new_instance)
440 return new_instances
441 else:
442 return []
443
444 def _get_inspect_command(
445 self, show_command: str, kdu_model: str, repo_str: str, version: str
446 ):
447 inspect_command = "{} inspect {} {}{} {}".format(
448 self._helm_command, show_command, kdu_model, repo_str, version
449 )
450 return inspect_command
451
452 def _get_get_command(
453 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
454 ):
455 get_command = "env KUBECONFIG={} {} get {} {} --output yaml".format(
456 kubeconfig, self._helm_command, get_command, kdu_instance
457 )
458 return get_command
459
460 async def _status_kdu(
461 self,
462 cluster_id: str,
463 kdu_instance: str,
464 namespace: str = None,
465 show_error_log: bool = False,
466 return_text: bool = False,
467 ):
468
469 self.log.debug(
470 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
471 )
472
473 # init config, env
474 paths, env = self._init_paths_env(
475 cluster_name=cluster_id, create_if_not_exist=True
476 )
477 command = ("env KUBECONFIG={} {} status {} --output yaml").format(
478 paths["kube_config"], self._helm_command, kdu_instance
479 )
480 output, rc = await self._local_async_exec(
481 command=command,
482 raise_exception_on_error=True,
483 show_error_log=show_error_log,
484 env=env,
485 )
486
487 if return_text:
488 return str(output)
489
490 if rc != 0:
491 return None
492
493 data = yaml.load(output, Loader=yaml.SafeLoader)
494
495 # remove field 'notes'
496 try:
497 del data.get("info").get("status")["notes"]
498 except KeyError:
499 pass
500
501 # parse field 'resources'
502 try:
503 resources = str(data.get("info").get("status").get("resources"))
504 resource_table = self._output_to_table(resources)
505 data.get("info").get("status")["resources"] = resource_table
506 except Exception:
507 pass
508
509 # set description to lowercase (unify with helm3)
510 try:
511 data.get("info")["description"] = data.get("info").pop("Description")
512 except KeyError:
513 pass
514
515 return data
516
517 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
518 repo_ids = []
519 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
520 cluster = self.db.get_one("k8sclusters", cluster_filter)
521 if cluster:
522 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
523 return repo_ids
524 else:
525 raise K8sException(
526 "k8cluster with helm-id : {} not found".format(cluster_uuid)
527 )
528
529 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
530 # init config, env
531 paths, env = self._init_paths_env(
532 cluster_name=cluster_id, create_if_not_exist=True
533 )
534
535 status = await self._status_kdu(
536 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
537 )
538
539 # extract info.status.resources-> str
540 # format:
541 # ==> v1/Deployment
542 # NAME READY UP-TO-DATE AVAILABLE AGE
543 # halting-horse-mongodb 0/1 1 0 0s
544 # halting-petit-mongodb 1/1 1 0 0s
545 # blank line
546 resources = K8sHelmBaseConnector._get_deep(
547 status, ("info", "status", "resources")
548 )
549
550 # convert to table
551 resources = K8sHelmBaseConnector._output_to_table(resources)
552
553 num_lines = len(resources)
554 index = 0
555 ready = True
556 while index < num_lines:
557 try:
558 line1 = resources[index]
559 index += 1
560 # find '==>' in column 0
561 if line1[0] == "==>":
562 line2 = resources[index]
563 index += 1
564 # find READY in column 1
565 if line2[1] == "READY":
566 # read next lines
567 line3 = resources[index]
568 index += 1
569 while len(line3) > 1 and index < num_lines:
570 ready_value = line3[1]
571 parts = ready_value.split(sep="/")
572 current = int(parts[0])
573 total = int(parts[1])
574 if current < total:
575 self.log.debug("NOT READY:\n {}".format(line3))
576 ready = False
577 line3 = resources[index]
578 index += 1
579
580 except Exception:
581 pass
582
583 return ready
584
585 def _get_install_command(
586 self,
587 kdu_model,
588 kdu_instance,
589 namespace,
590 params_str,
591 version,
592 atomic,
593 timeout,
594 kubeconfig,
595 ) -> str:
596
597 timeout_str = ""
598 if timeout:
599 timeout_str = "--timeout {}".format(timeout)
600
601 # atomic
602 atomic_str = ""
603 if atomic:
604 atomic_str = "--atomic"
605 # namespace
606 namespace_str = ""
607 if namespace:
608 namespace_str = "--namespace {}".format(namespace)
609
610 # version
611 version_str = ""
612 if version:
613 version_str = "--version {}".format(version)
614
615 command = (
616 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
617 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
618 kubeconfig=kubeconfig,
619 helm=self._helm_command,
620 atomic=atomic_str,
621 params=params_str,
622 timeout=timeout_str,
623 name=kdu_instance,
624 ns=namespace_str,
625 model=kdu_model,
626 ver=version_str,
627 )
628 )
629 return command
630
631 def _get_upgrade_scale_command(
632 self,
633 kdu_model: str,
634 kdu_instance: str,
635 namespace: str,
636 scale: int,
637 version: str,
638 atomic: bool,
639 replica_str: str,
640 timeout: float,
641 resource_name: str,
642 kubeconfig: str,
643 ) -> str:
644
645 timeout_str = ""
646 if timeout:
647 timeout_str = "--timeout {}s".format(timeout)
648
649 # atomic
650 atomic_str = ""
651 if atomic:
652 atomic_str = "--atomic"
653
654 # version
655 version_str = ""
656 if version:
657 version_str = "--version {}".format(version)
658
659 # scale
660 if resource_name:
661 scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
662 else:
663 scale_dict = {replica_str: scale}
664
665 scale_str = self._params_to_set_option(scale_dict)
666
667 command = (
668 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {scale} {timeout} {name} {model} {ver}"
669 ).format(
670 helm=self._helm_command,
671 name=kdu_instance,
672 atomic=atomic_str,
673 scale=scale_str,
674 timeout=timeout_str,
675 model=kdu_model,
676 ver=version_str,
677 kubeconfig=kubeconfig,
678 )
679 return command
680
681 def _get_upgrade_command(
682 self,
683 kdu_model,
684 kdu_instance,
685 namespace,
686 params_str,
687 version,
688 atomic,
689 timeout,
690 kubeconfig,
691 ) -> str:
692
693 timeout_str = ""
694 if timeout:
695 timeout_str = "--timeout {}".format(timeout)
696
697 # atomic
698 atomic_str = ""
699 if atomic:
700 atomic_str = "--atomic"
701
702 # version
703 version_str = ""
704 if version:
705 version_str = "--version {}".format(version)
706
707 command = (
708 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"
709 ).format(
710 kubeconfig=kubeconfig,
711 helm=self._helm_command,
712 atomic=atomic_str,
713 params=params_str,
714 timeout=timeout_str,
715 name=kdu_instance,
716 model=kdu_model,
717 ver=version_str,
718 )
719 return command
720
721 def _get_rollback_command(
722 self, kdu_instance, namespace, revision, kubeconfig
723 ) -> str:
724 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
725 kubeconfig, self._helm_command, kdu_instance, revision
726 )
727
728 def _get_uninstall_command(
729 self, kdu_instance: str, namespace: str, kubeconfig: str
730 ) -> str:
731 return "env KUBECONFIG={} {} delete --purge {}".format(
732 kubeconfig, self._helm_command, kdu_instance
733 )