Fix bug 1400: Change repo stable for helm2
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelmConnector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm",
44 log: object = None,
45 on_update_db=None,
46 vca_config: dict = None,
47 ):
48 """
49 Initializes helm connector for helm v2
50
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
55 :param log: logger
56 :param on_update_db: callback called when k8s connector updates database
57 """
58
59 # parent class
60 K8sHelmBaseConnector.__init__(
61 self,
62 db=db,
63 log=log,
64 fs=fs,
65 kubectl_command=kubectl_command,
66 helm_command=helm_command,
67 on_update_db=on_update_db,
68 vca_config=vca_config,
69 )
70
71 self.log.info("Initializing K8S Helm2 connector")
72
73 # initialize helm client-only
74 self.log.debug("Initializing helm client-only...")
75 command = "{} init --client-only".format(self._helm_command)
76 try:
77 asyncio.ensure_future(
78 self._local_async_exec(command=command, raise_exception_on_error=False)
79 )
80 # loop = asyncio.get_event_loop()
81 # loop.run_until_complete(self._local_async_exec(command=command,
82 # raise_exception_on_error=False))
83 except Exception as e:
84 self.warning(
85 msg="helm init failed (it was already initialized): {}".format(e)
86 )
87
88 self.log.info("K8S Helm2 connector initialized")
89
90 async def install(
91 self,
92 cluster_uuid: str,
93 kdu_model: str,
94 atomic: bool = True,
95 timeout: float = 300,
96 params: dict = None,
97 db_dict: dict = None,
98 kdu_name: str = None,
99 namespace: str = None,
100 ):
101 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
102 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
103
104 # sync local dir
105 self.fs.sync(from_path=cluster_id)
106
107 # init env, paths
108 paths, env = self._init_paths_env(
109 cluster_name=cluster_id, create_if_not_exist=True
110 )
111
112 kdu_instance = await self._install_impl(cluster_id,
113 kdu_model,
114 paths,
115 env,
116 atomic=atomic,
117 timeout=timeout,
118 params=params,
119 db_dict=db_dict,
120 kdu_name=kdu_name,
121 namespace=namespace)
122
123 # sync fs
124 self.fs.reverse_sync(from_path=cluster_id)
125
126 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
127 return kdu_instance
128
129 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
130
131 self.log.debug(
132 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
133 )
134
135 return await self._exec_inspect_comand(
136 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
137 )
138
139 """
140 ####################################################################################
141 ################################### P R I V A T E ##################################
142 ####################################################################################
143 """
144
145 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
146 """
147 Creates and returns base cluster and kube dirs and returns them.
148 Also created helm3 dirs according to new directory specification, paths are
149 returned and also environment variables that must be provided to execute commands
150
151 Helm 2 directory specification uses helm_home dir:
152
153 The variables assigned for this paths are:
154 - Helm hone: $HELM_HOME
155 - helm kubeconfig: $KUBECONFIG
156
157 :param cluster_name: cluster_name
158 :return: Dictionary with config_paths and dictionary with helm environment variables
159 """
160 base = self.fs.path
161 if base.endswith("/") or base.endswith("\\"):
162 base = base[:-1]
163
164 # base dir for cluster
165 cluster_dir = base + "/" + cluster_name
166
167 # kube dir
168 kube_dir = cluster_dir + "/" + ".kube"
169 if create_if_not_exist and not os.path.exists(kube_dir):
170 self.log.debug("Creating dir {}".format(kube_dir))
171 os.makedirs(kube_dir)
172
173 # helm home dir
174 helm_dir = cluster_dir + "/" + ".helm"
175 if create_if_not_exist and not os.path.exists(helm_dir):
176 self.log.debug("Creating dir {}".format(helm_dir))
177 os.makedirs(helm_dir)
178
179 config_filename = kube_dir + "/config"
180
181 # 2 - Prepare dictionary with paths
182 paths = {
183 "kube_dir": kube_dir,
184 "kube_config": config_filename,
185 "cluster_dir": cluster_dir,
186 "helm_dir": helm_dir,
187 }
188
189 for file_name, file in paths.items():
190 if "dir" in file_name and not os.path.exists(file):
191 err_msg = "{} dir does not exist".format(file)
192 self.log.error(err_msg)
193 raise K8sException(err_msg)
194
195 # 3 - Prepare environment variables
196 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
197
198 return paths, env
199
200 async def _get_services(self, cluster_id, kdu_instance, namespace):
201
202 # init config, env
203 paths, env = self._init_paths_env(
204 cluster_name=cluster_id, create_if_not_exist=True
205 )
206
207 command1 = "{} get manifest {} ".format(self._helm_command, kdu_instance)
208 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
209 output, _rc = await self._local_async_exec_pipe(
210 command1, command2, env=env, raise_exception_on_error=True
211 )
212 services = self._parse_services(output)
213
214 return services
215
216 async def _cluster_init(self, cluster_id: str, namespace: str,
217 paths: dict, env: dict):
218 """
219 Implements the helm version dependent cluster initialization:
220 For helm2 it initialized tiller environment if needed
221 """
222
223 # check if tiller pod is up in cluster
224 command = "{} --kubeconfig={} --namespace={} get deployments".format(
225 self.kubectl_command, paths["kube_config"], namespace
226 )
227 output, _rc = await self._local_async_exec(
228 command=command, raise_exception_on_error=True, env=env
229 )
230
231 output_table = self._output_to_table(output=output)
232
233 # find 'tiller' pod in all pods
234 already_initialized = False
235 try:
236 for row in output_table:
237 if row[0].startswith("tiller-deploy"):
238 already_initialized = True
239 break
240 except Exception:
241 pass
242
243 # helm init
244 n2vc_installed_sw = False
245 if not already_initialized:
246 self.log.info(
247 "Initializing helm in client and server: {}".format(cluster_id)
248 )
249 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
250 self.kubectl_command, paths["kube_config"], self.service_account
251 )
252 _, _rc = await self._local_async_exec(
253 command=command, raise_exception_on_error=False, env=env
254 )
255
256 command = (
257 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
258 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
259 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
260 _, _rc = await self._local_async_exec(
261 command=command, raise_exception_on_error=False, env=env
262 )
263
264 command = (
265 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
266 " --stable-repo-url {} init"
267 ).format(
268 self._helm_command,
269 paths["kube_config"],
270 namespace,
271 paths["helm_dir"],
272 self.service_account,
273 self._stable_repo_url
274 )
275 _, _rc = await self._local_async_exec(
276 command=command, raise_exception_on_error=True, env=env
277 )
278 n2vc_installed_sw = True
279 else:
280 # check client helm installation
281 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
282 if not self._check_file_exists(
283 filename=check_file, exception_if_not_exists=False
284 ):
285 self.log.info("Initializing helm in client: {}".format(cluster_id))
286 command = (
287 "{} --kubeconfig={} --tiller-namespace={} "
288 "--home={} init --client-only"
289 ).format(
290 self._helm_command,
291 paths["kube_config"],
292 namespace,
293 paths["helm_dir"],
294 )
295 output, _rc = await self._local_async_exec(
296 command=command, raise_exception_on_error=True, env=env
297 )
298 else:
299 self.log.info("Helm client already initialized")
300
301 # remove old stable repo and add new one
302 cluster_uuid = "{}:{}".format(namespace, cluster_id)
303 repo_list = await self.repo_list(cluster_uuid)
304 for repo in repo_list:
305 if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
306 self.log.debug("Add new stable repo url: {}")
307 await self.repo_remove(cluster_uuid,
308 "stable")
309 await self.repo_add(cluster_uuid,
310 "stable",
311 self._stable_repo_url)
312 break
313
314 return n2vc_installed_sw
315
316 async def _uninstall_sw(self, cluster_id: str, namespace: str):
317 # uninstall Tiller if necessary
318
319 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
320
321 # init paths, env
322 paths, env = self._init_paths_env(
323 cluster_name=cluster_id, create_if_not_exist=True
324 )
325
326 if not namespace:
327 # find namespace for tiller pod
328 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
329 self.kubectl_command, paths["kube_config"]
330 )
331 output, _rc = await self._local_async_exec(
332 command=command, raise_exception_on_error=False, env=env
333 )
334 output_table = self._output_to_table(output=output)
335 namespace = None
336 for r in output_table:
337 try:
338 if "tiller-deploy" in r[1]:
339 namespace = r[0]
340 break
341 except Exception:
342 pass
343 else:
344 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
345 self.log.error(msg)
346
347 self.log.debug("namespace for tiller: {}".format(namespace))
348
349 if namespace:
350 # uninstall tiller from cluster
351 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
352 command = "{} --kubeconfig={} --home={} reset".format(
353 self._helm_command, paths["kube_config"], paths["helm_dir"]
354 )
355 self.log.debug("resetting: {}".format(command))
356 output, _rc = await self._local_async_exec(
357 command=command, raise_exception_on_error=True, env=env
358 )
359 # Delete clusterrolebinding and serviceaccount.
360 # Ignore if errors for backward compatibility
361 command = (
362 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
363 "io/osm-tiller-cluster-rule"
364 ).format(self.kubectl_command, paths["kube_config"])
365 output, _rc = await self._local_async_exec(
366 command=command, raise_exception_on_error=False, env=env
367 )
368 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
369 self.kubectl_command, paths["kube_config"], self.service_account
370 )
371 output, _rc = await self._local_async_exec(
372 command=command, raise_exception_on_error=False, env=env
373 )
374
375 else:
376 self.log.debug("namespace not found")
377
378 async def _instances_list(self, cluster_id):
379
380 # init paths, env
381 paths, env = self._init_paths_env(
382 cluster_name=cluster_id, create_if_not_exist=True
383 )
384
385 command = "{} list --output yaml".format(self._helm_command)
386
387 output, _rc = await self._local_async_exec(
388 command=command, raise_exception_on_error=True, env=env
389 )
390
391 if output and len(output) > 0:
392 # parse yaml and update keys to lower case to unify with helm3
393 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
394 new_instances = []
395 for instance in instances:
396 new_instance = dict((k.lower(), v) for k, v in instance.items())
397 new_instances.append(new_instance)
398 return new_instances
399 else:
400 return []
401
402 def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
403 version: str):
404 inspect_command = "{} inspect {} {}{} {}".format(
405 self._helm_command, show_command, kdu_model, repo_str, version
406 )
407 return inspect_command
408
409 async def _status_kdu(
410 self,
411 cluster_id: str,
412 kdu_instance: str,
413 namespace: str = None,
414 show_error_log: bool = False,
415 return_text: bool = False,
416 ):
417
418 self.log.debug(
419 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
420 )
421
422 # init config, env
423 paths, env = self._init_paths_env(
424 cluster_name=cluster_id, create_if_not_exist=True
425 )
426 command = "{} status {} --output yaml".format(self._helm_command, kdu_instance)
427 output, rc = await self._local_async_exec(
428 command=command,
429 raise_exception_on_error=True,
430 show_error_log=show_error_log,
431 env=env,
432 )
433
434 if return_text:
435 return str(output)
436
437 if rc != 0:
438 return None
439
440 data = yaml.load(output, Loader=yaml.SafeLoader)
441
442 # remove field 'notes'
443 try:
444 del data.get("info").get("status")["notes"]
445 except KeyError:
446 pass
447
448 # parse field 'resources'
449 try:
450 resources = str(data.get("info").get("status").get("resources"))
451 resource_table = self._output_to_table(resources)
452 data.get("info").get("status")["resources"] = resource_table
453 except Exception:
454 pass
455
456 # set description to lowercase (unify with helm3)
457 try:
458 data.get("info")["description"] = data.get("info").pop("Description")
459 except KeyError:
460 pass
461
462 return data
463
464 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
465 repo_ids = []
466 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
467 cluster = self.db.get_one("k8sclusters", cluster_filter)
468 if cluster:
469 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
470 return repo_ids
471 else:
472 raise K8sException(
473 "k8cluster with helm-id : {} not found".format(cluster_uuid)
474 )
475
476 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
477
478 status = await self._status_kdu(
479 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
480 )
481
482 # extract info.status.resources-> str
483 # format:
484 # ==> v1/Deployment
485 # NAME READY UP-TO-DATE AVAILABLE AGE
486 # halting-horse-mongodb 0/1 1 0 0s
487 # halting-petit-mongodb 1/1 1 0 0s
488 # blank line
489 resources = K8sHelmBaseConnector._get_deep(
490 status, ("info", "status", "resources")
491 )
492
493 # convert to table
494 resources = K8sHelmBaseConnector._output_to_table(resources)
495
496 num_lines = len(resources)
497 index = 0
498 ready = True
499 while index < num_lines:
500 try:
501 line1 = resources[index]
502 index += 1
503 # find '==>' in column 0
504 if line1[0] == "==>":
505 line2 = resources[index]
506 index += 1
507 # find READY in column 1
508 if line2[1] == "READY":
509 # read next lines
510 line3 = resources[index]
511 index += 1
512 while len(line3) > 1 and index < num_lines:
513 ready_value = line3[1]
514 parts = ready_value.split(sep="/")
515 current = int(parts[0])
516 total = int(parts[1])
517 if current < total:
518 self.log.debug("NOT READY:\n {}".format(line3))
519 ready = False
520 line3 = resources[index]
521 index += 1
522
523 except Exception:
524 pass
525
526 return ready
527
528 def _get_install_command(
529 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
530 ) -> str:
531
532 timeout_str = ""
533 if timeout:
534 timeout_str = "--timeout {}".format(timeout)
535
536 # atomic
537 atomic_str = ""
538 if atomic:
539 atomic_str = "--atomic"
540 # namespace
541 namespace_str = ""
542 if namespace:
543 namespace_str = "--namespace {}".format(namespace)
544
545 # version
546 version_str = ""
547 if version:
548 version_str = version_str = "--version {}".format(version)
549
550 command = (
551 "{helm} install {atomic} --output yaml "
552 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
553 helm=self._helm_command,
554 atomic=atomic_str,
555 params=params_str,
556 timeout=timeout_str,
557 name=kdu_instance,
558 ns=namespace_str,
559 model=kdu_model,
560 ver=version_str,
561 )
562 )
563 return command
564
565 def _get_upgrade_command(
566 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
567 ) -> str:
568
569 timeout_str = ""
570 if timeout:
571 timeout_str = "--timeout {}".format(timeout)
572
573 # atomic
574 atomic_str = ""
575 if atomic:
576 atomic_str = "--atomic"
577
578 # version
579 version_str = ""
580 if version:
581 version_str = "--version {}".format(version)
582
583 command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"\
584 .format(helm=self._helm_command,
585 atomic=atomic_str,
586 params=params_str,
587 timeout=timeout_str,
588 name=kdu_instance,
589 model=kdu_model,
590 ver=version_str
591 )
592 return command
593
594 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
595 return "{} rollback {} {} --wait".format(
596 self._helm_command, kdu_instance, revision
597 )
598
599 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
600 return "{} delete --purge {}".format(self._helm_command, kdu_instance)