4baadae203a35455cd3b6bd9729535339f34ae7c
[osm/N2VC.git] / n2vc / k8s_helm3_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 from typing import Union
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelm3Connector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm3",
44 log: object = None,
45 on_update_db=None,
46 ):
47 """
48 Initializes helm connector for helm v3
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(
60 self,
61 db=db,
62 log=log,
63 fs=fs,
64 kubectl_command=kubectl_command,
65 helm_command=helm_command,
66 on_update_db=on_update_db,
67 )
68
69 self.log.info("K8S Helm3 connector initialized")
70
71 async def install(
72 self,
73 cluster_uuid: str,
74 kdu_model: str,
75 kdu_instance: str,
76 atomic: bool = True,
77 timeout: float = 300,
78 params: dict = None,
79 db_dict: dict = None,
80 kdu_name: str = None,
81 namespace: str = None,
82 **kwargs,
83 ):
84 """Install a helm chart
85
86 :param cluster_uuid str: The UUID of the cluster to install to
87 :param kdu_model str: chart/reference (string), which can be either
88 of these options:
89 - a name of chart available via the repos known by OSM
90 (e.g. stable/openldap, stable/openldap:1.2.4)
91 - a path to a packaged chart (e.g. mychart.tgz)
92 - a path to an unpacked chart directory or a URL (e.g. mychart)
93 :param kdu_instance: Kdu instance name
94 :param atomic bool: If set, waits until the model is active and resets
95 the cluster on failure.
96 :param timeout int: The time, in seconds, to wait for the install
97 to finish
98 :param params dict: Key-value pairs of instantiation parameters
99 :param kdu_name: Name of the KDU instance to be installed
100 :param namespace: K8s namespace to use for the KDU instance
101
102 :param kwargs: Additional parameters (None yet)
103
104 :return: True if successful
105 """
106
107 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
108
109 # sync local dir
110 self.fs.sync(from_path=cluster_uuid)
111
112 # init env, paths
113 paths, env = self._init_paths_env(
114 cluster_name=cluster_uuid, create_if_not_exist=True
115 )
116
117 # for helm3 if namespace does not exist must create it
118 if namespace and namespace != "kube-system":
119 if not await self._namespace_exists(cluster_uuid, namespace):
120 try:
121 # TODO: refactor to use kubernetes API client
122 await self._create_namespace(cluster_uuid, namespace)
123 except Exception as e:
124 if not await self._namespace_exists(cluster_uuid, namespace):
125 err_msg = (
126 "namespace {} does not exist in cluster_id {} "
127 "error message: ".format(namespace, e)
128 )
129 self.log.error(err_msg)
130 raise K8sException(err_msg)
131
132 await self._install_impl(
133 cluster_uuid,
134 kdu_model,
135 paths,
136 env,
137 kdu_instance,
138 atomic=atomic,
139 timeout=timeout,
140 params=params,
141 db_dict=db_dict,
142 kdu_name=kdu_name,
143 namespace=namespace,
144 )
145
146 # sync fs
147 self.fs.reverse_sync(from_path=cluster_uuid)
148
149 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
150 return True
151
152 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
153 self.log.debug(
154 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
155 )
156
157 return await self._exec_inspect_command(
158 inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
159 )
160
161 """
162 ####################################################################################
163 ################################### P R I V A T E ##################################
164 ####################################################################################
165 """
166
167 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
168 """
169 Creates and returns base cluster and kube dirs and returns them.
170 Also created helm3 dirs according to new directory specification, paths are
171 returned and also environment variables that must be provided to execute commands
172
173 Helm 3 directory specification uses XDG categories for variable support:
174 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
175 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
176 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
177
178 The variables assigned for this paths are:
179 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
180 $HELM_PATH_DATA but looking and helm env the variable names are different)
181 - Cache: $HELM_CACHE_HOME
182 - Config: $HELM_CONFIG_HOME
183 - Data: $HELM_DATA_HOME
184 - helm kubeconfig: $KUBECONFIG
185
186 :param cluster_name: cluster_name
187 :return: Dictionary with config_paths and dictionary with helm environment variables
188 """
189
190 base = self.fs.path
191 if base.endswith("/") or base.endswith("\\"):
192 base = base[:-1]
193
194 # base dir for cluster
195 cluster_dir = base + "/" + cluster_name
196
197 # kube dir
198 kube_dir = cluster_dir + "/" + ".kube"
199 if create_if_not_exist and not os.path.exists(kube_dir):
200 self.log.debug("Creating dir {}".format(kube_dir))
201 os.makedirs(kube_dir)
202
203 helm_path_cache = cluster_dir + "/.cache/helm"
204 if create_if_not_exist and not os.path.exists(helm_path_cache):
205 self.log.debug("Creating dir {}".format(helm_path_cache))
206 os.makedirs(helm_path_cache)
207
208 helm_path_config = cluster_dir + "/.config/helm"
209 if create_if_not_exist and not os.path.exists(helm_path_config):
210 self.log.debug("Creating dir {}".format(helm_path_config))
211 os.makedirs(helm_path_config)
212
213 helm_path_data = cluster_dir + "/.local/share/helm"
214 if create_if_not_exist and not os.path.exists(helm_path_data):
215 self.log.debug("Creating dir {}".format(helm_path_data))
216 os.makedirs(helm_path_data)
217
218 config_filename = kube_dir + "/config"
219
220 # 2 - Prepare dictionary with paths
221 paths = {
222 "kube_dir": kube_dir,
223 "kube_config": config_filename,
224 "cluster_dir": cluster_dir,
225 }
226
227 # 3 - Prepare environment variables
228 env = {
229 "HELM_CACHE_HOME": helm_path_cache,
230 "HELM_CONFIG_HOME": helm_path_config,
231 "HELM_DATA_HOME": helm_path_data,
232 "KUBECONFIG": config_filename,
233 }
234
235 for file_name, file in paths.items():
236 if "dir" in file_name and not os.path.exists(file):
237 err_msg = "{} dir does not exist".format(file)
238 self.log.error(err_msg)
239 raise K8sException(err_msg)
240
241 return paths, env
242
243 async def _namespace_exists(self, cluster_id, namespace) -> bool:
244 self.log.debug(
245 "checking if namespace {} exists cluster_id {}".format(
246 namespace, cluster_id
247 )
248 )
249 namespaces = await self._get_namespaces(cluster_id)
250 return namespace in namespaces if namespaces else False
251
252 async def _get_namespaces(self, cluster_id: str):
253 self.log.debug("get namespaces cluster_id {}".format(cluster_id))
254
255 # init config, env
256 paths, env = self._init_paths_env(
257 cluster_name=cluster_id, create_if_not_exist=True
258 )
259
260 command = "{} --kubeconfig={} get namespaces -o=yaml".format(
261 self.kubectl_command, paths["kube_config"]
262 )
263 output, _rc = await self._local_async_exec(
264 command=command, raise_exception_on_error=True, env=env
265 )
266
267 data = yaml.load(output, Loader=yaml.SafeLoader)
268 namespaces = [item["metadata"]["name"] for item in data["items"]]
269 self.log.debug(f"namespaces {namespaces}")
270
271 return namespaces
272
273 async def _create_namespace(self, cluster_id: str, namespace: str):
274 self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
275
276 # init config, env
277 paths, env = self._init_paths_env(
278 cluster_name=cluster_id, create_if_not_exist=True
279 )
280
281 command = "{} --kubeconfig={} create namespace {}".format(
282 self.kubectl_command, paths["kube_config"], namespace
283 )
284 _, _rc = await self._local_async_exec(
285 command=command, raise_exception_on_error=True, env=env
286 )
287 self.log.debug(f"namespace {namespace} created")
288
289 return _rc
290
291 async def _get_services(
292 self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str
293 ):
294 # init config, env
295 paths, env = self._init_paths_env(
296 cluster_name=cluster_id, create_if_not_exist=True
297 )
298
299 command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
300 kubeconfig, self._helm_command, kdu_instance, namespace
301 )
302 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
303 output, _rc = await self._local_async_exec_pipe(
304 command1, command2, env=env, raise_exception_on_error=True
305 )
306 services = self._parse_services(output)
307
308 return services
309
310 async def _cluster_init(self, cluster_id, namespace, paths, env):
311 """
312 Implements the helm version dependent cluster initialization:
313 For helm3 it creates the namespace if it is not created
314 """
315 if namespace != "kube-system":
316 namespaces = await self._get_namespaces(cluster_id)
317 if namespace not in namespaces:
318 # TODO: refactor to use kubernetes API client
319 await self._create_namespace(cluster_id, namespace)
320
321 repo_list = await self.repo_list(cluster_id)
322 stable_repo = [repo for repo in repo_list if repo["name"] == "stable"]
323 if not stable_repo and self._stable_repo_url:
324 await self.repo_add(cluster_id, "stable", self._stable_repo_url)
325
326 # Returns False as no software needs to be uninstalled
327 return False
328
329 async def _uninstall_sw(self, cluster_id: str, namespace: str):
330 # nothing to do to uninstall sw
331 pass
332
333 async def _instances_list(self, cluster_id: str):
334 # init paths, env
335 paths, env = self._init_paths_env(
336 cluster_name=cluster_id, create_if_not_exist=True
337 )
338
339 command = "{} list --all-namespaces --output yaml".format(self._helm_command)
340 output, _rc = await self._local_async_exec(
341 command=command, raise_exception_on_error=True, env=env
342 )
343
344 if output and len(output) > 0:
345 self.log.debug("instances list output: {}".format(output))
346 return yaml.load(output, Loader=yaml.SafeLoader)
347 else:
348 return []
349
350 def _get_inspect_command(
351 self, show_command: str, kdu_model: str, repo_str: str, version: str
352 ):
353 """Generates the command to obtain the information about an Helm Chart package
354 (´helm show ...´ command)
355
356 Args:
357 show_command: the second part of the command (`helm show <show_command>`)
358 kdu_model: The name or path of an Helm Chart
359 repo_url: Helm Chart repository url
360 version: constraint with specific version of the Chart to use
361
362 Returns:
363 str: the generated Helm Chart command
364 """
365
366 inspect_command = "{} show {} {}{} {}".format(
367 self._helm_command, show_command, kdu_model, repo_str, version
368 )
369 return inspect_command
370
371 def _get_get_command(
372 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
373 ):
374 get_command = (
375 "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
376 kubeconfig, self._helm_command, get_command, kdu_instance, namespace
377 )
378 )
379 return get_command
380
381 async def _status_kdu(
382 self,
383 cluster_id: str,
384 kdu_instance: str,
385 namespace: str = None,
386 yaml_format: bool = False,
387 show_error_log: bool = False,
388 ) -> Union[str, dict]:
389 self.log.debug(
390 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
391 )
392
393 if not namespace:
394 namespace = "kube-system"
395
396 # init config, env
397 paths, env = self._init_paths_env(
398 cluster_name=cluster_id, create_if_not_exist=True
399 )
400 command = "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
401 paths["kube_config"], self._helm_command, kdu_instance, namespace
402 )
403
404 output, rc = await self._local_async_exec(
405 command=command,
406 raise_exception_on_error=True,
407 show_error_log=show_error_log,
408 env=env,
409 )
410
411 if yaml_format:
412 return str(output)
413
414 if rc != 0:
415 return None
416
417 data = yaml.load(output, Loader=yaml.SafeLoader)
418
419 # remove field 'notes' and manifest
420 try:
421 del data.get("info")["notes"]
422 except KeyError:
423 pass
424
425 # parse the manifest to a list of dictionaries
426 if "manifest" in data:
427 manifest_str = data.get("manifest")
428 manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
429
430 data["manifest"] = []
431 for doc in manifest_docs:
432 data["manifest"].append(doc)
433
434 return data
435
436 def _get_install_command(
437 self,
438 kdu_model: str,
439 kdu_instance: str,
440 namespace: str,
441 params_str: str,
442 version: str,
443 atomic: bool,
444 timeout: float,
445 kubeconfig: str,
446 ) -> str:
447 timeout_str = ""
448 if timeout:
449 timeout_str = "--timeout {}s".format(timeout)
450
451 # atomic
452 atomic_str = ""
453 if atomic:
454 atomic_str = "--atomic"
455 # namespace
456 namespace_str = ""
457 if namespace:
458 namespace_str = "--namespace {}".format(namespace)
459
460 # version
461 version_str = ""
462 if version:
463 version_str = "--version {}".format(version)
464
465 command = (
466 "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml "
467 "{params} {timeout} {ns} {model} {ver}".format(
468 kubeconfig=kubeconfig,
469 helm=self._helm_command,
470 name=kdu_instance,
471 atomic=atomic_str,
472 params=params_str,
473 timeout=timeout_str,
474 ns=namespace_str,
475 model=kdu_model,
476 ver=version_str,
477 )
478 )
479 return command
480
481 def _get_upgrade_scale_command(
482 self,
483 kdu_model: str,
484 kdu_instance: str,
485 namespace: str,
486 scale: int,
487 version: str,
488 atomic: bool,
489 replica_str: str,
490 timeout: float,
491 resource_name: str,
492 kubeconfig: str,
493 ) -> str:
494 """Generates the command to scale a Helm Chart release
495
496 Args:
497 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
498 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
499 namespace (str): Namespace where this KDU instance is deployed
500 scale (int): Scale count
501 version (str): Constraint with specific version of the Chart to use
502 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
503 The --wait flag will be set automatically if --atomic is used
504 replica_str (str): The key under resource_name key where the scale count is stored
505 timeout (float): The time, in seconds, to wait
506 resource_name (str): The KDU's resource to scale
507 kubeconfig (str): Kubeconfig file path
508
509 Returns:
510 str: command to scale a Helm Chart release
511 """
512
513 # scale
514 if resource_name:
515 scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
516 else:
517 scale_dict = {replica_str: scale}
518
519 scale_str = self._params_to_set_option(scale_dict)
520
521 return self._get_upgrade_command(
522 kdu_model=kdu_model,
523 kdu_instance=kdu_instance,
524 namespace=namespace,
525 params_str=scale_str,
526 version=version,
527 atomic=atomic,
528 timeout=timeout,
529 kubeconfig=kubeconfig,
530 )
531
532 def _get_upgrade_command(
533 self,
534 kdu_model: str,
535 kdu_instance: str,
536 namespace: str,
537 params_str: str,
538 version: str,
539 atomic: bool,
540 timeout: float,
541 kubeconfig: str,
542 force: bool = False,
543 ) -> str:
544 """Generates the command to upgrade a Helm Chart release
545
546 Args:
547 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
548 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
549 namespace (str): Namespace where this KDU instance is deployed
550 params_str (str): Params used to upgrade the Helm Chart release
551 version (str): Constraint with specific version of the Chart to use
552 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
553 The --wait flag will be set automatically if --atomic is used
554 timeout (float): The time, in seconds, to wait
555 kubeconfig (str): Kubeconfig file path
556 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
557 Returns:
558 str: command to upgrade a Helm Chart release
559 """
560
561 timeout_str = ""
562 if timeout:
563 timeout_str = "--timeout {}s".format(timeout)
564
565 # atomic
566 atomic_str = ""
567 if atomic:
568 atomic_str = "--atomic"
569
570 # force
571 force_str = ""
572 if force:
573 force_str = "--force "
574
575 # version
576 version_str = ""
577 if version:
578 version_str = "--version {}".format(version)
579
580 # namespace
581 namespace_str = ""
582 if namespace:
583 namespace_str = "--namespace {}".format(namespace)
584
585 command = (
586 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} {force}"
587 "--output yaml {params} {timeout} --reuse-values {ver}"
588 ).format(
589 kubeconfig=kubeconfig,
590 helm=self._helm_command,
591 name=kdu_instance,
592 namespace=namespace_str,
593 atomic=atomic_str,
594 force=force_str,
595 params=params_str,
596 timeout=timeout_str,
597 model=kdu_model,
598 ver=version_str,
599 )
600 return command
601
602 def _get_rollback_command(
603 self, kdu_instance: str, namespace: str, revision: float, kubeconfig: str
604 ) -> str:
605 return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
606 kubeconfig, self._helm_command, kdu_instance, revision, namespace
607 )
608
609 def _get_uninstall_command(
610 self, kdu_instance: str, namespace: str, kubeconfig: str
611 ) -> str:
612 return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
613 kubeconfig, self._helm_command, kdu_instance, namespace
614 )
615
616 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
617 repo_ids = []
618 cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
619 cluster = self.db.get_one("k8sclusters", cluster_filter)
620 if cluster:
621 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
622 return repo_ids
623 else:
624 raise K8sException(
625 "k8cluster with helm-id : {} not found".format(cluster_uuid)
626 )