Bug 1964 fixed: removed the variable cluster_uuid from init_env method
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelmConnector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm",
44 log: object = None,
45 on_update_db=None,
46 ):
47 """
48 Initializes helm connector for helm v2
49
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
54 :param log: logger
55 :param on_update_db: callback called when k8s connector updates database
56 """
57
58 # parent class
59 K8sHelmBaseConnector.__init__(
60 self,
61 db=db,
62 log=log,
63 fs=fs,
64 kubectl_command=kubectl_command,
65 helm_command=helm_command,
66 on_update_db=on_update_db,
67 )
68
69 self.log.info("Initializing K8S Helm2 connector")
70
71 # initialize helm client-only
72 self.log.debug("Initializing helm client-only...")
73 command = "{} init --client-only {} ".format(
74 self._helm_command,
75 "--stable-repo-url {}".format(self._stable_repo_url)
76 if self._stable_repo_url
77 else "--skip-repos",
78 )
79 try:
80 asyncio.ensure_future(
81 self._local_async_exec(command=command, raise_exception_on_error=False)
82 )
83 # loop = asyncio.get_event_loop()
84 # loop.run_until_complete(self._local_async_exec(command=command,
85 # raise_exception_on_error=False))
86 except Exception as e:
87 self.warning(
88 msg="helm init failed (it was already initialized): {}".format(e)
89 )
90
91 self.log.info("K8S Helm2 connector initialized")
92
93 async def install(
94 self,
95 cluster_uuid: str,
96 kdu_model: str,
97 kdu_instance: str,
98 atomic: bool = True,
99 timeout: float = 300,
100 params: dict = None,
101 db_dict: dict = None,
102 kdu_name: str = None,
103 namespace: str = None,
104 **kwargs,
105 ):
106 """
107 Deploys of a new KDU instance. It would implicitly rely on the `install` call
108 to deploy the Chart/Bundle properly parametrized (in practice, this call would
109 happen before any _initial-config-primitive_of the VNF is called).
110
111 :param cluster_uuid: UUID of a K8s cluster known by OSM
112 :param kdu_model: chart/ reference (string), which can be either
113 of these options:
114 - a name of chart available via the repos known by OSM
115 - a path to a packaged chart
116 - a path to an unpacked chart directory or a URL
117 :param kdu_instance: Kdu instance name
118 :param atomic: If set, installation process purges chart/bundle on fail, also
119 will wait until all the K8s objects are active
120 :param timeout: Time in seconds to wait for the install of the chart/bundle
121 (defaults to Helm default timeout: 300s)
122 :param params: dictionary of key-value pairs for instantiation parameters
123 (overriding default values)
124 :param dict db_dict: where to write into database when the status changes.
125 It contains a dict with {collection: <str>, filter: {},
126 path: <str>},
127 e.g. {collection: "nsrs", filter:
128 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
129 :param kdu_name: Name of the KDU instance to be installed
130 :param namespace: K8s namespace to use for the KDU instance
131 :param kwargs: Additional parameters (None yet)
132 :return: True if successful
133 """
134 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
135
136 # sync local dir
137 self.fs.sync(from_path=cluster_uuid)
138
139 # init env, paths
140 paths, env = self._init_paths_env(
141 cluster_name=cluster_uuid, create_if_not_exist=True
142 )
143
144 await self._install_impl(
145 cluster_uuid,
146 kdu_model,
147 paths,
148 env,
149 kdu_instance,
150 atomic=atomic,
151 timeout=timeout,
152 params=params,
153 db_dict=db_dict,
154 kdu_name=kdu_name,
155 namespace=namespace,
156 )
157
158 # sync fs
159 self.fs.reverse_sync(from_path=cluster_uuid)
160
161 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
162 return True
163
164 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
165
166 self.log.debug(
167 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
168 )
169
170 return await self._exec_inspect_comand(
171 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
172 )
173
174 """
175 ####################################################################################
176 ################################### P R I V A T E ##################################
177 ####################################################################################
178 """
179
180 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
181 """
182 Creates and returns base cluster and kube dirs and returns them.
183 Also created helm3 dirs according to new directory specification, paths are
184 returned and also environment variables that must be provided to execute commands
185
186 Helm 2 directory specification uses helm_home dir:
187
188 The variables assigned for this paths are:
189 - Helm hone: $HELM_HOME
190 - helm kubeconfig: $KUBECONFIG
191
192 :param cluster_name: cluster_name
193 :return: Dictionary with config_paths and dictionary with helm environment variables
194 """
195 base = self.fs.path
196 if base.endswith("/") or base.endswith("\\"):
197 base = base[:-1]
198
199 # base dir for cluster
200 cluster_dir = base + "/" + cluster_name
201
202 # kube dir
203 kube_dir = cluster_dir + "/" + ".kube"
204 if create_if_not_exist and not os.path.exists(kube_dir):
205 self.log.debug("Creating dir {}".format(kube_dir))
206 os.makedirs(kube_dir)
207
208 # helm home dir
209 helm_dir = cluster_dir + "/" + ".helm"
210 if create_if_not_exist and not os.path.exists(helm_dir):
211 self.log.debug("Creating dir {}".format(helm_dir))
212 os.makedirs(helm_dir)
213
214 config_filename = kube_dir + "/config"
215
216 # 2 - Prepare dictionary with paths
217 paths = {
218 "kube_dir": kube_dir,
219 "kube_config": config_filename,
220 "cluster_dir": cluster_dir,
221 "helm_dir": helm_dir,
222 }
223
224 for file_name, file in paths.items():
225 if "dir" in file_name and not os.path.exists(file):
226 err_msg = "{} dir does not exist".format(file)
227 self.log.error(err_msg)
228 raise K8sException(err_msg)
229
230 # 3 - Prepare environment variables
231 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
232
233 return paths, env
234
235 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
236
237 # init config, env
238 paths, env = self._init_paths_env(
239 cluster_name=cluster_id, create_if_not_exist=True
240 )
241
242 command1 = "env KUBECONFIG={} {} get manifest {} ".format(
243 kubeconfig, self._helm_command, kdu_instance
244 )
245 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
246 output, _rc = await self._local_async_exec_pipe(
247 command1, command2, env=env, raise_exception_on_error=True
248 )
249 services = self._parse_services(output)
250
251 return services
252
253 async def _cluster_init(
254 self, cluster_id: str, namespace: str, paths: dict, env: dict
255 ):
256 """
257 Implements the helm version dependent cluster initialization:
258 For helm2 it initialized tiller environment if needed
259 """
260
261 # check if tiller pod is up in cluster
262 command = "{} --kubeconfig={} --namespace={} get deployments".format(
263 self.kubectl_command, paths["kube_config"], namespace
264 )
265 output, _rc = await self._local_async_exec(
266 command=command, raise_exception_on_error=True, env=env
267 )
268
269 output_table = self._output_to_table(output=output)
270
271 # find 'tiller' pod in all pods
272 already_initialized = False
273 try:
274 for row in output_table:
275 if row[0].startswith("tiller-deploy"):
276 already_initialized = True
277 break
278 except Exception:
279 pass
280
281 # helm init
282 n2vc_installed_sw = False
283 if not already_initialized:
284 self.log.info(
285 "Initializing helm in client and server: {}".format(cluster_id)
286 )
287 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
288 self.kubectl_command, paths["kube_config"], self.service_account
289 )
290 _, _rc = await self._local_async_exec(
291 command=command, raise_exception_on_error=False, env=env
292 )
293
294 command = (
295 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
296 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
297 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
298 _, _rc = await self._local_async_exec(
299 command=command, raise_exception_on_error=False, env=env
300 )
301
302 command = (
303 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
304 " {} init"
305 ).format(
306 self._helm_command,
307 paths["kube_config"],
308 namespace,
309 paths["helm_dir"],
310 self.service_account,
311 "--stable-repo-url {}".format(self._stable_repo_url)
312 if self._stable_repo_url
313 else "--skip-repos",
314 )
315 _, _rc = await self._local_async_exec(
316 command=command, raise_exception_on_error=True, env=env
317 )
318 n2vc_installed_sw = True
319 else:
320 # check client helm installation
321 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
322 if not self._check_file_exists(
323 filename=check_file, exception_if_not_exists=False
324 ):
325 self.log.info("Initializing helm in client: {}".format(cluster_id))
326 command = (
327 "{} --kubeconfig={} --tiller-namespace={} "
328 "--home={} init --client-only {} "
329 ).format(
330 self._helm_command,
331 paths["kube_config"],
332 namespace,
333 paths["helm_dir"],
334 "--stable-repo-url {}".format(self._stable_repo_url)
335 if self._stable_repo_url
336 else "--skip-repos",
337 )
338 output, _rc = await self._local_async_exec(
339 command=command, raise_exception_on_error=True, env=env
340 )
341 else:
342 self.log.info("Helm client already initialized")
343
344 repo_list = await self.repo_list(cluster_id)
345 for repo in repo_list:
346 if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
347 self.log.debug("Add new stable repo url: {}")
348 await self.repo_remove(cluster_id, "stable")
349 if self._stable_repo_url:
350 await self.repo_add(cluster_id, "stable", self._stable_repo_url)
351 break
352
353 return n2vc_installed_sw
354
355 async def _uninstall_sw(self, cluster_id: str, namespace: str):
356 # uninstall Tiller if necessary
357
358 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
359
360 # init paths, env
361 paths, env = self._init_paths_env(
362 cluster_name=cluster_id, create_if_not_exist=True
363 )
364
365 if not namespace:
366 # find namespace for tiller pod
367 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
368 self.kubectl_command, paths["kube_config"]
369 )
370 output, _rc = await self._local_async_exec(
371 command=command, raise_exception_on_error=False, env=env
372 )
373 output_table = self._output_to_table(output=output)
374 namespace = None
375 for r in output_table:
376 try:
377 if "tiller-deploy" in r[1]:
378 namespace = r[0]
379 break
380 except Exception:
381 pass
382 else:
383 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
384 self.log.error(msg)
385
386 self.log.debug("namespace for tiller: {}".format(namespace))
387
388 if namespace:
389 # uninstall tiller from cluster
390 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
391 command = "{} --kubeconfig={} --home={} reset".format(
392 self._helm_command, paths["kube_config"], paths["helm_dir"]
393 )
394 self.log.debug("resetting: {}".format(command))
395 output, _rc = await self._local_async_exec(
396 command=command, raise_exception_on_error=True, env=env
397 )
398 # Delete clusterrolebinding and serviceaccount.
399 # Ignore if errors for backward compatibility
400 command = (
401 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
402 "io/osm-tiller-cluster-rule"
403 ).format(self.kubectl_command, paths["kube_config"])
404 output, _rc = await self._local_async_exec(
405 command=command, raise_exception_on_error=False, env=env
406 )
407 command = (
408 "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
409 self.kubectl_command,
410 paths["kube_config"],
411 namespace,
412 self.service_account,
413 )
414 )
415 output, _rc = await self._local_async_exec(
416 command=command, raise_exception_on_error=False, env=env
417 )
418
419 else:
420 self.log.debug("namespace not found")
421
422 async def _instances_list(self, cluster_id):
423
424 # init paths, env
425 paths, env = self._init_paths_env(
426 cluster_name=cluster_id, create_if_not_exist=True
427 )
428
429 command = "{} list --output yaml".format(self._helm_command)
430
431 output, _rc = await self._local_async_exec(
432 command=command, raise_exception_on_error=True, env=env
433 )
434
435 if output and len(output) > 0:
436 # parse yaml and update keys to lower case to unify with helm3
437 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
438 new_instances = []
439 for instance in instances:
440 new_instance = dict((k.lower(), v) for k, v in instance.items())
441 new_instances.append(new_instance)
442 return new_instances
443 else:
444 return []
445
446 def _get_inspect_command(
447 self, show_command: str, kdu_model: str, repo_str: str, version: str
448 ):
449 inspect_command = "{} inspect {} {}{} {}".format(
450 self._helm_command, show_command, kdu_model, repo_str, version
451 )
452 return inspect_command
453
454 async def _status_kdu(
455 self,
456 cluster_id: str,
457 kdu_instance: str,
458 namespace: str = None,
459 show_error_log: bool = False,
460 return_text: bool = False,
461 ):
462
463 self.log.debug(
464 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
465 )
466
467 # init config, env
468 paths, env = self._init_paths_env(
469 cluster_name=cluster_id, create_if_not_exist=True
470 )
471 command = ("env KUBECONFIG={} {} status {} --output yaml").format(
472 paths["kube_config"], self._helm_command, kdu_instance
473 )
474 output, rc = await self._local_async_exec(
475 command=command,
476 raise_exception_on_error=True,
477 show_error_log=show_error_log,
478 env=env,
479 )
480
481 if return_text:
482 return str(output)
483
484 if rc != 0:
485 return None
486
487 data = yaml.load(output, Loader=yaml.SafeLoader)
488
489 # remove field 'notes'
490 try:
491 del data.get("info").get("status")["notes"]
492 except KeyError:
493 pass
494
495 # parse field 'resources'
496 try:
497 resources = str(data.get("info").get("status").get("resources"))
498 resource_table = self._output_to_table(resources)
499 data.get("info").get("status")["resources"] = resource_table
500 except Exception:
501 pass
502
503 # set description to lowercase (unify with helm3)
504 try:
505 data.get("info")["description"] = data.get("info").pop("Description")
506 except KeyError:
507 pass
508
509 return data
510
511 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
512 repo_ids = []
513 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
514 cluster = self.db.get_one("k8sclusters", cluster_filter)
515 if cluster:
516 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
517 return repo_ids
518 else:
519 raise K8sException(
520 "k8cluster with helm-id : {} not found".format(cluster_uuid)
521 )
522
523 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
524 # init config, env
525 paths, env = self._init_paths_env(
526 cluster_name=cluster_id, create_if_not_exist=True
527 )
528
529 status = await self._status_kdu(
530 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
531 )
532
533 # extract info.status.resources-> str
534 # format:
535 # ==> v1/Deployment
536 # NAME READY UP-TO-DATE AVAILABLE AGE
537 # halting-horse-mongodb 0/1 1 0 0s
538 # halting-petit-mongodb 1/1 1 0 0s
539 # blank line
540 resources = K8sHelmBaseConnector._get_deep(
541 status, ("info", "status", "resources")
542 )
543
544 # convert to table
545 resources = K8sHelmBaseConnector._output_to_table(resources)
546
547 num_lines = len(resources)
548 index = 0
549 ready = True
550 while index < num_lines:
551 try:
552 line1 = resources[index]
553 index += 1
554 # find '==>' in column 0
555 if line1[0] == "==>":
556 line2 = resources[index]
557 index += 1
558 # find READY in column 1
559 if line2[1] == "READY":
560 # read next lines
561 line3 = resources[index]
562 index += 1
563 while len(line3) > 1 and index < num_lines:
564 ready_value = line3[1]
565 parts = ready_value.split(sep="/")
566 current = int(parts[0])
567 total = int(parts[1])
568 if current < total:
569 self.log.debug("NOT READY:\n {}".format(line3))
570 ready = False
571 line3 = resources[index]
572 index += 1
573
574 except Exception:
575 pass
576
577 return ready
578
579 def _get_install_command(
580 self,
581 kdu_model,
582 kdu_instance,
583 namespace,
584 params_str,
585 version,
586 atomic,
587 timeout,
588 kubeconfig,
589 ) -> str:
590
591 timeout_str = ""
592 if timeout:
593 timeout_str = "--timeout {}".format(timeout)
594
595 # atomic
596 atomic_str = ""
597 if atomic:
598 atomic_str = "--atomic"
599 # namespace
600 namespace_str = ""
601 if namespace:
602 namespace_str = "--namespace {}".format(namespace)
603
604 # version
605 version_str = ""
606 if version:
607 version_str = version_str = "--version {}".format(version)
608
609 command = (
610 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
611 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
612 kubeconfig=kubeconfig,
613 helm=self._helm_command,
614 atomic=atomic_str,
615 params=params_str,
616 timeout=timeout_str,
617 name=kdu_instance,
618 ns=namespace_str,
619 model=kdu_model,
620 ver=version_str,
621 )
622 )
623 return command
624
625 def _get_upgrade_command(
626 self,
627 kdu_model,
628 kdu_instance,
629 namespace,
630 params_str,
631 version,
632 atomic,
633 timeout,
634 kubeconfig,
635 ) -> str:
636
637 timeout_str = ""
638 if timeout:
639 timeout_str = "--timeout {}".format(timeout)
640
641 # atomic
642 atomic_str = ""
643 if atomic:
644 atomic_str = "--atomic"
645
646 # version
647 version_str = ""
648 if version:
649 version_str = "--version {}".format(version)
650
651 command = (
652 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"
653 ).format(
654 kubeconfig=kubeconfig,
655 helm=self._helm_command,
656 atomic=atomic_str,
657 params=params_str,
658 timeout=timeout_str,
659 name=kdu_instance,
660 model=kdu_model,
661 ver=version_str,
662 )
663 return command
664
665 def _get_rollback_command(
666 self, kdu_instance, namespace, revision, kubeconfig
667 ) -> str:
668 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
669 kubeconfig, self._helm_command, kdu_instance, revision
670 )
671
672 def _get_uninstall_command(
673 self, kdu_instance: str, namespace: str, kubeconfig: str
674 ) -> str:
675 return "env KUBECONFIG={} {} delete --purge {}".format(
676 kubeconfig, self._helm_command, kdu_instance
677 )