Feature 10239: Distributed VCA
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelmConnector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm",
44 log: object = None,
45 on_update_db=None,
46 vca_config: dict = None,
47 ):
48 """
49 Initializes helm connector for helm v2
50
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
55 :param log: logger
56 :param on_update_db: callback called when k8s connector updates database
57 """
58
59 # parent class
60 K8sHelmBaseConnector.__init__(
61 self,
62 db=db,
63 log=log,
64 fs=fs,
65 kubectl_command=kubectl_command,
66 helm_command=helm_command,
67 on_update_db=on_update_db,
68 vca_config=vca_config,
69 )
70
71 self.log.info("Initializing K8S Helm2 connector")
72
73 # initialize helm client-only
74 self.log.debug("Initializing helm client-only...")
75 command = "{} init --client-only --stable-repo-url {} ".format(
76 self._helm_command, self._stable_repo_url)
77 try:
78 asyncio.ensure_future(
79 self._local_async_exec(command=command, raise_exception_on_error=False)
80 )
81 # loop = asyncio.get_event_loop()
82 # loop.run_until_complete(self._local_async_exec(command=command,
83 # raise_exception_on_error=False))
84 except Exception as e:
85 self.warning(
86 msg="helm init failed (it was already initialized): {}".format(e)
87 )
88
89 self.log.info("K8S Helm2 connector initialized")
90
91 async def install(
92 self,
93 cluster_uuid: str,
94 kdu_model: str,
95 kdu_instance: str,
96 atomic: bool = True,
97 timeout: float = 300,
98 params: dict = None,
99 db_dict: dict = None,
100 kdu_name: str = None,
101 namespace: str = None,
102 **kwargs,
103 ):
104 """
105 Deploys of a new KDU instance. It would implicitly rely on the `install` call
106 to deploy the Chart/Bundle properly parametrized (in practice, this call would
107 happen before any _initial-config-primitive_of the VNF is called).
108
109 :param cluster_uuid: UUID of a K8s cluster known by OSM
110 :param kdu_model: chart/ reference (string), which can be either
111 of these options:
112 - a name of chart available via the repos known by OSM
113 - a path to a packaged chart
114 - a path to an unpacked chart directory or a URL
115 :param kdu_instance: Kdu instance name
116 :param atomic: If set, installation process purges chart/bundle on fail, also
117 will wait until all the K8s objects are active
118 :param timeout: Time in seconds to wait for the install of the chart/bundle
119 (defaults to Helm default timeout: 300s)
120 :param params: dictionary of key-value pairs for instantiation parameters
121 (overriding default values)
122 :param dict db_dict: where to write into database when the status changes.
123 It contains a dict with {collection: <str>, filter: {},
124 path: <str>},
125 e.g. {collection: "nsrs", filter:
126 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
127 :param kdu_name: Name of the KDU instance to be installed
128 :param namespace: K8s namespace to use for the KDU instance
129 :param kwargs: Additional parameters (None yet)
130 :return: True if successful
131 """
132 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
133 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
134
135 # sync local dir
136 self.fs.sync(from_path=cluster_id)
137
138 # init env, paths
139 paths, env = self._init_paths_env(
140 cluster_name=cluster_id, create_if_not_exist=True
141 )
142
143 await self._install_impl(
144 cluster_id,
145 kdu_model,
146 paths,
147 env,
148 kdu_instance,
149 atomic=atomic,
150 timeout=timeout,
151 params=params,
152 db_dict=db_dict,
153 kdu_name=kdu_name,
154 namespace=namespace,
155 )
156
157 # sync fs
158 self.fs.reverse_sync(from_path=cluster_id)
159
160 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
161 return True
162
163 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
164
165 self.log.debug(
166 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
167 )
168
169 return await self._exec_inspect_comand(
170 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
171 )
172
173 """
174 ####################################################################################
175 ################################### P R I V A T E ##################################
176 ####################################################################################
177 """
178
179 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
180 """
181 Creates and returns base cluster and kube dirs and returns them.
182 Also created helm3 dirs according to new directory specification, paths are
183 returned and also environment variables that must be provided to execute commands
184
185 Helm 2 directory specification uses helm_home dir:
186
187 The variables assigned for this paths are:
188 - Helm hone: $HELM_HOME
189 - helm kubeconfig: $KUBECONFIG
190
191 :param cluster_name: cluster_name
192 :return: Dictionary with config_paths and dictionary with helm environment variables
193 """
194 base = self.fs.path
195 if base.endswith("/") or base.endswith("\\"):
196 base = base[:-1]
197
198 # base dir for cluster
199 cluster_dir = base + "/" + cluster_name
200
201 # kube dir
202 kube_dir = cluster_dir + "/" + ".kube"
203 if create_if_not_exist and not os.path.exists(kube_dir):
204 self.log.debug("Creating dir {}".format(kube_dir))
205 os.makedirs(kube_dir)
206
207 # helm home dir
208 helm_dir = cluster_dir + "/" + ".helm"
209 if create_if_not_exist and not os.path.exists(helm_dir):
210 self.log.debug("Creating dir {}".format(helm_dir))
211 os.makedirs(helm_dir)
212
213 config_filename = kube_dir + "/config"
214
215 # 2 - Prepare dictionary with paths
216 paths = {
217 "kube_dir": kube_dir,
218 "kube_config": config_filename,
219 "cluster_dir": cluster_dir,
220 "helm_dir": helm_dir,
221 }
222
223 for file_name, file in paths.items():
224 if "dir" in file_name and not os.path.exists(file):
225 err_msg = "{} dir does not exist".format(file)
226 self.log.error(err_msg)
227 raise K8sException(err_msg)
228
229 # 3 - Prepare environment variables
230 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
231
232 return paths, env
233
234 async def _get_services(self, cluster_id, kdu_instance, namespace):
235
236 # init config, env
237 paths, env = self._init_paths_env(
238 cluster_name=cluster_id, create_if_not_exist=True
239 )
240
241 command1 = "{} get manifest {} ".format(self._helm_command, kdu_instance)
242 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
243 output, _rc = await self._local_async_exec_pipe(
244 command1, command2, env=env, raise_exception_on_error=True
245 )
246 services = self._parse_services(output)
247
248 return services
249
250 async def _cluster_init(self, cluster_id: str, namespace: str,
251 paths: dict, env: dict):
252 """
253 Implements the helm version dependent cluster initialization:
254 For helm2 it initialized tiller environment if needed
255 """
256
257 # check if tiller pod is up in cluster
258 command = "{} --kubeconfig={} --namespace={} get deployments".format(
259 self.kubectl_command, paths["kube_config"], namespace
260 )
261 output, _rc = await self._local_async_exec(
262 command=command, raise_exception_on_error=True, env=env
263 )
264
265 output_table = self._output_to_table(output=output)
266
267 # find 'tiller' pod in all pods
268 already_initialized = False
269 try:
270 for row in output_table:
271 if row[0].startswith("tiller-deploy"):
272 already_initialized = True
273 break
274 except Exception:
275 pass
276
277 # helm init
278 n2vc_installed_sw = False
279 if not already_initialized:
280 self.log.info(
281 "Initializing helm in client and server: {}".format(cluster_id)
282 )
283 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
284 self.kubectl_command, paths["kube_config"], self.service_account
285 )
286 _, _rc = await self._local_async_exec(
287 command=command, raise_exception_on_error=False, env=env
288 )
289
290 command = (
291 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
292 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
293 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
294 _, _rc = await self._local_async_exec(
295 command=command, raise_exception_on_error=False, env=env
296 )
297
298 command = (
299 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
300 " --stable-repo-url {} init"
301 ).format(
302 self._helm_command,
303 paths["kube_config"],
304 namespace,
305 paths["helm_dir"],
306 self.service_account,
307 self._stable_repo_url
308 )
309 _, _rc = await self._local_async_exec(
310 command=command, raise_exception_on_error=True, env=env
311 )
312 n2vc_installed_sw = True
313 else:
314 # check client helm installation
315 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
316 if not self._check_file_exists(
317 filename=check_file, exception_if_not_exists=False
318 ):
319 self.log.info("Initializing helm in client: {}".format(cluster_id))
320 command = (
321 "{} --kubeconfig={} --tiller-namespace={} "
322 "--home={} init --client-only --stable-repo-url {} "
323 ).format(
324 self._helm_command,
325 paths["kube_config"],
326 namespace,
327 paths["helm_dir"],
328 self._stable_repo_url,
329 )
330 output, _rc = await self._local_async_exec(
331 command=command, raise_exception_on_error=True, env=env
332 )
333 else:
334 self.log.info("Helm client already initialized")
335
336 # remove old stable repo and add new one
337 cluster_uuid = "{}:{}".format(namespace, cluster_id)
338 repo_list = await self.repo_list(cluster_uuid)
339 for repo in repo_list:
340 if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
341 self.log.debug("Add new stable repo url: {}")
342 await self.repo_remove(cluster_uuid,
343 "stable")
344 await self.repo_add(cluster_uuid,
345 "stable",
346 self._stable_repo_url)
347 break
348
349 return n2vc_installed_sw
350
351 async def _uninstall_sw(self, cluster_id: str, namespace: str):
352 # uninstall Tiller if necessary
353
354 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
355
356 # init paths, env
357 paths, env = self._init_paths_env(
358 cluster_name=cluster_id, create_if_not_exist=True
359 )
360
361 if not namespace:
362 # find namespace for tiller pod
363 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
364 self.kubectl_command, paths["kube_config"]
365 )
366 output, _rc = await self._local_async_exec(
367 command=command, raise_exception_on_error=False, env=env
368 )
369 output_table = self._output_to_table(output=output)
370 namespace = None
371 for r in output_table:
372 try:
373 if "tiller-deploy" in r[1]:
374 namespace = r[0]
375 break
376 except Exception:
377 pass
378 else:
379 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
380 self.log.error(msg)
381
382 self.log.debug("namespace for tiller: {}".format(namespace))
383
384 if namespace:
385 # uninstall tiller from cluster
386 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
387 command = "{} --kubeconfig={} --home={} reset".format(
388 self._helm_command, paths["kube_config"], paths["helm_dir"]
389 )
390 self.log.debug("resetting: {}".format(command))
391 output, _rc = await self._local_async_exec(
392 command=command, raise_exception_on_error=True, env=env
393 )
394 # Delete clusterrolebinding and serviceaccount.
395 # Ignore if errors for backward compatibility
396 command = (
397 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
398 "io/osm-tiller-cluster-rule"
399 ).format(self.kubectl_command, paths["kube_config"])
400 output, _rc = await self._local_async_exec(
401 command=command, raise_exception_on_error=False, env=env
402 )
403 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
404 self.kubectl_command, paths["kube_config"], self.service_account
405 )
406 output, _rc = await self._local_async_exec(
407 command=command, raise_exception_on_error=False, env=env
408 )
409
410 else:
411 self.log.debug("namespace not found")
412
413 async def _instances_list(self, cluster_id):
414
415 # init paths, env
416 paths, env = self._init_paths_env(
417 cluster_name=cluster_id, create_if_not_exist=True
418 )
419
420 command = "{} list --output yaml".format(self._helm_command)
421
422 output, _rc = await self._local_async_exec(
423 command=command, raise_exception_on_error=True, env=env
424 )
425
426 if output and len(output) > 0:
427 # parse yaml and update keys to lower case to unify with helm3
428 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
429 new_instances = []
430 for instance in instances:
431 new_instance = dict((k.lower(), v) for k, v in instance.items())
432 new_instances.append(new_instance)
433 return new_instances
434 else:
435 return []
436
437 def _get_inspect_command(self, show_command: str, kdu_model: str, repo_str: str,
438 version: str):
439 inspect_command = "{} inspect {} {}{} {}".format(
440 self._helm_command, show_command, kdu_model, repo_str, version
441 )
442 return inspect_command
443
444 async def _status_kdu(
445 self,
446 cluster_id: str,
447 kdu_instance: str,
448 namespace: str = None,
449 show_error_log: bool = False,
450 return_text: bool = False,
451 ):
452
453 self.log.debug(
454 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
455 )
456
457 # init config, env
458 paths, env = self._init_paths_env(
459 cluster_name=cluster_id, create_if_not_exist=True
460 )
461 command = "{} status {} --output yaml".format(self._helm_command, kdu_instance)
462 output, rc = await self._local_async_exec(
463 command=command,
464 raise_exception_on_error=True,
465 show_error_log=show_error_log,
466 env=env,
467 )
468
469 if return_text:
470 return str(output)
471
472 if rc != 0:
473 return None
474
475 data = yaml.load(output, Loader=yaml.SafeLoader)
476
477 # remove field 'notes'
478 try:
479 del data.get("info").get("status")["notes"]
480 except KeyError:
481 pass
482
483 # parse field 'resources'
484 try:
485 resources = str(data.get("info").get("status").get("resources"))
486 resource_table = self._output_to_table(resources)
487 data.get("info").get("status")["resources"] = resource_table
488 except Exception:
489 pass
490
491 # set description to lowercase (unify with helm3)
492 try:
493 data.get("info")["description"] = data.get("info").pop("Description")
494 except KeyError:
495 pass
496
497 return data
498
499 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
500 repo_ids = []
501 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
502 cluster = self.db.get_one("k8sclusters", cluster_filter)
503 if cluster:
504 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
505 return repo_ids
506 else:
507 raise K8sException(
508 "k8cluster with helm-id : {} not found".format(cluster_uuid)
509 )
510
511 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
512
513 status = await self._status_kdu(
514 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
515 )
516
517 # extract info.status.resources-> str
518 # format:
519 # ==> v1/Deployment
520 # NAME READY UP-TO-DATE AVAILABLE AGE
521 # halting-horse-mongodb 0/1 1 0 0s
522 # halting-petit-mongodb 1/1 1 0 0s
523 # blank line
524 resources = K8sHelmBaseConnector._get_deep(
525 status, ("info", "status", "resources")
526 )
527
528 # convert to table
529 resources = K8sHelmBaseConnector._output_to_table(resources)
530
531 num_lines = len(resources)
532 index = 0
533 ready = True
534 while index < num_lines:
535 try:
536 line1 = resources[index]
537 index += 1
538 # find '==>' in column 0
539 if line1[0] == "==>":
540 line2 = resources[index]
541 index += 1
542 # find READY in column 1
543 if line2[1] == "READY":
544 # read next lines
545 line3 = resources[index]
546 index += 1
547 while len(line3) > 1 and index < num_lines:
548 ready_value = line3[1]
549 parts = ready_value.split(sep="/")
550 current = int(parts[0])
551 total = int(parts[1])
552 if current < total:
553 self.log.debug("NOT READY:\n {}".format(line3))
554 ready = False
555 line3 = resources[index]
556 index += 1
557
558 except Exception:
559 pass
560
561 return ready
562
563 def _get_install_command(
564 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
565 ) -> str:
566
567 timeout_str = ""
568 if timeout:
569 timeout_str = "--timeout {}".format(timeout)
570
571 # atomic
572 atomic_str = ""
573 if atomic:
574 atomic_str = "--atomic"
575 # namespace
576 namespace_str = ""
577 if namespace:
578 namespace_str = "--namespace {}".format(namespace)
579
580 # version
581 version_str = ""
582 if version:
583 version_str = version_str = "--version {}".format(version)
584
585 command = (
586 "{helm} install {atomic} --output yaml "
587 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
588 helm=self._helm_command,
589 atomic=atomic_str,
590 params=params_str,
591 timeout=timeout_str,
592 name=kdu_instance,
593 ns=namespace_str,
594 model=kdu_model,
595 ver=version_str,
596 )
597 )
598 return command
599
600 def _get_upgrade_command(
601 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
602 ) -> str:
603
604 timeout_str = ""
605 if timeout:
606 timeout_str = "--timeout {}".format(timeout)
607
608 # atomic
609 atomic_str = ""
610 if atomic:
611 atomic_str = "--atomic"
612
613 # version
614 version_str = ""
615 if version:
616 version_str = "--version {}".format(version)
617
618 command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"\
619 .format(helm=self._helm_command,
620 atomic=atomic_str,
621 params=params_str,
622 timeout=timeout_str,
623 name=kdu_instance,
624 model=kdu_model,
625 ver=version_str
626 )
627 return command
628
629 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
630 return "{} rollback {} {} --wait".format(
631 self._helm_command, kdu_instance, revision
632 )
633
634 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
635 return "{} delete --purge {}".format(self._helm_command, kdu_instance)