Revert "Revert "Clean up commented or unused code""
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelmConnector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm",
44 log: object = None,
45 on_update_db=None,
46 vca_config: dict = None,
47 ):
48 """
49 Initializes helm connector for helm v2
50
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
55 :param log: logger
56 :param on_update_db: callback called when k8s connector updates database
57 """
58
59 # parent class
60 K8sHelmBaseConnector.__init__(
61 self,
62 db=db,
63 log=log,
64 fs=fs,
65 kubectl_command=kubectl_command,
66 helm_command=helm_command,
67 on_update_db=on_update_db,
68 vca_config=vca_config,
69 )
70
71 self.log.info("Initializing K8S Helm2 connector")
72
73 # initialize helm client-only
74 self.log.debug("Initializing helm client-only...")
75 command = "{} init --client-only --stable-repo-url {} ".format(
76 self._helm_command, self._stable_repo_url)
77 try:
78 asyncio.ensure_future(
79 self._local_async_exec(command=command, raise_exception_on_error=False)
80 )
81 # loop = asyncio.get_event_loop()
82 # loop.run_until_complete(self._local_async_exec(command=command,
83 # raise_exception_on_error=False))
84 except Exception as e:
85 self.warning(
86 msg="helm init failed (it was already initialized): {}".format(e)
87 )
88
89 self.log.info("K8S Helm2 connector initialized")
90
91 async def install(
92 self,
93 cluster_uuid: str,
94 kdu_model: str,
95 atomic: bool = True,
96 timeout: float = 300,
97 params: dict = None,
98 db_dict: dict = None,
99 kdu_name: str = None,
100 namespace: str = None,
101 ):
102 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
103 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
104
105 # sync local dir
106 self.fs.sync(from_path=cluster_id)
107
108 # init env, paths
109 paths, env = self._init_paths_env(
110 cluster_name=cluster_id, create_if_not_exist=True
111 )
112
113 kdu_instance = await self._install_impl(cluster_id,
114 kdu_model,
115 paths,
116 env,
117 atomic=atomic,
118 timeout=timeout,
119 params=params,
120 db_dict=db_dict,
121 kdu_name=kdu_name,
122 namespace=namespace)
123
124 # sync fs
125 self.fs.reverse_sync(from_path=cluster_id)
126
127 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
128 return kdu_instance
129
130 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
131
132 self.log.debug(
133 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
134 )
135
136 return await self._exec_inspect_comand(
137 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
138 )
139
140 """
141 ####################################################################################
142 ################################### P R I V A T E ##################################
143 ####################################################################################
144 """
145
146 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
147 """
148 Creates and returns base cluster and kube dirs and returns them.
149 Also created helm3 dirs according to new directory specification, paths are
150 returned and also environment variables that must be provided to execute commands
151
152 Helm 2 directory specification uses helm_home dir:
153
154 The variables assigned for this paths are:
155 - Helm hone: $HELM_HOME
156 - helm kubeconfig: $KUBECONFIG
157
158 :param cluster_name: cluster_name
159 :return: Dictionary with config_paths and dictionary with helm environment variables
160 """
161 base = self.fs.path
162 if base.endswith("/") or base.endswith("\\"):
163 base = base[:-1]
164
165 # base dir for cluster
166 cluster_dir = base + "/" + cluster_name
167
168 # kube dir
169 kube_dir = cluster_dir + "/" + ".kube"
170 if create_if_not_exist and not os.path.exists(kube_dir):
171 self.log.debug("Creating dir {}".format(kube_dir))
172 os.makedirs(kube_dir)
173
174 # helm home dir
175 helm_dir = cluster_dir + "/" + ".helm"
176 if create_if_not_exist and not os.path.exists(helm_dir):
177 self.log.debug("Creating dir {}".format(helm_dir))
178 os.makedirs(helm_dir)
179
180 config_filename = kube_dir + "/config"
181
182 # 2 - Prepare dictionary with paths
183 paths = {
184 "kube_dir": kube_dir,
185 "kube_config": config_filename,
186 "cluster_dir": cluster_dir,
187 "helm_dir": helm_dir,
188 }
189
190 for file_name, file in paths.items():
191 if "dir" in file_name and not os.path.exists(file):
192 err_msg = "{} dir does not exist".format(file)
193 self.log.error(err_msg)
194 raise K8sException(err_msg)
195
196 # 3 - Prepare environment variables
197 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
198
199 return paths, env
200
201 async def _get_services(self, cluster_id, kdu_instance, namespace):
202
203 # init config, env
204 paths, env = self._init_paths_env(
205 cluster_name=cluster_id, create_if_not_exist=True
206 )
207
208 command1 = "{} get manifest {} ".format(self._helm_command, kdu_instance)
209 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
210 output, _rc = await self._local_async_exec_pipe(
211 command1, command2, env=env, raise_exception_on_error=True
212 )
213 services = self._parse_services(output)
214
215 return services
216
217 async def _cluster_init(self, cluster_id: str, namespace: str,
218 paths: dict, env: dict):
219 """
220 Implements the helm version dependent cluster initialization:
221 For helm2 it initialized tiller environment if needed
222 """
223
224 # check if tiller pod is up in cluster
225 command = "{} --kubeconfig={} --namespace={} get deployments".format(
226 self.kubectl_command, paths["kube_config"], namespace
227 )
228 output, _rc = await self._local_async_exec(
229 command=command, raise_exception_on_error=True, env=env
230 )
231
232 output_table = self._output_to_table(output=output)
233
234 # find 'tiller' pod in all pods
235 already_initialized = False
236 try:
237 for row in output_table:
238 if row[0].startswith("tiller-deploy"):
239 already_initialized = True
240 break
241 except Exception:
242 pass
243
244 # helm init
245 n2vc_installed_sw = False
246 if not already_initialized:
247 self.log.info(
248 "Initializing helm in client and server: {}".format(cluster_id)
249 )
250 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
251 self.kubectl_command, paths["kube_config"], self.service_account
252 )
253 _, _rc = await self._local_async_exec(
254 command=command, raise_exception_on_error=False, env=env
255 )
256
257 command = (
258 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
259 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
260 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
261 _, _rc = await self._local_async_exec(
262 command=command, raise_exception_on_error=False, env=env
263 )
264
265 command = (
266 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
267 " --stable-repo-url {} init"
268 ).format(
269 self._helm_command,
270 paths["kube_config"],
271 namespace,
272 paths["helm_dir"],
273 self.service_account,
274 self._stable_repo_url
275 )
276 _, _rc = await self._local_async_exec(
277 command=command, raise_exception_on_error=True, env=env
278 )
279 n2vc_installed_sw = True
280 else:
281 # check client helm installation
282 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
283 if not self._check_file_exists(
284 filename=check_file, exception_if_not_exists=False
285 ):
286 self.log.info("Initializing helm in client: {}".format(cluster_id))
287 command = (
288 "{} --kubeconfig={} --tiller-namespace={} "
289 "--home={} init --client-only --stable-repo-url {} "
290 ).format(
291 self._helm_command,
292 paths["kube_config"],
293 namespace,
294 paths["helm_dir"],
295 self._stable_repo_url,
296 )
297 output, _rc = await self._local_async_exec(
298 command=command, raise_exception_on_error=True, env=env
299 )
300 else:
301 self.log.info("Helm client already initialized")
302
303 # remove old stable repo and add new one
304 cluster_uuid = "{}:{}".format(namespace, cluster_id)
305 repo_list = await self.repo_list(cluster_uuid)
306 for repo in repo_list:
307 if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
308 self.log.debug("Add new stable repo url: {}")
309 await self.repo_remove(cluster_uuid,
310 "stable")
311 await self.repo_add(cluster_uuid,
312 "stable",
313 self._stable_repo_url)
314 break
315
316 return n2vc_installed_sw
317
318 async def _uninstall_sw(self, cluster_id: str, namespace: str):
319 # uninstall Tiller if necessary
320
321 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
322
323 # init paths, env
324 paths, env = self._init_paths_env(
325 cluster_name=cluster_id, create_if_not_exist=True
326 )
327
328 if not namespace:
329 # find namespace for tiller pod
330 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
331 self.kubectl_command, paths["kube_config"]
332 )
333 output, _rc = await self._local_async_exec(
334 command=command, raise_exception_on_error=False, env=env
335 )
336 output_table = self._output_to_table(output=output)
337 namespace = None
338 for r in output_table:
339 try:
340 if "tiller-deploy" in r[1]:
341 namespace = r[0]
342 break
343 except Exception:
344 pass
345 else:
346 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
347 self.log.error(msg)
348
349 self.log.debug("namespace for tiller: {}".format(namespace))
350
351 if namespace:
352 # uninstall tiller from cluster
353 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
354 command = "{} --kubeconfig={} --home={} reset".format(
355 self._helm_command, paths["kube_config"], paths["helm_dir"]
356 )
357 self.log.debug("resetting: {}".format(command))
358 output, _rc = await self._local_async_exec(
359 command=command, raise_exception_on_error=True, env=env
360 )
361 # Delete clusterrolebinding and serviceaccount.
362 # Ignore if errors for backward compatibility
363 command = (
364 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
365 "io/osm-tiller-cluster-rule"
366 ).format(self.kubectl_command, paths["kube_config"])
367 output, _rc = await self._local_async_exec(
368 command=command, raise_exception_on_error=False, env=env
369 )
370 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
371 self.kubectl_command, paths["kube_config"], self.service_account
372 )
373 output, _rc = await self._local_async_exec(
374 command=command, raise_exception_on_error=False, env=env
375 )
376
377 else:
378 self.log.debug("namespace not found")
379
380 async def _instances_list(self, cluster_id):
381
382 # init paths, env
383 paths, env = self._init_paths_env(
384 cluster_name=cluster_id, create_if_not_exist=True
385 )
386
387 command = "{} list --output yaml".format(self._helm_command)
388
389 output, _rc = await self._local_async_exec(
390 command=command, raise_exception_on_error=True, env=env
391 )
392
393 if output and len(output) > 0:
394 # parse yaml and update keys to lower case to unify with helm3
395 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
396 new_instances = []
397 for instance in instances:
398 new_instance = dict((k.lower(), v) for k, v in instance.items())
399 new_instances.append(new_instance)
400 return new_instances
401 else:
402 return []
403
404 def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
405 version: str):
406 inspect_command = "{} inspect {} {}{} {}".format(
407 self._helm_command, show_command, kdu_model, repo_str, version
408 )
409 return inspect_command
410
411 async def _status_kdu(
412 self,
413 cluster_id: str,
414 kdu_instance: str,
415 namespace: str = None,
416 show_error_log: bool = False,
417 return_text: bool = False,
418 ):
419
420 self.log.debug(
421 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
422 )
423
424 # init config, env
425 paths, env = self._init_paths_env(
426 cluster_name=cluster_id, create_if_not_exist=True
427 )
428 command = "{} status {} --output yaml".format(self._helm_command, kdu_instance)
429 output, rc = await self._local_async_exec(
430 command=command,
431 raise_exception_on_error=True,
432 show_error_log=show_error_log,
433 env=env,
434 )
435
436 if return_text:
437 return str(output)
438
439 if rc != 0:
440 return None
441
442 data = yaml.load(output, Loader=yaml.SafeLoader)
443
444 # remove field 'notes'
445 try:
446 del data.get("info").get("status")["notes"]
447 except KeyError:
448 pass
449
450 # parse field 'resources'
451 try:
452 resources = str(data.get("info").get("status").get("resources"))
453 resource_table = self._output_to_table(resources)
454 data.get("info").get("status")["resources"] = resource_table
455 except Exception:
456 pass
457
458 # set description to lowercase (unify with helm3)
459 try:
460 data.get("info")["description"] = data.get("info").pop("Description")
461 except KeyError:
462 pass
463
464 return data
465
466 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
467 repo_ids = []
468 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
469 cluster = self.db.get_one("k8sclusters", cluster_filter)
470 if cluster:
471 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
472 return repo_ids
473 else:
474 raise K8sException(
475 "k8cluster with helm-id : {} not found".format(cluster_uuid)
476 )
477
478 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
479
480 status = await self._status_kdu(
481 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
482 )
483
484 # extract info.status.resources-> str
485 # format:
486 # ==> v1/Deployment
487 # NAME READY UP-TO-DATE AVAILABLE AGE
488 # halting-horse-mongodb 0/1 1 0 0s
489 # halting-petit-mongodb 1/1 1 0 0s
490 # blank line
491 resources = K8sHelmBaseConnector._get_deep(
492 status, ("info", "status", "resources")
493 )
494
495 # convert to table
496 resources = K8sHelmBaseConnector._output_to_table(resources)
497
498 num_lines = len(resources)
499 index = 0
500 ready = True
501 while index < num_lines:
502 try:
503 line1 = resources[index]
504 index += 1
505 # find '==>' in column 0
506 if line1[0] == "==>":
507 line2 = resources[index]
508 index += 1
509 # find READY in column 1
510 if line2[1] == "READY":
511 # read next lines
512 line3 = resources[index]
513 index += 1
514 while len(line3) > 1 and index < num_lines:
515 ready_value = line3[1]
516 parts = ready_value.split(sep="/")
517 current = int(parts[0])
518 total = int(parts[1])
519 if current < total:
520 self.log.debug("NOT READY:\n {}".format(line3))
521 ready = False
522 line3 = resources[index]
523 index += 1
524
525 except Exception:
526 pass
527
528 return ready
529
530 def _get_install_command(
531 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
532 ) -> str:
533
534 timeout_str = ""
535 if timeout:
536 timeout_str = "--timeout {}".format(timeout)
537
538 # atomic
539 atomic_str = ""
540 if atomic:
541 atomic_str = "--atomic"
542 # namespace
543 namespace_str = ""
544 if namespace:
545 namespace_str = "--namespace {}".format(namespace)
546
547 # version
548 version_str = ""
549 if version:
550 version_str = version_str = "--version {}".format(version)
551
552 command = (
553 "{helm} install {atomic} --output yaml "
554 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
555 helm=self._helm_command,
556 atomic=atomic_str,
557 params=params_str,
558 timeout=timeout_str,
559 name=kdu_instance,
560 ns=namespace_str,
561 model=kdu_model,
562 ver=version_str,
563 )
564 )
565 return command
566
567 def _get_upgrade_command(
568 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
569 ) -> str:
570
571 timeout_str = ""
572 if timeout:
573 timeout_str = "--timeout {}".format(timeout)
574
575 # atomic
576 atomic_str = ""
577 if atomic:
578 atomic_str = "--atomic"
579
580 # version
581 version_str = ""
582 if version:
583 version_str = "--version {}".format(version)
584
585 command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"\
586 .format(helm=self._helm_command,
587 atomic=atomic_str,
588 params=params_str,
589 timeout=timeout_str,
590 name=kdu_instance,
591 model=kdu_model,
592 ver=version_str
593 )
594 return command
595
596 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
597 return "{} rollback {} {} --wait".format(
598 self._helm_command, kdu_instance, revision
599 )
600
601 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
602 return "{} delete --purge {}".format(self._helm_command, kdu_instance)