Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelmConnector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm",
44 log: object = None,
45 on_update_db=None,
46 ):
47 """
48 Initializes helm connector for helm v2
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(
60 self,
61 db=db,
62 log=log,
63 fs=fs,
64 kubectl_command=kubectl_command,
65 helm_command=helm_command,
66 on_update_db=on_update_db,
67 )
68
69 self.log.info("Initializing K8S Helm2 connector")
70
71 # initialize helm client-only
72 self.log.debug("Initializing helm client-only...")
73 command = "{} init --client-only {} ".format(
74 self._helm_command,
75 "--stable-repo-url {}".format(self._stable_repo_url)
76 if self._stable_repo_url
77 else "--skip-repos",
78 )
79 try:
80 asyncio.ensure_future(
81 self._local_async_exec(command=command, raise_exception_on_error=False)
82 )
83 # loop = asyncio.get_event_loop()
84 # loop.run_until_complete(self._local_async_exec(command=command,
85 # raise_exception_on_error=False))
86 except Exception as e:
87 self.warning(
88 msg="helm init failed (it was already initialized): {}".format(e)
89 )
90
91 self.log.info("K8S Helm2 connector initialized")
92
93 async def install(
94 self,
95 cluster_uuid: str,
96 kdu_model: str,
97 kdu_instance: str,
98 atomic: bool = True,
99 timeout: float = 300,
100 params: dict = None,
101 db_dict: dict = None,
102 kdu_name: str = None,
103 namespace: str = None,
104 **kwargs,
105 ):
106 """
107 Deploys of a new KDU instance. It would implicitly rely on the `install` call
108 to deploy the Chart/Bundle properly parametrized (in practice, this call would
109 happen before any _initial-config-primitive_of the VNF is called).
110
111 :param cluster_uuid: UUID of a K8s cluster known by OSM
112 :param kdu_model: chart/ reference (string), which can be either
113 of these options:
114 - a name of chart available via the repos known by OSM
115 - a path to a packaged chart
116 - a path to an unpacked chart directory or a URL
117 :param kdu_instance: Kdu instance name
118 :param atomic: If set, installation process purges chart/bundle on fail, also
119 will wait until all the K8s objects are active
120 :param timeout: Time in seconds to wait for the install of the chart/bundle
121 (defaults to Helm default timeout: 300s)
122 :param params: dictionary of key-value pairs for instantiation parameters
123 (overriding default values)
124 :param dict db_dict: where to write into database when the status changes.
125 It contains a dict with {collection: <str>, filter: {},
126 path: <str>},
127 e.g. {collection: "nsrs", filter:
128 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
129 :param kdu_name: Name of the KDU instance to be installed
130 :param namespace: K8s namespace to use for the KDU instance
131 :param kwargs: Additional parameters (None yet)
132 :return: True if successful
133 """
134 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
135 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
136
137 # sync local dir
138 self.fs.sync(from_path=cluster_id)
139
140 # init env, paths
141 paths, env = self._init_paths_env(
142 cluster_name=cluster_id, create_if_not_exist=True
143 )
144
145 await self._install_impl(
146 cluster_id,
147 kdu_model,
148 paths,
149 env,
150 kdu_instance,
151 atomic=atomic,
152 timeout=timeout,
153 params=params,
154 db_dict=db_dict,
155 kdu_name=kdu_name,
156 namespace=namespace,
157 )
158
159 # sync fs
160 self.fs.reverse_sync(from_path=cluster_id)
161
162 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
163 return True
164
165 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
166
167 self.log.debug(
168 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
169 )
170
171 return await self._exec_inspect_comand(
172 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
173 )
174
175 """
176 ####################################################################################
177 ################################### P R I V A T E ##################################
178 ####################################################################################
179 """
180
181 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
182 """
183 Creates and returns base cluster and kube dirs and returns them.
184 Also created helm3 dirs according to new directory specification, paths are
185 returned and also environment variables that must be provided to execute commands
186
187 Helm 2 directory specification uses helm_home dir:
188
189 The variables assigned for this paths are:
190 - Helm hone: $HELM_HOME
191 - helm kubeconfig: $KUBECONFIG
192
193 :param cluster_name: cluster_name
194 :return: Dictionary with config_paths and dictionary with helm environment variables
195 """
196 base = self.fs.path
197 if base.endswith("/") or base.endswith("\\"):
198 base = base[:-1]
199
200 # base dir for cluster
201 cluster_dir = base + "/" + cluster_name
202
203 # kube dir
204 kube_dir = cluster_dir + "/" + ".kube"
205 if create_if_not_exist and not os.path.exists(kube_dir):
206 self.log.debug("Creating dir {}".format(kube_dir))
207 os.makedirs(kube_dir)
208
209 # helm home dir
210 helm_dir = cluster_dir + "/" + ".helm"
211 if create_if_not_exist and not os.path.exists(helm_dir):
212 self.log.debug("Creating dir {}".format(helm_dir))
213 os.makedirs(helm_dir)
214
215 config_filename = kube_dir + "/config"
216
217 # 2 - Prepare dictionary with paths
218 paths = {
219 "kube_dir": kube_dir,
220 "kube_config": config_filename,
221 "cluster_dir": cluster_dir,
222 "helm_dir": helm_dir,
223 }
224
225 for file_name, file in paths.items():
226 if "dir" in file_name and not os.path.exists(file):
227 err_msg = "{} dir does not exist".format(file)
228 self.log.error(err_msg)
229 raise K8sException(err_msg)
230
231 # 3 - Prepare environment variables
232 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
233
234 return paths, env
235
236 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
237
238 # init config, env
239 paths, env = self._init_paths_env(
240 cluster_name=cluster_id, create_if_not_exist=True
241 )
242
243 command1 = "env KUBECONFIG={} {} get manifest {} ".format(
244 kubeconfig, self._helm_command, kdu_instance
245 )
246 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
247 output, _rc = await self._local_async_exec_pipe(
248 command1, command2, env=env, raise_exception_on_error=True
249 )
250 services = self._parse_services(output)
251
252 return services
253
254 async def _cluster_init(
255 self, cluster_id: str, namespace: str, paths: dict, env: dict
256 ):
257 """
258 Implements the helm version dependent cluster initialization:
259 For helm2 it initialized tiller environment if needed
260 """
261
262 # check if tiller pod is up in cluster
263 command = "{} --kubeconfig={} --namespace={} get deployments".format(
264 self.kubectl_command, paths["kube_config"], namespace
265 )
266 output, _rc = await self._local_async_exec(
267 command=command, raise_exception_on_error=True, env=env
268 )
269
270 output_table = self._output_to_table(output=output)
271
272 # find 'tiller' pod in all pods
273 already_initialized = False
274 try:
275 for row in output_table:
276 if row[0].startswith("tiller-deploy"):
277 already_initialized = True
278 break
279 except Exception:
280 pass
281
282 # helm init
283 n2vc_installed_sw = False
284 if not already_initialized:
285 self.log.info(
286 "Initializing helm in client and server: {}".format(cluster_id)
287 )
288 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
289 self.kubectl_command, paths["kube_config"], self.service_account
290 )
291 _, _rc = await self._local_async_exec(
292 command=command, raise_exception_on_error=False, env=env
293 )
294
295 command = (
296 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
297 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
298 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
299 _, _rc = await self._local_async_exec(
300 command=command, raise_exception_on_error=False, env=env
301 )
302
303 command = (
304 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
305 " {} init"
306 ).format(
307 self._helm_command,
308 paths["kube_config"],
309 namespace,
310 paths["helm_dir"],
311 self.service_account,
312 "--stable-repo-url {}".format(self._stable_repo_url)
313 if self._stable_repo_url
314 else "--skip-repos",
315 )
316 _, _rc = await self._local_async_exec(
317 command=command, raise_exception_on_error=True, env=env
318 )
319 n2vc_installed_sw = True
320 else:
321 # check client helm installation
322 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
323 if not self._check_file_exists(
324 filename=check_file, exception_if_not_exists=False
325 ):
326 self.log.info("Initializing helm in client: {}".format(cluster_id))
327 command = (
328 "{} --kubeconfig={} --tiller-namespace={} "
329 "--home={} init --client-only {} "
330 ).format(
331 self._helm_command,
332 paths["kube_config"],
333 namespace,
334 paths["helm_dir"],
335 "--stable-repo-url {}".format(self._stable_repo_url)
336 if self._stable_repo_url
337 else "--skip-repos",
338 )
339 output, _rc = await self._local_async_exec(
340 command=command, raise_exception_on_error=True, env=env
341 )
342 else:
343 self.log.info("Helm client already initialized")
344
345 # remove old stable repo and add new one
346 cluster_uuid = "{}:{}".format(namespace, cluster_id)
347 repo_list = await self.repo_list(cluster_uuid)
348 for repo in repo_list:
349 if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
350 self.log.debug("Add new stable repo url: {}")
351 await self.repo_remove(cluster_uuid, "stable")
352 if self._stable_repo_url:
353 await self.repo_add(cluster_uuid, "stable", self._stable_repo_url)
354 break
355
356 return n2vc_installed_sw
357
358 async def _uninstall_sw(self, cluster_id: str, namespace: str):
359 # uninstall Tiller if necessary
360
361 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
362
363 # init paths, env
364 paths, env = self._init_paths_env(
365 cluster_name=cluster_id, create_if_not_exist=True
366 )
367
368 if not namespace:
369 # find namespace for tiller pod
370 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
371 self.kubectl_command, paths["kube_config"]
372 )
373 output, _rc = await self._local_async_exec(
374 command=command, raise_exception_on_error=False, env=env
375 )
376 output_table = self._output_to_table(output=output)
377 namespace = None
378 for r in output_table:
379 try:
380 if "tiller-deploy" in r[1]:
381 namespace = r[0]
382 break
383 except Exception:
384 pass
385 else:
386 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
387 self.log.error(msg)
388
389 self.log.debug("namespace for tiller: {}".format(namespace))
390
391 if namespace:
392 # uninstall tiller from cluster
393 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
394 command = "{} --kubeconfig={} --home={} reset".format(
395 self._helm_command, paths["kube_config"], paths["helm_dir"]
396 )
397 self.log.debug("resetting: {}".format(command))
398 output, _rc = await self._local_async_exec(
399 command=command, raise_exception_on_error=True, env=env
400 )
401 # Delete clusterrolebinding and serviceaccount.
402 # Ignore if errors for backward compatibility
403 command = (
404 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
405 "io/osm-tiller-cluster-rule"
406 ).format(self.kubectl_command, paths["kube_config"])
407 output, _rc = await self._local_async_exec(
408 command=command, raise_exception_on_error=False, env=env
409 )
410 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
411 self.kubectl_command, paths["kube_config"], self.service_account
412 )
413 output, _rc = await self._local_async_exec(
414 command=command, raise_exception_on_error=False, env=env
415 )
416
417 else:
418 self.log.debug("namespace not found")
419
420 async def _instances_list(self, cluster_id):
421
422 # init paths, env
423 paths, env = self._init_paths_env(
424 cluster_name=cluster_id, create_if_not_exist=True
425 )
426
427 command = "{} list --output yaml".format(self._helm_command)
428
429 output, _rc = await self._local_async_exec(
430 command=command, raise_exception_on_error=True, env=env
431 )
432
433 if output and len(output) > 0:
434 # parse yaml and update keys to lower case to unify with helm3
435 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
436 new_instances = []
437 for instance in instances:
438 new_instance = dict((k.lower(), v) for k, v in instance.items())
439 new_instances.append(new_instance)
440 return new_instances
441 else:
442 return []
443
444 def _get_inspect_command(
445 self, show_command: str, kdu_model: str, repo_str: str, version: str
446 ):
447 inspect_command = "{} inspect {} {}{} {}".format(
448 self._helm_command, show_command, kdu_model, repo_str, version
449 )
450 return inspect_command
451
452 async def _status_kdu(
453 self,
454 cluster_id: str,
455 kdu_instance: str,
456 namespace: str = None,
457 show_error_log: bool = False,
458 return_text: bool = False,
459 ):
460
461 self.log.debug(
462 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
463 )
464
465 # init config, env
466 paths, env = self._init_paths_env(
467 cluster_name=cluster_id, create_if_not_exist=True
468 )
469 command = ("env KUBECONFIG={} {} status {} --output yaml").format(
470 paths["kube_config"], self._helm_command, kdu_instance
471 )
472 output, rc = await self._local_async_exec(
473 command=command,
474 raise_exception_on_error=True,
475 show_error_log=show_error_log,
476 env=env,
477 )
478
479 if return_text:
480 return str(output)
481
482 if rc != 0:
483 return None
484
485 data = yaml.load(output, Loader=yaml.SafeLoader)
486
487 # remove field 'notes'
488 try:
489 del data.get("info").get("status")["notes"]
490 except KeyError:
491 pass
492
493 # parse field 'resources'
494 try:
495 resources = str(data.get("info").get("status").get("resources"))
496 resource_table = self._output_to_table(resources)
497 data.get("info").get("status")["resources"] = resource_table
498 except Exception:
499 pass
500
501 # set description to lowercase (unify with helm3)
502 try:
503 data.get("info")["description"] = data.get("info").pop("Description")
504 except KeyError:
505 pass
506
507 return data
508
509 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
510 repo_ids = []
511 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
512 cluster = self.db.get_one("k8sclusters", cluster_filter)
513 if cluster:
514 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
515 return repo_ids
516 else:
517 raise K8sException(
518 "k8cluster with helm-id : {} not found".format(cluster_uuid)
519 )
520
521 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
522 # init config, env
523 paths, env = self._init_paths_env(
524 cluster_name=cluster_id, create_if_not_exist=True
525 )
526
527 status = await self._status_kdu(
528 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
529 )
530
531 # extract info.status.resources-> str
532 # format:
533 # ==> v1/Deployment
534 # NAME READY UP-TO-DATE AVAILABLE AGE
535 # halting-horse-mongodb 0/1 1 0 0s
536 # halting-petit-mongodb 1/1 1 0 0s
537 # blank line
538 resources = K8sHelmBaseConnector._get_deep(
539 status, ("info", "status", "resources")
540 )
541
542 # convert to table
543 resources = K8sHelmBaseConnector._output_to_table(resources)
544
545 num_lines = len(resources)
546 index = 0
547 ready = True
548 while index < num_lines:
549 try:
550 line1 = resources[index]
551 index += 1
552 # find '==>' in column 0
553 if line1[0] == "==>":
554 line2 = resources[index]
555 index += 1
556 # find READY in column 1
557 if line2[1] == "READY":
558 # read next lines
559 line3 = resources[index]
560 index += 1
561 while len(line3) > 1 and index < num_lines:
562 ready_value = line3[1]
563 parts = ready_value.split(sep="/")
564 current = int(parts[0])
565 total = int(parts[1])
566 if current < total:
567 self.log.debug("NOT READY:\n {}".format(line3))
568 ready = False
569 line3 = resources[index]
570 index += 1
571
572 except Exception:
573 pass
574
575 return ready
576
577 def _get_install_command(
578 self,
579 kdu_model,
580 kdu_instance,
581 namespace,
582 params_str,
583 version,
584 atomic,
585 timeout,
586 kubeconfig,
587 ) -> str:
588
589 timeout_str = ""
590 if timeout:
591 timeout_str = "--timeout {}".format(timeout)
592
593 # atomic
594 atomic_str = ""
595 if atomic:
596 atomic_str = "--atomic"
597 # namespace
598 namespace_str = ""
599 if namespace:
600 namespace_str = "--namespace {}".format(namespace)
601
602 # version
603 version_str = ""
604 if version:
605 version_str = version_str = "--version {}".format(version)
606
607 command = (
608 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
609 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
610 kubeconfig=kubeconfig,
611 helm=self._helm_command,
612 atomic=atomic_str,
613 params=params_str,
614 timeout=timeout_str,
615 name=kdu_instance,
616 ns=namespace_str,
617 model=kdu_model,
618 ver=version_str,
619 )
620 )
621 return command
622
623 def _get_upgrade_command(
624 self,
625 kdu_model,
626 kdu_instance,
627 namespace,
628 params_str,
629 version,
630 atomic,
631 timeout,
632 kubeconfig,
633 ) -> str:
634
635 timeout_str = ""
636 if timeout:
637 timeout_str = "--timeout {}".format(timeout)
638
639 # atomic
640 atomic_str = ""
641 if atomic:
642 atomic_str = "--atomic"
643
644 # version
645 version_str = ""
646 if version:
647 version_str = "--version {}".format(version)
648
649 command = (
650 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"
651 ).format(
652 kubeconfig=kubeconfig,
653 helm=self._helm_command,
654 atomic=atomic_str,
655 params=params_str,
656 timeout=timeout_str,
657 name=kdu_instance,
658 model=kdu_model,
659 ver=version_str,
660 )
661 return command
662
663 def _get_rollback_command(
664 self, kdu_instance, namespace, revision, kubeconfig
665 ) -> str:
666 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
667 kubeconfig, self._helm_command, kdu_instance, revision
668 )
669
670 def _get_uninstall_command(
671 self, kdu_instance: str, namespace: str, kubeconfig: str
672 ) -> str:
673 return "env KUBECONFIG={} {} delete --purge {}".format(
674 kubeconfig, self._helm_command, kdu_instance
675 )