Fix bug 2036
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 from typing import Union
24 import os
25 import yaml
26
27 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
28 from n2vc.exceptions import K8sException
29
30
31 class K8sHelmConnector(K8sHelmBaseConnector):
32
33 """
34 ####################################################################################
35 ################################### P U B L I C ####################################
36 ####################################################################################
37 """
38
39 def __init__(
40 self,
41 fs: object,
42 db: object,
43 kubectl_command: str = "/usr/bin/kubectl",
44 helm_command: str = "/usr/bin/helm",
45 log: object = None,
46 on_update_db=None,
47 ):
48 """
49 Initializes helm connector for helm v2
50
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
55 :param log: logger
56 :param on_update_db: callback called when k8s connector updates database
57 """
58
59 # parent class
60 K8sHelmBaseConnector.__init__(
61 self,
62 db=db,
63 log=log,
64 fs=fs,
65 kubectl_command=kubectl_command,
66 helm_command=helm_command,
67 on_update_db=on_update_db,
68 )
69
70 self.log.info("Initializing K8S Helm2 connector")
71
72 # initialize helm client-only
73 self.log.debug("Initializing helm client-only...")
74 command = "{} init --client-only {} ".format(
75 self._helm_command,
76 "--stable-repo-url {}".format(self._stable_repo_url)
77 if self._stable_repo_url
78 else "--skip-repos",
79 )
80 try:
81 asyncio.ensure_future(
82 self._local_async_exec(command=command, raise_exception_on_error=False)
83 )
84 # loop = asyncio.get_event_loop()
85 # loop.run_until_complete(self._local_async_exec(command=command,
86 # raise_exception_on_error=False))
87 except Exception as e:
88 self.warning(
89 msg="helm init failed (it was already initialized): {}".format(e)
90 )
91
92 self.log.info("K8S Helm2 connector initialized")
93
94 async def install(
95 self,
96 cluster_uuid: str,
97 kdu_model: str,
98 kdu_instance: str,
99 atomic: bool = True,
100 timeout: float = 300,
101 params: dict = None,
102 db_dict: dict = None,
103 kdu_name: str = None,
104 namespace: str = None,
105 **kwargs,
106 ):
107 """
108 Deploys of a new KDU instance. It would implicitly rely on the `install` call
109 to deploy the Chart/Bundle properly parametrized (in practice, this call would
110 happen before any _initial-config-primitive_of the VNF is called).
111
112 :param cluster_uuid: UUID of a K8s cluster known by OSM
113 :param kdu_model: chart/ reference (string), which can be either
114 of these options:
115 - a name of chart available via the repos known by OSM
116 - a path to a packaged chart
117 - a path to an unpacked chart directory or a URL
118 :param kdu_instance: Kdu instance name
119 :param atomic: If set, installation process purges chart/bundle on fail, also
120 will wait until all the K8s objects are active
121 :param timeout: Time in seconds to wait for the install of the chart/bundle
122 (defaults to Helm default timeout: 300s)
123 :param params: dictionary of key-value pairs for instantiation parameters
124 (overriding default values)
125 :param dict db_dict: where to write into database when the status changes.
126 It contains a dict with {collection: <str>, filter: {},
127 path: <str>},
128 e.g. {collection: "nsrs", filter:
129 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
130 :param kdu_name: Name of the KDU instance to be installed
131 :param namespace: K8s namespace to use for the KDU instance
132 :param kwargs: Additional parameters (None yet)
133 :return: True if successful
134 """
135 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
136 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
137
138 # sync local dir
139 self.fs.sync(from_path=cluster_id)
140
141 # init env, paths
142 paths, env = self._init_paths_env(
143 cluster_name=cluster_id, create_if_not_exist=True
144 )
145
146 await self._install_impl(
147 cluster_id,
148 kdu_model,
149 paths,
150 env,
151 kdu_instance,
152 atomic=atomic,
153 timeout=timeout,
154 params=params,
155 db_dict=db_dict,
156 kdu_name=kdu_name,
157 namespace=namespace,
158 )
159
160 # sync fs
161 self.fs.reverse_sync(from_path=cluster_id)
162
163 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
164 return True
165
166 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
167
168 self.log.debug(
169 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
170 )
171
172 return await self._exec_inspect_comand(
173 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
174 )
175
176 """
177 ####################################################################################
178 ################################### P R I V A T E ##################################
179 ####################################################################################
180 """
181
182 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
183 """
184 Creates and returns base cluster and kube dirs and returns them.
185 Also created helm3 dirs according to new directory specification, paths are
186 returned and also environment variables that must be provided to execute commands
187
188 Helm 2 directory specification uses helm_home dir:
189
190 The variables assigned for this paths are:
191 - Helm hone: $HELM_HOME
192 - helm kubeconfig: $KUBECONFIG
193
194 :param cluster_name: cluster_name
195 :return: Dictionary with config_paths and dictionary with helm environment variables
196 """
197 base = self.fs.path
198 if base.endswith("/") or base.endswith("\\"):
199 base = base[:-1]
200
201 # base dir for cluster
202 cluster_dir = base + "/" + cluster_name
203
204 # kube dir
205 kube_dir = cluster_dir + "/" + ".kube"
206 if create_if_not_exist and not os.path.exists(kube_dir):
207 self.log.debug("Creating dir {}".format(kube_dir))
208 os.makedirs(kube_dir)
209
210 # helm home dir
211 helm_dir = cluster_dir + "/" + ".helm"
212 if create_if_not_exist and not os.path.exists(helm_dir):
213 self.log.debug("Creating dir {}".format(helm_dir))
214 os.makedirs(helm_dir)
215
216 config_filename = kube_dir + "/config"
217
218 # 2 - Prepare dictionary with paths
219 paths = {
220 "kube_dir": kube_dir,
221 "kube_config": config_filename,
222 "cluster_dir": cluster_dir,
223 "helm_dir": helm_dir,
224 }
225
226 for file_name, file in paths.items():
227 if "dir" in file_name and not os.path.exists(file):
228 err_msg = "{} dir does not exist".format(file)
229 self.log.error(err_msg)
230 raise K8sException(err_msg)
231
232 # 3 - Prepare environment variables
233 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
234
235 return paths, env
236
237 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
238
239 # init config, env
240 paths, env = self._init_paths_env(
241 cluster_name=cluster_id, create_if_not_exist=True
242 )
243
244 command1 = "env KUBECONFIG={} {} get manifest {} ".format(
245 kubeconfig, self._helm_command, kdu_instance
246 )
247 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
248 output, _rc = await self._local_async_exec_pipe(
249 command1, command2, env=env, raise_exception_on_error=True
250 )
251 services = self._parse_services(output)
252
253 return services
254
255 async def _cluster_init(
256 self, cluster_id: str, namespace: str, paths: dict, env: dict
257 ):
258 """
259 Implements the helm version dependent cluster initialization:
260 For helm2 it initialized tiller environment if needed
261 """
262
263 # check if tiller pod is up in cluster
264 command = "{} --kubeconfig={} --namespace={} get deployments".format(
265 self.kubectl_command, paths["kube_config"], namespace
266 )
267 output, _rc = await self._local_async_exec(
268 command=command, raise_exception_on_error=True, env=env
269 )
270
271 output_table = self._output_to_table(output=output)
272
273 # find 'tiller' pod in all pods
274 already_initialized = False
275 try:
276 for row in output_table:
277 if row[0].startswith("tiller-deploy"):
278 already_initialized = True
279 break
280 except Exception:
281 pass
282
283 # helm init
284 n2vc_installed_sw = False
285 if not already_initialized:
286 self.log.info(
287 "Initializing helm in client and server: {}".format(cluster_id)
288 )
289 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
290 self.kubectl_command, paths["kube_config"], self.service_account
291 )
292 _, _rc = await self._local_async_exec(
293 command=command, raise_exception_on_error=False, env=env
294 )
295
296 command = (
297 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
298 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
299 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
300 _, _rc = await self._local_async_exec(
301 command=command, raise_exception_on_error=False, env=env
302 )
303
304 command = (
305 "{} init --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
306 " {}"
307 ).format(
308 self._helm_command,
309 paths["kube_config"],
310 namespace,
311 paths["helm_dir"],
312 self.service_account,
313 "--stable-repo-url {}".format(self._stable_repo_url)
314 if self._stable_repo_url
315 else "--skip-repos",
316 )
317 _, _rc = await self._local_async_exec(
318 command=command, raise_exception_on_error=True, env=env
319 )
320 n2vc_installed_sw = True
321 else:
322 # check client helm installation
323 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
324 if not self._check_file_exists(
325 filename=check_file, exception_if_not_exists=False
326 ):
327 self.log.info("Initializing helm in client: {}".format(cluster_id))
328 command = (
329 "{} init --kubeconfig={} --tiller-namespace={} "
330 "--home={} --client-only {} "
331 ).format(
332 self._helm_command,
333 paths["kube_config"],
334 namespace,
335 paths["helm_dir"],
336 "--stable-repo-url {}".format(self._stable_repo_url)
337 if self._stable_repo_url
338 else "--skip-repos",
339 )
340 output, _rc = await self._local_async_exec(
341 command=command, raise_exception_on_error=True, env=env
342 )
343 else:
344 self.log.info("Helm client already initialized")
345
346 # remove old stable repo and add new one
347 cluster_uuid = "{}:{}".format(namespace, cluster_id)
348 repo_list = await self.repo_list(cluster_uuid)
349 for repo in repo_list:
350 if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
351 self.log.debug("Add new stable repo url: {}")
352 await self.repo_remove(cluster_uuid, "stable")
353 if self._stable_repo_url:
354 await self.repo_add(cluster_uuid, "stable", self._stable_repo_url)
355 break
356
357 return n2vc_installed_sw
358
359 async def _uninstall_sw(self, cluster_id: str, namespace: str):
360 # uninstall Tiller if necessary
361
362 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
363
364 # init paths, env
365 paths, env = self._init_paths_env(
366 cluster_name=cluster_id, create_if_not_exist=True
367 )
368
369 if not namespace:
370 # find namespace for tiller pod
371 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
372 self.kubectl_command, paths["kube_config"]
373 )
374 output, _rc = await self._local_async_exec(
375 command=command, raise_exception_on_error=False, env=env
376 )
377 output_table = self._output_to_table(output=output)
378 namespace = None
379 for r in output_table:
380 try:
381 if "tiller-deploy" in r[1]:
382 namespace = r[0]
383 break
384 except Exception:
385 pass
386 else:
387 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
388 self.log.error(msg)
389
390 self.log.debug("namespace for tiller: {}".format(namespace))
391
392 if namespace:
393 # uninstall tiller from cluster
394 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
395 command = "{} --kubeconfig={} --home={} reset".format(
396 self._helm_command, paths["kube_config"], paths["helm_dir"]
397 )
398 self.log.debug("resetting: {}".format(command))
399 output, _rc = await self._local_async_exec(
400 command=command, raise_exception_on_error=True, env=env
401 )
402 # Delete clusterrolebinding and serviceaccount.
403 # Ignore if errors for backward compatibility
404 command = (
405 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
406 "io/osm-tiller-cluster-rule"
407 ).format(self.kubectl_command, paths["kube_config"])
408 output, _rc = await self._local_async_exec(
409 command=command, raise_exception_on_error=False, env=env
410 )
411 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
412 self.kubectl_command, paths["kube_config"], self.service_account
413 )
414 output, _rc = await self._local_async_exec(
415 command=command, raise_exception_on_error=False, env=env
416 )
417
418 else:
419 self.log.debug("namespace not found")
420
421 async def _instances_list(self, cluster_id):
422
423 # init paths, env
424 paths, env = self._init_paths_env(
425 cluster_name=cluster_id, create_if_not_exist=True
426 )
427
428 command = "{} list --output yaml".format(self._helm_command)
429
430 output, _rc = await self._local_async_exec(
431 command=command, raise_exception_on_error=True, env=env
432 )
433
434 if output and len(output) > 0:
435 # parse yaml and update keys to lower case to unify with helm3
436 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
437 new_instances = []
438 for instance in instances:
439 new_instance = dict((k.lower(), v) for k, v in instance.items())
440 new_instances.append(new_instance)
441 return new_instances
442 else:
443 return []
444
445 def _get_inspect_command(
446 self, show_command: str, kdu_model: str, repo_str: str, version: str
447 ):
448 inspect_command = "{} inspect {} {}{} {}".format(
449 self._helm_command, show_command, kdu_model, repo_str, version
450 )
451 return inspect_command
452
453 async def _status_kdu(
454 self,
455 cluster_id: str,
456 kdu_instance: str,
457 namespace: str = None,
458 yaml_format: bool = False,
459 show_error_log: bool = False,
460 ) -> Union[str, dict]:
461
462 self.log.debug(
463 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
464 )
465
466 # init config, env
467 paths, env = self._init_paths_env(
468 cluster_name=cluster_id, create_if_not_exist=True
469 )
470 command = ("env KUBECONFIG={} {} status {} --output yaml").format(
471 paths["kube_config"], self._helm_command, kdu_instance
472 )
473 output, rc = await self._local_async_exec(
474 command=command,
475 raise_exception_on_error=True,
476 show_error_log=show_error_log,
477 env=env,
478 )
479
480 if yaml_format:
481 return str(output)
482
483 if rc != 0:
484 return None
485
486 data = yaml.load(output, Loader=yaml.SafeLoader)
487
488 # remove field 'notes'
489 try:
490 del data.get("info").get("status")["notes"]
491 except KeyError:
492 pass
493
494 # parse the manifest to a list of dictionaries
495 if "manifest" in data:
496 manifest_str = data.get("manifest")
497 manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
498
499 data["manifest"] = []
500 for doc in manifest_docs:
501 data["manifest"].append(doc)
502
503 # parse field 'resources'
504 try:
505 resources = str(data.get("info").get("status").get("resources"))
506 resource_table = self._output_to_table(resources)
507 data.get("info").get("status")["resources"] = resource_table
508 except Exception:
509 pass
510
511 # set description to lowercase (unify with helm3)
512 try:
513 data.get("info")["description"] = data.get("info").pop("Description")
514 except KeyError:
515 pass
516
517 return data
518
519 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
520 repo_ids = []
521 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
522 cluster = self.db.get_one("k8sclusters", cluster_filter)
523 if cluster:
524 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
525 return repo_ids
526 else:
527 raise K8sException(
528 "k8cluster with helm-id : {} not found".format(cluster_uuid)
529 )
530
531 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
532 # init config, env
533 paths, env = self._init_paths_env(
534 cluster_name=cluster_id, create_if_not_exist=True
535 )
536
537 status = await self._status_kdu(
538 cluster_id=cluster_id, kdu_instance=kdu_instance, yaml_format=False
539 )
540
541 # extract info.status.resources-> str
542 # format:
543 # ==> v1/Deployment
544 # NAME READY UP-TO-DATE AVAILABLE AGE
545 # halting-horse-mongodb 0/1 1 0 0s
546 # halting-petit-mongodb 1/1 1 0 0s
547 # blank line
548 resources = K8sHelmBaseConnector._get_deep(
549 status, ("info", "status", "resources")
550 )
551
552 # convert to table
553 resources = K8sHelmBaseConnector._output_to_table(resources)
554
555 num_lines = len(resources)
556 index = 0
557 ready = True
558 while index < num_lines:
559 try:
560 line1 = resources[index]
561 index += 1
562 # find '==>' in column 0
563 if line1[0] == "==>":
564 line2 = resources[index]
565 index += 1
566 # find READY in column 1
567 if line2[1] == "READY":
568 # read next lines
569 line3 = resources[index]
570 index += 1
571 while len(line3) > 1 and index < num_lines:
572 ready_value = line3[1]
573 parts = ready_value.split(sep="/")
574 current = int(parts[0])
575 total = int(parts[1])
576 if current < total:
577 self.log.debug("NOT READY:\n {}".format(line3))
578 ready = False
579 line3 = resources[index]
580 index += 1
581
582 except Exception:
583 pass
584
585 return ready
586
587 def _get_install_command(
588 self,
589 kdu_model,
590 kdu_instance,
591 namespace,
592 params_str,
593 version,
594 atomic,
595 timeout,
596 kubeconfig,
597 ) -> str:
598
599 timeout_str = ""
600 if timeout:
601 timeout_str = "--timeout {}".format(timeout)
602
603 # atomic
604 atomic_str = ""
605 if atomic:
606 atomic_str = "--atomic"
607 # namespace
608 namespace_str = ""
609 if namespace:
610 namespace_str = "--namespace {}".format(namespace)
611
612 # version
613 version_str = ""
614 if version:
615 version_str = version_str = "--version {}".format(version)
616
617 command = (
618 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
619 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
620 kubeconfig=kubeconfig,
621 helm=self._helm_command,
622 atomic=atomic_str,
623 params=params_str,
624 timeout=timeout_str,
625 name=kdu_instance,
626 ns=namespace_str,
627 model=kdu_model,
628 ver=version_str,
629 )
630 )
631 return command
632
633 def _get_upgrade_command(
634 self,
635 kdu_model,
636 kdu_instance,
637 namespace,
638 params_str,
639 version,
640 atomic,
641 timeout,
642 kubeconfig,
643 ) -> str:
644
645 timeout_str = ""
646 if timeout:
647 timeout_str = "--timeout {}".format(timeout)
648
649 # atomic
650 atomic_str = ""
651 if atomic:
652 atomic_str = "--atomic"
653
654 # version
655 version_str = ""
656 if version:
657 version_str = "--version {}".format(version)
658
659 command = (
660 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"
661 ).format(
662 kubeconfig=kubeconfig,
663 helm=self._helm_command,
664 atomic=atomic_str,
665 params=params_str,
666 timeout=timeout_str,
667 name=kdu_instance,
668 model=kdu_model,
669 ver=version_str,
670 )
671 return command
672
673 def _get_rollback_command(
674 self, kdu_instance, namespace, revision, kubeconfig
675 ) -> str:
676 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
677 kubeconfig, self._helm_command, kdu_instance, revision
678 )
679
680 def _get_uninstall_command(
681 self, kdu_instance: str, namespace: str, kubeconfig: str
682 ) -> str:
683 return "env KUBECONFIG={} {} delete --purge {}".format(
684 kubeconfig, self._helm_command, kdu_instance
685 )