Update helm repo after adding the repo
[osm/N2VC.git] / n2vc / k8s_helm3_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 from typing import Union
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelm3Connector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm3",
44 log: object = None,
45 on_update_db=None,
46 ):
47 """
48 Initializes helm connector for helm v3
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(
60 self,
61 db=db,
62 log=log,
63 fs=fs,
64 kubectl_command=kubectl_command,
65 helm_command=helm_command,
66 on_update_db=on_update_db,
67 )
68
69 self.log.info("K8S Helm3 connector initialized")
70
71 async def install(
72 self,
73 cluster_uuid: str,
74 kdu_model: str,
75 kdu_instance: str,
76 atomic: bool = True,
77 timeout: float = 300,
78 params: dict = None,
79 db_dict: dict = None,
80 kdu_name: str = None,
81 namespace: str = None,
82 **kwargs,
83 ):
84 """Install a helm chart
85
86 :param cluster_uuid str: The UUID of the cluster to install to
87 :param kdu_model str: The name or path of a bundle to install
88 :param kdu_instance: Kdu instance name
89 :param atomic bool: If set, waits until the model is active and resets
90 the cluster on failure.
91 :param timeout int: The time, in seconds, to wait for the install
92 to finish
93 :param params dict: Key-value pairs of instantiation parameters
94 :param kdu_name: Name of the KDU instance to be installed
95 :param namespace: K8s namespace to use for the KDU instance
96
97 :param kwargs: Additional parameters (None yet)
98
99 :return: True if successful
100 """
101
102 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
103
104 # sync local dir
105 self.fs.sync(from_path=cluster_uuid)
106
107 # init env, paths
108 paths, env = self._init_paths_env(
109 cluster_name=cluster_uuid, create_if_not_exist=True
110 )
111
112 # for helm3 if namespace does not exist must create it
113 if namespace and namespace != "kube-system":
114 if not await self._namespace_exists(cluster_uuid, namespace):
115 try:
116 await self._create_namespace(cluster_uuid, namespace)
117 except Exception as e:
118 if not await self._namespace_exists(cluster_uuid, namespace):
119 err_msg = (
120 "namespace {} does not exist in cluster_id {} "
121 "error message: ".format(namespace, e)
122 )
123 self.log.error(err_msg)
124 raise K8sException(err_msg)
125
126 await self._install_impl(
127 cluster_uuid,
128 kdu_model,
129 paths,
130 env,
131 kdu_instance,
132 atomic=atomic,
133 timeout=timeout,
134 params=params,
135 db_dict=db_dict,
136 kdu_name=kdu_name,
137 namespace=namespace,
138 )
139
140 # sync fs
141 self.fs.reverse_sync(from_path=cluster_uuid)
142
143 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
144 return True
145
146 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
147
148 self.log.debug(
149 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
150 )
151
152 return await self._exec_inspect_command(
153 inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
154 )
155
156 """
157 ####################################################################################
158 ################################### P R I V A T E ##################################
159 ####################################################################################
160 """
161
162 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
163 """
164 Creates and returns base cluster and kube dirs and returns them.
165 Also created helm3 dirs according to new directory specification, paths are
166 returned and also environment variables that must be provided to execute commands
167
168 Helm 3 directory specification uses XDG categories for variable support:
169 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
170 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
171 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
172
173 The variables assigned for this paths are:
174 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
175 $HELM_PATH_DATA but looking and helm env the variable names are different)
176 - Cache: $HELM_CACHE_HOME
177 - Config: $HELM_CONFIG_HOME
178 - Data: $HELM_DATA_HOME
179 - helm kubeconfig: $KUBECONFIG
180
181 :param cluster_name: cluster_name
182 :return: Dictionary with config_paths and dictionary with helm environment variables
183 """
184
185 base = self.fs.path
186 if base.endswith("/") or base.endswith("\\"):
187 base = base[:-1]
188
189 # base dir for cluster
190 cluster_dir = base + "/" + cluster_name
191
192 # kube dir
193 kube_dir = cluster_dir + "/" + ".kube"
194 if create_if_not_exist and not os.path.exists(kube_dir):
195 self.log.debug("Creating dir {}".format(kube_dir))
196 os.makedirs(kube_dir)
197
198 helm_path_cache = cluster_dir + "/.cache/helm"
199 if create_if_not_exist and not os.path.exists(helm_path_cache):
200 self.log.debug("Creating dir {}".format(helm_path_cache))
201 os.makedirs(helm_path_cache)
202
203 helm_path_config = cluster_dir + "/.config/helm"
204 if create_if_not_exist and not os.path.exists(helm_path_config):
205 self.log.debug("Creating dir {}".format(helm_path_config))
206 os.makedirs(helm_path_config)
207
208 helm_path_data = cluster_dir + "/.local/share/helm"
209 if create_if_not_exist and not os.path.exists(helm_path_data):
210 self.log.debug("Creating dir {}".format(helm_path_data))
211 os.makedirs(helm_path_data)
212
213 config_filename = kube_dir + "/config"
214
215 # 2 - Prepare dictionary with paths
216 paths = {
217 "kube_dir": kube_dir,
218 "kube_config": config_filename,
219 "cluster_dir": cluster_dir,
220 }
221
222 # 3 - Prepare environment variables
223 env = {
224 "HELM_CACHE_HOME": helm_path_cache,
225 "HELM_CONFIG_HOME": helm_path_config,
226 "HELM_DATA_HOME": helm_path_data,
227 "KUBECONFIG": config_filename,
228 }
229
230 for file_name, file in paths.items():
231 if "dir" in file_name and not os.path.exists(file):
232 err_msg = "{} dir does not exist".format(file)
233 self.log.error(err_msg)
234 raise K8sException(err_msg)
235
236 return paths, env
237
238 async def _namespace_exists(self, cluster_id, namespace) -> bool:
239 self.log.debug(
240 "checking if namespace {} exists cluster_id {}".format(
241 namespace, cluster_id
242 )
243 )
244 namespaces = await self._get_namespaces(cluster_id)
245 return namespace in namespaces if namespaces else False
246
247 async def _get_namespaces(self, cluster_id: str):
248
249 self.log.debug("get namespaces cluster_id {}".format(cluster_id))
250
251 # init config, env
252 paths, env = self._init_paths_env(
253 cluster_name=cluster_id, create_if_not_exist=True
254 )
255
256 command = "{} --kubeconfig={} get namespaces -o=yaml".format(
257 self.kubectl_command, paths["kube_config"]
258 )
259 output, _rc = await self._local_async_exec(
260 command=command, raise_exception_on_error=True, env=env
261 )
262
263 data = yaml.load(output, Loader=yaml.SafeLoader)
264 namespaces = [item["metadata"]["name"] for item in data["items"]]
265 self.log.debug(f"namespaces {namespaces}")
266
267 return namespaces
268
269 async def _create_namespace(self, cluster_id: str, namespace: str):
270
271 self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
272
273 # init config, env
274 paths, env = self._init_paths_env(
275 cluster_name=cluster_id, create_if_not_exist=True
276 )
277
278 command = "{} --kubeconfig={} create namespace {}".format(
279 self.kubectl_command, paths["kube_config"], namespace
280 )
281 _, _rc = await self._local_async_exec(
282 command=command, raise_exception_on_error=True, env=env
283 )
284 self.log.debug(f"namespace {namespace} created")
285
286 return _rc
287
288 async def _get_services(
289 self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str
290 ):
291
292 # init config, env
293 paths, env = self._init_paths_env(
294 cluster_name=cluster_id, create_if_not_exist=True
295 )
296
297 command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
298 kubeconfig, self._helm_command, kdu_instance, namespace
299 )
300 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
301 output, _rc = await self._local_async_exec_pipe(
302 command1, command2, env=env, raise_exception_on_error=True
303 )
304 services = self._parse_services(output)
305
306 return services
307
308 async def _cluster_init(self, cluster_id, namespace, paths, env):
309 """
310 Implements the helm version dependent cluster initialization:
311 For helm3 it creates the namespace if it is not created
312 """
313 if namespace != "kube-system":
314 namespaces = await self._get_namespaces(cluster_id)
315 if namespace not in namespaces:
316 await self._create_namespace(cluster_id, namespace)
317
318 repo_list = await self.repo_list(cluster_id)
319 stable_repo = [repo for repo in repo_list if repo["name"] == "stable"]
320 if not stable_repo and self._stable_repo_url:
321 await self.repo_add(cluster_id, "stable", self._stable_repo_url)
322
323 # Returns False as no software needs to be uninstalled
324 return False
325
326 async def _uninstall_sw(self, cluster_id: str, namespace: str):
327 # nothing to do to uninstall sw
328 pass
329
330 async def _instances_list(self, cluster_id: str):
331
332 # init paths, env
333 paths, env = self._init_paths_env(
334 cluster_name=cluster_id, create_if_not_exist=True
335 )
336
337 command = "{} list --all-namespaces --output yaml".format(self._helm_command)
338 output, _rc = await self._local_async_exec(
339 command=command, raise_exception_on_error=True, env=env
340 )
341
342 if output and len(output) > 0:
343 self.log.debug("instances list output: {}".format(output))
344 return yaml.load(output, Loader=yaml.SafeLoader)
345 else:
346 return []
347
348 def _get_inspect_command(
349 self, inspect_command: str, kdu_model: str, repo_str: str, version: str
350 ):
351 inspect_command = "{} show {} {}{} {}".format(
352 self._helm_command, inspect_command, kdu_model, repo_str, version
353 )
354 return inspect_command
355
356 def _get_get_command(
357 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
358 ):
359 get_command = (
360 "env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
361 kubeconfig, self._helm_command, get_command, kdu_instance, namespace
362 )
363 )
364 return get_command
365
366 async def _status_kdu(
367 self,
368 cluster_id: str,
369 kdu_instance: str,
370 namespace: str = None,
371 yaml_format: bool = False,
372 show_error_log: bool = False,
373 ) -> Union[str, dict]:
374
375 self.log.debug(
376 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
377 )
378
379 if not namespace:
380 namespace = "kube-system"
381
382 # init config, env
383 paths, env = self._init_paths_env(
384 cluster_name=cluster_id, create_if_not_exist=True
385 )
386 command = "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
387 paths["kube_config"], self._helm_command, kdu_instance, namespace
388 )
389
390 output, rc = await self._local_async_exec(
391 command=command,
392 raise_exception_on_error=True,
393 show_error_log=show_error_log,
394 env=env,
395 )
396
397 if yaml_format:
398 return str(output)
399
400 if rc != 0:
401 return None
402
403 data = yaml.load(output, Loader=yaml.SafeLoader)
404
405 # remove field 'notes' and manifest
406 try:
407 del data.get("info")["notes"]
408 except KeyError:
409 pass
410
411 # parse the manifest to a list of dictionaries
412 if "manifest" in data:
413 manifest_str = data.get("manifest")
414 manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
415
416 data["manifest"] = []
417 for doc in manifest_docs:
418 data["manifest"].append(doc)
419
420 return data
421
422 def _get_install_command(
423 self,
424 kdu_model: str,
425 kdu_instance: str,
426 namespace: str,
427 params_str: str,
428 version: str,
429 atomic: bool,
430 timeout: float,
431 kubeconfig: str,
432 ) -> str:
433
434 timeout_str = ""
435 if timeout:
436 timeout_str = "--timeout {}s".format(timeout)
437
438 # atomic
439 atomic_str = ""
440 if atomic:
441 atomic_str = "--atomic"
442 # namespace
443 namespace_str = ""
444 if namespace:
445 namespace_str = "--namespace {}".format(namespace)
446
447 # version
448 version_str = ""
449 if version:
450 version_str = "--version {}".format(version)
451
452 command = (
453 "env KUBECONFIG={kubeconfig} {helm} install {name} {atomic} --output yaml "
454 "{params} {timeout} {ns} {model} {ver}".format(
455 kubeconfig=kubeconfig,
456 helm=self._helm_command,
457 name=kdu_instance,
458 atomic=atomic_str,
459 params=params_str,
460 timeout=timeout_str,
461 ns=namespace_str,
462 model=kdu_model,
463 ver=version_str,
464 )
465 )
466 return command
467
468 def _get_upgrade_scale_command(
469 self,
470 kdu_model: str,
471 kdu_instance: str,
472 namespace: str,
473 scale: int,
474 version: str,
475 atomic: bool,
476 replica_str: str,
477 timeout: float,
478 resource_name: str,
479 kubeconfig: str,
480 ) -> str:
481
482 timeout_str = ""
483 if timeout:
484 timeout_str = "--timeout {}s".format(timeout)
485
486 # atomic
487 atomic_str = ""
488 if atomic:
489 atomic_str = "--atomic"
490
491 # version
492 version_str = ""
493 if version:
494 version_str = "--version {}".format(version)
495
496 # namespace
497 namespace_str = ""
498 if namespace:
499 namespace_str = "--namespace {}".format(namespace)
500
501 # scale
502 if resource_name:
503 scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
504 else:
505 scale_dict = {replica_str: scale}
506
507 scale_str = self._params_to_set_option(scale_dict)
508
509 command = (
510 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} --output yaml {scale} "
511 "{timeout} {ver}"
512 ).format(
513 helm=self._helm_command,
514 name=kdu_instance,
515 namespace=namespace_str,
516 atomic=atomic_str,
517 scale=scale_str,
518 timeout=timeout_str,
519 model=kdu_model,
520 ver=version_str,
521 kubeconfig=kubeconfig,
522 )
523 return command
524
525 def _get_upgrade_command(
526 self,
527 kdu_model: str,
528 kdu_instance: str,
529 namespace: str,
530 params_str: str,
531 version: str,
532 atomic: bool,
533 timeout: float,
534 kubeconfig: str,
535 ) -> str:
536
537 timeout_str = ""
538 if timeout:
539 timeout_str = "--timeout {}s".format(timeout)
540
541 # atomic
542 atomic_str = ""
543 if atomic:
544 atomic_str = "--atomic"
545
546 # version
547 version_str = ""
548 if version:
549 version_str = "--version {}".format(version)
550
551 # namespace
552 namespace_str = ""
553 if namespace:
554 namespace_str = "--namespace {}".format(namespace)
555
556 command = (
557 "env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} "
558 "--output yaml {params} {timeout} {ver}"
559 ).format(
560 kubeconfig=kubeconfig,
561 helm=self._helm_command,
562 name=kdu_instance,
563 namespace=namespace_str,
564 atomic=atomic_str,
565 params=params_str,
566 timeout=timeout_str,
567 model=kdu_model,
568 ver=version_str,
569 )
570 return command
571
572 def _get_rollback_command(
573 self, kdu_instance: str, namespace: str, revision: float, kubeconfig: str
574 ) -> str:
575 return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
576 kubeconfig, self._helm_command, kdu_instance, revision, namespace
577 )
578
579 def _get_uninstall_command(
580 self, kdu_instance: str, namespace: str, kubeconfig: str
581 ) -> str:
582
583 return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
584 kubeconfig, self._helm_command, kdu_instance, namespace
585 )
586
587 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
588 repo_ids = []
589 cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
590 cluster = self.db.get_one("k8sclusters", cluster_filter)
591 if cluster:
592 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
593 return repo_ids
594 else:
595 raise K8sException(
596 "k8cluster with helm-id : {} not found".format(cluster_uuid)
597 )