2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
26 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
27 from n2vc
.exceptions
import K8sException
30 class K8sHelmConnector(K8sHelmBaseConnector
):
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
42 kubectl_command
: str = "/usr/bin/kubectl",
43 helm_command
: str = "/usr/bin/helm",
46 vca_config
: dict = None,
49 Initializes helm connector for helm v2
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
56 :param on_update_db: callback called when k8s connector updates database
60 K8sHelmBaseConnector
.__init
__(
65 kubectl_command
=kubectl_command
,
66 helm_command
=helm_command
,
67 on_update_db
=on_update_db
,
68 vca_config
=vca_config
,
71 self
.log
.info("Initializing K8S Helm2 connector")
73 # initialize helm client-only
74 self
.log
.debug("Initializing helm client-only...")
75 command
= "{} init --client-only --stable-repo-url {} ".format(
76 self
._helm
_command
, self
._stable
_repo
_url
)
78 asyncio
.ensure_future(
79 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
81 # loop = asyncio.get_event_loop()
82 # loop.run_until_complete(self._local_async_exec(command=command,
83 # raise_exception_on_error=False))
84 except Exception as e
:
86 msg
="helm init failed (it was already initialized): {}".format(e
)
89 self
.log
.info("K8S Helm2 connector initialized")
100 namespace
: str = None,
102 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
103 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_id
))
106 self
.fs
.sync(from_path
=cluster_id
)
109 paths
, env
= self
._init
_paths
_env
(
110 cluster_name
=cluster_id
, create_if_not_exist
=True
113 kdu_instance
= await self
._install
_impl
(cluster_id
,
125 self
.fs
.reverse_sync(from_path
=cluster_id
)
127 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
130 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
133 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
136 return await self
._exec
_inspect
_comand
(
137 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
141 ####################################################################################
142 ################################### P R I V A T E ##################################
143 ####################################################################################
146 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
148 Creates and returns base cluster and kube dirs and returns them.
149 Also created helm3 dirs according to new directory specification, paths are
150 returned and also environment variables that must be provided to execute commands
152 Helm 2 directory specification uses helm_home dir:
154 The variables assigned for this paths are:
155 - Helm hone: $HELM_HOME
156 - helm kubeconfig: $KUBECONFIG
158 :param cluster_name: cluster_name
159 :return: Dictionary with config_paths and dictionary with helm environment variables
162 if base
.endswith("/") or base
.endswith("\\"):
165 # base dir for cluster
166 cluster_dir
= base
+ "/" + cluster_name
169 kube_dir
= cluster_dir
+ "/" + ".kube"
170 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
171 self
.log
.debug("Creating dir {}".format(kube_dir
))
172 os
.makedirs(kube_dir
)
175 helm_dir
= cluster_dir
+ "/" + ".helm"
176 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
177 self
.log
.debug("Creating dir {}".format(helm_dir
))
178 os
.makedirs(helm_dir
)
180 config_filename
= kube_dir
+ "/config"
182 # 2 - Prepare dictionary with paths
184 "kube_dir": kube_dir
,
185 "kube_config": config_filename
,
186 "cluster_dir": cluster_dir
,
187 "helm_dir": helm_dir
,
190 for file_name
, file in paths
.items():
191 if "dir" in file_name
and not os
.path
.exists(file):
192 err_msg
= "{} dir does not exist".format(file)
193 self
.log
.error(err_msg
)
194 raise K8sException(err_msg
)
196 # 3 - Prepare environment variables
197 env
= {"HELM_HOME": helm_dir
, "KUBECONFIG": config_filename
}
201 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
):
204 paths
, env
= self
._init
_paths
_env
(
205 cluster_name
=cluster_id
, create_if_not_exist
=True
208 command1
= "{} get manifest {} ".format(self
._helm
_command
, kdu_instance
)
209 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
210 output
, _rc
= await self
._local
_async
_exec
_pipe
(
211 command1
, command2
, env
=env
, raise_exception_on_error
=True
213 services
= self
._parse
_services
(output
)
217 async def _cluster_init(self
, cluster_id
: str, namespace
: str,
218 paths
: dict, env
: dict):
220 Implements the helm version dependent cluster initialization:
221 For helm2 it initialized tiller environment if needed
224 # check if tiller pod is up in cluster
225 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
226 self
.kubectl_command
, paths
["kube_config"], namespace
228 output
, _rc
= await self
._local
_async
_exec
(
229 command
=command
, raise_exception_on_error
=True, env
=env
232 output_table
= self
._output
_to
_table
(output
=output
)
234 # find 'tiller' pod in all pods
235 already_initialized
= False
237 for row
in output_table
:
238 if row
[0].startswith("tiller-deploy"):
239 already_initialized
= True
245 n2vc_installed_sw
= False
246 if not already_initialized
:
248 "Initializing helm in client and server: {}".format(cluster_id
)
250 command
= "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
251 self
.kubectl_command
, paths
["kube_config"], self
.service_account
253 _
, _rc
= await self
._local
_async
_exec
(
254 command
=command
, raise_exception_on_error
=False, env
=env
258 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
259 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
260 ).format(self
.kubectl_command
, paths
["kube_config"], self
.service_account
)
261 _
, _rc
= await self
._local
_async
_exec
(
262 command
=command
, raise_exception_on_error
=False, env
=env
266 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
267 " --stable-repo-url {} init"
270 paths
["kube_config"],
273 self
.service_account
,
274 self
._stable
_repo
_url
276 _
, _rc
= await self
._local
_async
_exec
(
277 command
=command
, raise_exception_on_error
=True, env
=env
279 n2vc_installed_sw
= True
281 # check client helm installation
282 check_file
= paths
["helm_dir"] + "/repository/repositories.yaml"
283 if not self
._check
_file
_exists
(
284 filename
=check_file
, exception_if_not_exists
=False
286 self
.log
.info("Initializing helm in client: {}".format(cluster_id
))
288 "{} --kubeconfig={} --tiller-namespace={} "
289 "--home={} init --client-only --stable-repo-url {} "
292 paths
["kube_config"],
295 self
._stable
_repo
_url
,
297 output
, _rc
= await self
._local
_async
_exec
(
298 command
=command
, raise_exception_on_error
=True, env
=env
301 self
.log
.info("Helm client already initialized")
303 # remove old stable repo and add new one
304 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
305 repo_list
= await self
.repo_list(cluster_uuid
)
306 for repo
in repo_list
:
307 if repo
["name"] == "stable" and repo
["url"] != self
._stable
_repo
_url
:
308 self
.log
.debug("Add new stable repo url: {}")
309 await self
.repo_remove(cluster_uuid
,
311 await self
.repo_add(cluster_uuid
,
313 self
._stable
_repo
_url
)
316 return n2vc_installed_sw
318 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
319 # uninstall Tiller if necessary
321 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
324 paths
, env
= self
._init
_paths
_env
(
325 cluster_name
=cluster_id
, create_if_not_exist
=True
329 # find namespace for tiller pod
330 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
331 self
.kubectl_command
, paths
["kube_config"]
333 output
, _rc
= await self
._local
_async
_exec
(
334 command
=command
, raise_exception_on_error
=False, env
=env
336 output_table
= self
._output
_to
_table
(output
=output
)
338 for r
in output_table
:
340 if "tiller-deploy" in r
[1]:
346 msg
= "Tiller deployment not found in cluster {}".format(cluster_id
)
349 self
.log
.debug("namespace for tiller: {}".format(namespace
))
352 # uninstall tiller from cluster
353 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
354 command
= "{} --kubeconfig={} --home={} reset".format(
355 self
._helm
_command
, paths
["kube_config"], paths
["helm_dir"]
357 self
.log
.debug("resetting: {}".format(command
))
358 output
, _rc
= await self
._local
_async
_exec
(
359 command
=command
, raise_exception_on_error
=True, env
=env
361 # Delete clusterrolebinding and serviceaccount.
362 # Ignore if errors for backward compatibility
364 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
365 "io/osm-tiller-cluster-rule"
366 ).format(self
.kubectl_command
, paths
["kube_config"])
367 output
, _rc
= await self
._local
_async
_exec
(
368 command
=command
, raise_exception_on_error
=False, env
=env
370 command
= "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
371 self
.kubectl_command
, paths
["kube_config"], self
.service_account
373 output
, _rc
= await self
._local
_async
_exec
(
374 command
=command
, raise_exception_on_error
=False, env
=env
378 self
.log
.debug("namespace not found")
380 async def _instances_list(self
, cluster_id
):
383 paths
, env
= self
._init
_paths
_env
(
384 cluster_name
=cluster_id
, create_if_not_exist
=True
387 command
= "{} list --output yaml".format(self
._helm
_command
)
389 output
, _rc
= await self
._local
_async
_exec
(
390 command
=command
, raise_exception_on_error
=True, env
=env
393 if output
and len(output
) > 0:
394 # parse yaml and update keys to lower case to unify with helm3
395 instances
= yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
397 for instance
in instances
:
398 new_instance
= dict((k
.lower(), v
) for k
, v
in instance
.items())
399 new_instances
.append(new_instance
)
404 def _get_inspect_command(self
, show_command
: str, kdu_model
: str, repo_str
: str,
406 inspect_command
= "{} inspect {} {}{} {}".format(
407 self
._helm
_command
, show_command
, kdu_model
, repo_str
, version
409 return inspect_command
411 async def _status_kdu(
415 namespace
: str = None,
416 show_error_log
: bool = False,
417 return_text
: bool = False,
421 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
425 paths
, env
= self
._init
_paths
_env
(
426 cluster_name
=cluster_id
, create_if_not_exist
=True
428 command
= "{} status {} --output yaml".format(self
._helm
_command
, kdu_instance
)
429 output
, rc
= await self
._local
_async
_exec
(
431 raise_exception_on_error
=True,
432 show_error_log
=show_error_log
,
442 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
444 # remove field 'notes'
446 del data
.get("info").get("status")["notes"]
450 # parse field 'resources'
452 resources
= str(data
.get("info").get("status").get("resources"))
453 resource_table
= self
._output
_to
_table
(resources
)
454 data
.get("info").get("status")["resources"] = resource_table
458 # set description to lowercase (unify with helm3)
460 data
.get("info")["description"] = data
.get("info").pop("Description")
466 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
468 cluster_filter
= {"_admin.helm-chart.id": cluster_uuid
}
469 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
471 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
475 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
478 async def _is_install_completed(self
, cluster_id
: str, kdu_instance
: str) -> bool:
480 status
= await self
._status
_kdu
(
481 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, return_text
=False
484 # extract info.status.resources-> str
487 # NAME READY UP-TO-DATE AVAILABLE AGE
488 # halting-horse-mongodb 0/1 1 0 0s
489 # halting-petit-mongodb 1/1 1 0 0s
491 resources
= K8sHelmBaseConnector
._get
_deep
(
492 status
, ("info", "status", "resources")
496 resources
= K8sHelmBaseConnector
._output
_to
_table
(resources
)
498 num_lines
= len(resources
)
501 while index
< num_lines
:
503 line1
= resources
[index
]
505 # find '==>' in column 0
506 if line1
[0] == "==>":
507 line2
= resources
[index
]
509 # find READY in column 1
510 if line2
[1] == "READY":
512 line3
= resources
[index
]
514 while len(line3
) > 1 and index
< num_lines
:
515 ready_value
= line3
[1]
516 parts
= ready_value
.split(sep
="/")
517 current
= int(parts
[0])
518 total
= int(parts
[1])
520 self
.log
.debug("NOT READY:\n {}".format(line3
))
522 line3
= resources
[index
]
530 def _get_install_command(
531 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
536 timeout_str
= "--timeout {}".format(timeout
)
541 atomic_str
= "--atomic"
545 namespace_str
= "--namespace {}".format(namespace
)
550 version_str
= version_str
= "--version {}".format(version
)
553 "{helm} install {atomic} --output yaml "
554 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
555 helm
=self
._helm
_command
,
567 def _get_upgrade_command(
568 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
573 timeout_str
= "--timeout {}".format(timeout
)
578 atomic_str
= "--atomic"
583 version_str
= "--version {}".format(version
)
585 command
= "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"\
586 .format(helm
=self
._helm
_command
,
596 def _get_rollback_command(self
, kdu_instance
, namespace
, revision
) -> str:
597 return "{} rollback {} {} --wait".format(
598 self
._helm
_command
, kdu_instance
, revision
601 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
602 return "{} delete --purge {}".format(self
._helm
_command
, kdu_instance
)