Bug 1965 fixed
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelmConnector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm",
44 log: object = None,
45 on_update_db=None,
46 ):
47 """
48 Initializes helm connector for helm v2
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(
60 self,
61 db=db,
62 log=log,
63 fs=fs,
64 kubectl_command=kubectl_command,
65 helm_command=helm_command,
66 on_update_db=on_update_db,
67 )
68
69 self.log.info("Initializing K8S Helm2 connector")
70
71 # initialize helm client-only
72 self.log.debug("Initializing helm client-only...")
73 command = "{} init --client-only {} ".format(
74 self._helm_command,
75 "--stable-repo-url {}".format(self._stable_repo_url)
76 if self._stable_repo_url
77 else "--skip-repos",
78 )
79 try:
80 asyncio.ensure_future(
81 self._local_async_exec(command=command, raise_exception_on_error=False)
82 )
83 # loop = asyncio.get_event_loop()
84 # loop.run_until_complete(self._local_async_exec(command=command,
85 # raise_exception_on_error=False))
86 except Exception as e:
87 self.warning(
88 msg="helm init failed (it was already initialized): {}".format(e)
89 )
90
91 self.log.info("K8S Helm2 connector initialized")
92
93 async def install(
94 self,
95 cluster_uuid: str,
96 kdu_model: str,
97 kdu_instance: str,
98 atomic: bool = True,
99 timeout: float = 300,
100 params: dict = None,
101 db_dict: dict = None,
102 kdu_name: str = None,
103 namespace: str = None,
104 **kwargs,
105 ):
106 """
107 Deploys of a new KDU instance. It would implicitly rely on the `install` call
108 to deploy the Chart/Bundle properly parametrized (in practice, this call would
109 happen before any _initial-config-primitive_of the VNF is called).
110
111 :param cluster_uuid: UUID of a K8s cluster known by OSM
112 :param kdu_model: chart/ reference (string), which can be either
113 of these options:
114 - a name of chart available via the repos known by OSM
115 - a path to a packaged chart
116 - a path to an unpacked chart directory or a URL
117 :param kdu_instance: Kdu instance name
118 :param atomic: If set, installation process purges chart/bundle on fail, also
119 will wait until all the K8s objects are active
120 :param timeout: Time in seconds to wait for the install of the chart/bundle
121 (defaults to Helm default timeout: 300s)
122 :param params: dictionary of key-value pairs for instantiation parameters
123 (overriding default values)
124 :param dict db_dict: where to write into database when the status changes.
125 It contains a dict with {collection: <str>, filter: {},
126 path: <str>},
127 e.g. {collection: "nsrs", filter:
128 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
129 :param kdu_name: Name of the KDU instance to be installed
130 :param namespace: K8s namespace to use for the KDU instance
131 :param kwargs: Additional parameters (None yet)
132 :return: True if successful
133 """
134 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
135
136 # sync local dir
137 self.fs.sync(from_path=cluster_uuid)
138
139 # init env, paths
140 paths, env = self._init_paths_env(
141 cluster_name=cluster_uuid, create_if_not_exist=True
142 )
143
144 await self._install_impl(
145 cluster_uuid,
146 kdu_model,
147 paths,
148 env,
149 kdu_instance,
150 atomic=atomic,
151 timeout=timeout,
152 params=params,
153 db_dict=db_dict,
154 kdu_name=kdu_name,
155 namespace=namespace,
156 )
157
158 # sync fs
159 self.fs.reverse_sync(from_path=cluster_uuid)
160
161 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
162 return True
163
164 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
165
166 self.log.debug(
167 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
168 )
169
170 return await self._exec_inspect_command(
171 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
172 )
173
174 """
175 ####################################################################################
176 ################################### P R I V A T E ##################################
177 ####################################################################################
178 """
179
180 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
181 """
182 Creates and returns base cluster and kube dirs and returns them.
183 Also created helm3 dirs according to new directory specification, paths are
184 returned and also environment variables that must be provided to execute commands
185
186 Helm 2 directory specification uses helm_home dir:
187
188 The variables assigned for this paths are:
189 - Helm hone: $HELM_HOME
190 - helm kubeconfig: $KUBECONFIG
191
192 :param cluster_name: cluster_name
193 :return: Dictionary with config_paths and dictionary with helm environment variables
194 """
195 base = self.fs.path
196 if base.endswith("/") or base.endswith("\\"):
197 base = base[:-1]
198
199 # base dir for cluster
200 cluster_dir = base + "/" + cluster_name
201
202 # kube dir
203 kube_dir = cluster_dir + "/" + ".kube"
204 if create_if_not_exist and not os.path.exists(kube_dir):
205 self.log.debug("Creating dir {}".format(kube_dir))
206 os.makedirs(kube_dir)
207
208 # helm home dir
209 helm_dir = cluster_dir + "/" + ".helm"
210 if create_if_not_exist and not os.path.exists(helm_dir):
211 self.log.debug("Creating dir {}".format(helm_dir))
212 os.makedirs(helm_dir)
213
214 config_filename = kube_dir + "/config"
215
216 # 2 - Prepare dictionary with paths
217 paths = {
218 "kube_dir": kube_dir,
219 "kube_config": config_filename,
220 "cluster_dir": cluster_dir,
221 "helm_dir": helm_dir,
222 }
223
224 for file_name, file in paths.items():
225 if "dir" in file_name and not os.path.exists(file):
226 err_msg = "{} dir does not exist".format(file)
227 self.log.error(err_msg)
228 raise K8sException(err_msg)
229
230 # 3 - Prepare environment variables
231 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
232
233 return paths, env
234
235 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
236
237 # init config, env
238 paths, env = self._init_paths_env(
239 cluster_name=cluster_id, create_if_not_exist=True
240 )
241
242 command1 = "env KUBECONFIG={} {} get manifest {} ".format(
243 kubeconfig, self._helm_command, kdu_instance
244 )
245 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
246 output, _rc = await self._local_async_exec_pipe(
247 command1, command2, env=env, raise_exception_on_error=True
248 )
249 services = self._parse_services(output)
250
251 return services
252
253 async def _cluster_init(
254 self, cluster_id: str, namespace: str, paths: dict, env: dict
255 ):
256 """
257 Implements the helm version dependent cluster initialization:
258 For helm2 it initialized tiller environment if needed
259 """
260
261 # check if tiller pod is up in cluster
262 command = "{} --kubeconfig={} --namespace={} get deployments".format(
263 self.kubectl_command, paths["kube_config"], namespace
264 )
265 output, _rc = await self._local_async_exec(
266 command=command, raise_exception_on_error=True, env=env
267 )
268
269 output_table = self._output_to_table(output=output)
270
271 # find 'tiller' pod in all pods
272 already_initialized = False
273 try:
274 for row in output_table:
275 if row[0].startswith("tiller-deploy"):
276 already_initialized = True
277 break
278 except Exception:
279 pass
280
281 # helm init
282 n2vc_installed_sw = False
283 if not already_initialized:
284 self.log.info(
285 "Initializing helm in client and server: {}".format(cluster_id)
286 )
287 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
288 self.kubectl_command, paths["kube_config"], self.service_account
289 )
290 _, _rc = await self._local_async_exec(
291 command=command, raise_exception_on_error=False, env=env
292 )
293
294 command = (
295 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
296 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
297 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
298 _, _rc = await self._local_async_exec(
299 command=command, raise_exception_on_error=False, env=env
300 )
301
302 command = (
303 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
304 " {} init"
305 ).format(
306 self._helm_command,
307 paths["kube_config"],
308 namespace,
309 paths["helm_dir"],
310 self.service_account,
311 "--stable-repo-url {}".format(self._stable_repo_url)
312 if self._stable_repo_url
313 else "--skip-repos",
314 )
315 _, _rc = await self._local_async_exec(
316 command=command, raise_exception_on_error=True, env=env
317 )
318 n2vc_installed_sw = True
319 else:
320 # check client helm installation
321 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
322 if not self._check_file_exists(
323 filename=check_file, exception_if_not_exists=False
324 ):
325 self.log.info("Initializing helm in client: {}".format(cluster_id))
326 command = (
327 "{} --kubeconfig={} --tiller-namespace={} "
328 "--home={} init --client-only {} "
329 ).format(
330 self._helm_command,
331 paths["kube_config"],
332 namespace,
333 paths["helm_dir"],
334 "--stable-repo-url {}".format(self._stable_repo_url)
335 if self._stable_repo_url
336 else "--skip-repos",
337 )
338 output, _rc = await self._local_async_exec(
339 command=command, raise_exception_on_error=True, env=env
340 )
341 else:
342 self.log.info("Helm client already initialized")
343
344 repo_list = await self.repo_list(cluster_id)
345 for repo in repo_list:
346 if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
347 self.log.debug("Add new stable repo url: {}")
348 await self.repo_remove(cluster_id, "stable")
349 if self._stable_repo_url:
350 await self.repo_add(cluster_id, "stable", self._stable_repo_url)
351 break
352
353 return n2vc_installed_sw
354
355 async def _uninstall_sw(self, cluster_id: str, namespace: str):
356 # uninstall Tiller if necessary
357
358 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
359
360 # init paths, env
361 paths, env = self._init_paths_env(
362 cluster_name=cluster_id, create_if_not_exist=True
363 )
364
365 if not namespace:
366 # find namespace for tiller pod
367 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
368 self.kubectl_command, paths["kube_config"]
369 )
370 output, _rc = await self._local_async_exec(
371 command=command, raise_exception_on_error=False, env=env
372 )
373 output_table = self._output_to_table(output=output)
374 namespace = None
375 for r in output_table:
376 try:
377 if "tiller-deploy" in r[1]:
378 namespace = r[0]
379 break
380 except Exception:
381 pass
382 else:
383 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
384 self.log.error(msg)
385
386 self.log.debug("namespace for tiller: {}".format(namespace))
387
388 if namespace:
389 # uninstall tiller from cluster
390 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
391 command = "{} --kubeconfig={} --home={} reset".format(
392 self._helm_command, paths["kube_config"], paths["helm_dir"]
393 )
394 self.log.debug("resetting: {}".format(command))
395 output, _rc = await self._local_async_exec(
396 command=command, raise_exception_on_error=True, env=env
397 )
398 # Delete clusterrolebinding and serviceaccount.
399 # Ignore if errors for backward compatibility
400 command = (
401 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
402 "io/osm-tiller-cluster-rule"
403 ).format(self.kubectl_command, paths["kube_config"])
404 output, _rc = await self._local_async_exec(
405 command=command, raise_exception_on_error=False, env=env
406 )
407 command = (
408 "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
409 self.kubectl_command,
410 paths["kube_config"],
411 namespace,
412 self.service_account,
413 )
414 )
415 output, _rc = await self._local_async_exec(
416 command=command, raise_exception_on_error=False, env=env
417 )
418
419 else:
420 self.log.debug("namespace not found")
421
422 async def _instances_list(self, cluster_id):
423
424 # init paths, env
425 paths, env = self._init_paths_env(
426 cluster_name=cluster_id, create_if_not_exist=True
427 )
428
429 command = "{} list --output yaml".format(self._helm_command)
430
431 output, _rc = await self._local_async_exec(
432 command=command, raise_exception_on_error=True, env=env
433 )
434
435 if output and len(output) > 0:
436 # parse yaml and update keys to lower case to unify with helm3
437 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
438 new_instances = []
439 for instance in instances:
440 new_instance = dict((k.lower(), v) for k, v in instance.items())
441 new_instances.append(new_instance)
442 return new_instances
443 else:
444 return []
445
446 def _get_inspect_command(
447 self, show_command: str, kdu_model: str, repo_str: str, version: str
448 ):
449 inspect_command = "{} inspect {} {}{} {}".format(
450 self._helm_command, show_command, kdu_model, repo_str, version
451 )
452 return inspect_command
453
454 def _get_get_command(
455 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
456 ):
457 get_command = "env KUBECONFIG={} {} get {} {} --output yaml".format(
458 kubeconfig, self._helm_command, get_command, kdu_instance
459 )
460 return get_command
461
462 async def _status_kdu(
463 self,
464 cluster_id: str,
465 kdu_instance: str,
466 namespace: str = None,
467 show_error_log: bool = False,
468 return_text: bool = False,
469 ):
470
471 self.log.debug(
472 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
473 )
474
475 # init config, env
476 paths, env = self._init_paths_env(
477 cluster_name=cluster_id, create_if_not_exist=True
478 )
479 command = ("env KUBECONFIG={} {} status {} --output yaml").format(
480 paths["kube_config"], self._helm_command, kdu_instance
481 )
482 output, rc = await self._local_async_exec(
483 command=command,
484 raise_exception_on_error=True,
485 show_error_log=show_error_log,
486 env=env,
487 )
488
489 if return_text:
490 return str(output)
491
492 if rc != 0:
493 return None
494
495 data = yaml.load(output, Loader=yaml.SafeLoader)
496
497 # remove field 'notes'
498 try:
499 del data.get("info").get("status")["notes"]
500 except KeyError:
501 pass
502
503 # parse the manifest to a list of dictionaries
504 if "manifest" in data:
505 manifest_str = data.get("manifest")
506 manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
507
508 data["manifest"] = []
509 for doc in manifest_docs:
510 data["manifest"].append(doc)
511
512 # parse field 'resources'
513 try:
514 resources = str(data.get("info").get("status").get("resources"))
515 resource_table = self._output_to_table(resources)
516 data.get("info").get("status")["resources"] = resource_table
517 except Exception:
518 pass
519
520 # set description to lowercase (unify with helm3)
521 try:
522 data.get("info")["description"] = data.get("info").pop("Description")
523 except KeyError:
524 pass
525
526 return data
527
528 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
529 repo_ids = []
530 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
531 cluster = self.db.get_one("k8sclusters", cluster_filter)
532 if cluster:
533 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
534 return repo_ids
535 else:
536 raise K8sException(
537 "k8cluster with helm-id : {} not found".format(cluster_uuid)
538 )
539
540 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
541 # init config, env
542 paths, env = self._init_paths_env(
543 cluster_name=cluster_id, create_if_not_exist=True
544 )
545
546 status = await self._status_kdu(
547 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
548 )
549
550 # extract info.status.resources-> str
551 # format:
552 # ==> v1/Deployment
553 # NAME READY UP-TO-DATE AVAILABLE AGE
554 # halting-horse-mongodb 0/1 1 0 0s
555 # halting-petit-mongodb 1/1 1 0 0s
556 # blank line
557 resources = K8sHelmBaseConnector._get_deep(
558 status, ("info", "status", "resources")
559 )
560
561 # convert to table
562 resources = K8sHelmBaseConnector._output_to_table(resources)
563
564 num_lines = len(resources)
565 index = 0
566 ready = True
567 while index < num_lines:
568 try:
569 line1 = resources[index]
570 index += 1
571 # find '==>' in column 0
572 if line1[0] == "==>":
573 line2 = resources[index]
574 index += 1
575 # find READY in column 1
576 if line2[1] == "READY":
577 # read next lines
578 line3 = resources[index]
579 index += 1
580 while len(line3) > 1 and index < num_lines:
581 ready_value = line3[1]
582 parts = ready_value.split(sep="/")
583 current = int(parts[0])
584 total = int(parts[1])
585 if current < total:
586 self.log.debug("NOT READY:\n {}".format(line3))
587 ready = False
588 line3 = resources[index]
589 index += 1
590
591 except Exception:
592 pass
593
594 return ready
595
596 def _get_install_command(
597 self,
598 kdu_model,
599 kdu_instance,
600 namespace,
601 params_str,
602 version,
603 atomic,
604 timeout,
605 kubeconfig,
606 ) -> str:
607
608 timeout_str = ""
609 if timeout:
610 timeout_str = "--timeout {}".format(timeout)
611
612 # atomic
613 atomic_str = ""
614 if atomic:
615 atomic_str = "--atomic"
616 # namespace
617 namespace_str = ""
618 if namespace:
619 namespace_str = "--namespace {}".format(namespace)
620
621 # version
622 version_str = ""
623 if version:
624 version_str = "--version {}".format(version)
625
626 command = (
627 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
628 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
629 kubeconfig=kubeconfig,
630 helm=self._helm_command,
631 atomic=atomic_str,
632 params=params_str,
633 timeout=timeout_str,
634 name=kdu_instance,
635 ns=namespace_str,
636 model=kdu_model,
637 ver=version_str,
638 )
639 )
640 return command
641
642 def _get_upgrade_scale_command(
643 self,
644 kdu_model: str,
645 kdu_instance: str,
646 namespace: str,
647 scale: int,
648 version: str,
649 atomic: bool,
650 replica_str: str,
651 timeout: float,
652 resource_name: str,
653 kubeconfig: str,
654 ) -> str:
655
656 timeout_str = ""
657 if timeout:
658 timeout_str = "--timeout {}s".format(timeout)
659
660 # atomic
661 atomic_str = ""
662 if atomic:
663 atomic_str = "--atomic"
664
665 # version
666 version_str = ""
667 if version:
668 version_str = "--version {}".format(version)
669
670 # scale
671 if resource_name:
672 scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
673 else:
674 scale_dict = {replica_str: scale}
675
676 scale_str = self._params_to_set_option(scale_dict)
677
678 command = (
679 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {scale} {timeout} {name} {model} {ver}"
680 ).format(
681 helm=self._helm_command,
682 name=kdu_instance,
683 atomic=atomic_str,
684 scale=scale_str,
685 timeout=timeout_str,
686 model=kdu_model,
687 ver=version_str,
688 kubeconfig=kubeconfig,
689 )
690 return command
691
692 def _get_upgrade_command(
693 self,
694 kdu_model,
695 kdu_instance,
696 namespace,
697 params_str,
698 version,
699 atomic,
700 timeout,
701 kubeconfig,
702 ) -> str:
703
704 timeout_str = ""
705 if timeout:
706 timeout_str = "--timeout {}".format(timeout)
707
708 # atomic
709 atomic_str = ""
710 if atomic:
711 atomic_str = "--atomic"
712
713 # version
714 version_str = ""
715 if version:
716 version_str = "--version {}".format(version)
717
718 command = (
719 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"
720 ).format(
721 kubeconfig=kubeconfig,
722 helm=self._helm_command,
723 atomic=atomic_str,
724 params=params_str,
725 timeout=timeout_str,
726 name=kdu_instance,
727 model=kdu_model,
728 ver=version_str,
729 )
730 return command
731
732 def _get_rollback_command(
733 self, kdu_instance, namespace, revision, kubeconfig
734 ) -> str:
735 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
736 kubeconfig, self._helm_command, kdu_instance, revision
737 )
738
739 def _get_uninstall_command(
740 self, kdu_instance: str, namespace: str, kubeconfig: str
741 ) -> str:
742 return "env KUBECONFIG={} {} delete --purge {}".format(
743 kubeconfig, self._helm_command, kdu_instance
744 )