Update _split_version to check if the kdu_model is a reference or a file
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 from typing import Union
24 import os
25 import yaml
26
27 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
28 from n2vc.exceptions import K8sException
29
30
31 class K8sHelmConnector(K8sHelmBaseConnector):
32
33 """
34 ####################################################################################
35 ################################### P U B L I C ####################################
36 ####################################################################################
37 """
38
39 def __init__(
40 self,
41 fs: object,
42 db: object,
43 kubectl_command: str = "/usr/bin/kubectl",
44 helm_command: str = "/usr/bin/helm",
45 log: object = None,
46 on_update_db=None,
47 ):
48 """
49 Initializes helm connector for helm v2
50
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
55 :param log: logger
56 :param on_update_db: callback called when k8s connector updates database
57 """
58
59 # parent class
60 K8sHelmBaseConnector.__init__(
61 self,
62 db=db,
63 log=log,
64 fs=fs,
65 kubectl_command=kubectl_command,
66 helm_command=helm_command,
67 on_update_db=on_update_db,
68 )
69
70 self.log.info("Initializing K8S Helm2 connector")
71
72 # initialize helm client-only
73 self.log.debug("Initializing helm client-only...")
74 command = "{} init --client-only {} ".format(
75 self._helm_command,
76 "--stable-repo-url {}".format(self._stable_repo_url)
77 if self._stable_repo_url
78 else "--skip-repos",
79 )
80 try:
81 asyncio.ensure_future(
82 self._local_async_exec(command=command, raise_exception_on_error=False)
83 )
84 # loop = asyncio.get_event_loop()
85 # loop.run_until_complete(self._local_async_exec(command=command,
86 # raise_exception_on_error=False))
87 except Exception as e:
88 self.warning(
89 msg="helm init failed (it was already initialized): {}".format(e)
90 )
91
92 self.log.info("K8S Helm2 connector initialized")
93
94 async def install(
95 self,
96 cluster_uuid: str,
97 kdu_model: str,
98 kdu_instance: str,
99 atomic: bool = True,
100 timeout: float = 300,
101 params: dict = None,
102 db_dict: dict = None,
103 kdu_name: str = None,
104 namespace: str = None,
105 **kwargs,
106 ):
107 """
108 Deploys of a new KDU instance. It would implicitly rely on the `install` call
109 to deploy the Chart/Bundle properly parametrized (in practice, this call would
110 happen before any _initial-config-primitive_of the VNF is called).
111
112 :param cluster_uuid: UUID of a K8s cluster known by OSM
113 :param kdu_model: chart/reference (string), which can be either
114 of these options:
115 - a name of chart available via the repos known by OSM
116 (e.g. stable/openldap, stable/openldap:1.2.4)
117 - a path to a packaged chart (e.g. mychart.tgz)
118 - a path to an unpacked chart directory or a URL (e.g. mychart)
119 :param kdu_instance: Kdu instance name
120 :param atomic: If set, installation process purges chart/bundle on fail, also
121 will wait until all the K8s objects are active
122 :param timeout: Time in seconds to wait for the install of the chart/bundle
123 (defaults to Helm default timeout: 300s)
124 :param params: dictionary of key-value pairs for instantiation parameters
125 (overriding default values)
126 :param dict db_dict: where to write into database when the status changes.
127 It contains a dict with {collection: <str>, filter: {},
128 path: <str>},
129 e.g. {collection: "nsrs", filter:
130 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
131 :param kdu_name: Name of the KDU instance to be installed
132 :param namespace: K8s namespace to use for the KDU instance
133 :param kwargs: Additional parameters (None yet)
134 :return: True if successful
135 """
136 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
137
138 # sync local dir
139 self.fs.sync(from_path=cluster_uuid)
140
141 # init env, paths
142 paths, env = self._init_paths_env(
143 cluster_name=cluster_uuid, create_if_not_exist=True
144 )
145
146 await self._install_impl(
147 cluster_uuid,
148 kdu_model,
149 paths,
150 env,
151 kdu_instance,
152 atomic=atomic,
153 timeout=timeout,
154 params=params,
155 db_dict=db_dict,
156 kdu_name=kdu_name,
157 namespace=namespace,
158 )
159
160 # sync fs
161 self.fs.reverse_sync(from_path=cluster_uuid)
162
163 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
164 return True
165
166 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
167
168 self.log.debug(
169 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
170 )
171
172 return await self._exec_inspect_command(
173 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
174 )
175
176 """
177 ####################################################################################
178 ################################### P R I V A T E ##################################
179 ####################################################################################
180 """
181
182 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
183 """
184 Creates and returns base cluster and kube dirs and returns them.
185 Also created helm3 dirs according to new directory specification, paths are
186 returned and also environment variables that must be provided to execute commands
187
188 Helm 2 directory specification uses helm_home dir:
189
190 The variables assigned for this paths are:
191 - Helm hone: $HELM_HOME
192 - helm kubeconfig: $KUBECONFIG
193
194 :param cluster_name: cluster_name
195 :return: Dictionary with config_paths and dictionary with helm environment variables
196 """
197 base = self.fs.path
198 if base.endswith("/") or base.endswith("\\"):
199 base = base[:-1]
200
201 # base dir for cluster
202 cluster_dir = base + "/" + cluster_name
203
204 # kube dir
205 kube_dir = cluster_dir + "/" + ".kube"
206 if create_if_not_exist and not os.path.exists(kube_dir):
207 self.log.debug("Creating dir {}".format(kube_dir))
208 os.makedirs(kube_dir)
209
210 # helm home dir
211 helm_dir = cluster_dir + "/" + ".helm"
212 if create_if_not_exist and not os.path.exists(helm_dir):
213 self.log.debug("Creating dir {}".format(helm_dir))
214 os.makedirs(helm_dir)
215
216 config_filename = kube_dir + "/config"
217
218 # 2 - Prepare dictionary with paths
219 paths = {
220 "kube_dir": kube_dir,
221 "kube_config": config_filename,
222 "cluster_dir": cluster_dir,
223 "helm_dir": helm_dir,
224 }
225
226 for file_name, file in paths.items():
227 if "dir" in file_name and not os.path.exists(file):
228 err_msg = "{} dir does not exist".format(file)
229 self.log.error(err_msg)
230 raise K8sException(err_msg)
231
232 # 3 - Prepare environment variables
233 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
234
235 return paths, env
236
237 async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
238
239 # init config, env
240 paths, env = self._init_paths_env(
241 cluster_name=cluster_id, create_if_not_exist=True
242 )
243
244 command1 = "env KUBECONFIG={} {} get manifest {} ".format(
245 kubeconfig, self._helm_command, kdu_instance
246 )
247 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
248 output, _rc = await self._local_async_exec_pipe(
249 command1, command2, env=env, raise_exception_on_error=True
250 )
251 services = self._parse_services(output)
252
253 return services
254
255 async def _cluster_init(
256 self, cluster_id: str, namespace: str, paths: dict, env: dict
257 ):
258 """
259 Implements the helm version dependent cluster initialization:
260 For helm2 it initialized tiller environment if needed
261 """
262
263 # check if tiller pod is up in cluster
264 command = "{} --kubeconfig={} --namespace={} get deployments".format(
265 self.kubectl_command, paths["kube_config"], namespace
266 )
267 output, _rc = await self._local_async_exec(
268 command=command, raise_exception_on_error=True, env=env
269 )
270
271 output_table = self._output_to_table(output=output)
272
273 # find 'tiller' pod in all pods
274 already_initialized = False
275 try:
276 for row in output_table:
277 if row[0].startswith("tiller-deploy"):
278 already_initialized = True
279 break
280 except Exception:
281 pass
282
283 # helm init
284 n2vc_installed_sw = False
285 if not already_initialized:
286 self.log.info(
287 "Initializing helm in client and server: {}".format(cluster_id)
288 )
289 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
290 self.kubectl_command, paths["kube_config"], self.service_account
291 )
292 _, _rc = await self._local_async_exec(
293 command=command, raise_exception_on_error=False, env=env
294 )
295
296 command = (
297 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
298 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
299 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
300 _, _rc = await self._local_async_exec(
301 command=command, raise_exception_on_error=False, env=env
302 )
303
304 command = (
305 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
306 " {} init"
307 ).format(
308 self._helm_command,
309 paths["kube_config"],
310 namespace,
311 paths["helm_dir"],
312 self.service_account,
313 "--stable-repo-url {}".format(self._stable_repo_url)
314 if self._stable_repo_url
315 else "--skip-repos",
316 )
317 _, _rc = await self._local_async_exec(
318 command=command, raise_exception_on_error=True, env=env
319 )
320 n2vc_installed_sw = True
321 else:
322 # check client helm installation
323 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
324 if not self._check_file_exists(
325 filename=check_file, exception_if_not_exists=False
326 ):
327 self.log.info("Initializing helm in client: {}".format(cluster_id))
328 command = (
329 "{} --kubeconfig={} --tiller-namespace={} "
330 "--home={} init --client-only {} "
331 ).format(
332 self._helm_command,
333 paths["kube_config"],
334 namespace,
335 paths["helm_dir"],
336 "--stable-repo-url {}".format(self._stable_repo_url)
337 if self._stable_repo_url
338 else "--skip-repos",
339 )
340 output, _rc = await self._local_async_exec(
341 command=command, raise_exception_on_error=True, env=env
342 )
343 else:
344 self.log.info("Helm client already initialized")
345
346 repo_list = await self.repo_list(cluster_id)
347 for repo in repo_list:
348 if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
349 self.log.debug("Add new stable repo url: {}")
350 await self.repo_remove(cluster_id, "stable")
351 if self._stable_repo_url:
352 await self.repo_add(cluster_id, "stable", self._stable_repo_url)
353 break
354
355 return n2vc_installed_sw
356
357 async def _uninstall_sw(self, cluster_id: str, namespace: str):
358 # uninstall Tiller if necessary
359
360 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
361
362 # init paths, env
363 paths, env = self._init_paths_env(
364 cluster_name=cluster_id, create_if_not_exist=True
365 )
366
367 if not namespace:
368 # find namespace for tiller pod
369 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
370 self.kubectl_command, paths["kube_config"]
371 )
372 output, _rc = await self._local_async_exec(
373 command=command, raise_exception_on_error=False, env=env
374 )
375 output_table = self._output_to_table(output=output)
376 namespace = None
377 for r in output_table:
378 try:
379 if "tiller-deploy" in r[1]:
380 namespace = r[0]
381 break
382 except Exception:
383 pass
384 else:
385 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
386 self.log.error(msg)
387
388 self.log.debug("namespace for tiller: {}".format(namespace))
389
390 if namespace:
391 # uninstall tiller from cluster
392 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
393 command = "{} --kubeconfig={} --home={} reset".format(
394 self._helm_command, paths["kube_config"], paths["helm_dir"]
395 )
396 self.log.debug("resetting: {}".format(command))
397 output, _rc = await self._local_async_exec(
398 command=command, raise_exception_on_error=True, env=env
399 )
400 # Delete clusterrolebinding and serviceaccount.
401 # Ignore if errors for backward compatibility
402 command = (
403 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
404 "io/osm-tiller-cluster-rule"
405 ).format(self.kubectl_command, paths["kube_config"])
406 output, _rc = await self._local_async_exec(
407 command=command, raise_exception_on_error=False, env=env
408 )
409 command = (
410 "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
411 self.kubectl_command,
412 paths["kube_config"],
413 namespace,
414 self.service_account,
415 )
416 )
417 output, _rc = await self._local_async_exec(
418 command=command, raise_exception_on_error=False, env=env
419 )
420
421 else:
422 self.log.debug("namespace not found")
423
424 async def _instances_list(self, cluster_id):
425
426 # init paths, env
427 paths, env = self._init_paths_env(
428 cluster_name=cluster_id, create_if_not_exist=True
429 )
430
431 command = "{} list --output yaml".format(self._helm_command)
432
433 output, _rc = await self._local_async_exec(
434 command=command, raise_exception_on_error=True, env=env
435 )
436
437 if output and len(output) > 0:
438 # parse yaml and update keys to lower case to unify with helm3
439 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
440 new_instances = []
441 for instance in instances:
442 new_instance = dict((k.lower(), v) for k, v in instance.items())
443 new_instances.append(new_instance)
444 return new_instances
445 else:
446 return []
447
448 def _get_inspect_command(
449 self, show_command: str, kdu_model: str, repo_str: str, version: str
450 ):
451 inspect_command = "{} inspect {} {}{} {}".format(
452 self._helm_command, show_command, kdu_model, repo_str, version
453 )
454 return inspect_command
455
456 def _get_get_command(
457 self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
458 ):
459 get_command = "env KUBECONFIG={} {} get {} {} --output yaml".format(
460 kubeconfig, self._helm_command, get_command, kdu_instance
461 )
462 return get_command
463
464 async def _status_kdu(
465 self,
466 cluster_id: str,
467 kdu_instance: str,
468 namespace: str = None,
469 yaml_format: bool = False,
470 show_error_log: bool = False,
471 ) -> Union[str, dict]:
472
473 self.log.debug(
474 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
475 )
476
477 # init config, env
478 paths, env = self._init_paths_env(
479 cluster_name=cluster_id, create_if_not_exist=True
480 )
481 command = ("env KUBECONFIG={} {} status {} --output yaml").format(
482 paths["kube_config"], self._helm_command, kdu_instance
483 )
484 output, rc = await self._local_async_exec(
485 command=command,
486 raise_exception_on_error=True,
487 show_error_log=show_error_log,
488 env=env,
489 )
490
491 if yaml_format:
492 return str(output)
493
494 if rc != 0:
495 return None
496
497 data = yaml.load(output, Loader=yaml.SafeLoader)
498
499 # remove field 'notes'
500 try:
501 del data.get("info").get("status")["notes"]
502 except KeyError:
503 pass
504
505 # parse the manifest to a list of dictionaries
506 if "manifest" in data:
507 manifest_str = data.get("manifest")
508 manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
509
510 data["manifest"] = []
511 for doc in manifest_docs:
512 data["manifest"].append(doc)
513
514 # parse field 'resources'
515 try:
516 resources = str(data.get("info").get("status").get("resources"))
517 resource_table = self._output_to_table(resources)
518 data.get("info").get("status")["resources"] = resource_table
519 except Exception:
520 pass
521
522 # set description to lowercase (unify with helm3)
523 try:
524 data.get("info")["description"] = data.get("info").pop("Description")
525 except KeyError:
526 pass
527
528 return data
529
530 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
531 repo_ids = []
532 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
533 cluster = self.db.get_one("k8sclusters", cluster_filter)
534 if cluster:
535 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
536 return repo_ids
537 else:
538 raise K8sException(
539 "k8cluster with helm-id : {} not found".format(cluster_uuid)
540 )
541
542 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
543 # init config, env
544 paths, env = self._init_paths_env(
545 cluster_name=cluster_id, create_if_not_exist=True
546 )
547
548 status = await self._status_kdu(
549 cluster_id=cluster_id, kdu_instance=kdu_instance, yaml_format=False
550 )
551
552 # extract info.status.resources-> str
553 # format:
554 # ==> v1/Deployment
555 # NAME READY UP-TO-DATE AVAILABLE AGE
556 # halting-horse-mongodb 0/1 1 0 0s
557 # halting-petit-mongodb 1/1 1 0 0s
558 # blank line
559 resources = K8sHelmBaseConnector._get_deep(
560 status, ("info", "status", "resources")
561 )
562
563 # convert to table
564 resources = K8sHelmBaseConnector._output_to_table(resources)
565
566 num_lines = len(resources)
567 index = 0
568 ready = True
569 while index < num_lines:
570 try:
571 line1 = resources[index]
572 index += 1
573 # find '==>' in column 0
574 if line1[0] == "==>":
575 line2 = resources[index]
576 index += 1
577 # find READY in column 1
578 if line2[1] == "READY":
579 # read next lines
580 line3 = resources[index]
581 index += 1
582 while len(line3) > 1 and index < num_lines:
583 ready_value = line3[1]
584 parts = ready_value.split(sep="/")
585 current = int(parts[0])
586 total = int(parts[1])
587 if current < total:
588 self.log.debug("NOT READY:\n {}".format(line3))
589 ready = False
590 line3 = resources[index]
591 index += 1
592
593 except Exception:
594 pass
595
596 return ready
597
598 def _get_install_command(
599 self,
600 kdu_model,
601 kdu_instance,
602 namespace,
603 params_str,
604 version,
605 atomic,
606 timeout,
607 kubeconfig,
608 ) -> str:
609
610 timeout_str = ""
611 if timeout:
612 timeout_str = "--timeout {}".format(timeout)
613
614 # atomic
615 atomic_str = ""
616 if atomic:
617 atomic_str = "--atomic"
618 # namespace
619 namespace_str = ""
620 if namespace:
621 namespace_str = "--namespace {}".format(namespace)
622
623 # version
624 version_str = ""
625 if version:
626 version_str = "--version {}".format(version)
627
628 command = (
629 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
630 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
631 kubeconfig=kubeconfig,
632 helm=self._helm_command,
633 atomic=atomic_str,
634 params=params_str,
635 timeout=timeout_str,
636 name=kdu_instance,
637 ns=namespace_str,
638 model=kdu_model,
639 ver=version_str,
640 )
641 )
642 return command
643
644 def _get_upgrade_scale_command(
645 self,
646 kdu_model: str,
647 kdu_instance: str,
648 namespace: str,
649 scale: int,
650 version: str,
651 atomic: bool,
652 replica_str: str,
653 timeout: float,
654 resource_name: str,
655 kubeconfig: str,
656 ) -> str:
657
658 timeout_str = ""
659 if timeout:
660 timeout_str = "--timeout {}s".format(timeout)
661
662 # atomic
663 atomic_str = ""
664 if atomic:
665 atomic_str = "--atomic"
666
667 # version
668 version_str = ""
669 if version:
670 version_str = "--version {}".format(version)
671
672 # scale
673 if resource_name:
674 scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
675 else:
676 scale_dict = {replica_str: scale}
677
678 scale_str = self._params_to_set_option(scale_dict)
679
680 command = (
681 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {scale} {timeout} {name} {model} {ver}"
682 ).format(
683 helm=self._helm_command,
684 name=kdu_instance,
685 atomic=atomic_str,
686 scale=scale_str,
687 timeout=timeout_str,
688 model=kdu_model,
689 ver=version_str,
690 kubeconfig=kubeconfig,
691 )
692 return command
693
694 def _get_upgrade_command(
695 self,
696 kdu_model,
697 kdu_instance,
698 namespace,
699 params_str,
700 version,
701 atomic,
702 timeout,
703 kubeconfig,
704 ) -> str:
705
706 timeout_str = ""
707 if timeout:
708 timeout_str = "--timeout {}".format(timeout)
709
710 # atomic
711 atomic_str = ""
712 if atomic:
713 atomic_str = "--atomic"
714
715 # version
716 version_str = ""
717 if version:
718 version_str = "--version {}".format(version)
719
720 command = (
721 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"
722 ).format(
723 kubeconfig=kubeconfig,
724 helm=self._helm_command,
725 atomic=atomic_str,
726 params=params_str,
727 timeout=timeout_str,
728 name=kdu_instance,
729 model=kdu_model,
730 ver=version_str,
731 )
732 return command
733
734 def _get_rollback_command(
735 self, kdu_instance, namespace, revision, kubeconfig
736 ) -> str:
737 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
738 kubeconfig, self._helm_command, kdu_instance, revision
739 )
740
741 def _get_uninstall_command(
742 self, kdu_instance: str, namespace: str, kubeconfig: str
743 ) -> str:
744 return "env KUBECONFIG={} {} delete --purge {}".format(
745 kubeconfig, self._helm_command, kdu_instance
746 )