2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
26 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
27 from n2vc
.exceptions
import K8sException
30 class K8sHelmConnector(K8sHelmBaseConnector
):
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
42 kubectl_command
: str = "/usr/bin/kubectl",
43 helm_command
: str = "/usr/bin/helm",
46 vca_config
: dict = None,
49 Initializes helm connector for helm v2
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
56 :param on_update_db: callback called when k8s connector updates database
60 K8sHelmBaseConnector
.__init
__(
65 kubectl_command
=kubectl_command
,
66 helm_command
=helm_command
,
67 on_update_db
=on_update_db
,
68 vca_config
=vca_config
,
71 self
.log
.info("Initializing K8S Helm2 connector")
73 # initialize helm client-only
74 self
.log
.debug("Initializing helm client-only...")
75 command
= "{} init --client-only --stable-repo-url {} ".format(
76 self
._helm
_command
, self
._stable
_repo
_url
)
78 asyncio
.ensure_future(
79 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
81 # loop = asyncio.get_event_loop()
82 # loop.run_until_complete(self._local_async_exec(command=command,
83 # raise_exception_on_error=False))
84 except Exception as e
:
86 msg
="helm init failed (it was already initialized): {}".format(e
)
89 self
.log
.info("K8S Helm2 connector initialized")
100 kdu_name
: str = None,
101 namespace
: str = None,
103 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
104 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_id
))
107 self
.fs
.sync(from_path
=cluster_id
)
110 paths
, env
= self
._init
_paths
_env
(
111 cluster_name
=cluster_id
, create_if_not_exist
=True
114 await self
._install
_impl
(
129 self
.fs
.reverse_sync(from_path
=cluster_id
)
131 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
134 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
137 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
140 return await self
._exec
_inspect
_comand
(
141 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
145 ####################################################################################
146 ################################### P R I V A T E ##################################
147 ####################################################################################
150 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
152 Creates and returns base cluster and kube dirs and returns them.
153 Also created helm3 dirs according to new directory specification, paths are
154 returned and also environment variables that must be provided to execute commands
156 Helm 2 directory specification uses helm_home dir:
158 The variables assigned for this paths are:
159 - Helm hone: $HELM_HOME
160 - helm kubeconfig: $KUBECONFIG
162 :param cluster_name: cluster_name
163 :return: Dictionary with config_paths and dictionary with helm environment variables
166 if base
.endswith("/") or base
.endswith("\\"):
169 # base dir for cluster
170 cluster_dir
= base
+ "/" + cluster_name
173 kube_dir
= cluster_dir
+ "/" + ".kube"
174 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
175 self
.log
.debug("Creating dir {}".format(kube_dir
))
176 os
.makedirs(kube_dir
)
179 helm_dir
= cluster_dir
+ "/" + ".helm"
180 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
181 self
.log
.debug("Creating dir {}".format(helm_dir
))
182 os
.makedirs(helm_dir
)
184 config_filename
= kube_dir
+ "/config"
186 # 2 - Prepare dictionary with paths
188 "kube_dir": kube_dir
,
189 "kube_config": config_filename
,
190 "cluster_dir": cluster_dir
,
191 "helm_dir": helm_dir
,
194 for file_name
, file in paths
.items():
195 if "dir" in file_name
and not os
.path
.exists(file):
196 err_msg
= "{} dir does not exist".format(file)
197 self
.log
.error(err_msg
)
198 raise K8sException(err_msg
)
200 # 3 - Prepare environment variables
201 env
= {"HELM_HOME": helm_dir
, "KUBECONFIG": config_filename
}
205 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
):
208 paths
, env
= self
._init
_paths
_env
(
209 cluster_name
=cluster_id
, create_if_not_exist
=True
212 command1
= "{} get manifest {} ".format(self
._helm
_command
, kdu_instance
)
213 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
214 output
, _rc
= await self
._local
_async
_exec
_pipe
(
215 command1
, command2
, env
=env
, raise_exception_on_error
=True
217 services
= self
._parse
_services
(output
)
221 async def _cluster_init(self
, cluster_id
: str, namespace
: str,
222 paths
: dict, env
: dict):
224 Implements the helm version dependent cluster initialization:
225 For helm2 it initialized tiller environment if needed
228 # check if tiller pod is up in cluster
229 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
230 self
.kubectl_command
, paths
["kube_config"], namespace
232 output
, _rc
= await self
._local
_async
_exec
(
233 command
=command
, raise_exception_on_error
=True, env
=env
236 output_table
= self
._output
_to
_table
(output
=output
)
238 # find 'tiller' pod in all pods
239 already_initialized
= False
241 for row
in output_table
:
242 if row
[0].startswith("tiller-deploy"):
243 already_initialized
= True
249 n2vc_installed_sw
= False
250 if not already_initialized
:
252 "Initializing helm in client and server: {}".format(cluster_id
)
254 command
= "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
255 self
.kubectl_command
, paths
["kube_config"], self
.service_account
257 _
, _rc
= await self
._local
_async
_exec
(
258 command
=command
, raise_exception_on_error
=False, env
=env
262 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
263 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
264 ).format(self
.kubectl_command
, paths
["kube_config"], self
.service_account
)
265 _
, _rc
= await self
._local
_async
_exec
(
266 command
=command
, raise_exception_on_error
=False, env
=env
270 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
271 " --stable-repo-url {} init"
274 paths
["kube_config"],
277 self
.service_account
,
278 self
._stable
_repo
_url
280 _
, _rc
= await self
._local
_async
_exec
(
281 command
=command
, raise_exception_on_error
=True, env
=env
283 n2vc_installed_sw
= True
285 # check client helm installation
286 check_file
= paths
["helm_dir"] + "/repository/repositories.yaml"
287 if not self
._check
_file
_exists
(
288 filename
=check_file
, exception_if_not_exists
=False
290 self
.log
.info("Initializing helm in client: {}".format(cluster_id
))
292 "{} --kubeconfig={} --tiller-namespace={} "
293 "--home={} init --client-only --stable-repo-url {} "
296 paths
["kube_config"],
299 self
._stable
_repo
_url
,
301 output
, _rc
= await self
._local
_async
_exec
(
302 command
=command
, raise_exception_on_error
=True, env
=env
305 self
.log
.info("Helm client already initialized")
307 # remove old stable repo and add new one
308 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
309 repo_list
= await self
.repo_list(cluster_uuid
)
310 for repo
in repo_list
:
311 if repo
["name"] == "stable" and repo
["url"] != self
._stable
_repo
_url
:
312 self
.log
.debug("Add new stable repo url: {}")
313 await self
.repo_remove(cluster_uuid
,
315 await self
.repo_add(cluster_uuid
,
317 self
._stable
_repo
_url
)
320 return n2vc_installed_sw
322 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
323 # uninstall Tiller if necessary
325 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
328 paths
, env
= self
._init
_paths
_env
(
329 cluster_name
=cluster_id
, create_if_not_exist
=True
333 # find namespace for tiller pod
334 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
335 self
.kubectl_command
, paths
["kube_config"]
337 output
, _rc
= await self
._local
_async
_exec
(
338 command
=command
, raise_exception_on_error
=False, env
=env
340 output_table
= self
._output
_to
_table
(output
=output
)
342 for r
in output_table
:
344 if "tiller-deploy" in r
[1]:
350 msg
= "Tiller deployment not found in cluster {}".format(cluster_id
)
353 self
.log
.debug("namespace for tiller: {}".format(namespace
))
356 # uninstall tiller from cluster
357 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
358 command
= "{} --kubeconfig={} --home={} reset".format(
359 self
._helm
_command
, paths
["kube_config"], paths
["helm_dir"]
361 self
.log
.debug("resetting: {}".format(command
))
362 output
, _rc
= await self
._local
_async
_exec
(
363 command
=command
, raise_exception_on_error
=True, env
=env
365 # Delete clusterrolebinding and serviceaccount.
366 # Ignore if errors for backward compatibility
368 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
369 "io/osm-tiller-cluster-rule"
370 ).format(self
.kubectl_command
, paths
["kube_config"])
371 output
, _rc
= await self
._local
_async
_exec
(
372 command
=command
, raise_exception_on_error
=False, env
=env
374 command
= "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
375 self
.kubectl_command
, paths
["kube_config"], self
.service_account
377 output
, _rc
= await self
._local
_async
_exec
(
378 command
=command
, raise_exception_on_error
=False, env
=env
382 self
.log
.debug("namespace not found")
384 async def _instances_list(self
, cluster_id
):
387 paths
, env
= self
._init
_paths
_env
(
388 cluster_name
=cluster_id
, create_if_not_exist
=True
391 command
= "{} list --output yaml".format(self
._helm
_command
)
393 output
, _rc
= await self
._local
_async
_exec
(
394 command
=command
, raise_exception_on_error
=True, env
=env
397 if output
and len(output
) > 0:
398 # parse yaml and update keys to lower case to unify with helm3
399 instances
= yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
401 for instance
in instances
:
402 new_instance
= dict((k
.lower(), v
) for k
, v
in instance
.items())
403 new_instances
.append(new_instance
)
408 def _get_inspect_command(self
, show_command
: str, kdu_model
: str, repo_str
: str,
410 inspect_command
= "{} inspect {} {}{} {}".format(
411 self
._helm
_command
, show_command
, kdu_model
, repo_str
, version
413 return inspect_command
415 async def _status_kdu(
419 namespace
: str = None,
420 show_error_log
: bool = False,
421 return_text
: bool = False,
425 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
429 paths
, env
= self
._init
_paths
_env
(
430 cluster_name
=cluster_id
, create_if_not_exist
=True
432 command
= "{} status {} --output yaml".format(self
._helm
_command
, kdu_instance
)
433 output
, rc
= await self
._local
_async
_exec
(
435 raise_exception_on_error
=True,
436 show_error_log
=show_error_log
,
446 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
448 # remove field 'notes'
450 del data
.get("info").get("status")["notes"]
454 # parse field 'resources'
456 resources
= str(data
.get("info").get("status").get("resources"))
457 resource_table
= self
._output
_to
_table
(resources
)
458 data
.get("info").get("status")["resources"] = resource_table
462 # set description to lowercase (unify with helm3)
464 data
.get("info")["description"] = data
.get("info").pop("Description")
470 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
472 cluster_filter
= {"_admin.helm-chart.id": cluster_uuid
}
473 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
475 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
479 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
482 async def _is_install_completed(self
, cluster_id
: str, kdu_instance
: str) -> bool:
484 status
= await self
._status
_kdu
(
485 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, return_text
=False
488 # extract info.status.resources-> str
491 # NAME READY UP-TO-DATE AVAILABLE AGE
492 # halting-horse-mongodb 0/1 1 0 0s
493 # halting-petit-mongodb 1/1 1 0 0s
495 resources
= K8sHelmBaseConnector
._get
_deep
(
496 status
, ("info", "status", "resources")
500 resources
= K8sHelmBaseConnector
._output
_to
_table
(resources
)
502 num_lines
= len(resources
)
505 while index
< num_lines
:
507 line1
= resources
[index
]
509 # find '==>' in column 0
510 if line1
[0] == "==>":
511 line2
= resources
[index
]
513 # find READY in column 1
514 if line2
[1] == "READY":
516 line3
= resources
[index
]
518 while len(line3
) > 1 and index
< num_lines
:
519 ready_value
= line3
[1]
520 parts
= ready_value
.split(sep
="/")
521 current
= int(parts
[0])
522 total
= int(parts
[1])
524 self
.log
.debug("NOT READY:\n {}".format(line3
))
526 line3
= resources
[index
]
534 def _get_install_command(
535 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
540 timeout_str
= "--timeout {}".format(timeout
)
545 atomic_str
= "--atomic"
549 namespace_str
= "--namespace {}".format(namespace
)
554 version_str
= version_str
= "--version {}".format(version
)
557 "{helm} install {atomic} --output yaml "
558 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
559 helm
=self
._helm
_command
,
571 def _get_upgrade_command(
572 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
577 timeout_str
= "--timeout {}".format(timeout
)
582 atomic_str
= "--atomic"
587 version_str
= "--version {}".format(version
)
589 command
= "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"\
590 .format(helm
=self
._helm
_command
,
600 def _get_rollback_command(self
, kdu_instance
, namespace
, revision
) -> str:
601 return "{} rollback {} {} --wait".format(
602 self
._helm
_command
, kdu_instance
, revision
605 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
606 return "{} delete --purge {}".format(self
._helm
_command
, kdu_instance
)