a3644c807048779fa02f99b1ad4738fa03a598e7
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
23 from typing
import Union
27 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
28 from n2vc
.exceptions
import K8sException
31 class K8sHelmConnector(K8sHelmBaseConnector
):
34 ####################################################################################
35 ################################### P U B L I C ####################################
36 ####################################################################################
43 kubectl_command
: str = "/usr/bin/kubectl",
44 helm_command
: str = "/usr/bin/helm",
49 Initializes helm connector for helm v2
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
56 :param on_update_db: callback called when k8s connector updates database
60 K8sHelmBaseConnector
.__init
__(
65 kubectl_command
=kubectl_command
,
66 helm_command
=helm_command
,
67 on_update_db
=on_update_db
,
70 self
.log
.info("Initializing K8S Helm2 connector")
72 # initialize helm client-only
73 self
.log
.debug("Initializing helm client-only...")
74 command
= "{} init --client-only {} ".format(
76 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
77 if self
._stable
_repo
_url
81 asyncio
.ensure_future(
82 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
84 # loop = asyncio.get_event_loop()
85 # loop.run_until_complete(self._local_async_exec(command=command,
86 # raise_exception_on_error=False))
87 except Exception as e
:
89 msg
="helm init failed (it was already initialized): {}".format(e
)
92 self
.log
.info("K8S Helm2 connector initialized")
100 timeout
: float = 300,
102 db_dict
: dict = None,
103 kdu_name
: str = None,
104 namespace
: str = None,
108 Deploys of a new KDU instance. It would implicitly rely on the `install` call
109 to deploy the Chart/Bundle properly parametrized (in practice, this call would
110 happen before any _initial-config-primitive_of the VNF is called).
112 :param cluster_uuid: UUID of a K8s cluster known by OSM
113 :param kdu_model: chart/reference (string), which can be either
115 - a name of chart available via the repos known by OSM
116 (e.g. stable/openldap, stable/openldap:1.2.4)
117 - a path to a packaged chart (e.g. mychart.tgz)
118 - a path to an unpacked chart directory or a URL (e.g. mychart)
119 :param kdu_instance: Kdu instance name
120 :param atomic: If set, installation process purges chart/bundle on fail, also
121 will wait until all the K8s objects are active
122 :param timeout: Time in seconds to wait for the install of the chart/bundle
123 (defaults to Helm default timeout: 300s)
124 :param params: dictionary of key-value pairs for instantiation parameters
125 (overriding default values)
126 :param dict db_dict: where to write into database when the status changes.
127 It contains a dict with {collection: <str>, filter: {},
129 e.g. {collection: "nsrs", filter:
130 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
131 :param kdu_name: Name of the KDU instance to be installed
132 :param namespace: K8s namespace to use for the KDU instance
133 :param kwargs: Additional parameters (None yet)
134 :return: True if successful
136 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_uuid
))
139 self
.fs
.sync(from_path
=cluster_uuid
)
142 paths
, env
= self
._init
_paths
_env
(
143 cluster_name
=cluster_uuid
, create_if_not_exist
=True
146 await self
._install
_impl
(
161 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
163 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
166 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
168 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
171 return await self
._exec
_inspect
_command
(
172 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
176 ####################################################################################
177 ################################### P R I V A T E ##################################
178 ####################################################################################
181 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
183 Creates and returns base cluster and kube dirs and returns them.
184 Also created helm3 dirs according to new directory specification, paths are
185 returned and also environment variables that must be provided to execute commands
187 Helm 2 directory specification uses helm_home dir:
189 The variables assigned for this paths are:
190 - Helm hone: $HELM_HOME
191 - helm kubeconfig: $KUBECONFIG
193 :param cluster_name: cluster_name
194 :return: Dictionary with config_paths and dictionary with helm environment variables
197 if base
.endswith("/") or base
.endswith("\\"):
200 # base dir for cluster
201 cluster_dir
= base
+ "/" + cluster_name
204 kube_dir
= cluster_dir
+ "/" + ".kube"
205 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
206 self
.log
.debug("Creating dir {}".format(kube_dir
))
207 os
.makedirs(kube_dir
)
210 helm_dir
= cluster_dir
+ "/" + ".helm"
211 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
212 self
.log
.debug("Creating dir {}".format(helm_dir
))
213 os
.makedirs(helm_dir
)
215 config_filename
= kube_dir
+ "/config"
217 # 2 - Prepare dictionary with paths
219 "kube_dir": kube_dir
,
220 "kube_config": config_filename
,
221 "cluster_dir": cluster_dir
,
222 "helm_dir": helm_dir
,
225 for file_name
, file in paths
.items():
226 if "dir" in file_name
and not os
.path
.exists(file):
227 err_msg
= "{} dir does not exist".format(file)
228 self
.log
.error(err_msg
)
229 raise K8sException(err_msg
)
231 # 3 - Prepare environment variables
232 env
= {"HELM_HOME": helm_dir
, "KUBECONFIG": config_filename
}
236 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
238 paths
, env
= self
._init
_paths
_env
(
239 cluster_name
=cluster_id
, create_if_not_exist
=True
242 command1
= "env KUBECONFIG={} {} get manifest {} ".format(
243 kubeconfig
, self
._helm
_command
, kdu_instance
245 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
246 output
, _rc
= await self
._local
_async
_exec
_pipe
(
247 command1
, command2
, env
=env
, raise_exception_on_error
=True
249 services
= self
._parse
_services
(output
)
253 async def _cluster_init(
254 self
, cluster_id
: str, namespace
: str, paths
: dict, env
: dict
257 Implements the helm version dependent cluster initialization:
258 For helm2 it initialized tiller environment if needed
261 # check if tiller pod is up in cluster
262 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
263 self
.kubectl_command
, paths
["kube_config"], namespace
265 output
, _rc
= await self
._local
_async
_exec
(
266 command
=command
, raise_exception_on_error
=True, env
=env
269 output_table
= self
._output
_to
_table
(output
=output
)
271 # find 'tiller' pod in all pods
272 already_initialized
= False
274 for row
in output_table
:
275 if row
[0].startswith("tiller-deploy"):
276 already_initialized
= True
282 n2vc_installed_sw
= False
283 if not already_initialized
:
285 "Initializing helm in client and server: {}".format(cluster_id
)
287 command
= "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
288 self
.kubectl_command
, paths
["kube_config"], self
.service_account
290 _
, _rc
= await self
._local
_async
_exec
(
291 command
=command
, raise_exception_on_error
=False, env
=env
295 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
296 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
297 ).format(self
.kubectl_command
, paths
["kube_config"], self
.service_account
)
298 _
, _rc
= await self
._local
_async
_exec
(
299 command
=command
, raise_exception_on_error
=False, env
=env
303 "{} init --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
307 paths
["kube_config"],
310 self
.service_account
,
311 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
312 if self
._stable
_repo
_url
315 _
, _rc
= await self
._local
_async
_exec
(
316 command
=command
, raise_exception_on_error
=True, env
=env
318 n2vc_installed_sw
= True
320 # check client helm installation
321 check_file
= paths
["helm_dir"] + "/repository/repositories.yaml"
322 if not self
._check
_file
_exists
(
323 filename
=check_file
, exception_if_not_exists
=False
325 self
.log
.info("Initializing helm in client: {}".format(cluster_id
))
327 "{} init --kubeconfig={} --tiller-namespace={} "
328 "--home={} --client-only {} "
331 paths
["kube_config"],
334 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
335 if self
._stable
_repo
_url
338 output
, _rc
= await self
._local
_async
_exec
(
339 command
=command
, raise_exception_on_error
=True, env
=env
342 self
.log
.info("Helm client already initialized")
344 repo_list
= await self
.repo_list(cluster_id
)
345 for repo
in repo_list
:
346 if repo
["name"] == "stable" and repo
["url"] != self
._stable
_repo
_url
:
347 self
.log
.debug("Add new stable repo url: {}")
348 await self
.repo_remove(cluster_id
, "stable")
349 if self
._stable
_repo
_url
:
350 await self
.repo_add(cluster_id
, "stable", self
._stable
_repo
_url
)
353 return n2vc_installed_sw
355 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
356 # uninstall Tiller if necessary
358 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
361 paths
, env
= self
._init
_paths
_env
(
362 cluster_name
=cluster_id
, create_if_not_exist
=True
366 # find namespace for tiller pod
367 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
368 self
.kubectl_command
, paths
["kube_config"]
370 output
, _rc
= await self
._local
_async
_exec
(
371 command
=command
, raise_exception_on_error
=False, env
=env
373 output_table
= self
._output
_to
_table
(output
=output
)
375 for r
in output_table
:
377 if "tiller-deploy" in r
[1]:
383 msg
= "Tiller deployment not found in cluster {}".format(cluster_id
)
386 self
.log
.debug("namespace for tiller: {}".format(namespace
))
389 # uninstall tiller from cluster
390 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
391 command
= "{} --kubeconfig={} --home={} reset".format(
392 self
._helm
_command
, paths
["kube_config"], paths
["helm_dir"]
394 self
.log
.debug("resetting: {}".format(command
))
395 output
, _rc
= await self
._local
_async
_exec
(
396 command
=command
, raise_exception_on_error
=True, env
=env
398 # Delete clusterrolebinding and serviceaccount.
399 # Ignore if errors for backward compatibility
401 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
402 "io/osm-tiller-cluster-rule"
403 ).format(self
.kubectl_command
, paths
["kube_config"])
404 output
, _rc
= await self
._local
_async
_exec
(
405 command
=command
, raise_exception_on_error
=False, env
=env
408 "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
409 self
.kubectl_command
,
410 paths
["kube_config"],
412 self
.service_account
,
415 output
, _rc
= await self
._local
_async
_exec
(
416 command
=command
, raise_exception_on_error
=False, env
=env
420 self
.log
.debug("namespace not found")
422 async def _instances_list(self
, cluster_id
):
424 paths
, env
= self
._init
_paths
_env
(
425 cluster_name
=cluster_id
, create_if_not_exist
=True
428 command
= "{} list --output yaml".format(self
._helm
_command
)
430 output
, _rc
= await self
._local
_async
_exec
(
431 command
=command
, raise_exception_on_error
=True, env
=env
434 if output
and len(output
) > 0:
435 # parse yaml and update keys to lower case to unify with helm3
436 instances
= yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
438 for instance
in instances
:
439 new_instance
= dict((k
.lower(), v
) for k
, v
in instance
.items())
440 new_instances
.append(new_instance
)
445 def _get_inspect_command(
446 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
448 inspect_command
= "{} inspect {} {}{} {}".format(
449 self
._helm
_command
, show_command
, kdu_model
, repo_str
, version
451 return inspect_command
453 def _get_get_command(
454 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
456 get_command
= "env KUBECONFIG={} {} get {} {} --output yaml".format(
457 kubeconfig
, self
._helm
_command
, get_command
, kdu_instance
461 async def _status_kdu(
465 namespace
: str = None,
466 yaml_format
: bool = False,
467 show_error_log
: bool = False,
468 ) -> Union
[str, dict]:
470 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
474 paths
, env
= self
._init
_paths
_env
(
475 cluster_name
=cluster_id
, create_if_not_exist
=True
477 command
= ("env KUBECONFIG={} {} status {} --output yaml").format(
478 paths
["kube_config"], self
._helm
_command
, kdu_instance
480 output
, rc
= await self
._local
_async
_exec
(
482 raise_exception_on_error
=True,
483 show_error_log
=show_error_log
,
493 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
495 # remove field 'notes'
497 del data
.get("info").get("status")["notes"]
501 # parse the manifest to a list of dictionaries
502 if "manifest" in data
:
503 manifest_str
= data
.get("manifest")
504 manifest_docs
= yaml
.load_all(manifest_str
, Loader
=yaml
.SafeLoader
)
506 data
["manifest"] = []
507 for doc
in manifest_docs
:
508 data
["manifest"].append(doc
)
510 # parse field 'resources'
512 resources
= str(data
.get("info").get("status").get("resources"))
513 resource_table
= self
._output
_to
_table
(resources
)
514 data
.get("info").get("status")["resources"] = resource_table
518 # set description to lowercase (unify with helm3)
520 data
.get("info")["description"] = data
.get("info").pop("Description")
526 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
528 cluster_filter
= {"_admin.helm-chart.id": cluster_uuid
}
529 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
531 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
535 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
538 async def _is_install_completed(self
, cluster_id
: str, kdu_instance
: str) -> bool:
540 paths
, env
= self
._init
_paths
_env
(
541 cluster_name
=cluster_id
, create_if_not_exist
=True
544 status
= await self
._status
_kdu
(
545 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, yaml_format
=False
548 # extract info.status.resources-> str
551 # NAME READY UP-TO-DATE AVAILABLE AGE
552 # halting-horse-mongodb 0/1 1 0 0s
553 # halting-petit-mongodb 1/1 1 0 0s
555 resources
= K8sHelmBaseConnector
._get
_deep
(
556 status
, ("info", "status", "resources")
560 resources
= K8sHelmBaseConnector
._output
_to
_table
(resources
)
562 num_lines
= len(resources
)
565 while index
< num_lines
:
567 line1
= resources
[index
]
569 # find '==>' in column 0
570 if line1
[0] == "==>":
571 line2
= resources
[index
]
573 # find READY in column 1
574 if line2
[1] == "READY":
576 line3
= resources
[index
]
578 while len(line3
) > 1 and index
< num_lines
:
579 ready_value
= line3
[1]
580 parts
= ready_value
.split(sep
="/")
581 current
= int(parts
[0])
582 total
= int(parts
[1])
584 self
.log
.debug("NOT READY:\n {}".format(line3
))
586 line3
= resources
[index
]
594 def _get_install_command(
607 timeout_str
= "--timeout {}".format(timeout
)
612 atomic_str
= "--atomic"
616 namespace_str
= "--namespace {}".format(namespace
)
621 version_str
= "--version {}".format(version
)
624 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
625 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
626 kubeconfig
=kubeconfig
,
627 helm
=self
._helm
_command
,
639 def _get_upgrade_scale_command(
652 """Generates the command to scale a Helm Chart release
655 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
656 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
657 namespace (str): Namespace where this KDU instance is deployed
658 scale (int): Scale count
659 version (str): Constraint with specific version of the Chart to use
660 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
661 The --wait flag will be set automatically if --atomic is used
662 replica_str (str): The key under resource_name key where the scale count is stored
663 timeout (float): The time, in seconds, to wait
664 resource_name (str): The KDU's resource to scale
665 kubeconfig (str): Kubeconfig file path
668 str: command to scale a Helm Chart release
673 scale_dict
= {"{}.{}".format(resource_name
, replica_str
): scale
}
675 scale_dict
= {replica_str
: scale
}
677 scale_str
= self
._params
_to
_set
_option
(scale_dict
)
679 return self
._get
_upgrade
_command
(
681 kdu_instance
=kdu_instance
,
683 params_str
=scale_str
,
687 kubeconfig
=kubeconfig
,
690 def _get_upgrade_command(
701 """Generates the command to upgrade a Helm Chart release
704 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
705 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
706 namespace (str): Namespace where this KDU instance is deployed
707 params_str (str): Params used to upgrade the Helm Chart release
708 version (str): Constraint with specific version of the Chart to use
709 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
710 The --wait flag will be set automatically if --atomic is used
711 timeout (float): The time, in seconds, to wait
712 kubeconfig (str): Kubeconfig file path
715 str: command to upgrade a Helm Chart release
720 timeout_str
= "--timeout {}".format(timeout
)
725 atomic_str
= "--atomic"
730 version_str
= "--version {}".format(version
)
733 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} "
734 "--reuse-values {name} {model} {ver}"
736 kubeconfig
=kubeconfig
,
737 helm
=self
._helm
_command
,
747 def _get_rollback_command(
748 self
, kdu_instance
, namespace
, revision
, kubeconfig
750 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
751 kubeconfig
, self
._helm
_command
, kdu_instance
, revision
754 def _get_uninstall_command(
755 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
757 return "env KUBECONFIG={} {} delete --purge {}".format(
758 kubeconfig
, self
._helm
_command
, kdu_instance