Bug 1962 fixed: removed the variable cluster_uuid from init_env method
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelmConnector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm",
44 log: object = None,
45 on_update_db=None,
46 ):
47 """
48 Initializes helm connector for helm v2
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(
60 self,
61 db=db,
62 log=log,
63 fs=fs,
64 kubectl_command=kubectl_command,
65 helm_command=helm_command,
66 on_update_db=on_update_db,
67 )
68
69 self.log.info("Initializing K8S Helm2 connector")
70
71 # initialize helm client-only
72 self.log.debug("Initializing helm client-only...")
73 command = "{} init --client-only {} ".format(
74 self._helm_command,
75 "--stable-repo-url {}".format(self._stable_repo_url)
76 if self._stable_repo_url
77 else "--skip-repos",
78 )
79 try:
80 asyncio.ensure_future(
81 self._local_async_exec(command=command, raise_exception_on_error=False)
82 )
83 # loop = asyncio.get_event_loop()
84 # loop.run_until_complete(self._local_async_exec(command=command,
85 # raise_exception_on_error=False))
86 except Exception as e:
87 self.warning(
88 msg="helm init failed (it was already initialized): {}".format(e)
89 )
90
91 self.log.info("K8S Helm2 connector initialized")
92
93 async def install(
94 self,
95 cluster_uuid: str,
96 kdu_model: str,
97 kdu_instance: str,
98 atomic: bool = True,
99 timeout: float = 300,
100 params: dict = None,
101 db_dict: dict = None,
102 kdu_name: str = None,
103 namespace: str = None,
104 **kwargs,
105 ):
106 """
107 Deploys of a new KDU instance. It would implicitly rely on the `install` call
108 to deploy the Chart/Bundle properly parametrized (in practice, this call would
109 happen before any _initial-config-primitive_of the VNF is called).
110
111 :param cluster_uuid: UUID of a K8s cluster known by OSM
112 :param kdu_model: chart/ reference (string), which can be either
113 of these options:
114 - a name of chart available via the repos known by OSM
115 - a path to a packaged chart
116 - a path to an unpacked chart directory or a URL
117 :param kdu_instance: Kdu instance name
118 :param atomic: If set, installation process purges chart/bundle on fail, also
119 will wait until all the K8s objects are active
120 :param timeout: Time in seconds to wait for the install of the chart/bundle
121 (defaults to Helm default timeout: 300s)
122 :param params: dictionary of key-value pairs for instantiation parameters
123 (overriding default values)
124 :param dict db_dict: where to write into database when the status changes.
125 It contains a dict with {collection: <str>, filter: {},
126 path: <str>},
127 e.g. {collection: "nsrs", filter:
128 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
129 :param kdu_name: Name of the KDU instance to be installed
130 :param namespace: K8s namespace to use for the KDU instance
131 :param kwargs: Additional parameters (None yet)
132 :return: True if successful
133 """
134 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
135
136 # sync local dir
137 self.fs.sync(from_path=cluster_uuid)
138
139 # init env, paths
140 paths, env = self._init_paths_env(
141 cluster_name=cluster_uuid, create_if_not_exist=True
142 )
143
144 await self._install_impl(
145 cluster_uuid,
146 kdu_model,
147 paths,
148 env,
149 kdu_instance,
150 atomic=atomic,
151 timeout=timeout,
152 params=params,
153 db_dict=db_dict,
154 kdu_name=kdu_name,
155 namespace=namespace,
156 )
157
158 # sync fs
159 self.fs.reverse_sync(from_path=cluster_uuid)
160
161 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
162 return True
163
164 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
165
166 self.log.debug(
167 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
168 )
169
170 return await self._exec_inspect_command(
171 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
172 )
173
174 """
175 ####################################################################################
176 ################################### P R I V A T E ##################################
177 ####################################################################################
178 """
179
180 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
181 """
182 Creates and returns base cluster and kube dirs and returns them.
183 Also created helm3 dirs according to new directory specification, paths are
184 returned and also environment variables that must be provided to execute commands
185
186 Helm 2 directory specification uses helm_home dir:
187
188 The variables assigned for this paths are:
189 - Helm hone: $HELM_HOME
190 - helm kubeconfig: $KUBECONFIG
191
192 :param cluster_name: cluster_name
193 :return: Dictionary with config_paths and dictionary with helm environment variables
194 """
195 base = self.fs.path
196 if base.endswith("/") or base.endswith("\\"):
197 base = base[:-1]
198
199 # base dir for cluster
200 cluster_dir = base + "/" + cluster_name
201
202 # kube dir
203 kube_dir = cluster_dir + "/" + ".kube"
204 if create_if_not_exist and not os.path.exists(kube_dir):
205 self.log.debug("Creating dir {}".format(kube_dir))
206 os.makedirs(kube_dir)
207
208 # helm home dir
209 helm_dir = cluster_dir + "/" + ".helm"
210 if create_if_not_exist and not os.path.exists(helm_dir):
211 self.log.debug("Creating dir {}".format(helm_dir))
212 os.makedirs(helm_dir)
213
214 config_filename = kube_dir + "/config"
215
216 # 2 - Prepare dictionary with paths
217 paths = {
218 "kube_dir": kube_dir,
219 "kube_config": config_filename,
220 "cluster_dir": cluster_dir,
221 "helm_dir": helm_dir,
222 }
223
224 for file_name, file in paths.items():
225 if "dir" in file_name and not os.path.exists(file):
226 err_msg = "{} dir does not exist".format(file)
227 self.log.error(err_msg)
228 raise K8sException(err_msg)
229
230 # 3 - Prepare environment variables
231 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
232
233 return paths, env
234
235 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
236
237 # init config, env
238 paths, env = self._init_paths_env(
239 cluster_name=cluster_id, create_if_not_exist=True
240 )
241
242 command1 = "env KUBECONFIG={} {} get manifest {} ".format(
243 kubeconfig, self._helm_command, kdu_instance
244 )
245 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
246 output, _rc = await self._local_async_exec_pipe(
247 command1, command2, env=env, raise_exception_on_error=True
248 )
249 services = self._parse_services(output)
250
251 return services
252
253 async def _cluster_init(
254 self, cluster_id: str, namespace: str, paths: dict, env: dict
255 ):
256 """
257 Implements the helm version dependent cluster initialization:
258 For helm2 it initialized tiller environment if needed
259 """
260
261 # check if tiller pod is up in cluster
262 command = "{} --kubeconfig={} --namespace={} get deployments".format(
263 self.kubectl_command, paths["kube_config"], namespace
264 )
265 output, _rc = await self._local_async_exec(
266 command=command, raise_exception_on_error=True, env=env
267 )
268
269 output_table = self._output_to_table(output=output)
270
271 # find 'tiller' pod in all pods
272 already_initialized = False
273 try:
274 for row in output_table:
275 if row[0].startswith("tiller-deploy"):
276 already_initialized = True
277 break
278 except Exception:
279 pass
280
281 # helm init
282 n2vc_installed_sw = False
283 if not already_initialized:
284 self.log.info(
285 "Initializing helm in client and server: {}".format(cluster_id)
286 )
287 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
288 self.kubectl_command, paths["kube_config"], self.service_account
289 )
290 _, _rc = await self._local_async_exec(
291 command=command, raise_exception_on_error=False, env=env
292 )
293
294 command = (
295 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
296 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
297 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
298 _, _rc = await self._local_async_exec(
299 command=command, raise_exception_on_error=False, env=env
300 )
301
302 command = (
303 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
304 " {} init"
305 ).format(
306 self._helm_command,
307 paths["kube_config"],
308 namespace,
309 paths["helm_dir"],
310 self.service_account,
311 "--stable-repo-url {}".format(self._stable_repo_url)
312 if self._stable_repo_url
313 else "--skip-repos",
314 )
315 _, _rc = await self._local_async_exec(
316 command=command, raise_exception_on_error=True, env=env
317 )
318 n2vc_installed_sw = True
319 else:
320 # check client helm installation
321 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
322 if not self._check_file_exists(
323 filename=check_file, exception_if_not_exists=False
324 ):
325 self.log.info("Initializing helm in client: {}".format(cluster_id))
326 command = (
327 "{} --kubeconfig={} --tiller-namespace={} "
328 "--home={} init --client-only {} "
329 ).format(
330 self._helm_command,
331 paths["kube_config"],
332 namespace,
333 paths["helm_dir"],
334 "--stable-repo-url {}".format(self._stable_repo_url)
335 if self._stable_repo_url
336 else "--skip-repos",
337 )
338 output, _rc = await self._local_async_exec(
339 command=command, raise_exception_on_error=True, env=env
340 )
341 else:
342 self.log.info("Helm client already initialized")
343
344 repo_list = await self.repo_list(cluster_id)
345 for repo in repo_list:
346 if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
347 self.log.debug("Add new stable repo url: {}")
348 await self.repo_remove(cluster_id, "stable")
349 if self._stable_repo_url:
350 await self.repo_add(cluster_id, "stable", self._stable_repo_url)
351 break
352
353 return n2vc_installed_sw
354
355 async def _uninstall_sw(self, cluster_id: str, namespace: str):
356 # uninstall Tiller if necessary
357
358 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
359
360 # init paths, env
361 paths, env = self._init_paths_env(
362 cluster_name=cluster_id, create_if_not_exist=True
363 )
364
365 if not namespace:
366 # find namespace for tiller pod
367 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
368 self.kubectl_command, paths["kube_config"]
369 )
370 output, _rc = await self._local_async_exec(
371 command=command, raise_exception_on_error=False, env=env
372 )
373 output_table = self._output_to_table(output=output)
374 namespace = None
375 for r in output_table:
376 try:
377 if "tiller-deploy" in r[1]:
378 namespace = r[0]
379 break
380 except Exception:
381 pass
382 else:
383 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
384 self.log.error(msg)
385
386 self.log.debug("namespace for tiller: {}".format(namespace))
387
388 if namespace:
389 # uninstall tiller from cluster
390 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
391 command = "{} --kubeconfig={} --home={} reset".format(
392 self._helm_command, paths["kube_config"], paths["helm_dir"]
393 )
394 self.log.debug("resetting: {}".format(command))
395 output, _rc = await self._local_async_exec(
396 command=command, raise_exception_on_error=True, env=env
397 )
398 # Delete clusterrolebinding and serviceaccount.
399 # Ignore if errors for backward compatibility
400 command = (
401 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
402 "io/osm-tiller-cluster-rule"
403 ).format(self.kubectl_command, paths["kube_config"])
404 output, _rc = await self._local_async_exec(
405 command=command, raise_exception_on_error=False, env=env
406 )
407 command = (
408 "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
409 self.kubectl_command,
410 paths["kube_config"],
411 namespace,
412 self.service_account,
413 )
414 )
415 output, _rc = await self._local_async_exec(
416 command=command, raise_exception_on_error=False, env=env
417 )
418
419 else:
420 self.log.debug("namespace not found")
421
422 async def _instances_list(self, cluster_id):
423
424 # init paths, env
425 paths, env = self._init_paths_env(
426 cluster_name=cluster_id, create_if_not_exist=True
427 )
428
429 command = "{} list --output yaml".format(self._helm_command)
430
431 output, _rc = await self._local_async_exec(
432 command=command, raise_exception_on_error=True, env=env
433 )
434
435 if output and len(output) > 0:
436 # parse yaml and update keys to lower case to unify with helm3
437 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
438 new_instances = []
439 for instance in instances:
440 new_instance = dict((k.lower(), v) for k, v in instance.items())
441 new_instances.append(new_instance)
442 return new_instances
443 else:
444 return []
445
446 def _get_inspect_command(
447 self, show_command: str, kdu_model: str, repo_str: str, version: str
448 ):
449 inspect_command = "{} inspect {} {}{} {}".format(
450 self._helm_command, show_command, kdu_model, repo_str, version
451 )
452 return inspect_command
453
454 def _get_get_command(
455 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
456 ):
457 get_command = "env KUBECONFIG={} {} get {} {} --output yaml".format(
458 kubeconfig, self._helm_command, get_command, kdu_instance
459 )
460 return get_command
461
462 async def _status_kdu(
463 self,
464 cluster_id: str,
465 kdu_instance: str,
466 namespace: str = None,
467 show_error_log: bool = False,
468 return_text: bool = False,
469 ):
470
471 self.log.debug(
472 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
473 )
474
475 # init config, env
476 paths, env = self._init_paths_env(
477 cluster_name=cluster_id, create_if_not_exist=True
478 )
479 command = ("env KUBECONFIG={} {} status {} --output yaml").format(
480 paths["kube_config"], self._helm_command, kdu_instance
481 )
482 output, rc = await self._local_async_exec(
483 command=command,
484 raise_exception_on_error=True,
485 show_error_log=show_error_log,
486 env=env,
487 )
488
489 if return_text:
490 return str(output)
491
492 if rc != 0:
493 return None
494
495 data = yaml.load(output, Loader=yaml.SafeLoader)
496
497 # remove field 'notes'
498 try:
499 del data.get("info").get("status")["notes"]
500 except KeyError:
501 pass
502
503 # parse field 'resources'
504 try:
505 resources = str(data.get("info").get("status").get("resources"))
506 resource_table = self._output_to_table(resources)
507 data.get("info").get("status")["resources"] = resource_table
508 except Exception:
509 pass
510
511 # set description to lowercase (unify with helm3)
512 try:
513 data.get("info")["description"] = data.get("info").pop("Description")
514 except KeyError:
515 pass
516
517 return data
518
519 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
520 repo_ids = []
521 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
522 cluster = self.db.get_one("k8sclusters", cluster_filter)
523 if cluster:
524 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
525 return repo_ids
526 else:
527 raise K8sException(
528 "k8cluster with helm-id : {} not found".format(cluster_uuid)
529 )
530
531 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
532 # init config, env
533 paths, env = self._init_paths_env(
534 cluster_name=cluster_id, create_if_not_exist=True
535 )
536
537 status = await self._status_kdu(
538 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
539 )
540
541 # extract info.status.resources-> str
542 # format:
543 # ==> v1/Deployment
544 # NAME READY UP-TO-DATE AVAILABLE AGE
545 # halting-horse-mongodb 0/1 1 0 0s
546 # halting-petit-mongodb 1/1 1 0 0s
547 # blank line
548 resources = K8sHelmBaseConnector._get_deep(
549 status, ("info", "status", "resources")
550 )
551
552 # convert to table
553 resources = K8sHelmBaseConnector._output_to_table(resources)
554
555 num_lines = len(resources)
556 index = 0
557 ready = True
558 while index < num_lines:
559 try:
560 line1 = resources[index]
561 index += 1
562 # find '==>' in column 0
563 if line1[0] == "==>":
564 line2 = resources[index]
565 index += 1
566 # find READY in column 1
567 if line2[1] == "READY":
568 # read next lines
569 line3 = resources[index]
570 index += 1
571 while len(line3) > 1 and index < num_lines:
572 ready_value = line3[1]
573 parts = ready_value.split(sep="/")
574 current = int(parts[0])
575 total = int(parts[1])
576 if current < total:
577 self.log.debug("NOT READY:\n {}".format(line3))
578 ready = False
579 line3 = resources[index]
580 index += 1
581
582 except Exception:
583 pass
584
585 return ready
586
587 def _get_install_command(
588 self,
589 kdu_model,
590 kdu_instance,
591 namespace,
592 params_str,
593 version,
594 atomic,
595 timeout,
596 kubeconfig,
597 ) -> str:
598
599 timeout_str = ""
600 if timeout:
601 timeout_str = "--timeout {}".format(timeout)
602
603 # atomic
604 atomic_str = ""
605 if atomic:
606 atomic_str = "--atomic"
607 # namespace
608 namespace_str = ""
609 if namespace:
610 namespace_str = "--namespace {}".format(namespace)
611
612 # version
613 version_str = ""
614 if version:
615 version_str = "--version {}".format(version)
616
617 command = (
618 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
619 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
620 kubeconfig=kubeconfig,
621 helm=self._helm_command,
622 atomic=atomic_str,
623 params=params_str,
624 timeout=timeout_str,
625 name=kdu_instance,
626 ns=namespace_str,
627 model=kdu_model,
628 ver=version_str,
629 )
630 )
631 return command
632
633 def _get_upgrade_scale_command(
634 self,
635 kdu_model: str,
636 kdu_instance: str,
637 namespace: str,
638 scale: int,
639 version: str,
640 atomic: bool,
641 replica_str: str,
642 timeout: float,
643 resource_name: str,
644 kubeconfig: str,
645 ) -> str:
646
647 timeout_str = ""
648 if timeout:
649 timeout_str = "--timeout {}s".format(timeout)
650
651 # atomic
652 atomic_str = ""
653 if atomic:
654 atomic_str = "--atomic"
655
656 # version
657 version_str = ""
658 if version:
659 version_str = "--version {}".format(version)
660
661 # scale
662 if resource_name:
663 scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
664 else:
665 scale_dict = {replica_str: scale}
666
667 scale_str = self._params_to_set_option(scale_dict)
668
669 command = (
670 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {scale} {timeout} {name} {model} {ver}"
671 ).format(
672 helm=self._helm_command,
673 name=kdu_instance,
674 atomic=atomic_str,
675 scale=scale_str,
676 timeout=timeout_str,
677 model=kdu_model,
678 ver=version_str,
679 kubeconfig=kubeconfig,
680 )
681 return command
682
683 def _get_upgrade_command(
684 self,
685 kdu_model,
686 kdu_instance,
687 namespace,
688 params_str,
689 version,
690 atomic,
691 timeout,
692 kubeconfig,
693 ) -> str:
694
695 timeout_str = ""
696 if timeout:
697 timeout_str = "--timeout {}".format(timeout)
698
699 # atomic
700 atomic_str = ""
701 if atomic:
702 atomic_str = "--atomic"
703
704 # version
705 version_str = ""
706 if version:
707 version_str = "--version {}".format(version)
708
709 command = (
710 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"
711 ).format(
712 kubeconfig=kubeconfig,
713 helm=self._helm_command,
714 atomic=atomic_str,
715 params=params_str,
716 timeout=timeout_str,
717 name=kdu_instance,
718 model=kdu_model,
719 ver=version_str,
720 )
721 return command
722
723 def _get_rollback_command(
724 self, kdu_instance, namespace, revision, kubeconfig
725 ) -> str:
726 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
727 kubeconfig, self._helm_command, kdu_instance, revision
728 )
729
730 def _get_uninstall_command(
731 self, kdu_instance: str, namespace: str, kubeconfig: str
732 ) -> str:
733 return "env KUBECONFIG={} {} delete --purge {}".format(
734 kubeconfig, self._helm_command, kdu_instance
735 )