Bug 1965 fixed
[osm/N2VC.git] / n2vc / k8s_helm3_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import os
23 import yaml
24
25 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
26 from n2vc.exceptions import K8sException
27
28
29 class K8sHelm3Connector(K8sHelmBaseConnector):
30
31 """
32 ####################################################################################
33 ################################### P U B L I C ####################################
34 ####################################################################################
35 """
36
37 def __init__(
38 self,
39 fs: object,
40 db: object,
41 kubectl_command: str = "/usr/bin/kubectl",
42 helm_command: str = "/usr/bin/helm3",
43 log: object = None,
44 on_update_db=None,
45 ):
46 """
47 Initializes helm connector for helm v3
48
49 :param fs: file system for kubernetes and helm configuration
50 :param db: database object to write current operation status
51 :param kubectl_command: path to kubectl executable
52 :param helm_command: path to helm executable
53 :param log: logger
54 :param on_update_db: callback called when k8s connector updates database
55 """
56
57 # parent class
58 K8sHelmBaseConnector.__init__(
59 self,
60 db=db,
61 log=log,
62 fs=fs,
63 kubectl_command=kubectl_command,
64 helm_command=helm_command,
65 on_update_db=on_update_db,
66 )
67
68 self.log.info("K8S Helm3 connector initialized")
69
70 async def install(
71 self,
72 cluster_uuid: str,
73 kdu_model: str,
74 kdu_instance: str,
75 atomic: bool = True,
76 timeout: float = 300,
77 params: dict = None,
78 db_dict: dict = None,
79 kdu_name: str = None,
80 namespace: str = None,
81 **kwargs,
82 ):
83 """Install a helm chart
84
85 :param cluster_uuid str: The UUID of the cluster to install to
86 :param kdu_model str: The name or path of a bundle to install
87 :param kdu_instance: Kdu instance name
88 :param atomic bool: If set, waits until the model is active and resets
89 the cluster on failure.
90 :param timeout int: The time, in seconds, to wait for the install
91 to finish
92 :param params dict: Key-value pairs of instantiation parameters
93 :param kdu_name: Name of the KDU instance to be installed
94 :param namespace: K8s namespace to use for the KDU instance
95
96 :param kwargs: Additional parameters (None yet)
97
98 :return: True if successful
99 """
100
101 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
102
103 # sync local dir
104 self.fs.sync(from_path=cluster_uuid)
105
106 # init env, paths
107 paths, env = self._init_paths_env(
108 cluster_name=cluster_uuid, create_if_not_exist=True
109 )
110
111 # for helm3 if namespace does not exist must create it
112 if namespace and namespace != "kube-system":
113 if not await self._namespace_exists(cluster_uuid, namespace):
114 try:
115 await self._create_namespace(cluster_uuid, namespace)
116 except Exception as e:
117 if not await self._namespace_exists(cluster_uuid, namespace):
118 err_msg = (
119 "namespace {} does not exist in cluster_id {} "
120 "error message: ".format(namespace, e)
121 )
122 self.log.error(err_msg)
123 raise K8sException(err_msg)
124
125 await self._install_impl(
126 cluster_uuid,
127 kdu_model,
128 paths,
129 env,
130 kdu_instance,
131 atomic=atomic,
132 timeout=timeout,
133 params=params,
134 db_dict=db_dict,
135 kdu_name=kdu_name,
136 namespace=namespace,
137 )
138
139 # sync fs
140 self.fs.reverse_sync(from_path=cluster_uuid)
141
142 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
143 return True
144
145 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
146
147 self.log.debug(
148 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
149 )
150
151 return await self._exec_inspect_command(
152 inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
153 )
154
155 """
156 ####################################################################################
157 ################################### P R I V A T E ##################################
158 ####################################################################################
159 """
160
161 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
162 """
163 Creates and returns base cluster and kube dirs and returns them.
164 Also created helm3 dirs according to new directory specification, paths are
165 returned and also environment variables that must be provided to execute commands
166
167 Helm 3 directory specification uses XDG categories for variable support:
168 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
169 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
170 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
171
172 The variables assigned for this paths are:
173 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
174 $HELM_PATH_DATA but looking and helm env the variable names are different)
175 - Cache: $HELM_CACHE_HOME
176 - Config: $HELM_CONFIG_HOME
177 - Data: $HELM_DATA_HOME
178 - helm kubeconfig: $KUBECONFIG
179
180 :param cluster_name: cluster_name
181 :return: Dictionary with config_paths and dictionary with helm environment variables
182 """
183
184 base = self.fs.path
185 if base.endswith("/") or base.endswith("\\"):
186 base = base[:-1]
187
188 # base dir for cluster
189 cluster_dir = base + "/" + cluster_name
190
191 # kube dir
192 kube_dir = cluster_dir + "/" + ".kube"
193 if create_if_not_exist and not os.path.exists(kube_dir):
194 self.log.debug("Creating dir {}".format(kube_dir))
195 os.makedirs(kube_dir)
196
197 helm_path_cache = cluster_dir + "/.cache/helm"
198 if create_if_not_exist and not os.path.exists(helm_path_cache):
199 self.log.debug("Creating dir {}".format(helm_path_cache))
200 os.makedirs(helm_path_cache)
201
202 helm_path_config = cluster_dir + "/.config/helm"
203 if create_if_not_exist and not os.path.exists(helm_path_config):
204 self.log.debug("Creating dir {}".format(helm_path_config))
205 os.makedirs(helm_path_config)
206
207 helm_path_data = cluster_dir + "/.local/share/helm"
208 if create_if_not_exist and not os.path.exists(helm_path_data):
209 self.log.debug("Creating dir {}".format(helm_path_data))
210 os.makedirs(helm_path_data)
211
212 config_filename = kube_dir + "/config"
213
214 # 2 - Prepare dictionary with paths
215 paths = {
216 "kube_dir": kube_dir,
217 "kube_config": config_filename,
218 "cluster_dir": cluster_dir,
219 }
220
221 # 3 - Prepare environment variables
222 env = {
223 "HELM_CACHE_HOME": helm_path_cache,
224 "HELM_CONFIG_HOME": helm_path_config,
225 "HELM_DATA_HOME": helm_path_data,
226 "KUBECONFIG": config_filename,
227 }
228
229 for file_name, file in paths.items():
230 if "dir" in file_name and not os.path.exists(file):
231 err_msg = "{} dir does not exist".format(file)
232 self.log.error(err_msg)
233 raise K8sException(err_msg)
234
235 return paths, env
236
237 async def _namespace_exists(self, cluster_id, namespace) -> bool:
238 self.log.debug(
239 "checking if namespace {} exists cluster_id {}".format(
240 namespace, cluster_id
241 )
242 )
243 namespaces = await self._get_namespaces(cluster_id)
244 return namespace in namespaces if namespaces else False
245
246 async def _get_namespaces(self, cluster_id: str):
247
248 self.log.debug("get namespaces cluster_id {}".format(cluster_id))
249
250 # init config, env
251 paths, env = self._init_paths_env(
252 cluster_name=cluster_id, create_if_not_exist=True
253 )
254
255 command = "{} --kubeconfig={} get namespaces -o=yaml".format(
256 self.kubectl_command, paths["kube_config"]
257 )
258 output, _rc = await self._local_async_exec(
259 command=command, raise_exception_on_error=True, env=env
260 )
261
262 data = yaml.load(output, Loader=yaml.SafeLoader)
263 namespaces = [item["metadata"]["name"] for item in data["items"]]
264 self.log.debug(f"namespaces {namespaces}")
265
266 return namespaces
267
268 async def _create_namespace(self, cluster_id: str, namespace: str):
269
270 self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
271
272 # init config, env
273 paths, env = self._init_paths_env(
274 cluster_name=cluster_id, create_if_not_exist=True
275 )
276
277 command = "{} --kubeconfig={} create namespace {}".format(
278 self.kubectl_command, paths["kube_config"], namespace
279 )
280 _, _rc = await self._local_async_exec(
281 command=command, raise_exception_on_error=True, env=env
282 )
283 self.log.debug(f"namespace {namespace} created")
284
285 return _rc
286
287 async def _get_services(
288 self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str
289 ):
290
291 # init config, env
292 paths, env = self._init_paths_env(
293 cluster_name=cluster_id, create_if_not_exist=True
294 )
295
296 command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
297 kubeconfig, self._helm_command, kdu_instance, namespace
298 )
299 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
300 output, _rc = await self._local_async_exec_pipe(
301 command1, command2, env=env, raise_exception_on_error=True
302 )
303 services = self._parse_services(output)
304
305 return services
306
307 async def _cluster_init(self, cluster_id, namespace, paths, env):
308 """
309 Implements the helm version dependent cluster initialization:
310 For helm3 it creates the namespace if it is not created
311 """
312 if namespace != "kube-system":
313 namespaces = await self._get_namespaces(cluster_id)
314 if namespace not in namespaces:
315 await self._create_namespace(cluster_id, namespace)
316
317 repo_list = await self.repo_list(cluster_id)
318 stable_repo = [repo for repo in repo_list if repo["name"] == "stable"]
319 if not stable_repo and self._stable_repo_url:
320 await self.repo_add(cluster_id, "stable", self._stable_repo_url)
321
322 # Returns False as no software needs to be uninstalled
323 return False
324
325 async def _uninstall_sw(self, cluster_id: str, namespace: str):
326 # nothing to do to uninstall sw
327 pass
328
329 async def _instances_list(self, cluster_id: str):
330
331 # init paths, env
332 paths, env = self._init_paths_env(
333 cluster_name=cluster_id, create_if_not_exist=True
334 )
335
336 command = "{} list --all-namespaces --output yaml".format(self._helm_command)
337 output, _rc = await self._local_async_exec(
338 command=command, raise_exception_on_error=True, env=env
339 )
340
341 if output and len(output) > 0:
342 self.log.debug("instances list output: {}".format(output))
343 return yaml.load(output, Loader=yaml.SafeLoader)
344 else:
345 return []
346
347 def _get_inspect_command(
348 self, inspect_command: str, kdu_model: str, repo_str: str, version: str
349 ):
350 inspect_command = "{} show {} {}{} {}".format(
351 self._helm_command, inspect_command, kdu_model, repo_str, version
352 )
353 return inspect_command
354
355 def _get_get_command(
356 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
357 ):
358 get_command = (
359 "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
360 kubeconfig, self._helm_command, get_command, kdu_instance, namespace
361 )
362 )
363 return get_command
364
365 async def _status_kdu(
366 self,
367 cluster_id: str,
368 kdu_instance: str,
369 namespace: str = None,
370 show_error_log: bool = False,
371 return_text: bool = False,
372 ):
373
374 self.log.debug(
375 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
376 )
377
378 if not namespace:
379 namespace = "kube-system"
380
381 # init config, env
382 paths, env = self._init_paths_env(
383 cluster_name=cluster_id, create_if_not_exist=True
384 )
385 command = "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
386 paths["kube_config"], self._helm_command, kdu_instance, namespace
387 )
388
389 output, rc = await self._local_async_exec(
390 command=command,
391 raise_exception_on_error=True,
392 show_error_log=show_error_log,
393 env=env,
394 )
395
396 if return_text:
397 return str(output)
398
399 if rc != 0:
400 return None
401
402 data = yaml.load(output, Loader=yaml.SafeLoader)
403
404 # remove field 'notes' and manifest
405 try:
406 del data.get("info")["notes"]
407 except KeyError:
408 pass
409
410 # parse the manifest to a list of dictionaries
411 if "manifest" in data:
412 manifest_str = data.get("manifest")
413 manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
414
415 data["manifest"] = []
416 for doc in manifest_docs:
417 data["manifest"].append(doc)
418
419 return data
420
421 def _get_install_command(
422 self,
423 kdu_model: str,
424 kdu_instance: str,
425 namespace: str,
426 params_str: str,
427 version: str,
428 atomic: bool,
429 timeout: float,
430 kubeconfig: str,
431 ) -> str:
432
433 timeout_str = ""
434 if timeout:
435 timeout_str = "--timeout {}s".format(timeout)
436
437 # atomic
438 atomic_str = ""
439 if atomic:
440 atomic_str = "--atomic"
441 # namespace
442 namespace_str = ""
443 if namespace:
444 namespace_str = "--namespace {}".format(namespace)
445
446 # version
447 version_str = ""
448 if version:
449 version_str = "--version {}".format(version)
450
451 command = (
452 "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml "
453 "{params} {timeout} {ns} {model} {ver}".format(
454 kubeconfig=kubeconfig,
455 helm=self._helm_command,
456 name=kdu_instance,
457 atomic=atomic_str,
458 params=params_str,
459 timeout=timeout_str,
460 ns=namespace_str,
461 model=kdu_model,
462 ver=version_str,
463 )
464 )
465 return command
466
467 def _get_upgrade_scale_command(
468 self,
469 kdu_model: str,
470 kdu_instance: str,
471 namespace: str,
472 scale: int,
473 version: str,
474 atomic: bool,
475 replica_str: str,
476 timeout: float,
477 resource_name: str,
478 kubeconfig: str,
479 ) -> str:
480
481 timeout_str = ""
482 if timeout:
483 timeout_str = "--timeout {}s".format(timeout)
484
485 # atomic
486 atomic_str = ""
487 if atomic:
488 atomic_str = "--atomic"
489
490 # version
491 version_str = ""
492 if version:
493 version_str = "--version {}".format(version)
494
495 # namespace
496 namespace_str = ""
497 if namespace:
498 namespace_str = "--namespace {}".format(namespace)
499
500 # scale
501 if resource_name:
502 scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
503 else:
504 scale_dict = {replica_str: scale}
505
506 scale_str = self._params_to_set_option(scale_dict)
507
508 command = (
509 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} --output yaml {scale} "
510 "{timeout} {ver}"
511 ).format(
512 helm=self._helm_command,
513 name=kdu_instance,
514 namespace=namespace_str,
515 atomic=atomic_str,
516 scale=scale_str,
517 timeout=timeout_str,
518 model=kdu_model,
519 ver=version_str,
520 kubeconfig=kubeconfig,
521 )
522 return command
523
524 def _get_upgrade_command(
525 self,
526 kdu_model: str,
527 kdu_instance: str,
528 namespace: str,
529 params_str: str,
530 version: str,
531 atomic: bool,
532 timeout: float,
533 kubeconfig: str,
534 ) -> str:
535
536 timeout_str = ""
537 if timeout:
538 timeout_str = "--timeout {}s".format(timeout)
539
540 # atomic
541 atomic_str = ""
542 if atomic:
543 atomic_str = "--atomic"
544
545 # version
546 version_str = ""
547 if version:
548 version_str = "--version {}".format(version)
549
550 # namespace
551 namespace_str = ""
552 if namespace:
553 namespace_str = "--namespace {}".format(namespace)
554
555 command = (
556 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} "
557 "--output yaml {params} {timeout} {ver}"
558 ).format(
559 kubeconfig=kubeconfig,
560 helm=self._helm_command,
561 name=kdu_instance,
562 namespace=namespace_str,
563 atomic=atomic_str,
564 params=params_str,
565 timeout=timeout_str,
566 model=kdu_model,
567 ver=version_str,
568 )
569 return command
570
571 def _get_rollback_command(
572 self, kdu_instance: str, namespace: str, revision: float, kubeconfig: str
573 ) -> str:
574 return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
575 kubeconfig, self._helm_command, kdu_instance, revision, namespace
576 )
577
578 def _get_uninstall_command(
579 self, kdu_instance: str, namespace: str, kubeconfig: str
580 ) -> str:
581
582 return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
583 kubeconfig, self._helm_command, kdu_instance, namespace
584 )
585
586 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
587 repo_ids = []
588 cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
589 cluster = self.db.get_one("k8sclusters", cluster_filter)
590 if cluster:
591 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
592 return repo_ids
593 else:
594 raise K8sException(
595 "k8cluster with helm-id : {} not found".format(cluster_uuid)
596 )