Bug 1982 fixed
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 from typing import Union
24 import os
25 import yaml
26
27 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
28 from n2vc.exceptions import K8sException
29
30
31 class K8sHelmConnector(K8sHelmBaseConnector):
32
33 """
34 ####################################################################################
35 ################################### P U B L I C ####################################
36 ####################################################################################
37 """
38
39 def __init__(
40 self,
41 fs: object,
42 db: object,
43 kubectl_command: str = "/usr/bin/kubectl",
44 helm_command: str = "/usr/bin/helm",
45 log: object = None,
46 on_update_db=None,
47 ):
48 """
49 Initializes helm connector for helm v2
50
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
55 :param log: logger
56 :param on_update_db: callback called when k8s connector updates database
57 """
58
59 # parent class
60 K8sHelmBaseConnector.__init__(
61 self,
62 db=db,
63 log=log,
64 fs=fs,
65 kubectl_command=kubectl_command,
66 helm_command=helm_command,
67 on_update_db=on_update_db,
68 )
69
70 self.log.info("Initializing K8S Helm2 connector")
71
72 # initialize helm client-only
73 self.log.debug("Initializing helm client-only...")
74 command = "{} init --client-only {} ".format(
75 self._helm_command,
76 "--stable-repo-url {}".format(self._stable_repo_url)
77 if self._stable_repo_url
78 else "--skip-repos",
79 )
80 try:
81 asyncio.ensure_future(
82 self._local_async_exec(command=command, raise_exception_on_error=False)
83 )
84 # loop = asyncio.get_event_loop()
85 # loop.run_until_complete(self._local_async_exec(command=command,
86 # raise_exception_on_error=False))
87 except Exception as e:
88 self.warning(
89 msg="helm init failed (it was already initialized): {}".format(e)
90 )
91
92 self.log.info("K8S Helm2 connector initialized")
93
94 async def install(
95 self,
96 cluster_uuid: str,
97 kdu_model: str,
98 kdu_instance: str,
99 atomic: bool = True,
100 timeout: float = 300,
101 params: dict = None,
102 db_dict: dict = None,
103 kdu_name: str = None,
104 namespace: str = None,
105 **kwargs,
106 ):
107 """
108 Deploys of a new KDU instance. It would implicitly rely on the `install` call
109 to deploy the Chart/Bundle properly parametrized (in practice, this call would
110 happen before any _initial-config-primitive_of the VNF is called).
111
112 :param cluster_uuid: UUID of a K8s cluster known by OSM
113 :param kdu_model: chart/ reference (string), which can be either
114 of these options:
115 - a name of chart available via the repos known by OSM
116 - a path to a packaged chart
117 - a path to an unpacked chart directory or a URL
118 :param kdu_instance: Kdu instance name
119 :param atomic: If set, installation process purges chart/bundle on fail, also
120 will wait until all the K8s objects are active
121 :param timeout: Time in seconds to wait for the install of the chart/bundle
122 (defaults to Helm default timeout: 300s)
123 :param params: dictionary of key-value pairs for instantiation parameters
124 (overriding default values)
125 :param dict db_dict: where to write into database when the status changes.
126 It contains a dict with {collection: <str>, filter: {},
127 path: <str>},
128 e.g. {collection: "nsrs", filter:
129 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
130 :param kdu_name: Name of the KDU instance to be installed
131 :param namespace: K8s namespace to use for the KDU instance
132 :param kwargs: Additional parameters (None yet)
133 :return: True if successful
134 """
135 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
136
137 # sync local dir
138 self.fs.sync(from_path=cluster_uuid)
139
140 # init env, paths
141 paths, env = self._init_paths_env(
142 cluster_name=cluster_uuid, create_if_not_exist=True
143 )
144
145 await self._install_impl(
146 cluster_uuid,
147 kdu_model,
148 paths,
149 env,
150 kdu_instance,
151 atomic=atomic,
152 timeout=timeout,
153 params=params,
154 db_dict=db_dict,
155 kdu_name=kdu_name,
156 namespace=namespace,
157 )
158
159 # sync fs
160 self.fs.reverse_sync(from_path=cluster_uuid)
161
162 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
163 return True
164
165 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
166
167 self.log.debug(
168 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
169 )
170
171 return await self._exec_inspect_comand(
172 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
173 )
174
175 """
176 ####################################################################################
177 ################################### P R I V A T E ##################################
178 ####################################################################################
179 """
180
181 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
182 """
183 Creates and returns base cluster and kube dirs and returns them.
184 Also created helm3 dirs according to new directory specification, paths are
185 returned and also environment variables that must be provided to execute commands
186
187 Helm 2 directory specification uses helm_home dir:
188
189 The variables assigned for this paths are:
190 - Helm hone: $HELM_HOME
191 - helm kubeconfig: $KUBECONFIG
192
193 :param cluster_name: cluster_name
194 :return: Dictionary with config_paths and dictionary with helm environment variables
195 """
196 base = self.fs.path
197 if base.endswith("/") or base.endswith("\\"):
198 base = base[:-1]
199
200 # base dir for cluster
201 cluster_dir = base + "/" + cluster_name
202
203 # kube dir
204 kube_dir = cluster_dir + "/" + ".kube"
205 if create_if_not_exist and not os.path.exists(kube_dir):
206 self.log.debug("Creating dir {}".format(kube_dir))
207 os.makedirs(kube_dir)
208
209 # helm home dir
210 helm_dir = cluster_dir + "/" + ".helm"
211 if create_if_not_exist and not os.path.exists(helm_dir):
212 self.log.debug("Creating dir {}".format(helm_dir))
213 os.makedirs(helm_dir)
214
215 config_filename = kube_dir + "/config"
216
217 # 2 - Prepare dictionary with paths
218 paths = {
219 "kube_dir": kube_dir,
220 "kube_config": config_filename,
221 "cluster_dir": cluster_dir,
222 "helm_dir": helm_dir,
223 }
224
225 for file_name, file in paths.items():
226 if "dir" in file_name and not os.path.exists(file):
227 err_msg = "{} dir does not exist".format(file)
228 self.log.error(err_msg)
229 raise K8sException(err_msg)
230
231 # 3 - Prepare environment variables
232 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
233
234 return paths, env
235
236 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
237
238 # init config, env
239 paths, env = self._init_paths_env(
240 cluster_name=cluster_id, create_if_not_exist=True
241 )
242
243 command1 = "env KUBECONFIG={} {} get manifest {} ".format(
244 kubeconfig, self._helm_command, kdu_instance
245 )
246 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
247 output, _rc = await self._local_async_exec_pipe(
248 command1, command2, env=env, raise_exception_on_error=True
249 )
250 services = self._parse_services(output)
251
252 return services
253
254 async def _cluster_init(
255 self, cluster_id: str, namespace: str, paths: dict, env: dict
256 ):
257 """
258 Implements the helm version dependent cluster initialization:
259 For helm2 it initialized tiller environment if needed
260 """
261
262 # check if tiller pod is up in cluster
263 command = "{} --kubeconfig={} --namespace={} get deployments".format(
264 self.kubectl_command, paths["kube_config"], namespace
265 )
266 output, _rc = await self._local_async_exec(
267 command=command, raise_exception_on_error=True, env=env
268 )
269
270 output_table = self._output_to_table(output=output)
271
272 # find 'tiller' pod in all pods
273 already_initialized = False
274 try:
275 for row in output_table:
276 if row[0].startswith("tiller-deploy"):
277 already_initialized = True
278 break
279 except Exception:
280 pass
281
282 # helm init
283 n2vc_installed_sw = False
284 if not already_initialized:
285 self.log.info(
286 "Initializing helm in client and server: {}".format(cluster_id)
287 )
288 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
289 self.kubectl_command, paths["kube_config"], self.service_account
290 )
291 _, _rc = await self._local_async_exec(
292 command=command, raise_exception_on_error=False, env=env
293 )
294
295 command = (
296 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
297 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
298 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
299 _, _rc = await self._local_async_exec(
300 command=command, raise_exception_on_error=False, env=env
301 )
302
303 command = (
304 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
305 " {} init"
306 ).format(
307 self._helm_command,
308 paths["kube_config"],
309 namespace,
310 paths["helm_dir"],
311 self.service_account,
312 "--stable-repo-url {}".format(self._stable_repo_url)
313 if self._stable_repo_url
314 else "--skip-repos",
315 )
316 _, _rc = await self._local_async_exec(
317 command=command, raise_exception_on_error=True, env=env
318 )
319 n2vc_installed_sw = True
320 else:
321 # check client helm installation
322 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
323 if not self._check_file_exists(
324 filename=check_file, exception_if_not_exists=False
325 ):
326 self.log.info("Initializing helm in client: {}".format(cluster_id))
327 command = (
328 "{} --kubeconfig={} --tiller-namespace={} "
329 "--home={} init --client-only {} "
330 ).format(
331 self._helm_command,
332 paths["kube_config"],
333 namespace,
334 paths["helm_dir"],
335 "--stable-repo-url {}".format(self._stable_repo_url)
336 if self._stable_repo_url
337 else "--skip-repos",
338 )
339 output, _rc = await self._local_async_exec(
340 command=command, raise_exception_on_error=True, env=env
341 )
342 else:
343 self.log.info("Helm client already initialized")
344
345 repo_list = await self.repo_list(cluster_id)
346 for repo in repo_list:
347 if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
348 self.log.debug("Add new stable repo url: {}")
349 await self.repo_remove(cluster_id, "stable")
350 if self._stable_repo_url:
351 await self.repo_add(cluster_id, "stable", self._stable_repo_url)
352 break
353
354 return n2vc_installed_sw
355
356 async def _uninstall_sw(self, cluster_id: str, namespace: str):
357 # uninstall Tiller if necessary
358
359 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
360
361 # init paths, env
362 paths, env = self._init_paths_env(
363 cluster_name=cluster_id, create_if_not_exist=True
364 )
365
366 if not namespace:
367 # find namespace for tiller pod
368 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
369 self.kubectl_command, paths["kube_config"]
370 )
371 output, _rc = await self._local_async_exec(
372 command=command, raise_exception_on_error=False, env=env
373 )
374 output_table = self._output_to_table(output=output)
375 namespace = None
376 for r in output_table:
377 try:
378 if "tiller-deploy" in r[1]:
379 namespace = r[0]
380 break
381 except Exception:
382 pass
383 else:
384 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
385 self.log.error(msg)
386
387 self.log.debug("namespace for tiller: {}".format(namespace))
388
389 if namespace:
390 # uninstall tiller from cluster
391 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
392 command = "{} --kubeconfig={} --home={} reset".format(
393 self._helm_command, paths["kube_config"], paths["helm_dir"]
394 )
395 self.log.debug("resetting: {}".format(command))
396 output, _rc = await self._local_async_exec(
397 command=command, raise_exception_on_error=True, env=env
398 )
399 # Delete clusterrolebinding and serviceaccount.
400 # Ignore if errors for backward compatibility
401 command = (
402 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
403 "io/osm-tiller-cluster-rule"
404 ).format(self.kubectl_command, paths["kube_config"])
405 output, _rc = await self._local_async_exec(
406 command=command, raise_exception_on_error=False, env=env
407 )
408 command = (
409 "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
410 self.kubectl_command,
411 paths["kube_config"],
412 namespace,
413 self.service_account,
414 )
415 )
416 output, _rc = await self._local_async_exec(
417 command=command, raise_exception_on_error=False, env=env
418 )
419
420 else:
421 self.log.debug("namespace not found")
422
423 async def _instances_list(self, cluster_id):
424
425 # init paths, env
426 paths, env = self._init_paths_env(
427 cluster_name=cluster_id, create_if_not_exist=True
428 )
429
430 command = "{} list --output yaml".format(self._helm_command)
431
432 output, _rc = await self._local_async_exec(
433 command=command, raise_exception_on_error=True, env=env
434 )
435
436 if output and len(output) > 0:
437 # parse yaml and update keys to lower case to unify with helm3
438 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
439 new_instances = []
440 for instance in instances:
441 new_instance = dict((k.lower(), v) for k, v in instance.items())
442 new_instances.append(new_instance)
443 return new_instances
444 else:
445 return []
446
447 def _get_inspect_command(
448 self, show_command: str, kdu_model: str, repo_str: str, version: str
449 ):
450 inspect_command = "{} inspect {} {}{} {}".format(
451 self._helm_command, show_command, kdu_model, repo_str, version
452 )
453 return inspect_command
454
455 async def _status_kdu(
456 self,
457 cluster_id: str,
458 kdu_instance: str,
459 namespace: str = None,
460 yaml_format: bool = False,
461 show_error_log: bool = False,
462 ) -> Union[str, dict]:
463
464 self.log.debug(
465 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
466 )
467
468 # init config, env
469 paths, env = self._init_paths_env(
470 cluster_name=cluster_id, create_if_not_exist=True
471 )
472 command = ("env KUBECONFIG={} {} status {} --output yaml").format(
473 paths["kube_config"], self._helm_command, kdu_instance
474 )
475 output, rc = await self._local_async_exec(
476 command=command,
477 raise_exception_on_error=True,
478 show_error_log=show_error_log,
479 env=env,
480 )
481
482 if yaml_format:
483 return str(output)
484
485 if rc != 0:
486 return None
487
488 data = yaml.load(output, Loader=yaml.SafeLoader)
489
490 # remove field 'notes'
491 try:
492 del data.get("info").get("status")["notes"]
493 except KeyError:
494 pass
495
496 # parse the manifest to a list of dictionaries
497 if "manifest" in data:
498 manifest_str = data.get("manifest")
499 manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
500
501 data["manifest"] = []
502 for doc in manifest_docs:
503 data["manifest"].append(doc)
504
505 # parse field 'resources'
506 try:
507 resources = str(data.get("info").get("status").get("resources"))
508 resource_table = self._output_to_table(resources)
509 data.get("info").get("status")["resources"] = resource_table
510 except Exception:
511 pass
512
513 # set description to lowercase (unify with helm3)
514 try:
515 data.get("info")["description"] = data.get("info").pop("Description")
516 except KeyError:
517 pass
518
519 return data
520
521 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
522 repo_ids = []
523 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
524 cluster = self.db.get_one("k8sclusters", cluster_filter)
525 if cluster:
526 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
527 return repo_ids
528 else:
529 raise K8sException(
530 "k8cluster with helm-id : {} not found".format(cluster_uuid)
531 )
532
533 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
534 # init config, env
535 paths, env = self._init_paths_env(
536 cluster_name=cluster_id, create_if_not_exist=True
537 )
538
539 status = await self._status_kdu(
540 cluster_id=cluster_id, kdu_instance=kdu_instance, yaml_format=False
541 )
542
543 # extract info.status.resources-> str
544 # format:
545 # ==> v1/Deployment
546 # NAME READY UP-TO-DATE AVAILABLE AGE
547 # halting-horse-mongodb 0/1 1 0 0s
548 # halting-petit-mongodb 1/1 1 0 0s
549 # blank line
550 resources = K8sHelmBaseConnector._get_deep(
551 status, ("info", "status", "resources")
552 )
553
554 # convert to table
555 resources = K8sHelmBaseConnector._output_to_table(resources)
556
557 num_lines = len(resources)
558 index = 0
559 ready = True
560 while index < num_lines:
561 try:
562 line1 = resources[index]
563 index += 1
564 # find '==>' in column 0
565 if line1[0] == "==>":
566 line2 = resources[index]
567 index += 1
568 # find READY in column 1
569 if line2[1] == "READY":
570 # read next lines
571 line3 = resources[index]
572 index += 1
573 while len(line3) > 1 and index < num_lines:
574 ready_value = line3[1]
575 parts = ready_value.split(sep="/")
576 current = int(parts[0])
577 total = int(parts[1])
578 if current < total:
579 self.log.debug("NOT READY:\n {}".format(line3))
580 ready = False
581 line3 = resources[index]
582 index += 1
583
584 except Exception:
585 pass
586
587 return ready
588
589 def _get_install_command(
590 self,
591 kdu_model,
592 kdu_instance,
593 namespace,
594 params_str,
595 version,
596 atomic,
597 timeout,
598 kubeconfig,
599 ) -> str:
600
601 timeout_str = ""
602 if timeout:
603 timeout_str = "--timeout {}".format(timeout)
604
605 # atomic
606 atomic_str = ""
607 if atomic:
608 atomic_str = "--atomic"
609 # namespace
610 namespace_str = ""
611 if namespace:
612 namespace_str = "--namespace {}".format(namespace)
613
614 # version
615 version_str = ""
616 if version:
617 version_str = version_str = "--version {}".format(version)
618
619 command = (
620 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
621 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
622 kubeconfig=kubeconfig,
623 helm=self._helm_command,
624 atomic=atomic_str,
625 params=params_str,
626 timeout=timeout_str,
627 name=kdu_instance,
628 ns=namespace_str,
629 model=kdu_model,
630 ver=version_str,
631 )
632 )
633 return command
634
635 def _get_upgrade_command(
636 self,
637 kdu_model,
638 kdu_instance,
639 namespace,
640 params_str,
641 version,
642 atomic,
643 timeout,
644 kubeconfig,
645 ) -> str:
646
647 timeout_str = ""
648 if timeout:
649 timeout_str = "--timeout {}".format(timeout)
650
651 # atomic
652 atomic_str = ""
653 if atomic:
654 atomic_str = "--atomic"
655
656 # version
657 version_str = ""
658 if version:
659 version_str = "--version {}".format(version)
660
661 command = (
662 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"
663 ).format(
664 kubeconfig=kubeconfig,
665 helm=self._helm_command,
666 atomic=atomic_str,
667 params=params_str,
668 timeout=timeout_str,
669 name=kdu_instance,
670 model=kdu_model,
671 ver=version_str,
672 )
673 return command
674
675 def _get_rollback_command(
676 self, kdu_instance, namespace, revision, kubeconfig
677 ) -> str:
678 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
679 kubeconfig, self._helm_command, kdu_instance, revision
680 )
681
682 def _get_uninstall_command(
683 self, kdu_instance: str, namespace: str, kubeconfig: str
684 ) -> str:
685 return "env KUBECONFIG={} {} delete --purge {}".format(
686 kubeconfig, self._helm_command, kdu_instance
687 )