Wait for cloud-init to finish before provisioning
[osm/N2VC.git] / n2vc / k8s_helm_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22 import asyncio
23 import os
24 import yaml
25
26 from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
27 from n2vc.exceptions import K8sException
28
29
30 class K8sHelmConnector(K8sHelmBaseConnector):
31
32 """
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
36 """
37
38 def __init__(
39 self,
40 fs: object,
41 db: object,
42 kubectl_command: str = "/usr/bin/kubectl",
43 helm_command: str = "/usr/bin/helm",
44 log: object = None,
45 on_update_db=None,
46 vca_config: dict = None,
47 ):
48 """
49 Initializes helm connector for helm v2
50
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
55 :param log: logger
56 :param on_update_db: callback called when k8s connector updates database
57 """
58
59 # parent class
60 K8sHelmBaseConnector.__init__(
61 self,
62 db=db,
63 log=log,
64 fs=fs,
65 kubectl_command=kubectl_command,
66 helm_command=helm_command,
67 on_update_db=on_update_db,
68 vca_config=vca_config,
69 )
70
71 self.log.info("Initializing K8S Helm2 connector")
72
73 # initialize helm client-only
74 self.log.debug("Initializing helm client-only...")
75 command = "{} init --client-only --stable-repo-url {} ".format(
76 self._helm_command, self._stable_repo_url
77 )
78 try:
79 asyncio.ensure_future(
80 self._local_async_exec(command=command, raise_exception_on_error=False)
81 )
82 # loop = asyncio.get_event_loop()
83 # loop.run_until_complete(self._local_async_exec(command=command,
84 # raise_exception_on_error=False))
85 except Exception as e:
86 self.warning(
87 msg="helm init failed (it was already initialized): {}".format(e)
88 )
89
90 self.log.info("K8S Helm2 connector initialized")
91
92 async def install(
93 self,
94 cluster_uuid: str,
95 kdu_model: str,
96 kdu_instance: str,
97 atomic: bool = True,
98 timeout: float = 300,
99 params: dict = None,
100 db_dict: dict = None,
101 kdu_name: str = None,
102 namespace: str = None,
103 **kwargs,
104 ):
105 """
106 Deploys of a new KDU instance. It would implicitly rely on the `install` call
107 to deploy the Chart/Bundle properly parametrized (in practice, this call would
108 happen before any _initial-config-primitive_of the VNF is called).
109
110 :param cluster_uuid: UUID of a K8s cluster known by OSM
111 :param kdu_model: chart/ reference (string), which can be either
112 of these options:
113 - a name of chart available via the repos known by OSM
114 - a path to a packaged chart
115 - a path to an unpacked chart directory or a URL
116 :param kdu_instance: Kdu instance name
117 :param atomic: If set, installation process purges chart/bundle on fail, also
118 will wait until all the K8s objects are active
119 :param timeout: Time in seconds to wait for the install of the chart/bundle
120 (defaults to Helm default timeout: 300s)
121 :param params: dictionary of key-value pairs for instantiation parameters
122 (overriding default values)
123 :param dict db_dict: where to write into database when the status changes.
124 It contains a dict with {collection: <str>, filter: {},
125 path: <str>},
126 e.g. {collection: "nsrs", filter:
127 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
128 :param kdu_name: Name of the KDU instance to be installed
129 :param namespace: K8s namespace to use for the KDU instance
130 :param kwargs: Additional parameters (None yet)
131 :return: True if successful
132 """
133 _, cluster_id = self._get_namespace_cluster_id(cluster_uuid)
134 self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_id))
135
136 # sync local dir
137 self.fs.sync(from_path=cluster_id)
138
139 # init env, paths
140 paths, env = self._init_paths_env(
141 cluster_name=cluster_id, create_if_not_exist=True
142 )
143
144 await self._install_impl(
145 cluster_id,
146 kdu_model,
147 paths,
148 env,
149 kdu_instance,
150 atomic=atomic,
151 timeout=timeout,
152 params=params,
153 db_dict=db_dict,
154 kdu_name=kdu_name,
155 namespace=namespace,
156 )
157
158 # sync fs
159 self.fs.reverse_sync(from_path=cluster_id)
160
161 self.log.debug("Returning kdu_instance {}".format(kdu_instance))
162 return True
163
164 async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
165
166 self.log.debug(
167 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
168 )
169
170 return await self._exec_inspect_comand(
171 inspect_command="", kdu_model=kdu_model, repo_url=repo_url
172 )
173
174 """
175 ####################################################################################
176 ################################### P R I V A T E ##################################
177 ####################################################################################
178 """
179
180 def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
181 """
182 Creates and returns base cluster and kube dirs and returns them.
183 Also created helm3 dirs according to new directory specification, paths are
184 returned and also environment variables that must be provided to execute commands
185
186 Helm 2 directory specification uses helm_home dir:
187
188 The variables assigned for this paths are:
189 - Helm hone: $HELM_HOME
190 - helm kubeconfig: $KUBECONFIG
191
192 :param cluster_name: cluster_name
193 :return: Dictionary with config_paths and dictionary with helm environment variables
194 """
195 base = self.fs.path
196 if base.endswith("/") or base.endswith("\\"):
197 base = base[:-1]
198
199 # base dir for cluster
200 cluster_dir = base + "/" + cluster_name
201
202 # kube dir
203 kube_dir = cluster_dir + "/" + ".kube"
204 if create_if_not_exist and not os.path.exists(kube_dir):
205 self.log.debug("Creating dir {}".format(kube_dir))
206 os.makedirs(kube_dir)
207
208 # helm home dir
209 helm_dir = cluster_dir + "/" + ".helm"
210 if create_if_not_exist and not os.path.exists(helm_dir):
211 self.log.debug("Creating dir {}".format(helm_dir))
212 os.makedirs(helm_dir)
213
214 config_filename = kube_dir + "/config"
215
216 # 2 - Prepare dictionary with paths
217 paths = {
218 "kube_dir": kube_dir,
219 "kube_config": config_filename,
220 "cluster_dir": cluster_dir,
221 "helm_dir": helm_dir,
222 }
223
224 for file_name, file in paths.items():
225 if "dir" in file_name and not os.path.exists(file):
226 err_msg = "{} dir does not exist".format(file)
227 self.log.error(err_msg)
228 raise K8sException(err_msg)
229
230 # 3 - Prepare environment variables
231 env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
232
233 return paths, env
234
235 async def _get_services(self, cluster_id, kdu_instance, namespace):
236
237 # init config, env
238 paths, env = self._init_paths_env(
239 cluster_name=cluster_id, create_if_not_exist=True
240 )
241
242 command1 = "{} get manifest {} ".format(self._helm_command, kdu_instance)
243 command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
244 output, _rc = await self._local_async_exec_pipe(
245 command1, command2, env=env, raise_exception_on_error=True
246 )
247 services = self._parse_services(output)
248
249 return services
250
251 async def _cluster_init(
252 self, cluster_id: str, namespace: str, paths: dict, env: dict
253 ):
254 """
255 Implements the helm version dependent cluster initialization:
256 For helm2 it initialized tiller environment if needed
257 """
258
259 # check if tiller pod is up in cluster
260 command = "{} --kubeconfig={} --namespace={} get deployments".format(
261 self.kubectl_command, paths["kube_config"], namespace
262 )
263 output, _rc = await self._local_async_exec(
264 command=command, raise_exception_on_error=True, env=env
265 )
266
267 output_table = self._output_to_table(output=output)
268
269 # find 'tiller' pod in all pods
270 already_initialized = False
271 try:
272 for row in output_table:
273 if row[0].startswith("tiller-deploy"):
274 already_initialized = True
275 break
276 except Exception:
277 pass
278
279 # helm init
280 n2vc_installed_sw = False
281 if not already_initialized:
282 self.log.info(
283 "Initializing helm in client and server: {}".format(cluster_id)
284 )
285 command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
286 self.kubectl_command, paths["kube_config"], self.service_account
287 )
288 _, _rc = await self._local_async_exec(
289 command=command, raise_exception_on_error=False, env=env
290 )
291
292 command = (
293 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
294 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
295 ).format(self.kubectl_command, paths["kube_config"], self.service_account)
296 _, _rc = await self._local_async_exec(
297 command=command, raise_exception_on_error=False, env=env
298 )
299
300 command = (
301 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
302 " --stable-repo-url {} init"
303 ).format(
304 self._helm_command,
305 paths["kube_config"],
306 namespace,
307 paths["helm_dir"],
308 self.service_account,
309 self._stable_repo_url,
310 )
311 _, _rc = await self._local_async_exec(
312 command=command, raise_exception_on_error=True, env=env
313 )
314 n2vc_installed_sw = True
315 else:
316 # check client helm installation
317 check_file = paths["helm_dir"] + "/repository/repositories.yaml"
318 if not self._check_file_exists(
319 filename=check_file, exception_if_not_exists=False
320 ):
321 self.log.info("Initializing helm in client: {}".format(cluster_id))
322 command = (
323 "{} --kubeconfig={} --tiller-namespace={} "
324 "--home={} init --client-only --stable-repo-url {} "
325 ).format(
326 self._helm_command,
327 paths["kube_config"],
328 namespace,
329 paths["helm_dir"],
330 self._stable_repo_url,
331 )
332 output, _rc = await self._local_async_exec(
333 command=command, raise_exception_on_error=True, env=env
334 )
335 else:
336 self.log.info("Helm client already initialized")
337
338 # remove old stable repo and add new one
339 cluster_uuid = "{}:{}".format(namespace, cluster_id)
340 repo_list = await self.repo_list(cluster_uuid)
341 for repo in repo_list:
342 if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
343 self.log.debug("Add new stable repo url: {}")
344 await self.repo_remove(cluster_uuid, "stable")
345 await self.repo_add(cluster_uuid, "stable", self._stable_repo_url)
346 break
347
348 return n2vc_installed_sw
349
350 async def _uninstall_sw(self, cluster_id: str, namespace: str):
351 # uninstall Tiller if necessary
352
353 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
354
355 # init paths, env
356 paths, env = self._init_paths_env(
357 cluster_name=cluster_id, create_if_not_exist=True
358 )
359
360 if not namespace:
361 # find namespace for tiller pod
362 command = "{} --kubeconfig={} get deployments --all-namespaces".format(
363 self.kubectl_command, paths["kube_config"]
364 )
365 output, _rc = await self._local_async_exec(
366 command=command, raise_exception_on_error=False, env=env
367 )
368 output_table = self._output_to_table(output=output)
369 namespace = None
370 for r in output_table:
371 try:
372 if "tiller-deploy" in r[1]:
373 namespace = r[0]
374 break
375 except Exception:
376 pass
377 else:
378 msg = "Tiller deployment not found in cluster {}".format(cluster_id)
379 self.log.error(msg)
380
381 self.log.debug("namespace for tiller: {}".format(namespace))
382
383 if namespace:
384 # uninstall tiller from cluster
385 self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
386 command = "{} --kubeconfig={} --home={} reset".format(
387 self._helm_command, paths["kube_config"], paths["helm_dir"]
388 )
389 self.log.debug("resetting: {}".format(command))
390 output, _rc = await self._local_async_exec(
391 command=command, raise_exception_on_error=True, env=env
392 )
393 # Delete clusterrolebinding and serviceaccount.
394 # Ignore if errors for backward compatibility
395 command = (
396 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
397 "io/osm-tiller-cluster-rule"
398 ).format(self.kubectl_command, paths["kube_config"])
399 output, _rc = await self._local_async_exec(
400 command=command, raise_exception_on_error=False, env=env
401 )
402 command = "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
403 self.kubectl_command, paths["kube_config"], self.service_account
404 )
405 output, _rc = await self._local_async_exec(
406 command=command, raise_exception_on_error=False, env=env
407 )
408
409 else:
410 self.log.debug("namespace not found")
411
412 async def _instances_list(self, cluster_id):
413
414 # init paths, env
415 paths, env = self._init_paths_env(
416 cluster_name=cluster_id, create_if_not_exist=True
417 )
418
419 command = "{} list --output yaml".format(self._helm_command)
420
421 output, _rc = await self._local_async_exec(
422 command=command, raise_exception_on_error=True, env=env
423 )
424
425 if output and len(output) > 0:
426 # parse yaml and update keys to lower case to unify with helm3
427 instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
428 new_instances = []
429 for instance in instances:
430 new_instance = dict((k.lower(), v) for k, v in instance.items())
431 new_instances.append(new_instance)
432 return new_instances
433 else:
434 return []
435
436 def _get_inspect_command(
437 self, show_command: str, kdu_model: str, repo_str: str, version: str
438 ):
439 inspect_command = "{} inspect {} {}{} {}".format(
440 self._helm_command, show_command, kdu_model, repo_str, version
441 )
442 return inspect_command
443
444 async def _status_kdu(
445 self,
446 cluster_id: str,
447 kdu_instance: str,
448 namespace: str = None,
449 show_error_log: bool = False,
450 return_text: bool = False,
451 ):
452
453 self.log.debug(
454 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
455 )
456
457 # init config, env
458 paths, env = self._init_paths_env(
459 cluster_name=cluster_id, create_if_not_exist=True
460 )
461 command = "{} status {} --output yaml".format(self._helm_command, kdu_instance)
462 output, rc = await self._local_async_exec(
463 command=command,
464 raise_exception_on_error=True,
465 show_error_log=show_error_log,
466 env=env,
467 )
468
469 if return_text:
470 return str(output)
471
472 if rc != 0:
473 return None
474
475 data = yaml.load(output, Loader=yaml.SafeLoader)
476
477 # remove field 'notes'
478 try:
479 del data.get("info").get("status")["notes"]
480 except KeyError:
481 pass
482
483 # parse field 'resources'
484 try:
485 resources = str(data.get("info").get("status").get("resources"))
486 resource_table = self._output_to_table(resources)
487 data.get("info").get("status")["resources"] = resource_table
488 except Exception:
489 pass
490
491 # set description to lowercase (unify with helm3)
492 try:
493 data.get("info")["description"] = data.get("info").pop("Description")
494 except KeyError:
495 pass
496
497 return data
498
499 def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
500 repo_ids = []
501 cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
502 cluster = self.db.get_one("k8sclusters", cluster_filter)
503 if cluster:
504 repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
505 return repo_ids
506 else:
507 raise K8sException(
508 "k8cluster with helm-id : {} not found".format(cluster_uuid)
509 )
510
511 async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
512
513 status = await self._status_kdu(
514 cluster_id=cluster_id, kdu_instance=kdu_instance, return_text=False
515 )
516
517 # extract info.status.resources-> str
518 # format:
519 # ==> v1/Deployment
520 # NAME READY UP-TO-DATE AVAILABLE AGE
521 # halting-horse-mongodb 0/1 1 0 0s
522 # halting-petit-mongodb 1/1 1 0 0s
523 # blank line
524 resources = K8sHelmBaseConnector._get_deep(
525 status, ("info", "status", "resources")
526 )
527
528 # convert to table
529 resources = K8sHelmBaseConnector._output_to_table(resources)
530
531 num_lines = len(resources)
532 index = 0
533 ready = True
534 while index < num_lines:
535 try:
536 line1 = resources[index]
537 index += 1
538 # find '==>' in column 0
539 if line1[0] == "==>":
540 line2 = resources[index]
541 index += 1
542 # find READY in column 1
543 if line2[1] == "READY":
544 # read next lines
545 line3 = resources[index]
546 index += 1
547 while len(line3) > 1 and index < num_lines:
548 ready_value = line3[1]
549 parts = ready_value.split(sep="/")
550 current = int(parts[0])
551 total = int(parts[1])
552 if current < total:
553 self.log.debug("NOT READY:\n {}".format(line3))
554 ready = False
555 line3 = resources[index]
556 index += 1
557
558 except Exception:
559 pass
560
561 return ready
562
563 def _get_install_command(
564 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
565 ) -> str:
566
567 timeout_str = ""
568 if timeout:
569 timeout_str = "--timeout {}".format(timeout)
570
571 # atomic
572 atomic_str = ""
573 if atomic:
574 atomic_str = "--atomic"
575 # namespace
576 namespace_str = ""
577 if namespace:
578 namespace_str = "--namespace {}".format(namespace)
579
580 # version
581 version_str = ""
582 if version:
583 version_str = version_str = "--version {}".format(version)
584
585 command = (
586 "{helm} install {atomic} --output yaml "
587 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
588 helm=self._helm_command,
589 atomic=atomic_str,
590 params=params_str,
591 timeout=timeout_str,
592 name=kdu_instance,
593 ns=namespace_str,
594 model=kdu_model,
595 ver=version_str,
596 )
597 )
598 return command
599
600 def _get_upgrade_command(
601 self, kdu_model, kdu_instance, namespace, params_str, version, atomic, timeout
602 ) -> str:
603
604 timeout_str = ""
605 if timeout:
606 timeout_str = "--timeout {}".format(timeout)
607
608 # atomic
609 atomic_str = ""
610 if atomic:
611 atomic_str = "--atomic"
612
613 # version
614 version_str = ""
615 if version:
616 version_str = "--version {}".format(version)
617
618 command = "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}".format(
619 helm=self._helm_command,
620 atomic=atomic_str,
621 params=params_str,
622 timeout=timeout_str,
623 name=kdu_instance,
624 model=kdu_model,
625 ver=version_str,
626 )
627 return command
628
629 def _get_rollback_command(self, kdu_instance, namespace, revision) -> str:
630 return "{} rollback {} {} --wait".format(
631 self._helm_command, kdu_instance, revision
632 )
633
634 def _get_uninstall_command(self, kdu_instance: str, namespace: str) -> str:
635 return "{} delete --purge {}".format(self._helm_command, kdu_instance)