2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
26 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
27 from n2vc
.exceptions
import K8sException
30 class K8sHelmConnector(K8sHelmBaseConnector
):
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
42 kubectl_command
: str = "/usr/bin/kubectl",
43 helm_command
: str = "/usr/bin/helm",
48 Initializes helm connector for helm v2
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
55 :param on_update_db: callback called when k8s connector updates database
59 K8sHelmBaseConnector
.__init
__(
64 kubectl_command
=kubectl_command
,
65 helm_command
=helm_command
,
66 on_update_db
=on_update_db
,
69 self
.log
.info("Initializing K8S Helm2 connector")
71 # initialize helm client-only
72 self
.log
.debug("Initializing helm client-only...")
73 command
= "{} init --client-only".format(self
._helm
_command
)
75 asyncio
.ensure_future(
76 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
78 # loop = asyncio.get_event_loop()
79 # loop.run_until_complete(self._local_async_exec(command=command,
80 # raise_exception_on_error=False))
81 except Exception as e
:
83 msg
="helm init failed (it was already initialized): {}".format(e
)
86 self
.log
.info("K8S Helm2 connector initialized")
88 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
91 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
94 return await self
._exec
_inspect
_comand
(
95 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
99 ####################################################################################
100 ################################### P R I V A T E ##################################
101 ####################################################################################
104 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
106 Creates and returns base cluster and kube dirs and returns them.
107 Also created helm3 dirs according to new directory specification, paths are
108 returned and also environment variables that must be provided to execute commands
110 Helm 2 directory specification uses helm_home dir:
112 The variables assigned for this paths are:
113 - Helm hone: $HELM_HOME
114 - helm kubeconfig: $KUBECONFIG
116 :param cluster_name: cluster_name
117 :return: Dictionary with config_paths and dictionary with helm environment variables
120 if base
.endswith("/") or base
.endswith("\\"):
123 # base dir for cluster
124 cluster_dir
= base
+ "/" + cluster_name
127 kube_dir
= cluster_dir
+ "/" + ".kube"
128 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
129 self
.log
.debug("Creating dir {}".format(kube_dir
))
130 os
.makedirs(kube_dir
)
133 helm_dir
= cluster_dir
+ "/" + ".helm"
134 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
135 self
.log
.debug("Creating dir {}".format(helm_dir
))
136 os
.makedirs(helm_dir
)
138 config_filename
= kube_dir
+ "/config"
140 # 2 - Prepare dictionary with paths
142 "kube_dir": kube_dir
,
143 "kube_config": config_filename
,
144 "cluster_dir": cluster_dir
,
145 "helm_dir": helm_dir
,
148 for file_name
, file in paths
.items():
149 if "dir" in file_name
and not os
.path
.exists(file):
150 err_msg
= "{} dir does not exist".format(file)
151 self
.log
.error(err_msg
)
152 raise K8sException(err_msg
)
154 # 3 - Prepare environment variables
155 env
= {"HELM_HOME": helm_dir
, "KUBECONFIG": config_filename
}
159 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
):
162 paths
, env
= self
._init
_paths
_env
(
163 cluster_name
=cluster_id
, create_if_not_exist
=True
166 command1
= "{} get manifest {} ".format(self
._helm
_command
, kdu_instance
)
167 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
168 output
, _rc
= await self
._local
_async
_exec
_pipe
(
169 command1
, command2
, env
=env
, raise_exception_on_error
=True
171 services
= self
._parse
_services
(output
)
175 async def _cluster_init(self
, cluster_id
: str, namespace
: str,
176 paths
: dict, env
: dict):
178 Implements the helm version dependent cluster initialization:
179 For helm2 it initialized tiller environment if needed
182 # check if tiller pod is up in cluster
183 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
184 self
.kubectl_command
, paths
["kube_config"], namespace
186 output
, _rc
= await self
._local
_async
_exec
(
187 command
=command
, raise_exception_on_error
=True, env
=env
190 output_table
= self
._output
_to
_table
(output
=output
)
192 # find 'tiller' pod in all pods
193 already_initialized
= False
195 for row
in output_table
:
196 if row
[0].startswith("tiller-deploy"):
197 already_initialized
= True
203 n2vc_installed_sw
= False
204 if not already_initialized
:
206 "Initializing helm in client and server: {}".format(cluster_id
)
208 command
= "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
209 self
.kubectl_command
, paths
["kube_config"], self
.service_account
211 _
, _rc
= await self
._local
_async
_exec
(
212 command
=command
, raise_exception_on_error
=False, env
=env
216 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
217 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
218 ).format(self
.kubectl_command
, paths
["kube_config"], self
.service_account
)
219 _
, _rc
= await self
._local
_async
_exec
(
220 command
=command
, raise_exception_on_error
=False, env
=env
224 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
228 paths
["kube_config"],
231 self
.service_account
,
233 _
, _rc
= await self
._local
_async
_exec
(
234 command
=command
, raise_exception_on_error
=True, env
=env
236 n2vc_installed_sw
= True
238 # check client helm installation
239 check_file
= paths
["helm_dir"] + "/repository/repositories.yaml"
240 if not self
._check
_file
_exists
(
241 filename
=check_file
, exception_if_not_exists
=False
243 self
.log
.info("Initializing helm in client: {}".format(cluster_id
))
245 "{} --kubeconfig={} --tiller-namespace={} "
246 "--home={} init --client-only"
249 paths
["kube_config"],
253 output
, _rc
= await self
._local
_async
_exec
(
254 command
=command
, raise_exception_on_error
=True, env
=env
257 self
.log
.info("Helm client already initialized")
259 return n2vc_installed_sw
261 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
262 # uninstall Tiller if necessary
264 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
267 paths
, env
= self
._init
_paths
_env
(
268 cluster_name
=cluster_id
, create_if_not_exist
=True
272 # find namespace for tiller pod
273 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
274 self
.kubectl_command
, paths
["kube_config"]
276 output
, _rc
= await self
._local
_async
_exec
(
277 command
=command
, raise_exception_on_error
=False, env
=env
279 output_table
= self
._output
_to
_table
(output
=output
)
281 for r
in output_table
:
283 if "tiller-deploy" in r
[1]:
289 msg
= "Tiller deployment not found in cluster {}".format(cluster_id
)
292 self
.log
.debug("namespace for tiller: {}".format(namespace
))
295 # uninstall tiller from cluster
296 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
297 command
= "{} --kubeconfig={} --home={} reset".format(
298 self
._helm
_command
, paths
["kube_config"], paths
["helm_dir"]
300 self
.log
.debug("resetting: {}".format(command
))
301 output
, _rc
= await self
._local
_async
_exec
(
302 command
=command
, raise_exception_on_error
=True, env
=env
304 # Delete clusterrolebinding and serviceaccount.
305 # Ignore if errors for backward compatibility
307 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
308 "io/osm-tiller-cluster-rule"
309 ).format(self
.kubectl_command
, paths
["kube_config"])
310 output
, _rc
= await self
._local
_async
_exec
(
311 command
=command
, raise_exception_on_error
=False, env
=env
313 command
= "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
314 self
.kubectl_command
, paths
["kube_config"], self
.service_account
316 output
, _rc
= await self
._local
_async
_exec
(
317 command
=command
, raise_exception_on_error
=False, env
=env
321 self
.log
.debug("namespace not found")
323 async def _instances_list(self
, cluster_id
):
326 paths
, env
= self
._init
_paths
_env
(
327 cluster_name
=cluster_id
, create_if_not_exist
=True
330 command
= "{} list --output yaml".format(self
._helm
_command
)
332 output
, _rc
= await self
._local
_async
_exec
(
333 command
=command
, raise_exception_on_error
=True, env
=env
336 if output
and len(output
) > 0:
337 # parse yaml and update keys to lower case to unify with helm3
338 instances
= yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
340 for instance
in instances
:
341 new_instance
= dict((k
.lower(), v
) for k
, v
in instance
.items())
342 new_instances
.append(new_instance
)
347 def _get_inspect_command(self
, show_command
: str, kdu_model
: str, repo_str
: str,
349 inspect_command
= "{} inspect {} {}{} {}".format(
350 self
._helm
_command
, show_command
, kdu_model
, repo_str
, version
352 return inspect_command
354 async def _status_kdu(
358 namespace
: str = None,
359 show_error_log
: bool = False,
360 return_text
: bool = False,
364 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
368 paths
, env
= self
._init
_paths
_env
(
369 cluster_name
=cluster_id
, create_if_not_exist
=True
371 command
= "{} status {} --output yaml".format(self
._helm
_command
, kdu_instance
)
372 output
, rc
= await self
._local
_async
_exec
(
374 raise_exception_on_error
=True,
375 show_error_log
=show_error_log
,
385 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
387 # remove field 'notes'
389 del data
.get("info").get("status")["notes"]
393 # parse field 'resources'
395 resources
= str(data
.get("info").get("status").get("resources"))
396 resource_table
= self
._output
_to
_table
(resources
)
397 data
.get("info").get("status")["resources"] = resource_table
401 # set description to lowercase (unify with helm3)
403 data
.get("info")["description"] = data
.get("info").pop("Description")
409 async def _is_install_completed(self
, cluster_id
: str, kdu_instance
: str) -> bool:
411 status
= await self
._status
_kdu
(
412 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, return_text
=False
415 # extract info.status.resources-> str
418 # NAME READY UP-TO-DATE AVAILABLE AGE
419 # halting-horse-mongodb 0/1 1 0 0s
420 # halting-petit-mongodb 1/1 1 0 0s
422 resources
= K8sHelmBaseConnector
._get
_deep
(
423 status
, ("info", "status", "resources")
427 resources
= K8sHelmBaseConnector
._output
_to
_table
(resources
)
429 num_lines
= len(resources
)
432 while index
< num_lines
:
434 line1
= resources
[index
]
436 # find '==>' in column 0
437 if line1
[0] == "==>":
438 line2
= resources
[index
]
440 # find READY in column 1
441 if line2
[1] == "READY":
443 line3
= resources
[index
]
445 while len(line3
) > 1 and index
< num_lines
:
446 ready_value
= line3
[1]
447 parts
= ready_value
.split(sep
="/")
448 current
= int(parts
[0])
449 total
= int(parts
[1])
451 self
.log
.debug("NOT READY:\n {}".format(line3
))
453 line3
= resources
[index
]
461 def _get_install_command(
462 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
467 timeout_str
= "--timeout {}".format(timeout
)
472 atomic_str
= "--atomic"
476 namespace_str
= "--namespace {}".format(namespace
)
481 version_str
= version_str
= "--version {}".format(version
)
484 "{helm} install {atomic} --output yaml "
485 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
486 helm
=self
._helm
_command
,
498 def _get_upgrade_command(
499 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
504 timeout_str
= "--timeout {}".format(timeout
)
509 atomic_str
= "--atomic"
514 version_str
= "--version {}".format(version
)
516 command
= "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"\
517 .format(helm
=self
._helm
_command
,
527 def _get_rollback_command(self
, kdu_instance
, namespace
, revision
) -> str:
528 return "{} rollback {} {} --wait".format(
529 self
._helm
_command
, kdu_instance
, revision
532 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
533 return "{} delete --purge {}".format(self
._helm
_command
, kdu_instance
)