Fixes for scaling proxy and native charm
[osm/N2VC.git] / n2vc / k8s_helm3_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import os
23 import yaml
24
25 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
26 from n2vc.exceptions import K8sException
27
28
29 class K8sHelm3Connector(K8sHelmBaseConnector):
30
31 """
32 ####################################################################################
33 ################################### P U B L I C ####################################
34 ####################################################################################
35 """
36
37 def __init__(
38 self,
39 fs: object,
40 db: object,
41 kubectl_command: str = "/usr/bin/kubectl",
42 helm_command: str = "/usr/bin/helm3",
43 log: object = None,
44 on_update_db=None,
45 vca_config: dict = None,
46 ):
47 """
48 Initializes helm connector for helm v3
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(self,
60 db=db,
61 log=log,
62 fs=fs,
63 kubectl_command=kubectl_command,
64 helm_command=helm_command,
65 on_update_db=on_update_db,
66 vca_config=vca_config)
67
68 self.log.info("K8S Helm3 connector initialized")
69
70 async def install(
71 self,
72 cluster_uuid: str,
73 kdu_model: str,
74 kdu_instance: str,
75 atomic: bool = True,
76 timeout: float = 300,
77 params: dict = None,
78 db_dict: dict = None,
79 kdu_name: str = None,
80 namespace: str = None,
81 ):
82 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
83 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
84
85 # sync local dir
86 self.fs.sync(from_path=cluster_id)
87
88 # init env, paths
89 paths, env = self._init_paths_env(
90 cluster_name=cluster_id, create_if_not_exist=True
91 )
92
93 # for helm3 if namespace does not exist must create it
94 if namespace and namespace != "kube-system":
95 namespaces = await self._get_namespaces(cluster_id)
96 if namespace not in namespaces:
97 await self._create_namespace(cluster_id, namespace)
98
99 await self._install_impl(
100 cluster_id,
101 kdu_model,
102 paths,
103 env,
104 kdu_instance,
105 atomic=atomic,
106 timeout=timeout,
107 params=params,
108 db_dict=db_dict,
109 kdu_name=kdu_name,
110 namespace=namespace,
111 )
112
113 # sync fs
114 self.fs.reverse_sync(from_path=cluster_id)
115
116 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
117 return True
118
119 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
120
121 self.log.debug(
122 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
123 )
124
125 return await self._exec_inspect_comand(
126 inspect_command="all", kdu_model=kdu_model, repo_url=repo_url
127 )
128
129 """
130 ####################################################################################
131 ################################### P R I V A T E ##################################
132 ####################################################################################
133 """
134
135 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
136 """
137 Creates and returns base cluster and kube dirs and returns them.
138 Also created helm3 dirs according to new directory specification, paths are
139 returned and also environment variables that must be provided to execute commands
140
141 Helm 3 directory specification uses XDG categories for variable support:
142 - Cache: $XDG_CACHE_HOME, for example, ${HOME}/.cache/helm/
143 - Configuration: $XDG_CONFIG_HOME, for example, ${HOME}/.config/helm/
144 - Data: $XDG_DATA_HOME, for example ${HOME}/.local/share/helm
145
146 The variables assigned for this paths are:
147 (In the documentation the variables names are $HELM_PATH_CACHE, $HELM_PATH_CONFIG,
148 $HELM_PATH_DATA but looking and helm env the variable names are different)
149 - Cache: $HELM_CACHE_HOME
150 - Config: $HELM_CONFIG_HOME
151 - Data: $HELM_DATA_HOME
152 - helm kubeconfig: $KUBECONFIG
153
154 :param cluster_name: cluster_name
155 :return: Dictionary with config_paths and dictionary with helm environment variables
156 """
157
158 base = self.fs.path
159 if base.endswith("/") or base.endswith("\\"):
160 base = base[:-1]
161
162 # base dir for cluster
163 cluster_dir = base + "/" + cluster_name
164
165 # kube dir
166 kube_dir = cluster_dir + "/" + ".kube"
167 if create_if_not_exist and not os.path.exists(kube_dir):
168 self.log.debug("Creating dir {}".format(kube_dir))
169 os.makedirs(kube_dir)
170
171 helm_path_cache = cluster_dir + "/.cache/helm"
172 if create_if_not_exist and not os.path.exists(helm_path_cache):
173 self.log.debug("Creating dir {}".format(helm_path_cache))
174 os.makedirs(helm_path_cache)
175
176 helm_path_config = cluster_dir + "/.config/helm"
177 if create_if_not_exist and not os.path.exists(helm_path_config):
178 self.log.debug("Creating dir {}".format(helm_path_config))
179 os.makedirs(helm_path_config)
180
181 helm_path_data = cluster_dir + "/.local/share/helm"
182 if create_if_not_exist and not os.path.exists(helm_path_data):
183 self.log.debug("Creating dir {}".format(helm_path_data))
184 os.makedirs(helm_path_data)
185
186 config_filename = kube_dir + "/config"
187
188 # 2 - Prepare dictionary with paths
189 paths = {
190 "kube_dir": kube_dir,
191 "kube_config": config_filename,
192 "cluster_dir": cluster_dir
193 }
194
195 # 3 - Prepare environment variables
196 env = {
197 "HELM_CACHE_HOME": helm_path_cache,
198 "HELM_CONFIG_HOME": helm_path_config,
199 "HELM_DATA_HOME": helm_path_data,
200 "KUBECONFIG": config_filename
201 }
202
203 for file_name, file in paths.items():
204 if "dir" in file_name and not os.path.exists(file):
205 err_msg = "{} dir does not exist".format(file)
206 self.log.error(err_msg)
207 raise K8sException(err_msg)
208
209 return paths, env
210
211 async def _get_namespaces(self,
212 cluster_id: str):
213
214 self.log.debug("get namespaces cluster_id {}".format(cluster_id))
215
216 # init config, env
217 paths, env = self._init_paths_env(
218 cluster_name=cluster_id, create_if_not_exist=True
219 )
220
221 command = "{} --kubeconfig={} get namespaces -o=yaml".format(
222 self.kubectl_command, paths["kube_config"]
223 )
224 output, _rc = await self._local_async_exec(
225 command=command, raise_exception_on_error=True, env=env
226 )
227
228 data = yaml.load(output, Loader=yaml.SafeLoader)
229 namespaces = [item["metadata"]["name"] for item in data["items"]]
230 self.log.debug(f"namespaces {namespaces}")
231
232 return namespaces
233
234 async def _create_namespace(self,
235 cluster_id: str,
236 namespace: str):
237
238 self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
239
240 # init config, env
241 paths, env = self._init_paths_env(
242 cluster_name=cluster_id, create_if_not_exist=True
243 )
244
245 command = "{} --kubeconfig={} create namespace {}".format(
246 self.kubectl_command, paths["kube_config"], namespace
247 )
248 _, _rc = await self._local_async_exec(
249 command=command, raise_exception_on_error=True, env=env
250 )
251 self.log.debug(f"namespace {namespace} created")
252
253 return _rc
254
255 async def _get_services(self, cluster_id: str, kdu_instance: str, namespace: str):
256
257 # init config, env
258 paths, env = self._init_paths_env(
259 cluster_name=cluster_id, create_if_not_exist=True
260 )
261
262 command1 = "{} get manifest {} --namespace={}".format(
263 self._helm_command, kdu_instance, namespace
264 )
265 command2 = "{} get --namespace={} -f -".format(
266 self.kubectl_command, namespace
267 )
268 output, _rc = await self._local_async_exec_pipe(
269 command1, command2, env=env, raise_exception_on_error=True
270 )
271 services = self._parse_services(output)
272
273 return services
274
275 async def _cluster_init(self, cluster_id, namespace, paths, env):
276 """
277 Implements the helm version dependent cluster initialization:
278 For helm3 it creates the namespace if it is not created
279 """
280 if namespace != "kube-system":
281 namespaces = await self._get_namespaces(cluster_id)
282 if namespace not in namespaces:
283 await self._create_namespace(cluster_id, namespace)
284
285 # If default repo is not included add
286 cluster_uuid = "{}:{}".format(namespace, cluster_id)
287 repo_list = await self.repo_list(cluster_uuid)
288 for repo in repo_list:
289 self.log.debug("repo")
290 if repo["name"] == "stable":
291 self.log.debug("Default repo already present")
292 break
293 else:
294 await self.repo_add(cluster_uuid,
295 "stable",
296 self._stable_repo_url)
297
298 # Returns False as no software needs to be uninstalled
299 return False
300
301 async def _uninstall_sw(self, cluster_id: str, namespace: str):
302 # nothing to do to uninstall sw
303 pass
304
305 async def _instances_list(self, cluster_id: str):
306
307 # init paths, env
308 paths, env = self._init_paths_env(
309 cluster_name=cluster_id, create_if_not_exist=True
310 )
311
312 command = "{} list --all-namespaces --output yaml".format(
313 self._helm_command
314 )
315 output, _rc = await self._local_async_exec(
316 command=command, raise_exception_on_error=True, env=env
317 )
318
319 if output and len(output) > 0:
320 self.log.debug("instances list output: {}".format(output))
321 return yaml.load(output, Loader=yaml.SafeLoader)
322 else:
323 return []
324
325 def _get_inspect_command(self, inspect_command: str, kdu_model: str, repo_str: str,
326 version: str):
327 inspect_command = "{} show {} {}{} {}".format(
328 self._helm_command, inspect_command, kdu_model, repo_str, version
329 )
330 return inspect_command
331
332 async def _status_kdu(
333 self,
334 cluster_id: str,
335 kdu_instance: str,
336 namespace: str = None,
337 show_error_log: bool = False,
338 return_text: bool = False,
339 ):
340
341 self.log.debug("status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace))
342
343 if not namespace:
344 namespace = "kube-system"
345
346 # init config, env
347 paths, env = self._init_paths_env(
348 cluster_name=cluster_id, create_if_not_exist=True
349 )
350 command = "{} status {} --namespace={} --output yaml".format(
351 self._helm_command, kdu_instance, namespace
352 )
353
354 output, rc = await self._local_async_exec(
355 command=command,
356 raise_exception_on_error=True,
357 show_error_log=show_error_log,
358 env=env
359 )
360
361 if return_text:
362 return str(output)
363
364 if rc != 0:
365 return None
366
367 data = yaml.load(output, Loader=yaml.SafeLoader)
368
369 # remove field 'notes' and manifest
370 try:
371 del data.get("info")["notes"]
372 del data["manifest"]
373 except KeyError:
374 pass
375
376 # unable to parse 'resources' as currently it is not included in helm3
377 return data
378
379 def _get_install_command(self, kdu_model: str, kdu_instance: str, namespace: str,
380 params_str: str, version: str, atomic: bool, timeout: float) -> str:
381
382 timeout_str = ""
383 if timeout:
384 timeout_str = "--timeout {}s".format(timeout)
385
386 # atomic
387 atomic_str = ""
388 if atomic:
389 atomic_str = "--atomic"
390 # namespace
391 namespace_str = ""
392 if namespace:
393 namespace_str = "--namespace {}".format(namespace)
394
395 # version
396 version_str = ""
397 if version:
398 version_str = "--version {}".format(version)
399
400 command = (
401 "{helm} install {name} {atomic} --output yaml "
402 "{params} {timeout} {ns} {model} {ver}".format(
403 helm=self._helm_command,
404 name=kdu_instance,
405 atomic=atomic_str,
406 params=params_str,
407 timeout=timeout_str,
408 ns=namespace_str,
409 model=kdu_model,
410 ver=version_str,
411 )
412 )
413 return command
414
415 def _get_upgrade_command(self, kdu_model: str, kdu_instance: str, namespace: str,
416 params_str: str, version: str, atomic: bool, timeout: float) -> str:
417
418 timeout_str = ""
419 if timeout:
420 timeout_str = "--timeout {}s".format(timeout)
421
422 # atomic
423 atomic_str = ""
424 if atomic:
425 atomic_str = "--atomic"
426
427 # version
428 version_str = ""
429 if version:
430 version_str = "--version {}".format(version)
431
432 # namespace
433 namespace_str = ""
434 if namespace:
435 namespace_str = "--namespace {}".format(namespace)
436
437 command = (
438 "{helm} upgrade {name} {model} {namespace} {atomic} --output yaml {params} "
439 "{timeout} {ver}".format(
440 helm=self._helm_command,
441 name=kdu_instance,
442 namespace=namespace_str,
443 atomic=atomic_str,
444 params=params_str,
445 timeout=timeout_str,
446 model=kdu_model,
447 ver=version_str,
448 )
449 )
450 return command
451
452 def _get_rollback_command(self, kdu_instance: str, namespace: str, revision: float) -> str:
453 return "{} rollback {} {} --namespace={} --wait".format(
454 self._helm_command, kdu_instance, revision, namespace
455 )
456
457 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
458
459 return "{} uninstall {} --namespace={}".format(
460 self._helm_command, kdu_instance, namespace)
461
462 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
463 repo_ids = []
464 cluster_filter = {"_admin.helm-chart-v3.id": cluster_uuid}
465 cluster = self.db.get_one("k8sclusters", cluster_filter)
466 if cluster:
467 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
468 return repo_ids
469 else:
470 raise K8sException(
471 "k8cluster with helm-id : {} not found".format(cluster_uuid)
472 )