fix 1208: add native charm support for rhel and fix centos support
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelmConnector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm",
44 log: object = None,
45 on_update_db=None,
46 vca_config: dict = None,
47 ):
48 """
49 Initializes helm connector for helm v2
50
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
55 :param log: logger
56 :param on_update_db: callback called when k8s connector updates database
57 """
58
59 # parent class
60 K8sHelmBaseConnector.__init__(
61 self,
62 db=db,
63 log=log,
64 fs=fs,
65 kubectl_command=kubectl_command,
66 helm_command=helm_command,
67 on_update_db=on_update_db,
68 vca_config=vca_config,
69 )
70
71 self.log.info("Initializing K8S Helm2 connector")
72
73 # initialize helm client-only
74 self.log.debug("Initializing helm client-only...")
75 command = "{} init --client-only --stable-repo-url {} ".format(
76 self._helm_command, self._stable_repo_url)
77 try:
78 asyncio.ensure_future(
79 self._local_async_exec(command=command, raise_exception_on_error=False)
80 )
81 # loop = asyncio.get_event_loop()
82 # loop.run_until_complete(self._local_async_exec(command=command,
83 # raise_exception_on_error=False))
84 except Exception as e:
85 self.warning(
86 msg="helm init failed (it was already initialized): {}".format(e)
87 )
88
89 self.log.info("K8S Helm2 connector initialized")
90
91 async def install(
92 self,
93 cluster_uuid: str,
94 kdu_model: str,
95 kdu_instance: str,
96 atomic: bool = True,
97 timeout: float = 300,
98 params: dict = None,
99 db_dict: dict = None,
100 kdu_name: str = None,
101 namespace: str = None,
102 ):
103 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
104 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
105
106 # sync local dir
107 self.fs.sync(from_path=cluster_id)
108
109 # init env, paths
110 paths, env = self._init_paths_env(
111 cluster_name=cluster_id, create_if_not_exist=True
112 )
113
114 await self._install_impl(
115 cluster_id,
116 kdu_model,
117 paths,
118 env,
119 kdu_instance,
120 atomic=atomic,
121 timeout=timeout,
122 params=params,
123 db_dict=db_dict,
124 kdu_name=kdu_name,
125 namespace=namespace,
126 )
127
128 # sync fs
129 self.fs.reverse_sync(from_path=cluster_id)
130
131 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
132 return True
133
134 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
135
136 self.log.debug(
137 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
138 )
139
140 return await self._exec_inspect_comand(
141 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
142 )
143
144 """
145 ####################################################################################
146 ################################### P R I V A T E ##################################
147 ####################################################################################
148 """
149
150 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
151 """
152 Creates and returns base cluster and kube dirs and returns them.
153 Also created helm3 dirs according to new directory specification, paths are
154 returned and also environment variables that must be provided to execute commands
155
156 Helm 2 directory specification uses helm_home dir:
157
158 The variables assigned for this paths are:
159 - Helm hone: $HELM_HOME
160 - helm kubeconfig: $KUBECONFIG
161
162 :param cluster_name: cluster_name
163 :return: Dictionary with config_paths and dictionary with helm environment variables
164 """
165 base = self.fs.path
166 if base.endswith("/") or base.endswith("\\"):
167 base = base[:-1]
168
169 # base dir for cluster
170 cluster_dir = base + "/" + cluster_name
171
172 # kube dir
173 kube_dir = cluster_dir + "/" + ".kube"
174 if create_if_not_exist and not os.path.exists(kube_dir):
175 self.log.debug("Creating dir {}".format(kube_dir))
176 os.makedirs(kube_dir)
177
178 # helm home dir
179 helm_dir = cluster_dir + "/" + ".helm"
180 if create_if_not_exist and not os.path.exists(helm_dir):
181 self.log.debug("Creating dir {}".format(helm_dir))
182 os.makedirs(helm_dir)
183
184 config_filename = kube_dir + "/config"
185
186 # 2 - Prepare dictionary with paths
187 paths = {
188 "kube_dir": kube_dir,
189 "kube_config": config_filename,
190 "cluster_dir": cluster_dir,
191 "helm_dir": helm_dir,
192 }
193
194 for file_name, file in paths.items():
195 if "dir" in file_name and not os.path.exists(file):
196 err_msg = "{} dir does not exist".format(file)
197 self.log.error(err_msg)
198 raise K8sException(err_msg)
199
200 # 3 - Prepare environment variables
201 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
202
203 return paths, env
204
205 async def _get_services(self, cluster_id, kdu_instance, namespace):
206
207 # init config, env
208 paths, env = self._init_paths_env(
209 cluster_name=cluster_id, create_if_not_exist=True
210 )
211
212 command1 = "{} get manifest {} ".format(self._helm_command, kdu_instance)
213 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
214 output, _rc = await self._local_async_exec_pipe(
215 command1, command2, env=env, raise_exception_on_error=True
216 )
217 services = self._parse_services(output)
218
219 return services
220
221 async def _cluster_init(self, cluster_id: str, namespace: str,
222 paths: dict, env: dict):
223 """
224 Implements the helm version dependent cluster initialization:
225 For helm2 it initialized tiller environment if needed
226 """
227
228 # check if tiller pod is up in cluster
229 command = "{} --kubeconfig={} --namespace={} get deployments".format(
230 self.kubectl_command, paths["kube_config"], namespace
231 )
232 output, _rc = await self._local_async_exec(
233 command=command, raise_exception_on_error=True, env=env
234 )
235
236 output_table = self._output_to_table(output=output)
237
238 # find 'tiller' pod in all pods
239 already_initialized = False
240 try:
241 for row in output_table:
242 if row[0].startswith("tiller-deploy"):
243 already_initialized = True
244 break
245 except Exception:
246 pass
247
248 # helm init
249 n2vc_installed_sw = False
250 if not already_initialized:
251 self.log.info(
252 "Initializing helm in client and server: {}".format(cluster_id)
253 )
254 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
255 self.kubectl_command, paths["kube_config"], self.service_account
256 )
257 _, _rc = await self._local_async_exec(
258 command=command, raise_exception_on_error=False, env=env
259 )
260
261 command = (
262 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
263 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
264 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
265 _, _rc = await self._local_async_exec(
266 command=command, raise_exception_on_error=False, env=env
267 )
268
269 command = (
270 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
271 " --stable-repo-url {} init"
272 ).format(
273 self._helm_command,
274 paths["kube_config"],
275 namespace,
276 paths["helm_dir"],
277 self.service_account,
278 self._stable_repo_url
279 )
280 _, _rc = await self._local_async_exec(
281 command=command, raise_exception_on_error=True, env=env
282 )
283 n2vc_installed_sw = True
284 else:
285 # check client helm installation
286 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
287 if not self._check_file_exists(
288 filename=check_file, exception_if_not_exists=False
289 ):
290 self.log.info("Initializing helm in client: {}".format(cluster_id))
291 command = (
292 "{} --kubeconfig={} --tiller-namespace={} "
293 "--home={} init --client-only --stable-repo-url {} "
294 ).format(
295 self._helm_command,
296 paths["kube_config"],
297 namespace,
298 paths["helm_dir"],
299 self._stable_repo_url,
300 )
301 output, _rc = await self._local_async_exec(
302 command=command, raise_exception_on_error=True, env=env
303 )
304 else:
305 self.log.info("Helm client already initialized")
306
307 # remove old stable repo and add new one
308 cluster_uuid = "{}:{}".format(namespace, cluster_id)
309 repo_list = await self.repo_list(cluster_uuid)
310 for repo in repo_list:
311 if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
312 self.log.debug("Add new stable repo url: {}")
313 await self.repo_remove(cluster_uuid,
314 "stable")
315 await self.repo_add(cluster_uuid,
316 "stable",
317 self._stable_repo_url)
318 break
319
320 return n2vc_installed_sw
321
322 async def _uninstall_sw(self, cluster_id: str, namespace: str):
323 # uninstall Tiller if necessary
324
325 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
326
327 # init paths, env
328 paths, env = self._init_paths_env(
329 cluster_name=cluster_id, create_if_not_exist=True
330 )
331
332 if not namespace:
333 # find namespace for tiller pod
334 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
335 self.kubectl_command, paths["kube_config"]
336 )
337 output, _rc = await self._local_async_exec(
338 command=command, raise_exception_on_error=False, env=env
339 )
340 output_table = self._output_to_table(output=output)
341 namespace = None
342 for r in output_table:
343 try:
344 if "tiller-deploy" in r[1]:
345 namespace = r[0]
346 break
347 except Exception:
348 pass
349 else:
350 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
351 self.log.error(msg)
352
353 self.log.debug("namespace for tiller: {}".format(namespace))
354
355 if namespace:
356 # uninstall tiller from cluster
357 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
358 command = "{} --kubeconfig={} --home={} reset".format(
359 self._helm_command, paths["kube_config"], paths["helm_dir"]
360 )
361 self.log.debug("resetting: {}".format(command))
362 output, _rc = await self._local_async_exec(
363 command=command, raise_exception_on_error=True, env=env
364 )
365 # Delete clusterrolebinding and serviceaccount.
366 # Ignore if errors for backward compatibility
367 command = (
368 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
369 "io/osm-tiller-cluster-rule"
370 ).format(self.kubectl_command, paths["kube_config"])
371 output, _rc = await self._local_async_exec(
372 command=command, raise_exception_on_error=False, env=env
373 )
374 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
375 self.kubectl_command, paths["kube_config"], self.service_account
376 )
377 output, _rc = await self._local_async_exec(
378 command=command, raise_exception_on_error=False, env=env
379 )
380
381 else:
382 self.log.debug("namespace not found")
383
384 async def _instances_list(self, cluster_id):
385
386 # init paths, env
387 paths, env = self._init_paths_env(
388 cluster_name=cluster_id, create_if_not_exist=True
389 )
390
391 command = "{} list --output yaml".format(self._helm_command)
392
393 output, _rc = await self._local_async_exec(
394 command=command, raise_exception_on_error=True, env=env
395 )
396
397 if output and len(output) > 0:
398 # parse yaml and update keys to lower case to unify with helm3
399 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
400 new_instances = []
401 for instance in instances:
402 new_instance = dict((k.lower(), v) for k, v in instance.items())
403 new_instances.append(new_instance)
404 return new_instances
405 else:
406 return []
407
408 def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
409 version: str):
410 inspect_command = "{} inspect {} {}{} {}".format(
411 self._helm_command, show_command, kdu_model, repo_str, version
412 )
413 return inspect_command
414
415 async def _status_kdu(
416 self,
417 cluster_id: str,
418 kdu_instance: str,
419 namespace: str = None,
420 show_error_log: bool = False,
421 return_text: bool = False,
422 ):
423
424 self.log.debug(
425 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
426 )
427
428 # init config, env
429 paths, env = self._init_paths_env(
430 cluster_name=cluster_id, create_if_not_exist=True
431 )
432 command = "{} status {} --output yaml".format(self._helm_command, kdu_instance)
433 output, rc = await self._local_async_exec(
434 command=command,
435 raise_exception_on_error=True,
436 show_error_log=show_error_log,
437 env=env,
438 )
439
440 if return_text:
441 return str(output)
442
443 if rc != 0:
444 return None
445
446 data = yaml.load(output, Loader=yaml.SafeLoader)
447
448 # remove field 'notes'
449 try:
450 del data.get("info").get("status")["notes"]
451 except KeyError:
452 pass
453
454 # parse field 'resources'
455 try:
456 resources = str(data.get("info").get("status").get("resources"))
457 resource_table = self._output_to_table(resources)
458 data.get("info").get("status")["resources"] = resource_table
459 except Exception:
460 pass
461
462 # set description to lowercase (unify with helm3)
463 try:
464 data.get("info")["description"] = data.get("info").pop("Description")
465 except KeyError:
466 pass
467
468 return data
469
470 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
471 repo_ids = []
472 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
473 cluster = self.db.get_one("k8sclusters", cluster_filter)
474 if cluster:
475 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
476 return repo_ids
477 else:
478 raise K8sException(
479 "k8cluster with helm-id : {} not found".format(cluster_uuid)
480 )
481
482 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
483
484 status = await self._status_kdu(
485 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
486 )
487
488 # extract info.status.resources-> str
489 # format:
490 # ==> v1/Deployment
491 # NAME READY UP-TO-DATE AVAILABLE AGE
492 # halting-horse-mongodb 0/1 1 0 0s
493 # halting-petit-mongodb 1/1 1 0 0s
494 # blank line
495 resources = K8sHelmBaseConnector._get_deep(
496 status, ("info", "status", "resources")
497 )
498
499 # convert to table
500 resources = K8sHelmBaseConnector._output_to_table(resources)
501
502 num_lines = len(resources)
503 index = 0
504 ready = True
505 while index < num_lines:
506 try:
507 line1 = resources[index]
508 index += 1
509 # find '==>' in column 0
510 if line1[0] == "==>":
511 line2 = resources[index]
512 index += 1
513 # find READY in column 1
514 if line2[1] == "READY":
515 # read next lines
516 line3 = resources[index]
517 index += 1
518 while len(line3) > 1 and index < num_lines:
519 ready_value = line3[1]
520 parts = ready_value.split(sep="/")
521 current = int(parts[0])
522 total = int(parts[1])
523 if current < total:
524 self.log.debug("NOT READY:\n {}".format(line3))
525 ready = False
526 line3 = resources[index]
527 index += 1
528
529 except Exception:
530 pass
531
532 return ready
533
534 def _get_install_command(
535 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
536 ) -> str:
537
538 timeout_str = ""
539 if timeout:
540 timeout_str = "--timeout {}".format(timeout)
541
542 # atomic
543 atomic_str = ""
544 if atomic:
545 atomic_str = "--atomic"
546 # namespace
547 namespace_str = ""
548 if namespace:
549 namespace_str = "--namespace {}".format(namespace)
550
551 # version
552 version_str = ""
553 if version:
554 version_str = version_str = "--version {}".format(version)
555
556 command = (
557 "{helm} install {atomic} --output yaml "
558 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
559 helm=self._helm_command,
560 atomic=atomic_str,
561 params=params_str,
562 timeout=timeout_str,
563 name=kdu_instance,
564 ns=namespace_str,
565 model=kdu_model,
566 ver=version_str,
567 )
568 )
569 return command
570
571 def _get_upgrade_command(
572 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
573 ) -> str:
574
575 timeout_str = ""
576 if timeout:
577 timeout_str = "--timeout {}".format(timeout)
578
579 # atomic
580 atomic_str = ""
581 if atomic:
582 atomic_str = "--atomic"
583
584 # version
585 version_str = ""
586 if version:
587 version_str = "--version {}".format(version)
588
589 command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"\
590 .format(helm=self._helm_command,
591 atomic=atomic_str,
592 params=params_str,
593 timeout=timeout_str,
594 name=kdu_instance,
595 model=kdu_model,
596 ver=version_str
597 )
598 return command
599
600 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
601 return "{} rollback {} {} --wait".format(
602 self._helm_command, kdu_instance, revision
603 )
604
605 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
606 return "{} delete --purge {}".format(self._helm_command, kdu_instance)