Fix bug 1298
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelmConnector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm",
44 log: object = None,
45 on_update_db=None,
46 ):
47 """
48 Initializes helm connector for helm v2
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(
60 self,
61 db=db,
62 log=log,
63 fs=fs,
64 kubectl_command=kubectl_command,
65 helm_command=helm_command,
66 on_update_db=on_update_db,
67 )
68
69 self.log.info("Initializing K8S Helm2 connector")
70
71 # initialize helm client-only
72 self.log.debug("Initializing helm client-only...")
73 command = "{} init --client-only".format(self._helm_command)
74 try:
75 asyncio.ensure_future(
76 self._local_async_exec(command=command, raise_exception_on_error=False)
77 )
78 # loop = asyncio.get_event_loop()
79 # loop.run_until_complete(self._local_async_exec(command=command,
80 # raise_exception_on_error=False))
81 except Exception as e:
82 self.warning(
83 msg="helm init failed (it was already initialized): {}".format(e)
84 )
85
86 self.log.info("K8S Helm2 connector initialized")
87
88 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
89
90 self.log.debug(
91 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
92 )
93
94 return await self._exec_inspect_comand(
95 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
96 )
97
98 """
99 ####################################################################################
100 ################################### P R I V A T E ##################################
101 ####################################################################################
102 """
103
104 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
105 """
106 Creates and returns base cluster and kube dirs and returns them.
107 Also created helm3 dirs according to new directory specification, paths are
108 returned and also environment variables that must be provided to execute commands
109
110 Helm 2 directory specification uses helm_home dir:
111
112 The variables assigned for this paths are:
113 - Helm hone: $HELM_HOME
114 - helm kubeconfig: $KUBECONFIG
115
116 :param cluster_name: cluster_name
117 :return: Dictionary with config_paths and dictionary with helm environment variables
118 """
119 base = self.fs.path
120 if base.endswith("/") or base.endswith("\\"):
121 base = base[:-1]
122
123 # base dir for cluster
124 cluster_dir = base + "/" + cluster_name
125
126 # kube dir
127 kube_dir = cluster_dir + "/" + ".kube"
128 if create_if_not_exist and not os.path.exists(kube_dir):
129 self.log.debug("Creating dir {}".format(kube_dir))
130 os.makedirs(kube_dir)
131
132 # helm home dir
133 helm_dir = cluster_dir + "/" + ".helm"
134 if create_if_not_exist and not os.path.exists(helm_dir):
135 self.log.debug("Creating dir {}".format(helm_dir))
136 os.makedirs(helm_dir)
137
138 config_filename = kube_dir + "/config"
139
140 # 2 - Prepare dictionary with paths
141 paths = {
142 "kube_dir": kube_dir,
143 "kube_config": config_filename,
144 "cluster_dir": cluster_dir,
145 "helm_dir": helm_dir,
146 }
147
148 for file_name, file in paths.items():
149 if "dir" in file_name and not os.path.exists(file):
150 err_msg = "{} dir does not exist".format(file)
151 self.log.error(err_msg)
152 raise K8sException(err_msg)
153
154 # 3 - Prepare environment variables
155 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
156
157 return paths, env
158
159 async def _get_services(self, cluster_id, kdu_instance, namespace):
160
161 # init config, env
162 paths, env = self._init_paths_env(
163 cluster_name=cluster_id, create_if_not_exist=True
164 )
165
166 command1 = "{} get manifest {} ".format(self._helm_command, kdu_instance)
167 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
168 output, _rc = await self._local_async_exec_pipe(
169 command1, command2, env=env, raise_exception_on_error=True
170 )
171 services = self._parse_services(output)
172
173 return services
174
175 async def _cluster_init(self, cluster_id: str, namespace: str,
176 paths: dict, env: dict):
177 """
178 Implements the helm version dependent cluster initialization:
179 For helm2 it initialized tiller environment if needed
180 """
181
182 # check if tiller pod is up in cluster
183 command = "{} --kubeconfig={} --namespace={} get deployments".format(
184 self.kubectl_command, paths["kube_config"], namespace
185 )
186 output, _rc = await self._local_async_exec(
187 command=command, raise_exception_on_error=True, env=env
188 )
189
190 output_table = self._output_to_table(output=output)
191
192 # find 'tiller' pod in all pods
193 already_initialized = False
194 try:
195 for row in output_table:
196 if row[0].startswith("tiller-deploy"):
197 already_initialized = True
198 break
199 except Exception:
200 pass
201
202 # helm init
203 n2vc_installed_sw = False
204 if not already_initialized:
205 self.log.info(
206 "Initializing helm in client and server: {}".format(cluster_id)
207 )
208 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
209 self.kubectl_command, paths["kube_config"], self.service_account
210 )
211 _, _rc = await self._local_async_exec(
212 command=command, raise_exception_on_error=False, env=env
213 )
214
215 command = (
216 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
217 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
218 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
219 _, _rc = await self._local_async_exec(
220 command=command, raise_exception_on_error=False, env=env
221 )
222
223 command = (
224 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
225 "init"
226 ).format(
227 self._helm_command,
228 paths["kube_config"],
229 namespace,
230 paths["helm_dir"],
231 self.service_account,
232 )
233 _, _rc = await self._local_async_exec(
234 command=command, raise_exception_on_error=True, env=env
235 )
236 n2vc_installed_sw = True
237 else:
238 # check client helm installation
239 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
240 if not self._check_file_exists(
241 filename=check_file, exception_if_not_exists=False
242 ):
243 self.log.info("Initializing helm in client: {}".format(cluster_id))
244 command = (
245 "{} --kubeconfig={} --tiller-namespace={} "
246 "--home={} init --client-only"
247 ).format(
248 self._helm_command,
249 paths["kube_config"],
250 namespace,
251 paths["helm_dir"],
252 )
253 output, _rc = await self._local_async_exec(
254 command=command, raise_exception_on_error=True, env=env
255 )
256 else:
257 self.log.info("Helm client already initialized")
258
259 return n2vc_installed_sw
260
261 async def _uninstall_sw(self, cluster_id: str, namespace: str):
262 # uninstall Tiller if necessary
263
264 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
265
266 # init paths, env
267 paths, env = self._init_paths_env(
268 cluster_name=cluster_id, create_if_not_exist=True
269 )
270
271 if not namespace:
272 # find namespace for tiller pod
273 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
274 self.kubectl_command, paths["kube_config"]
275 )
276 output, _rc = await self._local_async_exec(
277 command=command, raise_exception_on_error=False, env=env
278 )
279 output_table = self._output_to_table(output=output)
280 namespace = None
281 for r in output_table:
282 try:
283 if "tiller-deploy" in r[1]:
284 namespace = r[0]
285 break
286 except Exception:
287 pass
288 else:
289 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
290 self.log.error(msg)
291
292 self.log.debug("namespace for tiller: {}".format(namespace))
293
294 if namespace:
295 # uninstall tiller from cluster
296 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
297 command = "{} --kubeconfig={} --home={} reset".format(
298 self._helm_command, paths["kube_config"], paths["helm_dir"]
299 )
300 self.log.debug("resetting: {}".format(command))
301 output, _rc = await self._local_async_exec(
302 command=command, raise_exception_on_error=True, env=env
303 )
304 # Delete clusterrolebinding and serviceaccount.
305 # Ignore if errors for backward compatibility
306 command = (
307 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
308 "io/osm-tiller-cluster-rule"
309 ).format(self.kubectl_command, paths["kube_config"])
310 output, _rc = await self._local_async_exec(
311 command=command, raise_exception_on_error=False, env=env
312 )
313 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
314 self.kubectl_command, paths["kube_config"], self.service_account
315 )
316 output, _rc = await self._local_async_exec(
317 command=command, raise_exception_on_error=False, env=env
318 )
319
320 else:
321 self.log.debug("namespace not found")
322
323 async def _instances_list(self, cluster_id):
324
325 # init paths, env
326 paths, env = self._init_paths_env(
327 cluster_name=cluster_id, create_if_not_exist=True
328 )
329
330 command = "{} list --output yaml".format(self._helm_command)
331
332 output, _rc = await self._local_async_exec(
333 command=command, raise_exception_on_error=True, env=env
334 )
335
336 if output and len(output) > 0:
337 # parse yaml and update keys to lower case to unify with helm3
338 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
339 new_instances = []
340 for instance in instances:
341 new_instance = dict((k.lower(), v) for k, v in instance.items())
342 new_instances.append(new_instance)
343 return new_instances
344 else:
345 return []
346
347 def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
348 version: str):
349 inspect_command = "{} inspect {} {}{} {}".format(
350 self._helm_command, show_command, kdu_model, repo_str, version
351 )
352 return inspect_command
353
354 async def _status_kdu(
355 self,
356 cluster_id: str,
357 kdu_instance: str,
358 namespace: str = None,
359 show_error_log: bool = False,
360 return_text: bool = False,
361 ):
362
363 self.log.debug(
364 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
365 )
366
367 # init config, env
368 paths, env = self._init_paths_env(
369 cluster_name=cluster_id, create_if_not_exist=True
370 )
371 command = "{} status {} --output yaml".format(self._helm_command, kdu_instance)
372 output, rc = await self._local_async_exec(
373 command=command,
374 raise_exception_on_error=True,
375 show_error_log=show_error_log,
376 env=env,
377 )
378
379 if return_text:
380 return str(output)
381
382 if rc != 0:
383 return None
384
385 data = yaml.load(output, Loader=yaml.SafeLoader)
386
387 # remove field 'notes'
388 try:
389 del data.get("info").get("status")["notes"]
390 except KeyError:
391 pass
392
393 # parse field 'resources'
394 try:
395 resources = str(data.get("info").get("status").get("resources"))
396 resource_table = self._output_to_table(resources)
397 data.get("info").get("status")["resources"] = resource_table
398 except Exception:
399 pass
400
401 # set description to lowercase (unify with helm3)
402 try:
403 data.get("info")["description"] = data.get("info").pop("Description")
404 except KeyError:
405 pass
406
407 return data
408
409 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
410
411 status = await self._status_kdu(
412 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
413 )
414
415 # extract info.status.resources-> str
416 # format:
417 # ==> v1/Deployment
418 # NAME READY UP-TO-DATE AVAILABLE AGE
419 # halting-horse-mongodb 0/1 1 0 0s
420 # halting-petit-mongodb 1/1 1 0 0s
421 # blank line
422 resources = K8sHelmBaseConnector._get_deep(
423 status, ("info", "status", "resources")
424 )
425
426 # convert to table
427 resources = K8sHelmBaseConnector._output_to_table(resources)
428
429 num_lines = len(resources)
430 index = 0
431 ready = True
432 while index < num_lines:
433 try:
434 line1 = resources[index]
435 index += 1
436 # find '==>' in column 0
437 if line1[0] == "==>":
438 line2 = resources[index]
439 index += 1
440 # find READY in column 1
441 if line2[1] == "READY":
442 # read next lines
443 line3 = resources[index]
444 index += 1
445 while len(line3) > 1 and index < num_lines:
446 ready_value = line3[1]
447 parts = ready_value.split(sep="/")
448 current = int(parts[0])
449 total = int(parts[1])
450 if current < total:
451 self.log.debug("NOT READY:\n {}".format(line3))
452 ready = False
453 line3 = resources[index]
454 index += 1
455
456 except Exception:
457 pass
458
459 return ready
460
461 def _get_install_command(
462 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
463 ) -> str:
464
465 timeout_str = ""
466 if timeout:
467 timeout_str = "--timeout {}".format(timeout)
468
469 # atomic
470 atomic_str = ""
471 if atomic:
472 atomic_str = "--atomic"
473 # namespace
474 namespace_str = ""
475 if namespace:
476 namespace_str = "--namespace {}".format(namespace)
477
478 # version
479 version_str = ""
480 if version:
481 version_str = version_str = "--version {}".format(version)
482
483 command = (
484 "{helm} install {atomic} --output yaml "
485 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
486 helm=self._helm_command,
487 atomic=atomic_str,
488 params=params_str,
489 timeout=timeout_str,
490 name=kdu_instance,
491 ns=namespace_str,
492 model=kdu_model,
493 ver=version_str,
494 )
495 )
496 return command
497
498 def _get_upgrade_command(
499 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
500 ) -> str:
501
502 timeout_str = ""
503 if timeout:
504 timeout_str = "--timeout {}".format(timeout)
505
506 # atomic
507 atomic_str = ""
508 if atomic:
509 atomic_str = "--atomic"
510
511 # version
512 version_str = ""
513 if version:
514 version_str = "--version {}".format(version)
515
516 command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"\
517 .format(helm=self._helm_command,
518 atomic=atomic_str,
519 params=params_str,
520 timeout=timeout_str,
521 name=kdu_instance,
522 model=kdu_model,
523 ver=version_str
524 )
525 return command
526
527 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
528 return "{} rollback {} {} --wait".format(
529 self._helm_command, kdu_instance, revision
530 )
531
532 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
533 return "{} delete --purge {}".format(self._helm_command, kdu_instance)