Fix bug 2088 by quoting inputs for commands
[osm/N2VC.git] / n2vc / k8s_helm3_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 from typing import Union
23 from shlex import quote
24 import os
25 import yaml
26
27 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
28 from n2vc.exceptions import K8sException
29
30
31 class K8sHelm3Connector(K8sHelmBaseConnector):
32
33 """
34 ####################################################################################
35 ################################### P U B L I C ####################################
36 ####################################################################################
37 """
38
39 def __init__(
40 self,
41 fs: object,
42 db: object,
43 kubectl_command: str = "/usr/bin/kubectl",
44 helm_command: str = "/usr/bin/helm3",
45 log: object = None,
46 on_update_db=None,
47 ):
48 """
49 Initializes helm connector for helm v3
50
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
55 :param log: logger
56 :param on_update_db: callback called when k8s connector updates database
57 """
58
59 # parent class
60 K8sHelmBaseConnector.__init__(
61 self,
62 db=db,
63 log=log,
64 fs=fs,
65 kubectl_command=kubectl_command,
66 helm_command=helm_command,
67 on_update_db=on_update_db,
68 )
69
70 self.log.info("K8S Helm3 connector initialized")
71
72 async def install(
73 self,
74 cluster_uuid: str,
75 kdu_model: str,
76 kdu_instance: str,
77 atomic: bool = True,
78 timeout: float = 300,
79 params: dict = None,
80 db_dict: dict = None,
81 kdu_name: str = None,
82 namespace: str = None,
83 **kwargs,
84 ):
85 """Install a helm chart
86
87 :param cluster_uuid str: The UUID of the cluster to install to
88 :param kdu_model str: chart/reference (string), which can be either
89 of these options:
90 - a name of chart available via the repos known by OSM
91 (e.g. stable/openldap, stable/openldap:1.2.4)
92 - a path to a packaged chart (e.g. mychart.tgz)
93 - a path to an unpacked chart directory or a URL (e.g. mychart)
94 :param kdu_instance: Kdu instance name
95 :param atomic bool: If set, waits until the model is active and resets
96 the cluster on failure.
97 :param timeout int: The time, in seconds, to wait for the install
98 to finish
99 :param params dict: Key-value pairs of instantiation parameters
100 :param kdu_name: Name of the KDU instance to be installed
101 :param namespace: K8s namespace to use for the KDU instance
102
103 :param kwargs: Additional parameters (None yet)
104
105 :return: True if successful
106 """
107
108 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
109
110 # sync local dir
111 self.fs.sync(from_path=cluster_uuid)
112
113 # init env, paths
114 paths, env = self._init_paths_env(
115 cluster_name=cluster_uuid, create_if_not_exist=True
116 )
117
118 # for helm3 if namespace does not exist must create it
119 if namespace and namespace != "kube-system":
120 if not await self._namespace_exists(cluster_uuid, namespace):
121 try:
122 await self._create_namespace(cluster_uuid, namespace)
123 except Exception as e:
124 if not await self._namespace_exists(cluster_uuid, namespace):
125 err_msg = (
126 "namespace {} does not exist in cluster_id {} "
127 "error message: ".format(namespace, e)
128 )
129 self.log.error(err_msg)
130 raise K8sException(err_msg)
131
132 await self._install_impl(
133 cluster_uuid,
134 kdu_model,
135 paths,
136 env,
137 kdu_instance,
138 atomic=atomic,
139 timeout=timeout,
140 params=params,
141 db_dict=db_dict,
142 kdu_name=kdu_name,
143 namespace=namespace,
144 )
145
146 # sync fs
147 self.fs.reverse_sync(from_path=cluster_uuid)
148
149 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
150 return True
151
152 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
153 self.log.debug(
154 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
155 )
156
157 return await self._exec_inspect_command(
158 inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
159 )
160
161 """
162 ####################################################################################
163 ################################### P R I V A T E ##################################
164 ####################################################################################
165 """
166
167 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
168 """
169 Creates and returns base cluster and kube dirs and returns them.
170 Also created helm3 dirs according to new directory specification, paths are
171 returned and also environment variables that must be provided to execute commands
172
173 Helm 3 directory specification uses XDG categories for variable support:
174 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
175 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
176 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
177
178 The variables assigned for this paths are:
179 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
180 $HELM_PATH_DATA but looking and helm env the variable names are different)
181 - Cache: $HELM_CACHE_HOME
182 - Config: $HELM_CONFIG_HOME
183 - Data: $HELM_DATA_HOME
184 - helm kubeconfig: $KUBECONFIG
185
186 :param cluster_name: cluster_name
187 :return: Dictionary with config_paths and dictionary with helm environment variables
188 """
189
190 base = self.fs.path
191 if base.endswith("/") or base.endswith("\\"):
192 base = base[:-1]
193
194 # base dir for cluster
195 cluster_dir = base + "/" + cluster_name
196
197 # kube dir
198 kube_dir = cluster_dir + "/" + ".kube"
199 if create_if_not_exist and not os.path.exists(kube_dir):
200 self.log.debug("Creating dir {}".format(kube_dir))
201 os.makedirs(kube_dir)
202
203 helm_path_cache = cluster_dir + "/.cache/helm"
204 if create_if_not_exist and not os.path.exists(helm_path_cache):
205 self.log.debug("Creating dir {}".format(helm_path_cache))
206 os.makedirs(helm_path_cache)
207
208 helm_path_config = cluster_dir + "/.config/helm"
209 if create_if_not_exist and not os.path.exists(helm_path_config):
210 self.log.debug("Creating dir {}".format(helm_path_config))
211 os.makedirs(helm_path_config)
212
213 helm_path_data = cluster_dir + "/.local/share/helm"
214 if create_if_not_exist and not os.path.exists(helm_path_data):
215 self.log.debug("Creating dir {}".format(helm_path_data))
216 os.makedirs(helm_path_data)
217
218 config_filename = kube_dir + "/config"
219
220 # 2 - Prepare dictionary with paths
221 paths = {
222 "kube_dir": kube_dir,
223 "kube_config": config_filename,
224 "cluster_dir": cluster_dir,
225 }
226
227 # 3 - Prepare environment variables
228 env = {
229 "HELM_CACHE_HOME": helm_path_cache,
230 "HELM_CONFIG_HOME": helm_path_config,
231 "HELM_DATA_HOME": helm_path_data,
232 "KUBECONFIG": config_filename,
233 }
234
235 for file_name, file in paths.items():
236 if "dir" in file_name and not os.path.exists(file):
237 err_msg = "{} dir does not exist".format(file)
238 self.log.error(err_msg)
239 raise K8sException(err_msg)
240
241 return paths, env
242
243 async def _namespace_exists(self, cluster_id, namespace) -> bool:
244 self.log.debug(
245 "checking if namespace {} exists cluster_id {}".format(
246 namespace, cluster_id
247 )
248 )
249 namespaces = await self._get_namespaces(cluster_id)
250 return namespace in namespaces if namespaces else False
251
252 async def _get_namespaces(self, cluster_id: str):
253 self.log.debug("get namespaces cluster_id {}".format(cluster_id))
254
255 # init config, env
256 paths, env = self._init_paths_env(
257 cluster_name=cluster_id, create_if_not_exist=True
258 )
259
260 command = "{} --kubeconfig={} get namespaces -o=yaml".format(
261 self.kubectl_command, quote(paths["kube_config"])
262 )
263 output, _rc = await self._local_async_exec(
264 command=command, raise_exception_on_error=True, env=env
265 )
266
267 data = yaml.load(output, Loader=yaml.SafeLoader)
268 namespaces = [item["metadata"]["name"] for item in data["items"]]
269 self.log.debug(f"namespaces {namespaces}")
270
271 return namespaces
272
273 async def _create_namespace(self, cluster_id: str, namespace: str):
274 self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
275
276 # init config, env
277 paths, env = self._init_paths_env(
278 cluster_name=cluster_id, create_if_not_exist=True
279 )
280
281 command = "{} --kubeconfig={} create namespace {}".format(
282 self.kubectl_command, quote(paths["kube_config"]), quote(namespace)
283 )
284 _, _rc = await self._local_async_exec(
285 command=command, raise_exception_on_error=True, env=env
286 )
287 self.log.debug(f"namespace {namespace} created")
288
289 return _rc
290
291 async def _get_services(
292 self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str
293 ):
294 # init config, env
295 paths, env = self._init_paths_env(
296 cluster_name=cluster_id, create_if_not_exist=True
297 )
298
299 command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
300 kubeconfig, self._helm_command, quote(kdu_instance), quote(namespace)
301 )
302 command2 = "{} get --namespace={} -f -".format(
303 self.kubectl_command, quote(namespace)
304 )
305 output, _rc = await self._local_async_exec_pipe(
306 command1, command2, env=env, raise_exception_on_error=True
307 )
308 services = self._parse_services(output)
309
310 return services
311
312 async def _cluster_init(self, cluster_id, namespace, paths, env):
313 """
314 Implements the helm version dependent cluster initialization:
315 For helm3 it creates the namespace if it is not created
316 """
317 if namespace != "kube-system":
318 namespaces = await self._get_namespaces(cluster_id)
319 if namespace not in namespaces:
320 await self._create_namespace(cluster_id, namespace)
321
322 repo_list = await self.repo_list(cluster_id)
323 stable_repo = [repo for repo in repo_list if repo["name"] == "stable"]
324 if not stable_repo and self._stable_repo_url:
325 await self.repo_add(cluster_id, "stable", self._stable_repo_url)
326
327 # Returns False as no software needs to be uninstalled
328 return False
329
330 async def _uninstall_sw(self, cluster_id: str, namespace: str):
331 # nothing to do to uninstall sw
332 pass
333
334 async def _instances_list(self, cluster_id: str):
335 # init paths, env
336 paths, env = self._init_paths_env(
337 cluster_name=cluster_id, create_if_not_exist=True
338 )
339
340 command = "{} list --all-namespaces --output yaml".format(self._helm_command)
341 output, _rc = await self._local_async_exec(
342 command=command, raise_exception_on_error=True, env=env
343 )
344
345 if output and len(output) > 0:
346 self.log.debug("instances list output: {}".format(output))
347 return yaml.load(output, Loader=yaml.SafeLoader)
348 else:
349 return []
350
351 def _get_inspect_command(
352 self, show_command: str, kdu_model: str, repo_str: str, version: str
353 ):
354 """Generates the command to obtain the information about an Helm Chart package
355 (´helm show ...´ command)
356
357 Args:
358 show_command: the second part of the command (`helm show <show_command>`)
359 kdu_model: The name or path of an Helm Chart
360 repo_url: Helm Chart repository url
361 version: constraint with specific version of the Chart to use
362
363 Returns:
364 str: the generated Helm Chart command
365 """
366
367 inspect_command = "{} show {} {}{} {}".format(
368 self._helm_command, show_command, quote(kdu_model), repo_str, version
369 )
370 return inspect_command
371
372 def _get_get_command(
373 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
374 ):
375 get_command = (
376 "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
377 kubeconfig,
378 self._helm_command,
379 get_command,
380 quote(kdu_instance),
381 quote(namespace),
382 )
383 )
384 return get_command
385
386 async def _status_kdu(
387 self,
388 cluster_id: str,
389 kdu_instance: str,
390 namespace: str = None,
391 yaml_format: bool = False,
392 show_error_log: bool = False,
393 ) -> Union[str, dict]:
394 self.log.debug(
395 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
396 )
397
398 if not namespace:
399 namespace = "kube-system"
400
401 # init config, env
402 paths, env = self._init_paths_env(
403 cluster_name=cluster_id, create_if_not_exist=True
404 )
405 command = "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
406 paths["kube_config"],
407 self._helm_command,
408 quote(kdu_instance),
409 quote(namespace),
410 )
411
412 output, rc = await self._local_async_exec(
413 command=command,
414 raise_exception_on_error=True,
415 show_error_log=show_error_log,
416 env=env,
417 )
418
419 if yaml_format:
420 return str(output)
421
422 if rc != 0:
423 return None
424
425 data = yaml.load(output, Loader=yaml.SafeLoader)
426
427 # remove field 'notes' and manifest
428 try:
429 del data.get("info")["notes"]
430 except KeyError:
431 pass
432
433 # parse the manifest to a list of dictionaries
434 if "manifest" in data:
435 manifest_str = data.get("manifest")
436 manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
437
438 data["manifest"] = []
439 for doc in manifest_docs:
440 data["manifest"].append(doc)
441
442 return data
443
444 def _get_install_command(
445 self,
446 kdu_model: str,
447 kdu_instance: str,
448 namespace: str,
449 params_str: str,
450 version: str,
451 atomic: bool,
452 timeout: float,
453 kubeconfig: str,
454 ) -> str:
455 timeout_str = ""
456 if timeout:
457 timeout_str = "--timeout {}s".format(timeout)
458
459 # atomic
460 atomic_str = ""
461 if atomic:
462 atomic_str = "--atomic"
463 # namespace
464 namespace_str = ""
465 if namespace:
466 namespace_str = "--namespace {}".format(quote(namespace))
467
468 # version
469 version_str = ""
470 if version:
471 version_str = "--version {}".format(version)
472
473 command = (
474 "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml "
475 "{params} {timeout} {ns} {model} {ver}".format(
476 kubeconfig=kubeconfig,
477 helm=self._helm_command,
478 name=quote(kdu_instance),
479 atomic=atomic_str,
480 params=params_str,
481 timeout=timeout_str,
482 ns=namespace_str,
483 model=quote(kdu_model),
484 ver=version_str,
485 )
486 )
487 return command
488
489 def _get_upgrade_scale_command(
490 self,
491 kdu_model: str,
492 kdu_instance: str,
493 namespace: str,
494 scale: int,
495 version: str,
496 atomic: bool,
497 replica_str: str,
498 timeout: float,
499 resource_name: str,
500 kubeconfig: str,
501 ) -> str:
502 """Generates the command to scale a Helm Chart release
503
504 Args:
505 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
506 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
507 namespace (str): Namespace where this KDU instance is deployed
508 scale (int): Scale count
509 version (str): Constraint with specific version of the Chart to use
510 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
511 The --wait flag will be set automatically if --atomic is used
512 replica_str (str): The key under resource_name key where the scale count is stored
513 timeout (float): The time, in seconds, to wait
514 resource_name (str): The KDU's resource to scale
515 kubeconfig (str): Kubeconfig file path
516
517 Returns:
518 str: command to scale a Helm Chart release
519 """
520
521 # scale
522 if resource_name:
523 scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
524 else:
525 scale_dict = {replica_str: scale}
526
527 scale_str = self._params_to_set_option(scale_dict)
528
529 return self._get_upgrade_command(
530 kdu_model=kdu_model,
531 kdu_instance=kdu_instance,
532 namespace=namespace,
533 params_str=scale_str,
534 version=version,
535 atomic=atomic,
536 timeout=timeout,
537 kubeconfig=kubeconfig,
538 )
539
540 def _get_upgrade_command(
541 self,
542 kdu_model: str,
543 kdu_instance: str,
544 namespace: str,
545 params_str: str,
546 version: str,
547 atomic: bool,
548 timeout: float,
549 kubeconfig: str,
550 ) -> str:
551 """Generates the command to upgrade a Helm Chart release
552
553 Args:
554 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
555 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
556 namespace (str): Namespace where this KDU instance is deployed
557 params_str (str): Params used to upgrade the Helm Chart release
558 version (str): Constraint with specific version of the Chart to use
559 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
560 The --wait flag will be set automatically if --atomic is used
561 timeout (float): The time, in seconds, to wait
562 kubeconfig (str): Kubeconfig file path
563
564 Returns:
565 str: command to upgrade a Helm Chart release
566 """
567
568 timeout_str = ""
569 if timeout:
570 timeout_str = "--timeout {}s".format(timeout)
571
572 # atomic
573 atomic_str = ""
574 if atomic:
575 atomic_str = "--atomic"
576
577 # version
578 version_str = ""
579 if version:
580 version_str = "--version {}".format(quote(version))
581
582 # namespace
583 namespace_str = ""
584 if namespace:
585 namespace_str = "--namespace {}".format(quote(namespace))
586
587 command = (
588 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} "
589 "--output yaml {params} {timeout} --reuse-values {ver}"
590 ).format(
591 kubeconfig=kubeconfig,
592 helm=self._helm_command,
593 name=quote(kdu_instance),
594 namespace=namespace_str,
595 atomic=atomic_str,
596 params=params_str,
597 timeout=timeout_str,
598 model=quote(kdu_model),
599 ver=version_str,
600 )
601 return command
602
603 def _get_rollback_command(
604 self, kdu_instance: str, namespace: str, revision: float, kubeconfig: str
605 ) -> str:
606 return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
607 kubeconfig,
608 self._helm_command,
609 quote(kdu_instance),
610 revision,
611 quote(namespace),
612 )
613
614 def _get_uninstall_command(
615 self, kdu_instance: str, namespace: str, kubeconfig: str
616 ) -> str:
617 return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
618 kubeconfig, self._helm_command, quote(kdu_instance), quote(namespace)
619 )
620
621 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
622 repo_ids = []
623 cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
624 cluster = self.db.get_one("k8sclusters", cluster_filter)
625 if cluster:
626 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
627 return repo_ids
628 else:
629 raise K8sException(
630 "k8cluster with helm-id : {} not found".format(cluster_uuid)
631 )