Code Coverage

Cobertura Coverage Report > n2vc >

k8s_helm_conn.py

Trend

Classes100%
 
Lines50%
   
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
k8s_helm_conn.py
100%
1/1
50%
115/231
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
k8s_helm_conn.py
50%
115/231
N/A

Source

n2vc/k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #    http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 1 import asyncio
23 1 import os
24 1 import yaml
25
26 1 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 1 from n2vc.exceptions import K8sException
28
29
30 1 class K8sHelmConnector(K8sHelmBaseConnector):
31
32     """
33     ####################################################################################
34     ################################### P U B L I C ####################################
35     ####################################################################################
36     """
37
38 1     def __init__(
39         self,
40         fs: object,
41         db: object,
42         kubectl_command: str = "/usr/bin/kubectl",
43         helm_command: str = "/usr/bin/helm",
44         log: object = None,
45         on_update_db=None,
46         vca_config: dict = None,
47     ):
48         """
49         Initializes helm connector for helm v2
50
51         :param fs: file system for kubernetes and helm configuration
52         :param db: database object to write current operation status
53         :param kubectl_command: path to kubectl executable
54         :param helm_command: path to helm executable
55         :param log: logger
56         :param on_update_db: callback called when k8s connector updates database
57         """
58
59         # parent class
60 1         K8sHelmBaseConnector.__init__(
61             self,
62             db=db,
63             log=log,
64             fs=fs,
65             kubectl_command=kubectl_command,
66             helm_command=helm_command,
67             on_update_db=on_update_db,
68             vca_config=vca_config,
69         )
70
71 1         self.log.info("Initializing K8S Helm2 connector")
72
73         # initialize helm client-only
74 1         self.log.debug("Initializing helm client-only...")
75 1         command = "{} init --client-only --stable-repo-url {} ".format(
76             self._helm_command, self._stable_repo_url)
77 1         try:
78 1             asyncio.ensure_future(
79                 self._local_async_exec(command=command, raise_exception_on_error=False)
80             )
81             # loop = asyncio.get_event_loop()
82             # loop.run_until_complete(self._local_async_exec(command=command,
83             # raise_exception_on_error=False))
84 0         except Exception as e:
85 0             self.warning(
86                 msg="helm init failed (it was already initialized): {}".format(e)
87             )
88
89 1         self.log.info("K8S Helm2 connector initialized")
90
91 1     async def install(
92             self,
93             cluster_uuid: str,
94             kdu_model: str,
95             kdu_instance: str,
96             atomic: bool = True,
97             timeout: float = 300,
98             params: dict = None,
99             db_dict: dict = None,
100             kdu_name: str = None,
101             namespace: str = None,
102     ):
103 1         _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
104 1         self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
105
106         # sync local dir
107 1         self.fs.sync(from_path=cluster_id)
108
109         # init env, paths
110 1         paths, env = self._init_paths_env(
111             cluster_name=cluster_id, create_if_not_exist=True
112         )
113
114 1         await self._install_impl(
115             cluster_id,
116             kdu_model,
117             paths,
118             env,
119             kdu_instance,
120             atomic=atomic,
121             timeout=timeout,
122             params=params,
123             db_dict=db_dict,
124             kdu_name=kdu_name,
125             namespace=namespace,
126         )
127
128         # sync fs
129 1         self.fs.reverse_sync(from_path=cluster_id)
130
131 1         self.log.debug("Returning kdu_instance {}".format(kdu_instance))
132 1         return True
133
134 1     async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
135
136 1         self.log.debug(
137             "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
138         )
139
140 1         return await self._exec_inspect_comand(
141             inspect_command="", kdu_model=kdu_model, repo_url=repo_url
142         )
143
144     """
145     ####################################################################################
146     ################################### P R I V A T E ##################################
147     ####################################################################################
148     """
149
150 1     def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
151         """
152         Creates and returns base cluster and kube dirs and returns them.
153         Also created helm3 dirs according to new directory specification, paths are
154         returned and also environment variables that must be provided to execute commands
155
156         Helm 2 directory specification uses helm_home dir:
157
158         The variables assigned for this paths are:
159         - Helm hone: $HELM_HOME
160         - helm kubeconfig: $KUBECONFIG
161
162         :param cluster_name:  cluster_name
163         :return: Dictionary with config_paths and dictionary with helm environment variables
164         """
165 1         base = self.fs.path
166 1         if base.endswith("/") or base.endswith("\\"):
167 1             base = base[:-1]
168
169         # base dir for cluster
170 1         cluster_dir = base + "/" + cluster_name
171
172         # kube dir
173 1         kube_dir = cluster_dir + "/" + ".kube"
174 1         if create_if_not_exist and not os.path.exists(kube_dir):
175 1             self.log.debug("Creating dir {}".format(kube_dir))
176 1             os.makedirs(kube_dir)
177
178         # helm home dir
179 1         helm_dir = cluster_dir + "/" + ".helm"
180 1         if create_if_not_exist and not os.path.exists(helm_dir):
181 1             self.log.debug("Creating dir {}".format(helm_dir))
182 1             os.makedirs(helm_dir)
183
184 1         config_filename = kube_dir + "/config"
185
186         # 2 - Prepare dictionary with paths
187 1         paths = {
188             "kube_dir": kube_dir,
189             "kube_config": config_filename,
190             "cluster_dir": cluster_dir,
191             "helm_dir": helm_dir,
192         }
193
194 1         for file_name, file in paths.items():
195 1             if "dir" in file_name and not os.path.exists(file):
196 0                 err_msg = "{} dir does not exist".format(file)
197 0                 self.log.error(err_msg)
198 0                 raise K8sException(err_msg)
199
200         # 3 - Prepare environment variables
201 1         env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
202
203 1         return paths, env
204
205 1     async def _get_services(self, cluster_id, kdu_instance, namespace):
206
207         # init config, env
208 1         paths, env = self._init_paths_env(
209             cluster_name=cluster_id, create_if_not_exist=True
210         )
211
212 1         command1 = "{} get manifest {} ".format(self._helm_command, kdu_instance)
213 1         command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
214 1         output, _rc = await self._local_async_exec_pipe(
215             command1, command2, env=env, raise_exception_on_error=True
216         )
217 1         services = self._parse_services(output)
218
219 1         return services
220
221 1     async def _cluster_init(self, cluster_id: str, namespace: str,
222                             paths: dict, env: dict):
223         """
224         Implements the helm version dependent cluster initialization:
225         For helm2 it initialized tiller environment if needed
226         """
227
228         # check if tiller pod is up in cluster
229 0         command = "{} --kubeconfig={} --namespace={} get deployments".format(
230             self.kubectl_command, paths["kube_config"], namespace
231         )
232 0         output, _rc = await self._local_async_exec(
233             command=command, raise_exception_on_error=True, env=env
234         )
235
236 0         output_table = self._output_to_table(output=output)
237
238         # find 'tiller' pod in all pods
239 0         already_initialized = False
240 0         try:
241 0             for row in output_table:
242 0                 if row[0].startswith("tiller-deploy"):
243 0                     already_initialized = True
244 0                     break
245 0         except Exception:
246 0             pass
247
248         # helm init
249 0         n2vc_installed_sw = False
250 0         if not already_initialized:
251 0             self.log.info(
252                 "Initializing helm in client and server: {}".format(cluster_id)
253             )
254 0             command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
255                 self.kubectl_command, paths["kube_config"], self.service_account
256             )
257 0             _, _rc = await self._local_async_exec(
258                 command=command, raise_exception_on_error=False, env=env
259             )
260
261 0             command = (
262                 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
263                 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
264             ).format(self.kubectl_command, paths["kube_config"], self.service_account)
265 0             _, _rc = await self._local_async_exec(
266                 command=command, raise_exception_on_error=False, env=env
267             )
268
269 0             command = (
270                 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
271                 " --stable-repo-url {} init"
272             ).format(
273                 self._helm_command,
274                 paths["kube_config"],
275                 namespace,
276                 paths["helm_dir"],
277                 self.service_account,
278                 self._stable_repo_url
279             )
280 0             _, _rc = await self._local_async_exec(
281                 command=command, raise_exception_on_error=True, env=env
282             )
283 0             n2vc_installed_sw = True
284         else:
285             # check client helm installation
286 0             check_file = paths["helm_dir"] + "/repository/repositories.yaml"
287 0             if not self._check_file_exists(
288                 filename=check_file, exception_if_not_exists=False
289             ):
290 0                 self.log.info("Initializing helm in client: {}".format(cluster_id))
291 0                 command = (
292                     "{} --kubeconfig={} --tiller-namespace={} "
293                     "--home={} init --client-only --stable-repo-url {} "
294                 ).format(
295                     self._helm_command,
296                     paths["kube_config"],
297                     namespace,
298                     paths["helm_dir"],
299                     self._stable_repo_url,
300                 )
301 0                 output, _rc = await self._local_async_exec(
302                     command=command, raise_exception_on_error=True, env=env
303                 )
304             else:
305 0                 self.log.info("Helm client already initialized")
306
307         # remove old stable repo and add new one
308 0         cluster_uuid = "{}:{}".format(namespace, cluster_id)
309 0         repo_list = await self.repo_list(cluster_uuid)
310 0         for repo in repo_list:
311 0             if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
312 0                 self.log.debug("Add new stable repo url: {}")
313 0                 await self.repo_remove(cluster_uuid,
314                                        "stable")
315 0                 await self.repo_add(cluster_uuid,
316                                     "stable",
317                                     self._stable_repo_url)
318 0                 break
319
320 0         return n2vc_installed_sw
321
322 1     async def _uninstall_sw(self, cluster_id: str, namespace: str):
323         # uninstall Tiller if necessary
324
325 1         self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
326
327         # init paths, env
328 1         paths, env = self._init_paths_env(
329             cluster_name=cluster_id, create_if_not_exist=True
330         )
331
332 1         if not namespace:
333             # find namespace for tiller pod
334 0             command = "{} --kubeconfig={} get deployments --all-namespaces".format(
335                 self.kubectl_command, paths["kube_config"]
336             )
337 0             output, _rc = await self._local_async_exec(
338                 command=command, raise_exception_on_error=False, env=env
339             )
340 0             output_table = self._output_to_table(output=output)
341 0             namespace = None
342 0             for r in output_table:
343 0                 try:
344 0                     if "tiller-deploy" in r[1]:
345 0                         namespace = r[0]
346 0                         break
347 0                 except Exception:
348 0                     pass
349             else:
350 0                 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
351 0                 self.log.error(msg)
352
353 0             self.log.debug("namespace for tiller: {}".format(namespace))
354
355 1         if namespace:
356             # uninstall tiller from cluster
357 1             self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
358 1             command = "{} --kubeconfig={} --home={} reset".format(
359                 self._helm_command, paths["kube_config"], paths["helm_dir"]
360             )
361 1             self.log.debug("resetting: {}".format(command))
362 1             output, _rc = await self._local_async_exec(
363                 command=command, raise_exception_on_error=True, env=env
364             )
365             # Delete clusterrolebinding and serviceaccount.
366             # Ignore if errors for backward compatibility
367 1             command = (
368                 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
369                 "io/osm-tiller-cluster-rule"
370             ).format(self.kubectl_command, paths["kube_config"])
371 1             output, _rc = await self._local_async_exec(
372                 command=command, raise_exception_on_error=False, env=env
373             )
374 1             command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
375                 self.kubectl_command, paths["kube_config"], self.service_account
376             )
377 1             output, _rc = await self._local_async_exec(
378                 command=command, raise_exception_on_error=False, env=env
379             )
380
381         else:
382 0             self.log.debug("namespace not found")
383
384 1     async def _instances_list(self, cluster_id):
385
386         # init paths, env
387 1         paths, env = self._init_paths_env(
388             cluster_name=cluster_id, create_if_not_exist=True
389         )
390
391 1         command = "{} list --output yaml".format(self._helm_command)
392
393 1         output, _rc = await self._local_async_exec(
394             command=command, raise_exception_on_error=True, env=env
395         )
396
397 1         if output and len(output) > 0:
398             # parse yaml and update keys to lower case to unify with helm3
399 0             instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
400 0             new_instances = []
401 0             for instance in instances:
402 0                 new_instance = dict((k.lower(), v) for k, v in instance.items())
403 0                 new_instances.append(new_instance)
404 0             return new_instances
405         else:
406 1             return []
407
408 1     def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
409                              version: str):
410 1         inspect_command = "{} inspect {} {}{} {}".format(
411             self._helm_command, show_command, kdu_model, repo_str, version
412         )
413 1         return inspect_command
414
415 1     async def _status_kdu(
416         self,
417         cluster_id: str,
418         kdu_instance: str,
419         namespace: str = None,
420         show_error_log: bool = False,
421         return_text: bool = False,
422     ):
423
424 1         self.log.debug(
425             "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
426         )
427
428         # init config, env
429 1         paths, env = self._init_paths_env(
430             cluster_name=cluster_id, create_if_not_exist=True
431         )
432 1         command = "{} status {} --output yaml".format(self._helm_command, kdu_instance)
433 1         output, rc = await self._local_async_exec(
434             command=command,
435             raise_exception_on_error=True,
436             show_error_log=show_error_log,
437             env=env,
438         )
439
440 1         if return_text:
441 1             return str(output)
442
443 0         if rc != 0:
444 0             return None
445
446 0         data = yaml.load(output, Loader=yaml.SafeLoader)
447
448         # remove field 'notes'
449 0         try:
450 0             del data.get("info").get("status")["notes"]
451 0         except KeyError:
452 0             pass
453
454         # parse field 'resources'
455 0         try:
456 0             resources = str(data.get("info").get("status").get("resources"))
457 0             resource_table = self._output_to_table(resources)
458 0             data.get("info").get("status")["resources"] = resource_table
459 0         except Exception:
460 0             pass
461
462         # set description to lowercase (unify with helm3)
463 0         try:
464 0             data.get("info")["description"] = data.get("info").pop("Description")
465 0         except KeyError:
466 0             pass
467
468 0         return data
469
470 1     def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
471 0         repo_ids = []
472 0         cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
473 0         cluster = self.db.get_one("k8sclusters", cluster_filter)
474 0         if cluster:
475 0             repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
476 0             return repo_ids
477         else:
478 0             raise K8sException(
479                 "k8cluster with helm-id : {} not found".format(cluster_uuid)
480             )
481
482 1     async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
483
484 0         status = await self._status_kdu(
485             cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
486         )
487
488         # extract info.status.resources-> str
489         # format:
490         #       ==> v1/Deployment
491         #       NAME                    READY   UP-TO-DATE   AVAILABLE   AGE
492         #       halting-horse-mongodb   0/1     1            0           0s
493         #       halting-petit-mongodb   1/1     1            0           0s
494         # blank line
495 0         resources = K8sHelmBaseConnector._get_deep(
496             status, ("info", "status", "resources")
497         )
498
499         # convert to table
500 0         resources = K8sHelmBaseConnector._output_to_table(resources)
501
502 0         num_lines = len(resources)
503 0         index = 0
504 0         ready = True
505 0         while index < num_lines:
506 0             try:
507 0                 line1 = resources[index]
508 0                 index += 1
509                 # find '==>' in column 0
510 0                 if line1[0] == "==>":
511 0                     line2 = resources[index]
512 0                     index += 1
513                     # find READY in column 1
514 0                     if line2[1] == "READY":
515                         # read next lines
516 0                         line3 = resources[index]
517 0                         index += 1
518 0                         while len(line3) > 1 and index < num_lines:
519 0                             ready_value = line3[1]
520 0                             parts = ready_value.split(sep="/")
521 0                             current = int(parts[0])
522 0                             total = int(parts[1])
523 0                             if current < total:
524 0                                 self.log.debug("NOT READY:\n    {}".format(line3))
525 0                                 ready = False
526 0                             line3 = resources[index]
527 0                             index += 1
528
529 0             except Exception:
530 0                 pass
531
532 0         return ready
533
534 1     def _get_install_command(
535         self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
536     ) -> str:
537
538 1         timeout_str = ""
539 1         if timeout:
540 1             timeout_str = "--timeout {}".format(timeout)
541
542         # atomic
543 1         atomic_str = ""
544 1         if atomic:
545 1             atomic_str = "--atomic"
546         # namespace
547 1         namespace_str = ""
548 1         if namespace:
549 1             namespace_str = "--namespace {}".format(namespace)
550
551         # version
552 1         version_str = ""
553 1         if version:
554 1             version_str = version_str = "--version {}".format(version)
555
556 1         command = (
557             "{helm} install {atomic} --output yaml  "
558             "{params} {timeout} --name={name} {ns} {model} {ver}".format(
559                 helm=self._helm_command,
560                 atomic=atomic_str,
561                 params=params_str,
562                 timeout=timeout_str,
563                 name=kdu_instance,
564                 ns=namespace_str,
565                 model=kdu_model,
566                 ver=version_str,
567             )
568         )
569 1         return command
570
571 1     def _get_upgrade_command(
572         self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
573     ) -> str:
574
575 1         timeout_str = ""
576 1         if timeout:
577 1             timeout_str = "--timeout {}".format(timeout)
578
579         # atomic
580 1         atomic_str = ""
581 1         if atomic:
582 1             atomic_str = "--atomic"
583
584         # version
585 1         version_str = ""
586 1         if version:
587 1             version_str = "--version {}".format(version)
588
589 1         command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"\
590             .format(helm=self._helm_command,
591                     atomic=atomic_str,
592                     params=params_str,
593                     timeout=timeout_str,
594                     name=kdu_instance,
595                     model=kdu_model,
596                     ver=version_str
597                     )
598 1         return command
599
600 1     def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
601 1         return "{} rollback {} {} --wait".format(
602             self._helm_command, kdu_instance, revision
603         )
604
605 1     def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
606 1         return "{} delete --purge  {}".format(self._helm_command, kdu_instance)