442ed06b93ef7896f61492d504887d8b531e86c8
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
26 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
27 from n2vc
.exceptions
import K8sException
30 class K8sHelmConnector(K8sHelmBaseConnector
):
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
42 kubectl_command
: str = "/usr/bin/kubectl",
43 helm_command
: str = "/usr/bin/helm",
46 vca_config
: dict = None,
49 Initializes helm connector for helm v2
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
56 :param on_update_db: callback called when k8s connector updates database
60 K8sHelmBaseConnector
.__init
__(
65 kubectl_command
=kubectl_command
,
66 helm_command
=helm_command
,
67 on_update_db
=on_update_db
,
68 vca_config
=vca_config
,
71 self
.log
.info("Initializing K8S Helm2 connector")
73 # initialize helm client-only
74 self
.log
.debug("Initializing helm client-only...")
75 command
= "{} init --client-only".format(self
._helm
_command
)
77 asyncio
.ensure_future(
78 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
80 # loop = asyncio.get_event_loop()
81 # loop.run_until_complete(self._local_async_exec(command=command,
82 # raise_exception_on_error=False))
83 except Exception as e
:
85 msg
="helm init failed (it was already initialized): {}".format(e
)
88 self
.log
.info("K8S Helm2 connector initialized")
99 namespace
: str = None,
101 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
102 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_id
))
105 self
.fs
.sync(from_path
=cluster_id
)
108 paths
, env
= self
._init
_paths
_env
(
109 cluster_name
=cluster_id
, create_if_not_exist
=True
112 kdu_instance
= await self
._install
_impl
(cluster_id
,
124 self
.fs
.reverse_sync(from_path
=cluster_id
)
126 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
129 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
132 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
135 return await self
._exec
_inspect
_comand
(
136 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
140 ####################################################################################
141 ################################### P R I V A T E ##################################
142 ####################################################################################
145 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
147 Creates and returns base cluster and kube dirs and returns them.
148 Also created helm3 dirs according to new directory specification, paths are
149 returned and also environment variables that must be provided to execute commands
151 Helm 2 directory specification uses helm_home dir:
153 The variables assigned for this paths are:
154 - Helm hone: $HELM_HOME
155 - helm kubeconfig: $KUBECONFIG
157 :param cluster_name: cluster_name
158 :return: Dictionary with config_paths and dictionary with helm environment variables
161 if base
.endswith("/") or base
.endswith("\\"):
164 # base dir for cluster
165 cluster_dir
= base
+ "/" + cluster_name
168 kube_dir
= cluster_dir
+ "/" + ".kube"
169 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
170 self
.log
.debug("Creating dir {}".format(kube_dir
))
171 os
.makedirs(kube_dir
)
174 helm_dir
= cluster_dir
+ "/" + ".helm"
175 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
176 self
.log
.debug("Creating dir {}".format(helm_dir
))
177 os
.makedirs(helm_dir
)
179 config_filename
= kube_dir
+ "/config"
181 # 2 - Prepare dictionary with paths
183 "kube_dir": kube_dir
,
184 "kube_config": config_filename
,
185 "cluster_dir": cluster_dir
,
186 "helm_dir": helm_dir
,
189 for file_name
, file in paths
.items():
190 if "dir" in file_name
and not os
.path
.exists(file):
191 err_msg
= "{} dir does not exist".format(file)
192 self
.log
.error(err_msg
)
193 raise K8sException(err_msg
)
195 # 3 - Prepare environment variables
196 env
= {"HELM_HOME": helm_dir
, "KUBECONFIG": config_filename
}
200 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
):
203 paths
, env
= self
._init
_paths
_env
(
204 cluster_name
=cluster_id
, create_if_not_exist
=True
207 command1
= "{} get manifest {} ".format(self
._helm
_command
, kdu_instance
)
208 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
209 output
, _rc
= await self
._local
_async
_exec
_pipe
(
210 command1
, command2
, env
=env
, raise_exception_on_error
=True
212 services
= self
._parse
_services
(output
)
216 async def _cluster_init(self
, cluster_id
: str, namespace
: str,
217 paths
: dict, env
: dict):
219 Implements the helm version dependent cluster initialization:
220 For helm2 it initialized tiller environment if needed
223 # check if tiller pod is up in cluster
224 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
225 self
.kubectl_command
, paths
["kube_config"], namespace
227 output
, _rc
= await self
._local
_async
_exec
(
228 command
=command
, raise_exception_on_error
=True, env
=env
231 output_table
= self
._output
_to
_table
(output
=output
)
233 # find 'tiller' pod in all pods
234 already_initialized
= False
236 for row
in output_table
:
237 if row
[0].startswith("tiller-deploy"):
238 already_initialized
= True
244 n2vc_installed_sw
= False
245 if not already_initialized
:
247 "Initializing helm in client and server: {}".format(cluster_id
)
249 command
= "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
250 self
.kubectl_command
, paths
["kube_config"], self
.service_account
252 _
, _rc
= await self
._local
_async
_exec
(
253 command
=command
, raise_exception_on_error
=False, env
=env
257 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
258 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
259 ).format(self
.kubectl_command
, paths
["kube_config"], self
.service_account
)
260 _
, _rc
= await self
._local
_async
_exec
(
261 command
=command
, raise_exception_on_error
=False, env
=env
265 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
266 " --stable-repo-url {} init"
269 paths
["kube_config"],
272 self
.service_account
,
273 self
._stable
_repo
_url
275 _
, _rc
= await self
._local
_async
_exec
(
276 command
=command
, raise_exception_on_error
=True, env
=env
278 n2vc_installed_sw
= True
280 # check client helm installation
281 check_file
= paths
["helm_dir"] + "/repository/repositories.yaml"
282 if not self
._check
_file
_exists
(
283 filename
=check_file
, exception_if_not_exists
=False
285 self
.log
.info("Initializing helm in client: {}".format(cluster_id
))
287 "{} --kubeconfig={} --tiller-namespace={} "
288 "--home={} init --client-only"
291 paths
["kube_config"],
295 output
, _rc
= await self
._local
_async
_exec
(
296 command
=command
, raise_exception_on_error
=True, env
=env
299 self
.log
.info("Helm client already initialized")
301 # remove old stable repo and add new one
302 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
303 repo_list
= await self
.repo_list(cluster_uuid
)
304 for repo
in repo_list
:
305 if repo
["name"] == "stable" and repo
["url"] != self
._stable
_repo
_url
:
306 self
.log
.debug("Add new stable repo url: {}")
307 await self
.repo_remove(cluster_uuid
,
309 await self
.repo_add(cluster_uuid
,
311 self
._stable
_repo
_url
)
314 return n2vc_installed_sw
316 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
317 # uninstall Tiller if necessary
319 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
322 paths
, env
= self
._init
_paths
_env
(
323 cluster_name
=cluster_id
, create_if_not_exist
=True
327 # find namespace for tiller pod
328 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
329 self
.kubectl_command
, paths
["kube_config"]
331 output
, _rc
= await self
._local
_async
_exec
(
332 command
=command
, raise_exception_on_error
=False, env
=env
334 output_table
= self
._output
_to
_table
(output
=output
)
336 for r
in output_table
:
338 if "tiller-deploy" in r
[1]:
344 msg
= "Tiller deployment not found in cluster {}".format(cluster_id
)
347 self
.log
.debug("namespace for tiller: {}".format(namespace
))
350 # uninstall tiller from cluster
351 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
352 command
= "{} --kubeconfig={} --home={} reset".format(
353 self
._helm
_command
, paths
["kube_config"], paths
["helm_dir"]
355 self
.log
.debug("resetting: {}".format(command
))
356 output
, _rc
= await self
._local
_async
_exec
(
357 command
=command
, raise_exception_on_error
=True, env
=env
359 # Delete clusterrolebinding and serviceaccount.
360 # Ignore if errors for backward compatibility
362 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
363 "io/osm-tiller-cluster-rule"
364 ).format(self
.kubectl_command
, paths
["kube_config"])
365 output
, _rc
= await self
._local
_async
_exec
(
366 command
=command
, raise_exception_on_error
=False, env
=env
368 command
= "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
369 self
.kubectl_command
, paths
["kube_config"], self
.service_account
371 output
, _rc
= await self
._local
_async
_exec
(
372 command
=command
, raise_exception_on_error
=False, env
=env
376 self
.log
.debug("namespace not found")
378 async def _instances_list(self
, cluster_id
):
381 paths
, env
= self
._init
_paths
_env
(
382 cluster_name
=cluster_id
, create_if_not_exist
=True
385 command
= "{} list --output yaml".format(self
._helm
_command
)
387 output
, _rc
= await self
._local
_async
_exec
(
388 command
=command
, raise_exception_on_error
=True, env
=env
391 if output
and len(output
) > 0:
392 # parse yaml and update keys to lower case to unify with helm3
393 instances
= yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
395 for instance
in instances
:
396 new_instance
= dict((k
.lower(), v
) for k
, v
in instance
.items())
397 new_instances
.append(new_instance
)
402 def _get_inspect_command(self
, show_command
: str, kdu_model
: str, repo_str
: str,
404 inspect_command
= "{} inspect {} {}{} {}".format(
405 self
._helm
_command
, show_command
, kdu_model
, repo_str
, version
407 return inspect_command
409 async def _status_kdu(
413 namespace
: str = None,
414 show_error_log
: bool = False,
415 return_text
: bool = False,
419 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
423 paths
, env
= self
._init
_paths
_env
(
424 cluster_name
=cluster_id
, create_if_not_exist
=True
426 command
= "{} status {} --output yaml".format(self
._helm
_command
, kdu_instance
)
427 output
, rc
= await self
._local
_async
_exec
(
429 raise_exception_on_error
=True,
430 show_error_log
=show_error_log
,
440 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
442 # remove field 'notes'
444 del data
.get("info").get("status")["notes"]
448 # parse field 'resources'
450 resources
= str(data
.get("info").get("status").get("resources"))
451 resource_table
= self
._output
_to
_table
(resources
)
452 data
.get("info").get("status")["resources"] = resource_table
456 # set description to lowercase (unify with helm3)
458 data
.get("info")["description"] = data
.get("info").pop("Description")
464 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
466 cluster_filter
= {"_admin.helm-chart.id": cluster_uuid
}
467 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
469 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
473 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
476 async def _is_install_completed(self
, cluster_id
: str, kdu_instance
: str) -> bool:
478 status
= await self
._status
_kdu
(
479 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, return_text
=False
482 # extract info.status.resources-> str
485 # NAME READY UP-TO-DATE AVAILABLE AGE
486 # halting-horse-mongodb 0/1 1 0 0s
487 # halting-petit-mongodb 1/1 1 0 0s
489 resources
= K8sHelmBaseConnector
._get
_deep
(
490 status
, ("info", "status", "resources")
494 resources
= K8sHelmBaseConnector
._output
_to
_table
(resources
)
496 num_lines
= len(resources
)
499 while index
< num_lines
:
501 line1
= resources
[index
]
503 # find '==>' in column 0
504 if line1
[0] == "==>":
505 line2
= resources
[index
]
507 # find READY in column 1
508 if line2
[1] == "READY":
510 line3
= resources
[index
]
512 while len(line3
) > 1 and index
< num_lines
:
513 ready_value
= line3
[1]
514 parts
= ready_value
.split(sep
="/")
515 current
= int(parts
[0])
516 total
= int(parts
[1])
518 self
.log
.debug("NOT READY:\n {}".format(line3
))
520 line3
= resources
[index
]
528 def _get_install_command(
529 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
534 timeout_str
= "--timeout {}".format(timeout
)
539 atomic_str
= "--atomic"
543 namespace_str
= "--namespace {}".format(namespace
)
548 version_str
= version_str
= "--version {}".format(version
)
551 "{helm} install {atomic} --output yaml "
552 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
553 helm
=self
._helm
_command
,
565 def _get_upgrade_command(
566 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
571 timeout_str
= "--timeout {}".format(timeout
)
576 atomic_str
= "--atomic"
581 version_str
= "--version {}".format(version
)
583 command
= "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"\
584 .format(helm
=self
._helm
_command
,
594 def _get_rollback_command(self
, kdu_instance
, namespace
, revision
) -> str:
595 return "{} rollback {} {} --wait".format(
596 self
._helm
_command
, kdu_instance
, revision
599 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
600 return "{} delete --purge {}".format(self
._helm
_command
, kdu_instance
)