Updating python dependencies
[osm/N2VC.git] / n2vc / k8s_helm3_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 from typing import Union
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelm3Connector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm3",
44 log: object = None,
45 on_update_db=None,
46 ):
47 """
48 Initializes helm connector for helm v3
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(
60 self,
61 db=db,
62 log=log,
63 fs=fs,
64 kubectl_command=kubectl_command,
65 helm_command=helm_command,
66 on_update_db=on_update_db,
67 )
68
69 self.log.info("K8S Helm3 connector initialized")
70
71 async def install(
72 self,
73 cluster_uuid: str,
74 kdu_model: str,
75 kdu_instance: str,
76 atomic: bool = True,
77 timeout: float = 300,
78 params: dict = None,
79 db_dict: dict = None,
80 kdu_name: str = None,
81 namespace: str = None,
82 **kwargs,
83 ):
84 """Install a helm chart
85
86 :param cluster_uuid str: The UUID of the cluster to install to
87 :param kdu_model str: chart/reference (string), which can be either
88 of these options:
89 - a name of chart available via the repos known by OSM
90 (e.g. stable/openldap, stable/openldap:1.2.4)
91 - a path to a packaged chart (e.g. mychart.tgz)
92 - a path to an unpacked chart directory or a URL (e.g. mychart)
93 :param kdu_instance: Kdu instance name
94 :param atomic bool: If set, waits until the model is active and resets
95 the cluster on failure.
96 :param timeout int: The time, in seconds, to wait for the install
97 to finish
98 :param params dict: Key-value pairs of instantiation parameters
99 :param kdu_name: Name of the KDU instance to be installed
100 :param namespace: K8s namespace to use for the KDU instance
101
102 :param kwargs: Additional parameters (None yet)
103
104 :return: True if successful
105 """
106
107 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
108
109 # sync local dir
110 self.fs.sync(from_path=cluster_uuid)
111
112 # init env, paths
113 paths, env = self._init_paths_env(
114 cluster_name=cluster_uuid, create_if_not_exist=True
115 )
116
117 # for helm3 if namespace does not exist must create it
118 if namespace and namespace != "kube-system":
119 if not await self._namespace_exists(cluster_uuid, namespace):
120 try:
121 await self._create_namespace(cluster_uuid, namespace)
122 except Exception as e:
123 if not await self._namespace_exists(cluster_uuid, namespace):
124 err_msg = (
125 "namespace {} does not exist in cluster_id {} "
126 "error message: ".format(namespace, e)
127 )
128 self.log.error(err_msg)
129 raise K8sException(err_msg)
130
131 await self._install_impl(
132 cluster_uuid,
133 kdu_model,
134 paths,
135 env,
136 kdu_instance,
137 atomic=atomic,
138 timeout=timeout,
139 params=params,
140 db_dict=db_dict,
141 kdu_name=kdu_name,
142 namespace=namespace,
143 )
144
145 # sync fs
146 self.fs.reverse_sync(from_path=cluster_uuid)
147
148 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
149 return True
150
151 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
152
153 self.log.debug(
154 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
155 )
156
157 return await self._exec_inspect_command(
158 inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
159 )
160
161 """
162 ####################################################################################
163 ################################### P R I V A T E ##################################
164 ####################################################################################
165 """
166
167 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
168 """
169 Creates and returns base cluster and kube dirs and returns them.
170 Also created helm3 dirs according to new directory specification, paths are
171 returned and also environment variables that must be provided to execute commands
172
173 Helm 3 directory specification uses XDG categories for variable support:
174 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
175 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
176 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
177
178 The variables assigned for this paths are:
179 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
180 $HELM_PATH_DATA but looking and helm env the variable names are different)
181 - Cache: $HELM_CACHE_HOME
182 - Config: $HELM_CONFIG_HOME
183 - Data: $HELM_DATA_HOME
184 - helm kubeconfig: $KUBECONFIG
185
186 :param cluster_name: cluster_name
187 :return: Dictionary with config_paths and dictionary with helm environment variables
188 """
189
190 base = self.fs.path
191 if base.endswith("/") or base.endswith("\\"):
192 base = base[:-1]
193
194 # base dir for cluster
195 cluster_dir = base + "/" + cluster_name
196
197 # kube dir
198 kube_dir = cluster_dir + "/" + ".kube"
199 if create_if_not_exist and not os.path.exists(kube_dir):
200 self.log.debug("Creating dir {}".format(kube_dir))
201 os.makedirs(kube_dir)
202
203 helm_path_cache = cluster_dir + "/.cache/helm"
204 if create_if_not_exist and not os.path.exists(helm_path_cache):
205 self.log.debug("Creating dir {}".format(helm_path_cache))
206 os.makedirs(helm_path_cache)
207
208 helm_path_config = cluster_dir + "/.config/helm"
209 if create_if_not_exist and not os.path.exists(helm_path_config):
210 self.log.debug("Creating dir {}".format(helm_path_config))
211 os.makedirs(helm_path_config)
212
213 helm_path_data = cluster_dir + "/.local/share/helm"
214 if create_if_not_exist and not os.path.exists(helm_path_data):
215 self.log.debug("Creating dir {}".format(helm_path_data))
216 os.makedirs(helm_path_data)
217
218 config_filename = kube_dir + "/config"
219
220 # 2 - Prepare dictionary with paths
221 paths = {
222 "kube_dir": kube_dir,
223 "kube_config": config_filename,
224 "cluster_dir": cluster_dir,
225 }
226
227 # 3 - Prepare environment variables
228 env = {
229 "HELM_CACHE_HOME": helm_path_cache,
230 "HELM_CONFIG_HOME": helm_path_config,
231 "HELM_DATA_HOME": helm_path_data,
232 "KUBECONFIG": config_filename,
233 }
234
235 for file_name, file in paths.items():
236 if "dir" in file_name and not os.path.exists(file):
237 err_msg = "{} dir does not exist".format(file)
238 self.log.error(err_msg)
239 raise K8sException(err_msg)
240
241 return paths, env
242
243 async def _namespace_exists(self, cluster_id, namespace) -> bool:
244 self.log.debug(
245 "checking if namespace {} exists cluster_id {}".format(
246 namespace, cluster_id
247 )
248 )
249 namespaces = await self._get_namespaces(cluster_id)
250 return namespace in namespaces if namespaces else False
251
252 async def _get_namespaces(self, cluster_id: str):
253
254 self.log.debug("get namespaces cluster_id {}".format(cluster_id))
255
256 # init config, env
257 paths, env = self._init_paths_env(
258 cluster_name=cluster_id, create_if_not_exist=True
259 )
260
261 command = "{} --kubeconfig={} get namespaces -o=yaml".format(
262 self.kubectl_command, paths["kube_config"]
263 )
264 output, _rc = await self._local_async_exec(
265 command=command, raise_exception_on_error=True, env=env
266 )
267
268 data = yaml.load(output, Loader=yaml.SafeLoader)
269 namespaces = [item["metadata"]["name"] for item in data["items"]]
270 self.log.debug(f"namespaces {namespaces}")
271
272 return namespaces
273
274 async def _create_namespace(self, cluster_id: str, namespace: str):
275
276 self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
277
278 # init config, env
279 paths, env = self._init_paths_env(
280 cluster_name=cluster_id, create_if_not_exist=True
281 )
282
283 command = "{} --kubeconfig={} create namespace {}".format(
284 self.kubectl_command, paths["kube_config"], namespace
285 )
286 _, _rc = await self._local_async_exec(
287 command=command, raise_exception_on_error=True, env=env
288 )
289 self.log.debug(f"namespace {namespace} created")
290
291 return _rc
292
293 async def _get_services(
294 self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str
295 ):
296
297 # init config, env
298 paths, env = self._init_paths_env(
299 cluster_name=cluster_id, create_if_not_exist=True
300 )
301
302 command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
303 kubeconfig, self._helm_command, kdu_instance, namespace
304 )
305 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
306 output, _rc = await self._local_async_exec_pipe(
307 command1, command2, env=env, raise_exception_on_error=True
308 )
309 services = self._parse_services(output)
310
311 return services
312
313 async def _cluster_init(self, cluster_id, namespace, paths, env):
314 """
315 Implements the helm version dependent cluster initialization:
316 For helm3 it creates the namespace if it is not created
317 """
318 if namespace != "kube-system":
319 namespaces = await self._get_namespaces(cluster_id)
320 if namespace not in namespaces:
321 await self._create_namespace(cluster_id, namespace)
322
323 repo_list = await self.repo_list(cluster_id)
324 stable_repo = [repo for repo in repo_list if repo["name"] == "stable"]
325 if not stable_repo and self._stable_repo_url:
326 await self.repo_add(cluster_id, "stable", self._stable_repo_url)
327
328 # Returns False as no software needs to be uninstalled
329 return False
330
331 async def _uninstall_sw(self, cluster_id: str, namespace: str):
332 # nothing to do to uninstall sw
333 pass
334
335 async def _instances_list(self, cluster_id: str):
336
337 # init paths, env
338 paths, env = self._init_paths_env(
339 cluster_name=cluster_id, create_if_not_exist=True
340 )
341
342 command = "{} list --all-namespaces --output yaml".format(self._helm_command)
343 output, _rc = await self._local_async_exec(
344 command=command, raise_exception_on_error=True, env=env
345 )
346
347 if output and len(output) > 0:
348 self.log.debug("instances list output: {}".format(output))
349 return yaml.load(output, Loader=yaml.SafeLoader)
350 else:
351 return []
352
353 def _get_inspect_command(
354 self, show_command: str, kdu_model: str, repo_str: str, version: str
355 ):
356 """Generates the command to obtain the information about an Helm Chart package
357 (┬┤helm show ...┬┤ command)
358
359 Args:
360 show_command: the second part of the command (`helm show <show_command>`)
361 kdu_model: The name or path of an Helm Chart
362 repo_url: Helm Chart repository url
363 version: constraint with specific version of the Chart to use
364
365 Returns:
366 str: the generated Helm Chart command
367 """
368
369 inspect_command = "{} show {} {}{} {}".format(
370 self._helm_command, show_command, kdu_model, repo_str, version
371 )
372 return inspect_command
373
374 def _get_get_command(
375 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
376 ):
377 get_command = (
378 "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
379 kubeconfig, self._helm_command, get_command, kdu_instance, namespace
380 )
381 )
382 return get_command
383
384 async def _status_kdu(
385 self,
386 cluster_id: str,
387 kdu_instance: str,
388 namespace: str = None,
389 yaml_format: bool = False,
390 show_error_log: bool = False,
391 ) -> Union[str, dict]:
392
393 self.log.debug(
394 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
395 )
396
397 if not namespace:
398 namespace = "kube-system"
399
400 # init config, env
401 paths, env = self._init_paths_env(
402 cluster_name=cluster_id, create_if_not_exist=True
403 )
404 command = "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
405 paths["kube_config"], self._helm_command, kdu_instance, namespace
406 )
407
408 output, rc = await self._local_async_exec(
409 command=command,
410 raise_exception_on_error=True,
411 show_error_log=show_error_log,
412 env=env,
413 )
414
415 if yaml_format:
416 return str(output)
417
418 if rc != 0:
419 return None
420
421 data = yaml.load(output, Loader=yaml.SafeLoader)
422
423 # remove field 'notes' and manifest
424 try:
425 del data.get("info")["notes"]
426 except KeyError:
427 pass
428
429 # parse the manifest to a list of dictionaries
430 if "manifest" in data:
431 manifest_str = data.get("manifest")
432 manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
433
434 data["manifest"] = []
435 for doc in manifest_docs:
436 data["manifest"].append(doc)
437
438 return data
439
440 def _get_install_command(
441 self,
442 kdu_model: str,
443 kdu_instance: str,
444 namespace: str,
445 params_str: str,
446 version: str,
447 atomic: bool,
448 timeout: float,
449 kubeconfig: str,
450 ) -> str:
451
452 timeout_str = ""
453 if timeout:
454 timeout_str = "--timeout {}s".format(timeout)
455
456 # atomic
457 atomic_str = ""
458 if atomic:
459 atomic_str = "--atomic"
460 # namespace
461 namespace_str = ""
462 if namespace:
463 namespace_str = "--namespace {}".format(namespace)
464
465 # version
466 version_str = ""
467 if version:
468 version_str = "--version {}".format(version)
469
470 command = (
471 "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml "
472 "{params} {timeout} {ns} {model} {ver}".format(
473 kubeconfig=kubeconfig,
474 helm=self._helm_command,
475 name=kdu_instance,
476 atomic=atomic_str,
477 params=params_str,
478 timeout=timeout_str,
479 ns=namespace_str,
480 model=kdu_model,
481 ver=version_str,
482 )
483 )
484 return command
485
486 def _get_upgrade_scale_command(
487 self,
488 kdu_model: str,
489 kdu_instance: str,
490 namespace: str,
491 scale: int,
492 version: str,
493 atomic: bool,
494 replica_str: str,
495 timeout: float,
496 resource_name: str,
497 kubeconfig: str,
498 ) -> str:
499 """Generates the command to scale a Helm Chart release
500
501 Args:
502 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
503 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
504 namespace (str): Namespace where this KDU instance is deployed
505 scale (int): Scale count
506 version (str): Constraint with specific version of the Chart to use
507 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
508 The --wait flag will be set automatically if --atomic is used
509 replica_str (str): The key under resource_name key where the scale count is stored
510 timeout (float): The time, in seconds, to wait
511 resource_name (str): The KDU's resource to scale
512 kubeconfig (str): Kubeconfig file path
513
514 Returns:
515 str: command to scale a Helm Chart release
516 """
517
518 # scale
519 if resource_name:
520 scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
521 else:
522 scale_dict = {replica_str: scale}
523
524 scale_str = self._params_to_set_option(scale_dict)
525
526 return self._get_upgrade_command(
527 kdu_model=kdu_model,
528 kdu_instance=kdu_instance,
529 namespace=namespace,
530 params_str=scale_str,
531 version=version,
532 atomic=atomic,
533 timeout=timeout,
534 kubeconfig=kubeconfig,
535 )
536
537 def _get_upgrade_command(
538 self,
539 kdu_model: str,
540 kdu_instance: str,
541 namespace: str,
542 params_str: str,
543 version: str,
544 atomic: bool,
545 timeout: float,
546 kubeconfig: str,
547 force: bool = False,
548 ) -> str:
549 """Generates the command to upgrade a Helm Chart release
550
551 Args:
552 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
553 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
554 namespace (str): Namespace where this KDU instance is deployed
555 params_str (str): Params used to upgrade the Helm Chart release
556 version (str): Constraint with specific version of the Chart to use
557 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
558 The --wait flag will be set automatically if --atomic is used
559 timeout (float): The time, in seconds, to wait
560 kubeconfig (str): Kubeconfig file path
561 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
562 Returns:
563 str: command to upgrade a Helm Chart release
564 """
565
566 timeout_str = ""
567 if timeout:
568 timeout_str = "--timeout {}s".format(timeout)
569
570 # atomic
571 atomic_str = ""
572 if atomic:
573 atomic_str = "--atomic"
574
575 # force
576 force_str = ""
577 if force:
578 force_str = "--force "
579
580 # version
581 version_str = ""
582 if version:
583 version_str = "--version {}".format(version)
584
585 # namespace
586 namespace_str = ""
587 if namespace:
588 namespace_str = "--namespace {}".format(namespace)
589
590 command = (
591 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} {force}"
592 "--output yaml {params} {timeout} --reuse-values {ver}"
593 ).format(
594 kubeconfig=kubeconfig,
595 helm=self._helm_command,
596 name=kdu_instance,
597 namespace=namespace_str,
598 atomic=atomic_str,
599 force=force_str,
600 params=params_str,
601 timeout=timeout_str,
602 model=kdu_model,
603 ver=version_str,
604 )
605 return command
606
607 def _get_rollback_command(
608 self, kdu_instance: str, namespace: str, revision: float, kubeconfig: str
609 ) -> str:
610 return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
611 kubeconfig, self._helm_command, kdu_instance, revision, namespace
612 )
613
614 def _get_uninstall_command(
615 self, kdu_instance: str, namespace: str, kubeconfig: str
616 ) -> str:
617
618 return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
619 kubeconfig, self._helm_command, kdu_instance, namespace
620 )
621
622 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
623 repo_ids = []
624 cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
625 cluster = self.db.get_one("k8sclusters", cluster_filter)
626 if cluster:
627 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
628 return repo_ids
629 else:
630 raise K8sException(
631 "k8cluster with helm-id : {} not found".format(cluster_uuid)
632 )