Bug 1967 fixed
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelmConnector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm",
44 log: object = None,
45 on_update_db=None,
46 ):
47 """
48 Initializes helm connector for helm v2
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(
60 self,
61 db=db,
62 log=log,
63 fs=fs,
64 kubectl_command=kubectl_command,
65 helm_command=helm_command,
66 on_update_db=on_update_db,
67 )
68
69 self.log.info("Initializing K8S Helm2 connector")
70
71 # initialize helm client-only
72 self.log.debug("Initializing helm client-only...")
73 command = "{} init --client-only {} ".format(
74 self._helm_command,
75 "--stable-repo-url {}".format(self._stable_repo_url)
76 if self._stable_repo_url
77 else "--skip-repos",
78 )
79 try:
80 asyncio.ensure_future(
81 self._local_async_exec(command=command, raise_exception_on_error=False)
82 )
83 # loop = asyncio.get_event_loop()
84 # loop.run_until_complete(self._local_async_exec(command=command,
85 # raise_exception_on_error=False))
86 except Exception as e:
87 self.warning(
88 msg="helm init failed (it was already initialized): {}".format(e)
89 )
90
91 self.log.info("K8S Helm2 connector initialized")
92
93 async def install(
94 self,
95 cluster_uuid: str,
96 kdu_model: str,
97 kdu_instance: str,
98 atomic: bool = True,
99 timeout: float = 300,
100 params: dict = None,
101 db_dict: dict = None,
102 kdu_name: str = None,
103 namespace: str = None,
104 **kwargs,
105 ):
106 """
107 Deploys of a new KDU instance. It would implicitly rely on the `install` call
108 to deploy the Chart/Bundle properly parametrized (in practice, this call would
109 happen before any _initial-config-primitive_of the VNF is called).
110
111 :param cluster_uuid: UUID of a K8s cluster known by OSM
112 :param kdu_model: chart/ reference (string), which can be either
113 of these options:
114 - a name of chart available via the repos known by OSM
115 - a path to a packaged chart
116 - a path to an unpacked chart directory or a URL
117 :param kdu_instance: Kdu instance name
118 :param atomic: If set, installation process purges chart/bundle on fail, also
119 will wait until all the K8s objects are active
120 :param timeout: Time in seconds to wait for the install of the chart/bundle
121 (defaults to Helm default timeout: 300s)
122 :param params: dictionary of key-value pairs for instantiation parameters
123 (overriding default values)
124 :param dict db_dict: where to write into database when the status changes.
125 It contains a dict with {collection: <str>, filter: {},
126 path: <str>},
127 e.g. {collection: "nsrs", filter:
128 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
129 :param kdu_name: Name of the KDU instance to be installed
130 :param namespace: K8s namespace to use for the KDU instance
131 :param kwargs: Additional parameters (None yet)
132 :return: True if successful
133 """
134 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
135
136 # sync local dir
137 self.fs.sync(from_path=cluster_uuid)
138
139 # init env, paths
140 paths, env = self._init_paths_env(
141 cluster_name=cluster_uuid, create_if_not_exist=True
142 )
143
144 await self._install_impl(
145 cluster_uuid,
146 kdu_model,
147 paths,
148 env,
149 kdu_instance,
150 atomic=atomic,
151 timeout=timeout,
152 params=params,
153 db_dict=db_dict,
154 kdu_name=kdu_name,
155 namespace=namespace,
156 )
157
158 # sync fs
159 self.fs.reverse_sync(from_path=cluster_uuid)
160
161 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
162 return True
163
164 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
165
166 self.log.debug(
167 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
168 )
169
170 return await self._exec_inspect_comand(
171 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
172 )
173
174 """
175 ####################################################################################
176 ################################### P R I V A T E ##################################
177 ####################################################################################
178 """
179
180 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
181 """
182 Creates and returns base cluster and kube dirs and returns them.
183 Also created helm3 dirs according to new directory specification, paths are
184 returned and also environment variables that must be provided to execute commands
185
186 Helm 2 directory specification uses helm_home dir:
187
188 The variables assigned for this paths are:
189 - Helm hone: $HELM_HOME
190 - helm kubeconfig: $KUBECONFIG
191
192 :param cluster_name: cluster_name
193 :return: Dictionary with config_paths and dictionary with helm environment variables
194 """
195 base = self.fs.path
196 if base.endswith("/") or base.endswith("\\"):
197 base = base[:-1]
198
199 # base dir for cluster
200 cluster_dir = base + "/" + cluster_name
201
202 # kube dir
203 kube_dir = cluster_dir + "/" + ".kube"
204 if create_if_not_exist and not os.path.exists(kube_dir):
205 self.log.debug("Creating dir {}".format(kube_dir))
206 os.makedirs(kube_dir)
207
208 # helm home dir
209 helm_dir = cluster_dir + "/" + ".helm"
210 if create_if_not_exist and not os.path.exists(helm_dir):
211 self.log.debug("Creating dir {}".format(helm_dir))
212 os.makedirs(helm_dir)
213
214 config_filename = kube_dir + "/config"
215
216 # 2 - Prepare dictionary with paths
217 paths = {
218 "kube_dir": kube_dir,
219 "kube_config": config_filename,
220 "cluster_dir": cluster_dir,
221 "helm_dir": helm_dir,
222 }
223
224 for file_name, file in paths.items():
225 if "dir" in file_name and not os.path.exists(file):
226 err_msg = "{} dir does not exist".format(file)
227 self.log.error(err_msg)
228 raise K8sException(err_msg)
229
230 # 3 - Prepare environment variables
231 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
232
233 return paths, env
234
235 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
236
237 # init config, env
238 paths, env = self._init_paths_env(
239 cluster_name=cluster_id, create_if_not_exist=True
240 )
241
242 command1 = "env KUBECONFIG={} {} get manifest {} ".format(
243 kubeconfig, self._helm_command, kdu_instance
244 )
245 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
246 output, _rc = await self._local_async_exec_pipe(
247 command1, command2, env=env, raise_exception_on_error=True
248 )
249 services = self._parse_services(output)
250
251 return services
252
253 async def _cluster_init(
254 self, cluster_id: str, namespace: str, paths: dict, env: dict
255 ):
256 """
257 Implements the helm version dependent cluster initialization:
258 For helm2 it initialized tiller environment if needed
259 """
260
261 # check if tiller pod is up in cluster
262 command = "{} --kubeconfig={} --namespace={} get deployments".format(
263 self.kubectl_command, paths["kube_config"], namespace
264 )
265 output, _rc = await self._local_async_exec(
266 command=command, raise_exception_on_error=True, env=env
267 )
268
269 output_table = self._output_to_table(output=output)
270
271 # find 'tiller' pod in all pods
272 already_initialized = False
273 try:
274 for row in output_table:
275 if row[0].startswith("tiller-deploy"):
276 already_initialized = True
277 break
278 except Exception:
279 pass
280
281 # helm init
282 n2vc_installed_sw = False
283 if not already_initialized:
284 self.log.info(
285 "Initializing helm in client and server: {}".format(cluster_id)
286 )
287 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
288 self.kubectl_command, paths["kube_config"], self.service_account
289 )
290 _, _rc = await self._local_async_exec(
291 command=command, raise_exception_on_error=False, env=env
292 )
293
294 command = (
295 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
296 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
297 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
298 _, _rc = await self._local_async_exec(
299 command=command, raise_exception_on_error=False, env=env
300 )
301
302 command = (
303 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
304 " {} init"
305 ).format(
306 self._helm_command,
307 paths["kube_config"],
308 namespace,
309 paths["helm_dir"],
310 self.service_account,
311 "--stable-repo-url {}".format(self._stable_repo_url)
312 if self._stable_repo_url
313 else "--skip-repos",
314 )
315 _, _rc = await self._local_async_exec(
316 command=command, raise_exception_on_error=True, env=env
317 )
318 n2vc_installed_sw = True
319 else:
320 # check client helm installation
321 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
322 if not self._check_file_exists(
323 filename=check_file, exception_if_not_exists=False
324 ):
325 self.log.info("Initializing helm in client: {}".format(cluster_id))
326 command = (
327 "{} --kubeconfig={} --tiller-namespace={} "
328 "--home={} init --client-only {} "
329 ).format(
330 self._helm_command,
331 paths["kube_config"],
332 namespace,
333 paths["helm_dir"],
334 "--stable-repo-url {}".format(self._stable_repo_url)
335 if self._stable_repo_url
336 else "--skip-repos",
337 )
338 output, _rc = await self._local_async_exec(
339 command=command, raise_exception_on_error=True, env=env
340 )
341 else:
342 self.log.info("Helm client already initialized")
343
344 repo_list = await self.repo_list(cluster_id)
345 for repo in repo_list:
346 if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
347 self.log.debug("Add new stable repo url: {}")
348 await self.repo_remove(cluster_id, "stable")
349 if self._stable_repo_url:
350 await self.repo_add(cluster_id, "stable", self._stable_repo_url)
351 break
352
353 return n2vc_installed_sw
354
355 async def _uninstall_sw(self, cluster_id: str, namespace: str):
356 # uninstall Tiller if necessary
357
358 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
359
360 # init paths, env
361 paths, env = self._init_paths_env(
362 cluster_name=cluster_id, create_if_not_exist=True
363 )
364
365 if not namespace:
366 # find namespace for tiller pod
367 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
368 self.kubectl_command, paths["kube_config"]
369 )
370 output, _rc = await self._local_async_exec(
371 command=command, raise_exception_on_error=False, env=env
372 )
373 output_table = self._output_to_table(output=output)
374 namespace = None
375 for r in output_table:
376 try:
377 if "tiller-deploy" in r[1]:
378 namespace = r[0]
379 break
380 except Exception:
381 pass
382 else:
383 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
384 self.log.error(msg)
385
386 self.log.debug("namespace for tiller: {}".format(namespace))
387
388 if namespace:
389 # uninstall tiller from cluster
390 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
391 command = "{} --kubeconfig={} --home={} reset".format(
392 self._helm_command, paths["kube_config"], paths["helm_dir"]
393 )
394 self.log.debug("resetting: {}".format(command))
395 output, _rc = await self._local_async_exec(
396 command=command, raise_exception_on_error=True, env=env
397 )
398 # Delete clusterrolebinding and serviceaccount.
399 # Ignore if errors for backward compatibility
400 command = (
401 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
402 "io/osm-tiller-cluster-rule"
403 ).format(self.kubectl_command, paths["kube_config"])
404 output, _rc = await self._local_async_exec(
405 command=command, raise_exception_on_error=False, env=env
406 )
407 command = (
408 "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
409 self.kubectl_command,
410 paths["kube_config"],
411 namespace,
412 self.service_account,
413 )
414 )
415 output, _rc = await self._local_async_exec(
416 command=command, raise_exception_on_error=False, env=env
417 )
418
419 else:
420 self.log.debug("namespace not found")
421
422 async def _instances_list(self, cluster_id):
423
424 # init paths, env
425 paths, env = self._init_paths_env(
426 cluster_name=cluster_id, create_if_not_exist=True
427 )
428
429 command = "{} list --output yaml".format(self._helm_command)
430
431 output, _rc = await self._local_async_exec(
432 command=command, raise_exception_on_error=True, env=env
433 )
434
435 if output and len(output) > 0:
436 # parse yaml and update keys to lower case to unify with helm3
437 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
438 new_instances = []
439 for instance in instances:
440 new_instance = dict((k.lower(), v) for k, v in instance.items())
441 new_instances.append(new_instance)
442 return new_instances
443 else:
444 return []
445
446 def _get_inspect_command(
447 self, show_command: str, kdu_model: str, repo_str: str, version: str
448 ):
449 inspect_command = "{} inspect {} {}{} {}".format(
450 self._helm_command, show_command, kdu_model, repo_str, version
451 )
452 return inspect_command
453
454 async def _status_kdu(
455 self,
456 cluster_id: str,
457 kdu_instance: str,
458 namespace: str = None,
459 show_error_log: bool = False,
460 return_text: bool = False,
461 ):
462
463 self.log.debug(
464 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
465 )
466
467 # init config, env
468 paths, env = self._init_paths_env(
469 cluster_name=cluster_id, create_if_not_exist=True
470 )
471 command = ("env KUBECONFIG={} {} status {} --output yaml").format(
472 paths["kube_config"], self._helm_command, kdu_instance
473 )
474 output, rc = await self._local_async_exec(
475 command=command,
476 raise_exception_on_error=True,
477 show_error_log=show_error_log,
478 env=env,
479 )
480
481 if return_text:
482 return str(output)
483
484 if rc != 0:
485 return None
486
487 data = yaml.load(output, Loader=yaml.SafeLoader)
488
489 # remove field 'notes'
490 try:
491 del data.get("info").get("status")["notes"]
492 except KeyError:
493 pass
494
495 # parse the manifest to a list of dictionaries
496 if "manifest" in data:
497 manifest_str = data.get("manifest")
498 manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
499
500 data["manifest"] = []
501 for doc in manifest_docs:
502 data["manifest"].append(doc)
503
504 # parse field 'resources'
505 try:
506 resources = str(data.get("info").get("status").get("resources"))
507 resource_table = self._output_to_table(resources)
508 data.get("info").get("status")["resources"] = resource_table
509 except Exception:
510 pass
511
512 # set description to lowercase (unify with helm3)
513 try:
514 data.get("info")["description"] = data.get("info").pop("Description")
515 except KeyError:
516 pass
517
518 return data
519
520 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
521 repo_ids = []
522 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
523 cluster = self.db.get_one("k8sclusters", cluster_filter)
524 if cluster:
525 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
526 return repo_ids
527 else:
528 raise K8sException(
529 "k8cluster with helm-id : {} not found".format(cluster_uuid)
530 )
531
532 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
533 # init config, env
534 paths, env = self._init_paths_env(
535 cluster_name=cluster_id, create_if_not_exist=True
536 )
537
538 status = await self._status_kdu(
539 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
540 )
541
542 # extract info.status.resources-> str
543 # format:
544 # ==> v1/Deployment
545 # NAME READY UP-TO-DATE AVAILABLE AGE
546 # halting-horse-mongodb 0/1 1 0 0s
547 # halting-petit-mongodb 1/1 1 0 0s
548 # blank line
549 resources = K8sHelmBaseConnector._get_deep(
550 status, ("info", "status", "resources")
551 )
552
553 # convert to table
554 resources = K8sHelmBaseConnector._output_to_table(resources)
555
556 num_lines = len(resources)
557 index = 0
558 ready = True
559 while index < num_lines:
560 try:
561 line1 = resources[index]
562 index += 1
563 # find '==>' in column 0
564 if line1[0] == "==>":
565 line2 = resources[index]
566 index += 1
567 # find READY in column 1
568 if line2[1] == "READY":
569 # read next lines
570 line3 = resources[index]
571 index += 1
572 while len(line3) > 1 and index < num_lines:
573 ready_value = line3[1]
574 parts = ready_value.split(sep="/")
575 current = int(parts[0])
576 total = int(parts[1])
577 if current < total:
578 self.log.debug("NOT READY:\n {}".format(line3))
579 ready = False
580 line3 = resources[index]
581 index += 1
582
583 except Exception:
584 pass
585
586 return ready
587
588 def _get_install_command(
589 self,
590 kdu_model,
591 kdu_instance,
592 namespace,
593 params_str,
594 version,
595 atomic,
596 timeout,
597 kubeconfig,
598 ) -> str:
599
600 timeout_str = ""
601 if timeout:
602 timeout_str = "--timeout {}".format(timeout)
603
604 # atomic
605 atomic_str = ""
606 if atomic:
607 atomic_str = "--atomic"
608 # namespace
609 namespace_str = ""
610 if namespace:
611 namespace_str = "--namespace {}".format(namespace)
612
613 # version
614 version_str = ""
615 if version:
616 version_str = version_str = "--version {}".format(version)
617
618 command = (
619 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
620 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
621 kubeconfig=kubeconfig,
622 helm=self._helm_command,
623 atomic=atomic_str,
624 params=params_str,
625 timeout=timeout_str,
626 name=kdu_instance,
627 ns=namespace_str,
628 model=kdu_model,
629 ver=version_str,
630 )
631 )
632 return command
633
634 def _get_upgrade_command(
635 self,
636 kdu_model,
637 kdu_instance,
638 namespace,
639 params_str,
640 version,
641 atomic,
642 timeout,
643 kubeconfig,
644 ) -> str:
645
646 timeout_str = ""
647 if timeout:
648 timeout_str = "--timeout {}".format(timeout)
649
650 # atomic
651 atomic_str = ""
652 if atomic:
653 atomic_str = "--atomic"
654
655 # version
656 version_str = ""
657 if version:
658 version_str = "--version {}".format(version)
659
660 command = (
661 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"
662 ).format(
663 kubeconfig=kubeconfig,
664 helm=self._helm_command,
665 atomic=atomic_str,
666 params=params_str,
667 timeout=timeout_str,
668 name=kdu_instance,
669 model=kdu_model,
670 ver=version_str,
671 )
672 return command
673
674 def _get_rollback_command(
675 self, kdu_instance, namespace, revision, kubeconfig
676 ) -> str:
677 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
678 kubeconfig, self._helm_command, kdu_instance, revision
679 )
680
681 def _get_uninstall_command(
682 self, kdu_instance: str, namespace: str, kubeconfig: str
683 ) -> str:
684 return "env KUBECONFIG={} {} delete --purge {}".format(
685 kubeconfig, self._helm_command, kdu_instance
686 )