Bug 2064 fixed
[osm/N2VC.git] / n2vc / k8s_helm3_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 from typing import Union
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelm3Connector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm3",
44 log: object = None,
45 on_update_db=None,
46 ):
47 """
48 Initializes helm connector for helm v3
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(
60 self,
61 db=db,
62 log=log,
63 fs=fs,
64 kubectl_command=kubectl_command,
65 helm_command=helm_command,
66 on_update_db=on_update_db,
67 )
68
69 self.log.info("K8S Helm3 connector initialized")
70
71 async def install(
72 self,
73 cluster_uuid: str,
74 kdu_model: str,
75 kdu_instance: str,
76 atomic: bool = True,
77 timeout: float = 300,
78 params: dict = None,
79 db_dict: dict = None,
80 kdu_name: str = None,
81 namespace: str = None,
82 **kwargs,
83 ):
84 """Install a helm chart
85
86 :param cluster_uuid str: The UUID of the cluster to install to
87 :param kdu_model str: chart/reference (string), which can be either
88 of these options:
89 - a name of chart available via the repos known by OSM
90 (e.g. stable/openldap, stable/openldap:1.2.4)
91 - a path to a packaged chart (e.g. mychart.tgz)
92 - a path to an unpacked chart directory or a URL (e.g. mychart)
93 :param kdu_instance: Kdu instance name
94 :param atomic bool: If set, waits until the model is active and resets
95 the cluster on failure.
96 :param timeout int: The time, in seconds, to wait for the install
97 to finish
98 :param params dict: Key-value pairs of instantiation parameters
99 :param kdu_name: Name of the KDU instance to be installed
100 :param namespace: K8s namespace to use for the KDU instance
101
102 :param kwargs: Additional parameters (None yet)
103
104 :return: True if successful
105 """
106
107 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
108
109 # sync local dir
110 self.fs.sync(from_path=cluster_uuid)
111
112 # init env, paths
113 paths, env = self._init_paths_env(
114 cluster_name=cluster_uuid, create_if_not_exist=True
115 )
116
117 # for helm3 if namespace does not exist must create it
118 if namespace and namespace != "kube-system":
119 if not await self._namespace_exists(cluster_uuid, namespace):
120 try:
121 await self._create_namespace(cluster_uuid, namespace)
122 except Exception as e:
123 if not await self._namespace_exists(cluster_uuid, namespace):
124 err_msg = (
125 "namespace {} does not exist in cluster_id {} "
126 "error message: ".format(namespace, e)
127 )
128 self.log.error(err_msg)
129 raise K8sException(err_msg)
130
131 await self._install_impl(
132 cluster_uuid,
133 kdu_model,
134 paths,
135 env,
136 kdu_instance,
137 atomic=atomic,
138 timeout=timeout,
139 params=params,
140 db_dict=db_dict,
141 kdu_name=kdu_name,
142 namespace=namespace,
143 )
144
145 # sync fs
146 self.fs.reverse_sync(from_path=cluster_uuid)
147
148 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
149 return True
150
151 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
152
153 self.log.debug(
154 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
155 )
156
157 return await self._exec_inspect_command(
158 inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
159 )
160
161 """
162 ####################################################################################
163 ################################### P R I V A T E ##################################
164 ####################################################################################
165 """
166
167 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
168 """
169 Creates and returns base cluster and kube dirs and returns them.
170 Also created helm3 dirs according to new directory specification, paths are
171 returned and also environment variables that must be provided to execute commands
172
173 Helm 3 directory specification uses XDG categories for variable support:
174 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
175 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
176 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
177
178 The variables assigned for this paths are:
179 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
180 $HELM_PATH_DATA but looking and helm env the variable names are different)
181 - Cache: $HELM_CACHE_HOME
182 - Config: $HELM_CONFIG_HOME
183 - Data: $HELM_DATA_HOME
184 - helm kubeconfig: $KUBECONFIG
185
186 :param cluster_name: cluster_name
187 :return: Dictionary with config_paths and dictionary with helm environment variables
188 """
189
190 base = self.fs.path
191 if base.endswith("/") or base.endswith("\\"):
192 base = base[:-1]
193
194 # base dir for cluster
195 cluster_dir = base + "/" + cluster_name
196
197 # kube dir
198 kube_dir = cluster_dir + "/" + ".kube"
199 if create_if_not_exist and not os.path.exists(kube_dir):
200 self.log.debug("Creating dir {}".format(kube_dir))
201 os.makedirs(kube_dir)
202
203 helm_path_cache = cluster_dir + "/.cache/helm"
204 if create_if_not_exist and not os.path.exists(helm_path_cache):
205 self.log.debug("Creating dir {}".format(helm_path_cache))
206 os.makedirs(helm_path_cache)
207
208 helm_path_config = cluster_dir + "/.config/helm"
209 if create_if_not_exist and not os.path.exists(helm_path_config):
210 self.log.debug("Creating dir {}".format(helm_path_config))
211 os.makedirs(helm_path_config)
212
213 helm_path_data = cluster_dir + "/.local/share/helm"
214 if create_if_not_exist and not os.path.exists(helm_path_data):
215 self.log.debug("Creating dir {}".format(helm_path_data))
216 os.makedirs(helm_path_data)
217
218 config_filename = kube_dir + "/config"
219
220 # 2 - Prepare dictionary with paths
221 paths = {
222 "kube_dir": kube_dir,
223 "kube_config": config_filename,
224 "cluster_dir": cluster_dir,
225 }
226
227 # 3 - Prepare environment variables
228 env = {
229 "HELM_CACHE_HOME": helm_path_cache,
230 "HELM_CONFIG_HOME": helm_path_config,
231 "HELM_DATA_HOME": helm_path_data,
232 "KUBECONFIG": config_filename,
233 }
234
235 for file_name, file in paths.items():
236 if "dir" in file_name and not os.path.exists(file):
237 err_msg = "{} dir does not exist".format(file)
238 self.log.error(err_msg)
239 raise K8sException(err_msg)
240
241 return paths, env
242
243 async def _namespace_exists(self, cluster_id, namespace) -> bool:
244 self.log.debug(
245 "checking if namespace {} exists cluster_id {}".format(
246 namespace, cluster_id
247 )
248 )
249 namespaces = await self._get_namespaces(cluster_id)
250 return namespace in namespaces if namespaces else False
251
252 async def _get_namespaces(self, cluster_id: str):
253
254 self.log.debug("get namespaces cluster_id {}".format(cluster_id))
255
256 # init config, env
257 paths, env = self._init_paths_env(
258 cluster_name=cluster_id, create_if_not_exist=True
259 )
260
261 command = "{} --kubeconfig={} get namespaces -o=yaml".format(
262 self.kubectl_command, paths["kube_config"]
263 )
264 output, _rc = await self._local_async_exec(
265 command=command, raise_exception_on_error=True, env=env
266 )
267
268 data = yaml.load(output, Loader=yaml.SafeLoader)
269 namespaces = [item["metadata"]["name"] for item in data["items"]]
270 self.log.debug(f"namespaces {namespaces}")
271
272 return namespaces
273
274 async def _create_namespace(self, cluster_id: str, namespace: str):
275
276 self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
277
278 # init config, env
279 paths, env = self._init_paths_env(
280 cluster_name=cluster_id, create_if_not_exist=True
281 )
282
283 command = "{} --kubeconfig={} create namespace {}".format(
284 self.kubectl_command, paths["kube_config"], namespace
285 )
286 _, _rc = await self._local_async_exec(
287 command=command, raise_exception_on_error=True, env=env
288 )
289 self.log.debug(f"namespace {namespace} created")
290
291 return _rc
292
293 async def _get_services(
294 self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str
295 ):
296
297 # init config, env
298 paths, env = self._init_paths_env(
299 cluster_name=cluster_id, create_if_not_exist=True
300 )
301
302 command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
303 kubeconfig, self._helm_command, kdu_instance, namespace
304 )
305 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
306 output, _rc = await self._local_async_exec_pipe(
307 command1, command2, env=env, raise_exception_on_error=True
308 )
309 services = self._parse_services(output)
310
311 return services
312
313 async def _cluster_init(self, cluster_id, namespace, paths, env):
314 """
315 Implements the helm version dependent cluster initialization:
316 For helm3 it creates the namespace if it is not created
317 """
318 if namespace != "kube-system":
319 namespaces = await self._get_namespaces(cluster_id)
320 if namespace not in namespaces:
321 await self._create_namespace(cluster_id, namespace)
322
323 repo_list = await self.repo_list(cluster_id)
324 stable_repo = [repo for repo in repo_list if repo["name"] == "stable"]
325 if not stable_repo and self._stable_repo_url:
326 await self.repo_add(cluster_id, "stable", self._stable_repo_url)
327
328 # Returns False as no software needs to be uninstalled
329 return False
330
331 async def _uninstall_sw(self, cluster_id: str, namespace: str):
332 # nothing to do to uninstall sw
333 pass
334
335 async def _instances_list(self, cluster_id: str):
336
337 # init paths, env
338 paths, env = self._init_paths_env(
339 cluster_name=cluster_id, create_if_not_exist=True
340 )
341
342 command = "{} list --all-namespaces --output yaml".format(self._helm_command)
343 output, _rc = await self._local_async_exec(
344 command=command, raise_exception_on_error=True, env=env
345 )
346
347 if output and len(output) > 0:
348 self.log.debug("instances list output: {}".format(output))
349 return yaml.load(output, Loader=yaml.SafeLoader)
350 else:
351 return []
352
353 def _get_inspect_command(
354 self, inspect_command: str, kdu_model: str, repo_str: str, version: str
355 ):
356 """Generates the command to obtain the information about an Helm Chart package
357 (┬┤helm show ...┬┤ command)
358
359 Args:
360 show_command: the second part of the command (`helm show <show_command>`)
361 kdu_model: The name or path of an Helm Chart
362 repo_url: Helm Chart repository url
363 version: constraint with specific version of the Chart to use
364
365 Returns:
366 str: the generated Helm Chart command
367 """
368
369 inspect_command = "{} show {} {}{} {}".format(
370 self._helm_command, inspect_command, kdu_model, repo_str, version
371 )
372 return inspect_command
373
374 def _get_get_command(
375 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
376 ):
377 get_command = (
378 "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
379 kubeconfig, self._helm_command, get_command, kdu_instance, namespace
380 )
381 )
382 return get_command
383
384 async def _status_kdu(
385 self,
386 cluster_id: str,
387 kdu_instance: str,
388 namespace: str = None,
389 yaml_format: bool = False,
390 show_error_log: bool = False,
391 ) -> Union[str, dict]:
392
393 self.log.debug(
394 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
395 )
396
397 if not namespace:
398 namespace = "kube-system"
399
400 # init config, env
401 paths, env = self._init_paths_env(
402 cluster_name=cluster_id, create_if_not_exist=True
403 )
404 command = "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
405 paths["kube_config"], self._helm_command, kdu_instance, namespace
406 )
407
408 output, rc = await self._local_async_exec(
409 command=command,
410 raise_exception_on_error=True,
411 show_error_log=show_error_log,
412 env=env,
413 )
414
415 if yaml_format:
416 return str(output)
417
418 if rc != 0:
419 return None
420
421 data = yaml.load(output, Loader=yaml.SafeLoader)
422
423 # remove field 'notes' and manifest
424 try:
425 del data.get("info")["notes"]
426 except KeyError:
427 pass
428
429 # parse the manifest to a list of dictionaries
430 if "manifest" in data:
431 manifest_str = data.get("manifest")
432 manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
433
434 data["manifest"] = []
435 for doc in manifest_docs:
436 data["manifest"].append(doc)
437
438 return data
439
440 def _get_install_command(
441 self,
442 kdu_model: str,
443 kdu_instance: str,
444 namespace: str,
445 params_str: str,
446 version: str,
447 atomic: bool,
448 timeout: float,
449 kubeconfig: str,
450 ) -> str:
451
452 timeout_str = ""
453 if timeout:
454 timeout_str = "--timeout {}s".format(timeout)
455
456 # atomic
457 atomic_str = ""
458 if atomic:
459 atomic_str = "--atomic"
460 # namespace
461 namespace_str = ""
462 if namespace:
463 namespace_str = "--namespace {}".format(namespace)
464
465 # version
466 version_str = ""
467 if version:
468 version_str = "--version {}".format(version)
469
470 command = (
471 "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml "
472 "{params} {timeout} {ns} {model} {ver}".format(
473 kubeconfig=kubeconfig,
474 helm=self._helm_command,
475 name=kdu_instance,
476 atomic=atomic_str,
477 params=params_str,
478 timeout=timeout_str,
479 ns=namespace_str,
480 model=kdu_model,
481 ver=version_str,
482 )
483 )
484 return command
485
486 def _get_upgrade_scale_command(
487 self,
488 kdu_model: str,
489 kdu_instance: str,
490 namespace: str,
491 scale: int,
492 version: str,
493 atomic: bool,
494 replica_str: str,
495 timeout: float,
496 resource_name: str,
497 kubeconfig: str,
498 ) -> str:
499
500 timeout_str = ""
501 if timeout:
502 timeout_str = "--timeout {}s".format(timeout)
503
504 # atomic
505 atomic_str = ""
506 if atomic:
507 atomic_str = "--atomic"
508
509 # version
510 version_str = ""
511 if version:
512 version_str = "--version {}".format(version)
513
514 # namespace
515 namespace_str = ""
516 if namespace:
517 namespace_str = "--namespace {}".format(namespace)
518
519 # scale
520 if resource_name:
521 scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
522 else:
523 scale_dict = {replica_str: scale}
524
525 scale_str = self._params_to_set_option(scale_dict)
526
527 command = (
528 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} --output yaml {scale} "
529 "{timeout} {ver}"
530 ).format(
531 helm=self._helm_command,
532 name=kdu_instance,
533 namespace=namespace_str,
534 atomic=atomic_str,
535 scale=scale_str,
536 timeout=timeout_str,
537 model=kdu_model,
538 ver=version_str,
539 kubeconfig=kubeconfig,
540 )
541 return command
542
543 def _get_upgrade_command(
544 self,
545 kdu_model: str,
546 kdu_instance: str,
547 namespace: str,
548 params_str: str,
549 version: str,
550 atomic: bool,
551 timeout: float,
552 kubeconfig: str,
553 ) -> str:
554
555 timeout_str = ""
556 if timeout:
557 timeout_str = "--timeout {}s".format(timeout)
558
559 # atomic
560 atomic_str = ""
561 if atomic:
562 atomic_str = "--atomic"
563
564 # version
565 version_str = ""
566 if version:
567 version_str = "--version {}".format(version)
568
569 # namespace
570 namespace_str = ""
571 if namespace:
572 namespace_str = "--namespace {}".format(namespace)
573
574 command = (
575 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} "
576 "--output yaml {params} {timeout} {ver}"
577 ).format(
578 kubeconfig=kubeconfig,
579 helm=self._helm_command,
580 name=kdu_instance,
581 namespace=namespace_str,
582 atomic=atomic_str,
583 params=params_str,
584 timeout=timeout_str,
585 model=kdu_model,
586 ver=version_str,
587 )
588 return command
589
590 def _get_rollback_command(
591 self, kdu_instance: str, namespace: str, revision: float, kubeconfig: str
592 ) -> str:
593 return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
594 kubeconfig, self._helm_command, kdu_instance, revision, namespace
595 )
596
597 def _get_uninstall_command(
598 self, kdu_instance: str, namespace: str, kubeconfig: str
599 ) -> str:
600
601 return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
602 kubeconfig, self._helm_command, kdu_instance, namespace
603 )
604
605 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
606 repo_ids = []
607 cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
608 cluster = self.db.get_one("k8sclusters", cluster_filter)
609 if cluster:
610 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
611 return repo_ids
612 else:
613 raise K8sException(
614 "k8cluster with helm-id : {} not found".format(cluster_uuid)
615 )