Fix black issues
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 from typing import Union
24 import os
25 import yaml
26
27 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
28 from n2vc.exceptions import K8sException
29
30
31 class K8sHelmConnector(K8sHelmBaseConnector):
32
33 """
34 ####################################################################################
35 ################################### P U B L I C ####################################
36 ####################################################################################
37 """
38
39 def __init__(
40 self,
41 fs: object,
42 db: object,
43 kubectl_command: str = "/usr/bin/kubectl",
44 helm_command: str = "/usr/bin/helm",
45 log: object = None,
46 on_update_db=None,
47 ):
48 """
49 Initializes helm connector for helm v2
50
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
55 :param log: logger
56 :param on_update_db: callback called when k8s connector updates database
57 """
58
59 # parent class
60 K8sHelmBaseConnector.__init__(
61 self,
62 db=db,
63 log=log,
64 fs=fs,
65 kubectl_command=kubectl_command,
66 helm_command=helm_command,
67 on_update_db=on_update_db,
68 )
69
70 self.log.info("Initializing K8S Helm2 connector")
71
72 # initialize helm client-only
73 self.log.debug("Initializing helm client-only...")
74 command = "{} init --client-only {} ".format(
75 self._helm_command,
76 "--stable-repo-url {}".format(self._stable_repo_url)
77 if self._stable_repo_url
78 else "--skip-repos",
79 )
80 try:
81 asyncio.ensure_future(
82 self._local_async_exec(command=command, raise_exception_on_error=False)
83 )
84 # loop = asyncio.get_event_loop()
85 # loop.run_until_complete(self._local_async_exec(command=command,
86 # raise_exception_on_error=False))
87 except Exception as e:
88 self.warning(
89 msg="helm init failed (it was already initialized): {}".format(e)
90 )
91
92 self.log.info("K8S Helm2 connector initialized")
93
94 async def install(
95 self,
96 cluster_uuid: str,
97 kdu_model: str,
98 kdu_instance: str,
99 atomic: bool = True,
100 timeout: float = 300,
101 params: dict = None,
102 db_dict: dict = None,
103 kdu_name: str = None,
104 namespace: str = None,
105 **kwargs,
106 ):
107 """
108 Deploys of a new KDU instance. It would implicitly rely on the `install` call
109 to deploy the Chart/Bundle properly parametrized (in practice, this call would
110 happen before any _initial-config-primitive_of the VNF is called).
111
112 :param cluster_uuid: UUID of a K8s cluster known by OSM
113 :param kdu_model: chart/ reference (string), which can be either
114 of these options:
115 - a name of chart available via the repos known by OSM
116 - a path to a packaged chart
117 - a path to an unpacked chart directory or a URL
118 :param kdu_instance: Kdu instance name
119 :param atomic: If set, installation process purges chart/bundle on fail, also
120 will wait until all the K8s objects are active
121 :param timeout: Time in seconds to wait for the install of the chart/bundle
122 (defaults to Helm default timeout: 300s)
123 :param params: dictionary of key-value pairs for instantiation parameters
124 (overriding default values)
125 :param dict db_dict: where to write into database when the status changes.
126 It contains a dict with {collection: <str>, filter: {},
127 path: <str>},
128 e.g. {collection: "nsrs", filter:
129 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
130 :param kdu_name: Name of the KDU instance to be installed
131 :param namespace: K8s namespace to use for the KDU instance
132 :param kwargs: Additional parameters (None yet)
133 :return: True if successful
134 """
135 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
136 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
137
138 # sync local dir
139 self.fs.sync(from_path=cluster_id)
140
141 # init env, paths
142 paths, env = self._init_paths_env(
143 cluster_name=cluster_id, create_if_not_exist=True
144 )
145
146 await self._install_impl(
147 cluster_id,
148 kdu_model,
149 paths,
150 env,
151 kdu_instance,
152 atomic=atomic,
153 timeout=timeout,
154 params=params,
155 db_dict=db_dict,
156 kdu_name=kdu_name,
157 namespace=namespace,
158 )
159
160 # sync fs
161 self.fs.reverse_sync(from_path=cluster_id)
162
163 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
164 return True
165
166 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
167 self.log.debug(
168 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
169 )
170
171 return await self._exec_inspect_comand(
172 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
173 )
174
175 """
176 ####################################################################################
177 ################################### P R I V A T E ##################################
178 ####################################################################################
179 """
180
181 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
182 """
183 Creates and returns base cluster and kube dirs and returns them.
184 Also created helm3 dirs according to new directory specification, paths are
185 returned and also environment variables that must be provided to execute commands
186
187 Helm 2 directory specification uses helm_home dir:
188
189 The variables assigned for this paths are:
190 - Helm hone: $HELM_HOME
191 - helm kubeconfig: $KUBECONFIG
192
193 :param cluster_name: cluster_name
194 :return: Dictionary with config_paths and dictionary with helm environment variables
195 """
196 base = self.fs.path
197 if base.endswith("/") or base.endswith("\\"):
198 base = base[:-1]
199
200 # base dir for cluster
201 cluster_dir = base + "/" + cluster_name
202
203 # kube dir
204 kube_dir = cluster_dir + "/" + ".kube"
205 if create_if_not_exist and not os.path.exists(kube_dir):
206 self.log.debug("Creating dir {}".format(kube_dir))
207 os.makedirs(kube_dir)
208
209 # helm home dir
210 helm_dir = cluster_dir + "/" + ".helm"
211 if create_if_not_exist and not os.path.exists(helm_dir):
212 self.log.debug("Creating dir {}".format(helm_dir))
213 os.makedirs(helm_dir)
214
215 config_filename = kube_dir + "/config"
216
217 # 2 - Prepare dictionary with paths
218 paths = {
219 "kube_dir": kube_dir,
220 "kube_config": config_filename,
221 "cluster_dir": cluster_dir,
222 "helm_dir": helm_dir,
223 }
224
225 for file_name, file in paths.items():
226 if "dir" in file_name and not os.path.exists(file):
227 err_msg = "{} dir does not exist".format(file)
228 self.log.error(err_msg)
229 raise K8sException(err_msg)
230
231 # 3 - Prepare environment variables
232 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
233
234 return paths, env
235
236 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
237 # init config, env
238 paths, env = self._init_paths_env(
239 cluster_name=cluster_id, create_if_not_exist=True
240 )
241
242 command1 = "env KUBECONFIG={} {} get manifest {} ".format(
243 kubeconfig, self._helm_command, kdu_instance
244 )
245 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
246 output, _rc = await self._local_async_exec_pipe(
247 command1, command2, env=env, raise_exception_on_error=True
248 )
249 services = self._parse_services(output)
250
251 return services
252
253 async def _cluster_init(
254 self, cluster_id: str, namespace: str, paths: dict, env: dict
255 ):
256 """
257 Implements the helm version dependent cluster initialization:
258 For helm2 it initialized tiller environment if needed
259 """
260
261 # check if tiller pod is up in cluster
262 command = "{} --kubeconfig={} --namespace={} get deployments".format(
263 self.kubectl_command, paths["kube_config"], namespace
264 )
265 output, _rc = await self._local_async_exec(
266 command=command, raise_exception_on_error=True, env=env
267 )
268
269 output_table = self._output_to_table(output=output)
270
271 # find 'tiller' pod in all pods
272 already_initialized = False
273 try:
274 for row in output_table:
275 if row[0].startswith("tiller-deploy"):
276 already_initialized = True
277 break
278 except Exception:
279 pass
280
281 # helm init
282 n2vc_installed_sw = False
283 if not already_initialized:
284 self.log.info(
285 "Initializing helm in client and server: {}".format(cluster_id)
286 )
287 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
288 self.kubectl_command, paths["kube_config"], self.service_account
289 )
290 _, _rc = await self._local_async_exec(
291 command=command, raise_exception_on_error=False, env=env
292 )
293
294 command = (
295 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
296 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
297 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
298 _, _rc = await self._local_async_exec(
299 command=command, raise_exception_on_error=False, env=env
300 )
301
302 command = (
303 "{} init --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
304 " {}"
305 ).format(
306 self._helm_command,
307 paths["kube_config"],
308 namespace,
309 paths["helm_dir"],
310 self.service_account,
311 "--stable-repo-url {}".format(self._stable_repo_url)
312 if self._stable_repo_url
313 else "--skip-repos",
314 )
315 _, _rc = await self._local_async_exec(
316 command=command, raise_exception_on_error=True, env=env
317 )
318 n2vc_installed_sw = True
319 else:
320 # check client helm installation
321 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
322 if not self._check_file_exists(
323 filename=check_file, exception_if_not_exists=False
324 ):
325 self.log.info("Initializing helm in client: {}".format(cluster_id))
326 command = (
327 "{} init --kubeconfig={} --tiller-namespace={} "
328 "--home={} --client-only {} "
329 ).format(
330 self._helm_command,
331 paths["kube_config"],
332 namespace,
333 paths["helm_dir"],
334 "--stable-repo-url {}".format(self._stable_repo_url)
335 if self._stable_repo_url
336 else "--skip-repos",
337 )
338 output, _rc = await self._local_async_exec(
339 command=command, raise_exception_on_error=True, env=env
340 )
341 else:
342 self.log.info("Helm client already initialized")
343
344 # remove old stable repo and add new one
345 cluster_uuid = "{}:{}".format(namespace, cluster_id)
346 repo_list = await self.repo_list(cluster_uuid)
347 for repo in repo_list:
348 if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
349 self.log.debug("Add new stable repo url: {}")
350 await self.repo_remove(cluster_uuid, "stable")
351 if self._stable_repo_url:
352 await self.repo_add(cluster_uuid, "stable", self._stable_repo_url)
353 break
354
355 return n2vc_installed_sw
356
357 async def _uninstall_sw(self, cluster_id: str, namespace: str):
358 # uninstall Tiller if necessary
359
360 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
361
362 # init paths, env
363 paths, env = self._init_paths_env(
364 cluster_name=cluster_id, create_if_not_exist=True
365 )
366
367 if not namespace:
368 # find namespace for tiller pod
369 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
370 self.kubectl_command, paths["kube_config"]
371 )
372 output, _rc = await self._local_async_exec(
373 command=command, raise_exception_on_error=False, env=env
374 )
375 output_table = self._output_to_table(output=output)
376 namespace = None
377 for r in output_table:
378 try:
379 if "tiller-deploy" in r[1]:
380 namespace = r[0]
381 break
382 except Exception:
383 pass
384 else:
385 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
386 self.log.error(msg)
387
388 self.log.debug("namespace for tiller: {}".format(namespace))
389
390 if namespace:
391 # uninstall tiller from cluster
392 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
393 command = "{} --kubeconfig={} --home={} reset".format(
394 self._helm_command, paths["kube_config"], paths["helm_dir"]
395 )
396 self.log.debug("resetting: {}".format(command))
397 output, _rc = await self._local_async_exec(
398 command=command, raise_exception_on_error=True, env=env
399 )
400 # Delete clusterrolebinding and serviceaccount.
401 # Ignore if errors for backward compatibility
402 command = (
403 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
404 "io/osm-tiller-cluster-rule"
405 ).format(self.kubectl_command, paths["kube_config"])
406 output, _rc = await self._local_async_exec(
407 command=command, raise_exception_on_error=False, env=env
408 )
409 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
410 self.kubectl_command, paths["kube_config"], self.service_account
411 )
412 output, _rc = await self._local_async_exec(
413 command=command, raise_exception_on_error=False, env=env
414 )
415
416 else:
417 self.log.debug("namespace not found")
418
419 async def _instances_list(self, cluster_id):
420 # init paths, env
421 paths, env = self._init_paths_env(
422 cluster_name=cluster_id, create_if_not_exist=True
423 )
424
425 command = "{} list --output yaml".format(self._helm_command)
426
427 output, _rc = await self._local_async_exec(
428 command=command, raise_exception_on_error=True, env=env
429 )
430
431 if output and len(output) > 0:
432 # parse yaml and update keys to lower case to unify with helm3
433 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
434 new_instances = []
435 for instance in instances:
436 new_instance = dict((k.lower(), v) for k, v in instance.items())
437 new_instances.append(new_instance)
438 return new_instances
439 else:
440 return []
441
442 def _get_inspect_command(
443 self, show_command: str, kdu_model: str, repo_str: str, version: str
444 ):
445 inspect_command = "{} inspect {} {}{} {}".format(
446 self._helm_command, show_command, kdu_model, repo_str, version
447 )
448 return inspect_command
449
450 async def _status_kdu(
451 self,
452 cluster_id: str,
453 kdu_instance: str,
454 namespace: str = None,
455 yaml_format: bool = False,
456 show_error_log: bool = False,
457 ) -> Union[str, dict]:
458 self.log.debug(
459 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
460 )
461
462 # init config, env
463 paths, env = self._init_paths_env(
464 cluster_name=cluster_id, create_if_not_exist=True
465 )
466 command = ("env KUBECONFIG={} {} status {} --output yaml").format(
467 paths["kube_config"], self._helm_command, kdu_instance
468 )
469 output, rc = await self._local_async_exec(
470 command=command,
471 raise_exception_on_error=True,
472 show_error_log=show_error_log,
473 env=env,
474 )
475
476 if yaml_format:
477 return str(output)
478
479 if rc != 0:
480 return None
481
482 data = yaml.load(output, Loader=yaml.SafeLoader)
483
484 # remove field 'notes'
485 try:
486 del data.get("info").get("status")["notes"]
487 except KeyError:
488 pass
489
490 # parse the manifest to a list of dictionaries
491 if "manifest" in data:
492 manifest_str = data.get("manifest")
493 manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
494
495 data["manifest"] = []
496 for doc in manifest_docs:
497 data["manifest"].append(doc)
498
499 # parse field 'resources'
500 try:
501 resources = str(data.get("info").get("status").get("resources"))
502 resource_table = self._output_to_table(resources)
503 data.get("info").get("status")["resources"] = resource_table
504 except Exception:
505 pass
506
507 # set description to lowercase (unify with helm3)
508 try:
509 data.get("info")["description"] = data.get("info").pop("Description")
510 except KeyError:
511 pass
512
513 return data
514
515 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
516 repo_ids = []
517 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
518 cluster = self.db.get_one("k8sclusters", cluster_filter)
519 if cluster:
520 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
521 return repo_ids
522 else:
523 raise K8sException(
524 "k8cluster with helm-id : {} not found".format(cluster_uuid)
525 )
526
527 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
528 # init config, env
529 paths, env = self._init_paths_env(
530 cluster_name=cluster_id, create_if_not_exist=True
531 )
532
533 status = await self._status_kdu(
534 cluster_id=cluster_id, kdu_instance=kdu_instance, yaml_format=False
535 )
536
537 # extract info.status.resources-> str
538 # format:
539 # ==> v1/Deployment
540 # NAME READY UP-TO-DATE AVAILABLE AGE
541 # halting-horse-mongodb 0/1 1 0 0s
542 # halting-petit-mongodb 1/1 1 0 0s
543 # blank line
544 resources = K8sHelmBaseConnector._get_deep(
545 status, ("info", "status", "resources")
546 )
547
548 # convert to table
549 resources = K8sHelmBaseConnector._output_to_table(resources)
550
551 num_lines = len(resources)
552 index = 0
553 ready = True
554 while index < num_lines:
555 try:
556 line1 = resources[index]
557 index += 1
558 # find '==>' in column 0
559 if line1[0] == "==>":
560 line2 = resources[index]
561 index += 1
562 # find READY in column 1
563 if line2[1] == "READY":
564 # read next lines
565 line3 = resources[index]
566 index += 1
567 while len(line3) > 1 and index < num_lines:
568 ready_value = line3[1]
569 parts = ready_value.split(sep="/")
570 current = int(parts[0])
571 total = int(parts[1])
572 if current < total:
573 self.log.debug("NOT READY:\n {}".format(line3))
574 ready = False
575 line3 = resources[index]
576 index += 1
577
578 except Exception:
579 pass
580
581 return ready
582
583 def _get_install_command(
584 self,
585 kdu_model,
586 kdu_instance,
587 namespace,
588 params_str,
589 version,
590 atomic,
591 timeout,
592 kubeconfig,
593 ) -> str:
594 timeout_str = ""
595 if timeout:
596 timeout_str = "--timeout {}".format(timeout)
597
598 # atomic
599 atomic_str = ""
600 if atomic:
601 atomic_str = "--atomic"
602 # namespace
603 namespace_str = ""
604 if namespace:
605 namespace_str = "--namespace {}".format(namespace)
606
607 # version
608 version_str = ""
609 if version:
610 version_str = version_str = "--version {}".format(version)
611
612 command = (
613 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
614 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
615 kubeconfig=kubeconfig,
616 helm=self._helm_command,
617 atomic=atomic_str,
618 params=params_str,
619 timeout=timeout_str,
620 name=kdu_instance,
621 ns=namespace_str,
622 model=kdu_model,
623 ver=version_str,
624 )
625 )
626 return command
627
628 def _get_upgrade_command(
629 self,
630 kdu_model,
631 kdu_instance,
632 namespace,
633 params_str,
634 version,
635 atomic,
636 timeout,
637 kubeconfig,
638 ) -> str:
639 timeout_str = ""
640 if timeout:
641 timeout_str = "--timeout {}".format(timeout)
642
643 # atomic
644 atomic_str = ""
645 if atomic:
646 atomic_str = "--atomic"
647
648 # version
649 version_str = ""
650 if version:
651 version_str = "--version {}".format(version)
652
653 command = (
654 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"
655 ).format(
656 kubeconfig=kubeconfig,
657 helm=self._helm_command,
658 atomic=atomic_str,
659 params=params_str,
660 timeout=timeout_str,
661 name=kdu_instance,
662 model=kdu_model,
663 ver=version_str,
664 )
665 return command
666
667 def _get_rollback_command(
668 self, kdu_instance, namespace, revision, kubeconfig
669 ) -> str:
670 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
671 kubeconfig, self._helm_command, kdu_instance, revision
672 )
673
674 def _get_uninstall_command(
675 self, kdu_instance: str, namespace: str, kubeconfig: str
676 ) -> str:
677 return "env KUBECONFIG={} {} delete --purge {}".format(
678 kubeconfig, self._helm_command, kdu_instance
679 )