Bug 1651 fix
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelmConnector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm",
44 log: object = None,
45 on_update_db=None,
46 ):
47 """
48 Initializes helm connector for helm v2
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(
60 self,
61 db=db,
62 log=log,
63 fs=fs,
64 kubectl_command=kubectl_command,
65 helm_command=helm_command,
66 on_update_db=on_update_db,
67 )
68
69 self.log.info("Initializing K8S Helm2 connector")
70
71 # initialize helm client-only
72 self.log.debug("Initializing helm client-only...")
73 command = "{} init --client-only {} ".format(
74 self._helm_command,
75 "--stable-repo-url {}".format(self._stable_repo_url)
76 if self._stable_repo_url
77 else "--skip-repos",
78 )
79 try:
80 asyncio.ensure_future(
81 self._local_async_exec(command=command, raise_exception_on_error=False)
82 )
83 # loop = asyncio.get_event_loop()
84 # loop.run_until_complete(self._local_async_exec(command=command,
85 # raise_exception_on_error=False))
86 except Exception as e:
87 self.warning(
88 msg="helm init failed (it was already initialized): {}".format(e)
89 )
90
91 self.log.info("K8S Helm2 connector initialized")
92
93 async def install(
94 self,
95 cluster_uuid: str,
96 kdu_model: str,
97 kdu_instance: str,
98 atomic: bool = True,
99 timeout: float = 300,
100 params: dict = None,
101 db_dict: dict = None,
102 kdu_name: str = None,
103 namespace: str = None,
104 **kwargs,
105 ):
106 """
107 Deploys of a new KDU instance. It would implicitly rely on the `install` call
108 to deploy the Chart/Bundle properly parametrized (in practice, this call would
109 happen before any _initial-config-primitive_of the VNF is called).
110
111 :param cluster_uuid: UUID of a K8s cluster known by OSM
112 :param kdu_model: chart/ reference (string), which can be either
113 of these options:
114 - a name of chart available via the repos known by OSM
115 - a path to a packaged chart
116 - a path to an unpacked chart directory or a URL
117 :param kdu_instance: Kdu instance name
118 :param atomic: If set, installation process purges chart/bundle on fail, also
119 will wait until all the K8s objects are active
120 :param timeout: Time in seconds to wait for the install of the chart/bundle
121 (defaults to Helm default timeout: 300s)
122 :param params: dictionary of key-value pairs for instantiation parameters
123 (overriding default values)
124 :param dict db_dict: where to write into database when the status changes.
125 It contains a dict with {collection: <str>, filter: {},
126 path: <str>},
127 e.g. {collection: "nsrs", filter:
128 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
129 :param kdu_name: Name of the KDU instance to be installed
130 :param namespace: K8s namespace to use for the KDU instance
131 :param kwargs: Additional parameters (None yet)
132 :return: True if successful
133 """
134 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
135 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
136
137 # sync local dir
138 self.fs.sync(from_path=cluster_id)
139
140 # init env, paths
141 paths, env = self._init_paths_env(
142 cluster_name=cluster_id, create_if_not_exist=True
143 )
144
145 await self._install_impl(
146 cluster_id,
147 kdu_model,
148 paths,
149 env,
150 kdu_instance,
151 atomic=atomic,
152 timeout=timeout,
153 params=params,
154 db_dict=db_dict,
155 kdu_name=kdu_name,
156 namespace=namespace,
157 )
158
159 # sync fs
160 self.fs.reverse_sync(from_path=cluster_id)
161
162 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
163 return True
164
165 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
166
167 self.log.debug(
168 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
169 )
170
171 return await self._exec_inspect_comand(
172 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
173 )
174
175 """
176 ####################################################################################
177 ################################### P R I V A T E ##################################
178 ####################################################################################
179 """
180
181 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
182 """
183 Creates and returns base cluster and kube dirs and returns them.
184 Also created helm3 dirs according to new directory specification, paths are
185 returned and also environment variables that must be provided to execute commands
186
187 Helm 2 directory specification uses helm_home dir:
188
189 The variables assigned for this paths are:
190 - Helm hone: $HELM_HOME
191 - helm kubeconfig: $KUBECONFIG
192
193 :param cluster_name: cluster_name
194 :return: Dictionary with config_paths and dictionary with helm environment variables
195 """
196 base = self.fs.path
197 if base.endswith("/") or base.endswith("\\"):
198 base = base[:-1]
199
200 # base dir for cluster
201 cluster_dir = base + "/" + cluster_name
202
203 # kube dir
204 kube_dir = cluster_dir + "/" + ".kube"
205 if create_if_not_exist and not os.path.exists(kube_dir):
206 self.log.debug("Creating dir {}".format(kube_dir))
207 os.makedirs(kube_dir)
208
209 # helm home dir
210 helm_dir = cluster_dir + "/" + ".helm"
211 if create_if_not_exist and not os.path.exists(helm_dir):
212 self.log.debug("Creating dir {}".format(helm_dir))
213 os.makedirs(helm_dir)
214
215 config_filename = kube_dir + "/config"
216
217 # 2 - Prepare dictionary with paths
218 paths = {
219 "kube_dir": kube_dir,
220 "kube_config": config_filename,
221 "cluster_dir": cluster_dir,
222 "helm_dir": helm_dir,
223 }
224
225 for file_name, file in paths.items():
226 if "dir" in file_name and not os.path.exists(file):
227 err_msg = "{} dir does not exist".format(file)
228 self.log.error(err_msg)
229 raise K8sException(err_msg)
230
231 # 3 - Prepare environment variables
232 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
233
234 return paths, env
235
236 async def _get_services(self, cluster_id, kdu_instance, namespace):
237
238 # init config, env
239 paths, env = self._init_paths_env(
240 cluster_name=cluster_id, create_if_not_exist=True
241 )
242
243 command1 = "{} get manifest {} ".format(self._helm_command, kdu_instance)
244 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
245 output, _rc = await self._local_async_exec_pipe(
246 command1, command2, env=env, raise_exception_on_error=True
247 )
248 services = self._parse_services(output)
249
250 return services
251
252 async def _cluster_init(
253 self, cluster_id: str, namespace: str, paths: dict, env: dict
254 ):
255 """
256 Implements the helm version dependent cluster initialization:
257 For helm2 it initialized tiller environment if needed
258 """
259
260 # check if tiller pod is up in cluster
261 command = "{} --kubeconfig={} --namespace={} get deployments".format(
262 self.kubectl_command, paths["kube_config"], namespace
263 )
264 output, _rc = await self._local_async_exec(
265 command=command, raise_exception_on_error=True, env=env
266 )
267
268 output_table = self._output_to_table(output=output)
269
270 # find 'tiller' pod in all pods
271 already_initialized = False
272 try:
273 for row in output_table:
274 if row[0].startswith("tiller-deploy"):
275 already_initialized = True
276 break
277 except Exception:
278 pass
279
280 # helm init
281 n2vc_installed_sw = False
282 if not already_initialized:
283 self.log.info(
284 "Initializing helm in client and server: {}".format(cluster_id)
285 )
286 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
287 self.kubectl_command, paths["kube_config"], self.service_account
288 )
289 _, _rc = await self._local_async_exec(
290 command=command, raise_exception_on_error=False, env=env
291 )
292
293 command = (
294 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
295 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
296 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
297 _, _rc = await self._local_async_exec(
298 command=command, raise_exception_on_error=False, env=env
299 )
300
301 command = (
302 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
303 " {} init"
304 ).format(
305 self._helm_command,
306 paths["kube_config"],
307 namespace,
308 paths["helm_dir"],
309 self.service_account,
310 "--stable-repo-url {}".format(self._stable_repo_url)
311 if self._stable_repo_url
312 else "--skip-repos",
313 )
314 _, _rc = await self._local_async_exec(
315 command=command, raise_exception_on_error=True, env=env
316 )
317 n2vc_installed_sw = True
318 else:
319 # check client helm installation
320 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
321 if not self._check_file_exists(
322 filename=check_file, exception_if_not_exists=False
323 ):
324 self.log.info("Initializing helm in client: {}".format(cluster_id))
325 command = (
326 "{} --kubeconfig={} --tiller-namespace={} "
327 "--home={} init --client-only {} "
328 ).format(
329 self._helm_command,
330 paths["kube_config"],
331 namespace,
332 paths["helm_dir"],
333 "--stable-repo-url {}".format(self._stable_repo_url)
334 if self._stable_repo_url
335 else "--skip-repos",
336 )
337 output, _rc = await self._local_async_exec(
338 command=command, raise_exception_on_error=True, env=env
339 )
340 else:
341 self.log.info("Helm client already initialized")
342
343 # remove old stable repo and add new one
344 cluster_uuid = "{}:{}".format(namespace, cluster_id)
345 repo_list = await self.repo_list(cluster_uuid)
346 for repo in repo_list:
347 if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
348 self.log.debug("Add new stable repo url: {}")
349 await self.repo_remove(cluster_uuid, "stable")
350 if self._stable_repo_url:
351 await self.repo_add(cluster_uuid, "stable", self._stable_repo_url)
352 break
353
354 return n2vc_installed_sw
355
356 async def _uninstall_sw(self, cluster_id: str, namespace: str):
357 # uninstall Tiller if necessary
358
359 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
360
361 # init paths, env
362 paths, env = self._init_paths_env(
363 cluster_name=cluster_id, create_if_not_exist=True
364 )
365
366 if not namespace:
367 # find namespace for tiller pod
368 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
369 self.kubectl_command, paths["kube_config"]
370 )
371 output, _rc = await self._local_async_exec(
372 command=command, raise_exception_on_error=False, env=env
373 )
374 output_table = self._output_to_table(output=output)
375 namespace = None
376 for r in output_table:
377 try:
378 if "tiller-deploy" in r[1]:
379 namespace = r[0]
380 break
381 except Exception:
382 pass
383 else:
384 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
385 self.log.error(msg)
386
387 self.log.debug("namespace for tiller: {}".format(namespace))
388
389 if namespace:
390 # uninstall tiller from cluster
391 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
392 command = "{} --kubeconfig={} --home={} reset".format(
393 self._helm_command, paths["kube_config"], paths["helm_dir"]
394 )
395 self.log.debug("resetting: {}".format(command))
396 output, _rc = await self._local_async_exec(
397 command=command, raise_exception_on_error=True, env=env
398 )
399 # Delete clusterrolebinding and serviceaccount.
400 # Ignore if errors for backward compatibility
401 command = (
402 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
403 "io/osm-tiller-cluster-rule"
404 ).format(self.kubectl_command, paths["kube_config"])
405 output, _rc = await self._local_async_exec(
406 command=command, raise_exception_on_error=False, env=env
407 )
408 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
409 self.kubectl_command, paths["kube_config"], self.service_account
410 )
411 output, _rc = await self._local_async_exec(
412 command=command, raise_exception_on_error=False, env=env
413 )
414
415 else:
416 self.log.debug("namespace not found")
417
418 async def _instances_list(self, cluster_id):
419
420 # init paths, env
421 paths, env = self._init_paths_env(
422 cluster_name=cluster_id, create_if_not_exist=True
423 )
424
425 command = "{} list --output yaml".format(self._helm_command)
426
427 output, _rc = await self._local_async_exec(
428 command=command, raise_exception_on_error=True, env=env
429 )
430
431 if output and len(output) > 0:
432 # parse yaml and update keys to lower case to unify with helm3
433 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
434 new_instances = []
435 for instance in instances:
436 new_instance = dict((k.lower(), v) for k, v in instance.items())
437 new_instances.append(new_instance)
438 return new_instances
439 else:
440 return []
441
442 def _get_inspect_command(
443 self, show_command: str, kdu_model: str, repo_str: str, version: str
444 ):
445 inspect_command = "{} inspect {} {}{} {}".format(
446 self._helm_command, show_command, kdu_model, repo_str, version
447 )
448 return inspect_command
449
450 async def _status_kdu(
451 self,
452 cluster_id: str,
453 kdu_instance: str,
454 namespace: str = None,
455 show_error_log: bool = False,
456 return_text: bool = False,
457 ):
458
459 self.log.debug(
460 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
461 )
462
463 # init config, env
464 paths, env = self._init_paths_env(
465 cluster_name=cluster_id, create_if_not_exist=True
466 )
467 command = "{} status {} --output yaml".format(self._helm_command, kdu_instance)
468 output, rc = await self._local_async_exec(
469 command=command,
470 raise_exception_on_error=True,
471 show_error_log=show_error_log,
472 env=env,
473 )
474
475 if return_text:
476 return str(output)
477
478 if rc != 0:
479 return None
480
481 data = yaml.load(output, Loader=yaml.SafeLoader)
482
483 # remove field 'notes'
484 try:
485 del data.get("info").get("status")["notes"]
486 except KeyError:
487 pass
488
489 # parse field 'resources'
490 try:
491 resources = str(data.get("info").get("status").get("resources"))
492 resource_table = self._output_to_table(resources)
493 data.get("info").get("status")["resources"] = resource_table
494 except Exception:
495 pass
496
497 # set description to lowercase (unify with helm3)
498 try:
499 data.get("info")["description"] = data.get("info").pop("Description")
500 except KeyError:
501 pass
502
503 return data
504
505 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
506 repo_ids = []
507 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
508 cluster = self.db.get_one("k8sclusters", cluster_filter)
509 if cluster:
510 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
511 return repo_ids
512 else:
513 raise K8sException(
514 "k8cluster with helm-id : {} not found".format(cluster_uuid)
515 )
516
517 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
518
519 status = await self._status_kdu(
520 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
521 )
522
523 # extract info.status.resources-> str
524 # format:
525 # ==> v1/Deployment
526 # NAME READY UP-TO-DATE AVAILABLE AGE
527 # halting-horse-mongodb 0/1 1 0 0s
528 # halting-petit-mongodb 1/1 1 0 0s
529 # blank line
530 resources = K8sHelmBaseConnector._get_deep(
531 status, ("info", "status", "resources")
532 )
533
534 # convert to table
535 resources = K8sHelmBaseConnector._output_to_table(resources)
536
537 num_lines = len(resources)
538 index = 0
539 ready = True
540 while index < num_lines:
541 try:
542 line1 = resources[index]
543 index += 1
544 # find '==>' in column 0
545 if line1[0] == "==>":
546 line2 = resources[index]
547 index += 1
548 # find READY in column 1
549 if line2[1] == "READY":
550 # read next lines
551 line3 = resources[index]
552 index += 1
553 while len(line3) > 1 and index < num_lines:
554 ready_value = line3[1]
555 parts = ready_value.split(sep="/")
556 current = int(parts[0])
557 total = int(parts[1])
558 if current < total:
559 self.log.debug("NOT READY:\n {}".format(line3))
560 ready = False
561 line3 = resources[index]
562 index += 1
563
564 except Exception:
565 pass
566
567 return ready
568
569 def _get_install_command(
570 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
571 ) -> str:
572
573 timeout_str = ""
574 if timeout:
575 timeout_str = "--timeout {}".format(timeout)
576
577 # atomic
578 atomic_str = ""
579 if atomic:
580 atomic_str = "--atomic"
581 # namespace
582 namespace_str = ""
583 if namespace:
584 namespace_str = "--namespace {}".format(namespace)
585
586 # version
587 version_str = ""
588 if version:
589 version_str = version_str = "--version {}".format(version)
590
591 command = (
592 "{helm} install {atomic} --output yaml "
593 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
594 helm=self._helm_command,
595 atomic=atomic_str,
596 params=params_str,
597 timeout=timeout_str,
598 name=kdu_instance,
599 ns=namespace_str,
600 model=kdu_model,
601 ver=version_str,
602 )
603 )
604 return command
605
606 def _get_upgrade_command(
607 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
608 ) -> str:
609
610 timeout_str = ""
611 if timeout:
612 timeout_str = "--timeout {}".format(timeout)
613
614 # atomic
615 atomic_str = ""
616 if atomic:
617 atomic_str = "--atomic"
618
619 # version
620 version_str = ""
621 if version:
622 version_str = "--version {}".format(version)
623
624 command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}".format(
625 helm=self._helm_command,
626 atomic=atomic_str,
627 params=params_str,
628 timeout=timeout_str,
629 name=kdu_instance,
630 model=kdu_model,
631 ver=version_str,
632 )
633 return command
634
635 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
636 return "{} rollback {} {} --wait".format(
637 self._helm_command, kdu_instance, revision
638 )
639
640 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
641 return "{} delete --purge {}".format(self._helm_command, kdu_instance)