Changed url stable repository for helm3
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelmConnector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm",
44 log: object = None,
45 on_update_db=None,
46 ):
47 """
48 Initializes helm connector for helm v2
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(
60 self,
61 db=db,
62 log=log,
63 fs=fs,
64 kubectl_command=kubectl_command,
65 helm_command=helm_command,
66 on_update_db=on_update_db,
67 )
68
69 self.log.info("Initializing K8S Helm2 connector")
70
71 # initialize helm client-only
72 self.log.debug("Initializing helm client-only...")
73 command = "{} init --client-only".format(self._helm_command)
74 try:
75 asyncio.ensure_future(
76 self._local_async_exec(command=command, raise_exception_on_error=False)
77 )
78 # loop = asyncio.get_event_loop()
79 # loop.run_until_complete(self._local_async_exec(command=command,
80 # raise_exception_on_error=False))
81 except Exception as e:
82 self.warning(
83 msg="helm init failed (it was already initialized): {}".format(e)
84 )
85
86 self.log.info("K8S Helm2 connector initialized")
87
88 async def install(
89 self,
90 cluster_uuid: str,
91 kdu_model: str,
92 atomic: bool = True,
93 timeout: float = 300,
94 params: dict = None,
95 db_dict: dict = None,
96 kdu_name: str = None,
97 namespace: str = None,
98 ):
99 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
100 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
101
102 # sync local dir
103 self.fs.sync(from_path=cluster_id)
104
105 # init env, paths
106 paths, env = self._init_paths_env(
107 cluster_name=cluster_id, create_if_not_exist=True
108 )
109
110 kdu_instance = await self._install_impl(cluster_id,
111 kdu_model,
112 paths,
113 env,
114 atomic=atomic,
115 timeout=timeout,
116 params=params,
117 db_dict=db_dict,
118 kdu_name=kdu_name,
119 namespace=namespace)
120
121 # sync fs
122 self.fs.reverse_sync(from_path=cluster_id)
123
124 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
125 return kdu_instance
126
127 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
128
129 self.log.debug(
130 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
131 )
132
133 return await self._exec_inspect_comand(
134 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
135 )
136
137 """
138 ####################################################################################
139 ################################### P R I V A T E ##################################
140 ####################################################################################
141 """
142
143 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
144 """
145 Creates and returns base cluster and kube dirs and returns them.
146 Also created helm3 dirs according to new directory specification, paths are
147 returned and also environment variables that must be provided to execute commands
148
149 Helm 2 directory specification uses helm_home dir:
150
151 The variables assigned for this paths are:
152 - Helm hone: $HELM_HOME
153 - helm kubeconfig: $KUBECONFIG
154
155 :param cluster_name: cluster_name
156 :return: Dictionary with config_paths and dictionary with helm environment variables
157 """
158 base = self.fs.path
159 if base.endswith("/") or base.endswith("\\"):
160 base = base[:-1]
161
162 # base dir for cluster
163 cluster_dir = base + "/" + cluster_name
164
165 # kube dir
166 kube_dir = cluster_dir + "/" + ".kube"
167 if create_if_not_exist and not os.path.exists(kube_dir):
168 self.log.debug("Creating dir {}".format(kube_dir))
169 os.makedirs(kube_dir)
170
171 # helm home dir
172 helm_dir = cluster_dir + "/" + ".helm"
173 if create_if_not_exist and not os.path.exists(helm_dir):
174 self.log.debug("Creating dir {}".format(helm_dir))
175 os.makedirs(helm_dir)
176
177 config_filename = kube_dir + "/config"
178
179 # 2 - Prepare dictionary with paths
180 paths = {
181 "kube_dir": kube_dir,
182 "kube_config": config_filename,
183 "cluster_dir": cluster_dir,
184 "helm_dir": helm_dir,
185 }
186
187 for file_name, file in paths.items():
188 if "dir" in file_name and not os.path.exists(file):
189 err_msg = "{} dir does not exist".format(file)
190 self.log.error(err_msg)
191 raise K8sException(err_msg)
192
193 # 3 - Prepare environment variables
194 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
195
196 return paths, env
197
198 async def _get_services(self, cluster_id, kdu_instance, namespace):
199
200 # init config, env
201 paths, env = self._init_paths_env(
202 cluster_name=cluster_id, create_if_not_exist=True
203 )
204
205 command1 = "{} get manifest {} ".format(self._helm_command, kdu_instance)
206 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
207 output, _rc = await self._local_async_exec_pipe(
208 command1, command2, env=env, raise_exception_on_error=True
209 )
210 services = self._parse_services(output)
211
212 return services
213
214 async def _cluster_init(self, cluster_id: str, namespace: str,
215 paths: dict, env: dict):
216 """
217 Implements the helm version dependent cluster initialization:
218 For helm2 it initialized tiller environment if needed
219 """
220
221 # check if tiller pod is up in cluster
222 command = "{} --kubeconfig={} --namespace={} get deployments".format(
223 self.kubectl_command, paths["kube_config"], namespace
224 )
225 output, _rc = await self._local_async_exec(
226 command=command, raise_exception_on_error=True, env=env
227 )
228
229 output_table = self._output_to_table(output=output)
230
231 # find 'tiller' pod in all pods
232 already_initialized = False
233 try:
234 for row in output_table:
235 if row[0].startswith("tiller-deploy"):
236 already_initialized = True
237 break
238 except Exception:
239 pass
240
241 # helm init
242 n2vc_installed_sw = False
243 if not already_initialized:
244 self.log.info(
245 "Initializing helm in client and server: {}".format(cluster_id)
246 )
247 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
248 self.kubectl_command, paths["kube_config"], self.service_account
249 )
250 _, _rc = await self._local_async_exec(
251 command=command, raise_exception_on_error=False, env=env
252 )
253
254 command = (
255 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
256 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
257 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
258 _, _rc = await self._local_async_exec(
259 command=command, raise_exception_on_error=False, env=env
260 )
261
262 command = (
263 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
264 "init"
265 ).format(
266 self._helm_command,
267 paths["kube_config"],
268 namespace,
269 paths["helm_dir"],
270 self.service_account,
271 )
272 _, _rc = await self._local_async_exec(
273 command=command, raise_exception_on_error=True, env=env
274 )
275 n2vc_installed_sw = True
276 else:
277 # check client helm installation
278 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
279 if not self._check_file_exists(
280 filename=check_file, exception_if_not_exists=False
281 ):
282 self.log.info("Initializing helm in client: {}".format(cluster_id))
283 command = (
284 "{} --kubeconfig={} --tiller-namespace={} "
285 "--home={} init --client-only"
286 ).format(
287 self._helm_command,
288 paths["kube_config"],
289 namespace,
290 paths["helm_dir"],
291 )
292 output, _rc = await self._local_async_exec(
293 command=command, raise_exception_on_error=True, env=env
294 )
295 else:
296 self.log.info("Helm client already initialized")
297
298 return n2vc_installed_sw
299
300 async def _uninstall_sw(self, cluster_id: str, namespace: str):
301 # uninstall Tiller if necessary
302
303 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
304
305 # init paths, env
306 paths, env = self._init_paths_env(
307 cluster_name=cluster_id, create_if_not_exist=True
308 )
309
310 if not namespace:
311 # find namespace for tiller pod
312 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
313 self.kubectl_command, paths["kube_config"]
314 )
315 output, _rc = await self._local_async_exec(
316 command=command, raise_exception_on_error=False, env=env
317 )
318 output_table = self._output_to_table(output=output)
319 namespace = None
320 for r in output_table:
321 try:
322 if "tiller-deploy" in r[1]:
323 namespace = r[0]
324 break
325 except Exception:
326 pass
327 else:
328 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
329 self.log.error(msg)
330
331 self.log.debug("namespace for tiller: {}".format(namespace))
332
333 if namespace:
334 # uninstall tiller from cluster
335 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
336 command = "{} --kubeconfig={} --home={} reset".format(
337 self._helm_command, paths["kube_config"], paths["helm_dir"]
338 )
339 self.log.debug("resetting: {}".format(command))
340 output, _rc = await self._local_async_exec(
341 command=command, raise_exception_on_error=True, env=env
342 )
343 # Delete clusterrolebinding and serviceaccount.
344 # Ignore if errors for backward compatibility
345 command = (
346 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
347 "io/osm-tiller-cluster-rule"
348 ).format(self.kubectl_command, paths["kube_config"])
349 output, _rc = await self._local_async_exec(
350 command=command, raise_exception_on_error=False, env=env
351 )
352 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
353 self.kubectl_command, paths["kube_config"], self.service_account
354 )
355 output, _rc = await self._local_async_exec(
356 command=command, raise_exception_on_error=False, env=env
357 )
358
359 else:
360 self.log.debug("namespace not found")
361
362 async def _instances_list(self, cluster_id):
363
364 # init paths, env
365 paths, env = self._init_paths_env(
366 cluster_name=cluster_id, create_if_not_exist=True
367 )
368
369 command = "{} list --output yaml".format(self._helm_command)
370
371 output, _rc = await self._local_async_exec(
372 command=command, raise_exception_on_error=True, env=env
373 )
374
375 if output and len(output) > 0:
376 # parse yaml and update keys to lower case to unify with helm3
377 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
378 new_instances = []
379 for instance in instances:
380 new_instance = dict((k.lower(), v) for k, v in instance.items())
381 new_instances.append(new_instance)
382 return new_instances
383 else:
384 return []
385
386 def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
387 version: str):
388 inspect_command = "{} inspect {} {}{} {}".format(
389 self._helm_command, show_command, kdu_model, repo_str, version
390 )
391 return inspect_command
392
393 async def _status_kdu(
394 self,
395 cluster_id: str,
396 kdu_instance: str,
397 namespace: str = None,
398 show_error_log: bool = False,
399 return_text: bool = False,
400 ):
401
402 self.log.debug(
403 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
404 )
405
406 # init config, env
407 paths, env = self._init_paths_env(
408 cluster_name=cluster_id, create_if_not_exist=True
409 )
410 command = "{} status {} --output yaml".format(self._helm_command, kdu_instance)
411 output, rc = await self._local_async_exec(
412 command=command,
413 raise_exception_on_error=True,
414 show_error_log=show_error_log,
415 env=env,
416 )
417
418 if return_text:
419 return str(output)
420
421 if rc != 0:
422 return None
423
424 data = yaml.load(output, Loader=yaml.SafeLoader)
425
426 # remove field 'notes'
427 try:
428 del data.get("info").get("status")["notes"]
429 except KeyError:
430 pass
431
432 # parse field 'resources'
433 try:
434 resources = str(data.get("info").get("status").get("resources"))
435 resource_table = self._output_to_table(resources)
436 data.get("info").get("status")["resources"] = resource_table
437 except Exception:
438 pass
439
440 # set description to lowercase (unify with helm3)
441 try:
442 data.get("info")["description"] = data.get("info").pop("Description")
443 except KeyError:
444 pass
445
446 return data
447
448 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
449 repo_ids = []
450 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
451 cluster = self.db.get_one("k8sclusters", cluster_filter)
452 if cluster:
453 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
454 return repo_ids
455 else:
456 raise K8sException(
457 "k8cluster with helm-id : {} not found".format(cluster_uuid)
458 )
459
460 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
461
462 status = await self._status_kdu(
463 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
464 )
465
466 # extract info.status.resources-> str
467 # format:
468 # ==> v1/Deployment
469 # NAME READY UP-TO-DATE AVAILABLE AGE
470 # halting-horse-mongodb 0/1 1 0 0s
471 # halting-petit-mongodb 1/1 1 0 0s
472 # blank line
473 resources = K8sHelmBaseConnector._get_deep(
474 status, ("info", "status", "resources")
475 )
476
477 # convert to table
478 resources = K8sHelmBaseConnector._output_to_table(resources)
479
480 num_lines = len(resources)
481 index = 0
482 ready = True
483 while index < num_lines:
484 try:
485 line1 = resources[index]
486 index += 1
487 # find '==>' in column 0
488 if line1[0] == "==>":
489 line2 = resources[index]
490 index += 1
491 # find READY in column 1
492 if line2[1] == "READY":
493 # read next lines
494 line3 = resources[index]
495 index += 1
496 while len(line3) > 1 and index < num_lines:
497 ready_value = line3[1]
498 parts = ready_value.split(sep="/")
499 current = int(parts[0])
500 total = int(parts[1])
501 if current < total:
502 self.log.debug("NOT READY:\n {}".format(line3))
503 ready = False
504 line3 = resources[index]
505 index += 1
506
507 except Exception:
508 pass
509
510 return ready
511
512 def _get_install_command(
513 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
514 ) -> str:
515
516 timeout_str = ""
517 if timeout:
518 timeout_str = "--timeout {}".format(timeout)
519
520 # atomic
521 atomic_str = ""
522 if atomic:
523 atomic_str = "--atomic"
524 # namespace
525 namespace_str = ""
526 if namespace:
527 namespace_str = "--namespace {}".format(namespace)
528
529 # version
530 version_str = ""
531 if version:
532 version_str = version_str = "--version {}".format(version)
533
534 command = (
535 "{helm} install {atomic} --output yaml "
536 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
537 helm=self._helm_command,
538 atomic=atomic_str,
539 params=params_str,
540 timeout=timeout_str,
541 name=kdu_instance,
542 ns=namespace_str,
543 model=kdu_model,
544 ver=version_str,
545 )
546 )
547 return command
548
549 def _get_upgrade_command(
550 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
551 ) -> str:
552
553 timeout_str = ""
554 if timeout:
555 timeout_str = "--timeout {}".format(timeout)
556
557 # atomic
558 atomic_str = ""
559 if atomic:
560 atomic_str = "--atomic"
561
562 # version
563 version_str = ""
564 if version:
565 version_str = "--version {}".format(version)
566
567 command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"\
568 .format(helm=self._helm_command,
569 atomic=atomic_str,
570 params=params_str,
571 timeout=timeout_str,
572 name=kdu_instance,
573 model=kdu_model,
574 ver=version_str
575 )
576 return command
577
578 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
579 return "{} rollback {} {} --wait".format(
580 self._helm_command, kdu_instance, revision
581 )
582
583 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
584 return "{} delete --purge {}".format(self._helm_command, kdu_instance)