2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
26 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
27 from n2vc
.exceptions
import K8sException
30 class K8sHelmConnector(K8sHelmBaseConnector
):
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
42 kubectl_command
: str = "/usr/bin/kubectl",
43 helm_command
: str = "/usr/bin/helm",
48 Initializes helm connector for helm v2
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
55 :param on_update_db: callback called when k8s connector updates database
59 K8sHelmBaseConnector
.__init
__(
64 kubectl_command
=kubectl_command
,
65 helm_command
=helm_command
,
66 on_update_db
=on_update_db
,
69 self
.log
.info("Initializing K8S Helm2 connector")
71 # initialize helm client-only
72 self
.log
.debug("Initializing helm client-only...")
73 command
= "{} init --client-only".format(self
._helm
_command
)
75 asyncio
.ensure_future(
76 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
78 # loop = asyncio.get_event_loop()
79 # loop.run_until_complete(self._local_async_exec(command=command,
80 # raise_exception_on_error=False))
81 except Exception as e
:
83 msg
="helm init failed (it was already initialized): {}".format(e
)
86 self
.log
.info("K8S Helm2 connector initialized")
97 namespace
: str = None,
99 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
100 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_id
))
103 self
.fs
.sync(from_path
=cluster_id
)
106 paths
, env
= self
._init
_paths
_env
(
107 cluster_name
=cluster_id
, create_if_not_exist
=True
110 kdu_instance
= await self
._install
_impl
(cluster_id
,
122 self
.fs
.reverse_sync(from_path
=cluster_id
)
124 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
127 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
130 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
133 return await self
._exec
_inspect
_comand
(
134 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
138 ####################################################################################
139 ################################### P R I V A T E ##################################
140 ####################################################################################
143 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
145 Creates and returns base cluster and kube dirs and returns them.
146 Also created helm3 dirs according to new directory specification, paths are
147 returned and also environment variables that must be provided to execute commands
149 Helm 2 directory specification uses helm_home dir:
151 The variables assigned for this paths are:
152 - Helm hone: $HELM_HOME
153 - helm kubeconfig: $KUBECONFIG
155 :param cluster_name: cluster_name
156 :return: Dictionary with config_paths and dictionary with helm environment variables
159 if base
.endswith("/") or base
.endswith("\\"):
162 # base dir for cluster
163 cluster_dir
= base
+ "/" + cluster_name
166 kube_dir
= cluster_dir
+ "/" + ".kube"
167 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
168 self
.log
.debug("Creating dir {}".format(kube_dir
))
169 os
.makedirs(kube_dir
)
172 helm_dir
= cluster_dir
+ "/" + ".helm"
173 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
174 self
.log
.debug("Creating dir {}".format(helm_dir
))
175 os
.makedirs(helm_dir
)
177 config_filename
= kube_dir
+ "/config"
179 # 2 - Prepare dictionary with paths
181 "kube_dir": kube_dir
,
182 "kube_config": config_filename
,
183 "cluster_dir": cluster_dir
,
184 "helm_dir": helm_dir
,
187 for file_name
, file in paths
.items():
188 if "dir" in file_name
and not os
.path
.exists(file):
189 err_msg
= "{} dir does not exist".format(file)
190 self
.log
.error(err_msg
)
191 raise K8sException(err_msg
)
193 # 3 - Prepare environment variables
194 env
= {"HELM_HOME": helm_dir
, "KUBECONFIG": config_filename
}
198 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
):
201 paths
, env
= self
._init
_paths
_env
(
202 cluster_name
=cluster_id
, create_if_not_exist
=True
205 command1
= "{} get manifest {} ".format(self
._helm
_command
, kdu_instance
)
206 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
207 output
, _rc
= await self
._local
_async
_exec
_pipe
(
208 command1
, command2
, env
=env
, raise_exception_on_error
=True
210 services
= self
._parse
_services
(output
)
214 async def _cluster_init(self
, cluster_id
: str, namespace
: str,
215 paths
: dict, env
: dict):
217 Implements the helm version dependent cluster initialization:
218 For helm2 it initialized tiller environment if needed
221 # check if tiller pod is up in cluster
222 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
223 self
.kubectl_command
, paths
["kube_config"], namespace
225 output
, _rc
= await self
._local
_async
_exec
(
226 command
=command
, raise_exception_on_error
=True, env
=env
229 output_table
= self
._output
_to
_table
(output
=output
)
231 # find 'tiller' pod in all pods
232 already_initialized
= False
234 for row
in output_table
:
235 if row
[0].startswith("tiller-deploy"):
236 already_initialized
= True
242 n2vc_installed_sw
= False
243 if not already_initialized
:
245 "Initializing helm in client and server: {}".format(cluster_id
)
247 command
= "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
248 self
.kubectl_command
, paths
["kube_config"], self
.service_account
250 _
, _rc
= await self
._local
_async
_exec
(
251 command
=command
, raise_exception_on_error
=False, env
=env
255 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
256 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
257 ).format(self
.kubectl_command
, paths
["kube_config"], self
.service_account
)
258 _
, _rc
= await self
._local
_async
_exec
(
259 command
=command
, raise_exception_on_error
=False, env
=env
263 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
267 paths
["kube_config"],
270 self
.service_account
,
272 _
, _rc
= await self
._local
_async
_exec
(
273 command
=command
, raise_exception_on_error
=True, env
=env
275 n2vc_installed_sw
= True
277 # check client helm installation
278 check_file
= paths
["helm_dir"] + "/repository/repositories.yaml"
279 if not self
._check
_file
_exists
(
280 filename
=check_file
, exception_if_not_exists
=False
282 self
.log
.info("Initializing helm in client: {}".format(cluster_id
))
284 "{} --kubeconfig={} --tiller-namespace={} "
285 "--home={} init --client-only"
288 paths
["kube_config"],
292 output
, _rc
= await self
._local
_async
_exec
(
293 command
=command
, raise_exception_on_error
=True, env
=env
296 self
.log
.info("Helm client already initialized")
298 return n2vc_installed_sw
300 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
301 # uninstall Tiller if necessary
303 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
306 paths
, env
= self
._init
_paths
_env
(
307 cluster_name
=cluster_id
, create_if_not_exist
=True
311 # find namespace for tiller pod
312 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
313 self
.kubectl_command
, paths
["kube_config"]
315 output
, _rc
= await self
._local
_async
_exec
(
316 command
=command
, raise_exception_on_error
=False, env
=env
318 output_table
= self
._output
_to
_table
(output
=output
)
320 for r
in output_table
:
322 if "tiller-deploy" in r
[1]:
328 msg
= "Tiller deployment not found in cluster {}".format(cluster_id
)
331 self
.log
.debug("namespace for tiller: {}".format(namespace
))
334 # uninstall tiller from cluster
335 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
336 command
= "{} --kubeconfig={} --home={} reset".format(
337 self
._helm
_command
, paths
["kube_config"], paths
["helm_dir"]
339 self
.log
.debug("resetting: {}".format(command
))
340 output
, _rc
= await self
._local
_async
_exec
(
341 command
=command
, raise_exception_on_error
=True, env
=env
343 # Delete clusterrolebinding and serviceaccount.
344 # Ignore if errors for backward compatibility
346 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
347 "io/osm-tiller-cluster-rule"
348 ).format(self
.kubectl_command
, paths
["kube_config"])
349 output
, _rc
= await self
._local
_async
_exec
(
350 command
=command
, raise_exception_on_error
=False, env
=env
352 command
= "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
353 self
.kubectl_command
, paths
["kube_config"], self
.service_account
355 output
, _rc
= await self
._local
_async
_exec
(
356 command
=command
, raise_exception_on_error
=False, env
=env
360 self
.log
.debug("namespace not found")
362 async def _instances_list(self
, cluster_id
):
365 paths
, env
= self
._init
_paths
_env
(
366 cluster_name
=cluster_id
, create_if_not_exist
=True
369 command
= "{} list --output yaml".format(self
._helm
_command
)
371 output
, _rc
= await self
._local
_async
_exec
(
372 command
=command
, raise_exception_on_error
=True, env
=env
375 if output
and len(output
) > 0:
376 # parse yaml and update keys to lower case to unify with helm3
377 instances
= yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
379 for instance
in instances
:
380 new_instance
= dict((k
.lower(), v
) for k
, v
in instance
.items())
381 new_instances
.append(new_instance
)
386 def _get_inspect_command(self
, show_command
: str, kdu_model
: str, repo_str
: str,
388 inspect_command
= "{} inspect {} {}{} {}".format(
389 self
._helm
_command
, show_command
, kdu_model
, repo_str
, version
391 return inspect_command
393 async def _status_kdu(
397 namespace
: str = None,
398 show_error_log
: bool = False,
399 return_text
: bool = False,
403 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
407 paths
, env
= self
._init
_paths
_env
(
408 cluster_name
=cluster_id
, create_if_not_exist
=True
410 command
= "{} status {} --output yaml".format(self
._helm
_command
, kdu_instance
)
411 output
, rc
= await self
._local
_async
_exec
(
413 raise_exception_on_error
=True,
414 show_error_log
=show_error_log
,
424 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
426 # remove field 'notes'
428 del data
.get("info").get("status")["notes"]
432 # parse field 'resources'
434 resources
= str(data
.get("info").get("status").get("resources"))
435 resource_table
= self
._output
_to
_table
(resources
)
436 data
.get("info").get("status")["resources"] = resource_table
440 # set description to lowercase (unify with helm3)
442 data
.get("info")["description"] = data
.get("info").pop("Description")
448 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
450 cluster_filter
= {"_admin.helm-chart.id": cluster_uuid
}
451 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
453 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
457 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
460 async def _is_install_completed(self
, cluster_id
: str, kdu_instance
: str) -> bool:
462 status
= await self
._status
_kdu
(
463 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, return_text
=False
466 # extract info.status.resources-> str
469 # NAME READY UP-TO-DATE AVAILABLE AGE
470 # halting-horse-mongodb 0/1 1 0 0s
471 # halting-petit-mongodb 1/1 1 0 0s
473 resources
= K8sHelmBaseConnector
._get
_deep
(
474 status
, ("info", "status", "resources")
478 resources
= K8sHelmBaseConnector
._output
_to
_table
(resources
)
480 num_lines
= len(resources
)
483 while index
< num_lines
:
485 line1
= resources
[index
]
487 # find '==>' in column 0
488 if line1
[0] == "==>":
489 line2
= resources
[index
]
491 # find READY in column 1
492 if line2
[1] == "READY":
494 line3
= resources
[index
]
496 while len(line3
) > 1 and index
< num_lines
:
497 ready_value
= line3
[1]
498 parts
= ready_value
.split(sep
="/")
499 current
= int(parts
[0])
500 total
= int(parts
[1])
502 self
.log
.debug("NOT READY:\n {}".format(line3
))
504 line3
= resources
[index
]
512 def _get_install_command(
513 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
518 timeout_str
= "--timeout {}".format(timeout
)
523 atomic_str
= "--atomic"
527 namespace_str
= "--namespace {}".format(namespace
)
532 version_str
= version_str
= "--version {}".format(version
)
535 "{helm} install {atomic} --output yaml "
536 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
537 helm
=self
._helm
_command
,
549 def _get_upgrade_command(
550 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
555 timeout_str
= "--timeout {}".format(timeout
)
560 atomic_str
= "--atomic"
565 version_str
= "--version {}".format(version
)
567 command
= "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"\
568 .format(helm
=self
._helm
_command
,
578 def _get_rollback_command(self
, kdu_instance
, namespace
, revision
) -> str:
579 return "{} rollback {} {} --wait".format(
580 self
._helm
_command
, kdu_instance
, revision
583 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
584 return "{} delete --purge {}".format(self
._helm
_command
, kdu_instance
)