2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
23 from typing
import Union
27 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
28 from n2vc
.exceptions
import K8sException
31 class K8sHelmConnector(K8sHelmBaseConnector
):
34 ####################################################################################
35 ################################### P U B L I C ####################################
36 ####################################################################################
43 kubectl_command
: str = "/usr/bin/kubectl",
44 helm_command
: str = "/usr/bin/helm",
49 Initializes helm connector for helm v2
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
56 :param on_update_db: callback called when k8s connector updates database
60 K8sHelmBaseConnector
.__init
__(
65 kubectl_command
=kubectl_command
,
66 helm_command
=helm_command
,
67 on_update_db
=on_update_db
,
70 self
.log
.info("Initializing K8S Helm2 connector")
72 # initialize helm client-only
73 self
.log
.debug("Initializing helm client-only...")
74 command
= "{} init --client-only {} ".format(
76 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
77 if self
._stable
_repo
_url
82 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
84 except Exception as e
:
86 msg
="helm init failed (it was already initialized): {}".format(e
)
89 self
.log
.info("K8S Helm2 connector initialized")
100 kdu_name
: str = None,
101 namespace
: str = None,
105 Deploys of a new KDU instance. It would implicitly rely on the `install` call
106 to deploy the Chart/Bundle properly parametrized (in practice, this call would
107 happen before any _initial-config-primitive_of the VNF is called).
109 :param cluster_uuid: UUID of a K8s cluster known by OSM
110 :param kdu_model: chart/reference (string), which can be either
112 - a name of chart available via the repos known by OSM
113 (e.g. stable/openldap, stable/openldap:1.2.4)
114 - a path to a packaged chart (e.g. mychart.tgz)
115 - a path to an unpacked chart directory or a URL (e.g. mychart)
116 :param kdu_instance: Kdu instance name
117 :param atomic: If set, installation process purges chart/bundle on fail, also
118 will wait until all the K8s objects are active
119 :param timeout: Time in seconds to wait for the install of the chart/bundle
120 (defaults to Helm default timeout: 300s)
121 :param params: dictionary of key-value pairs for instantiation parameters
122 (overriding default values)
123 :param dict db_dict: where to write into database when the status changes.
124 It contains a dict with {collection: <str>, filter: {},
126 e.g. {collection: "nsrs", filter:
127 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
128 :param kdu_name: Name of the KDU instance to be installed
129 :param namespace: K8s namespace to use for the KDU instance
130 :param kwargs: Additional parameters (None yet)
131 :return: True if successful
133 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_uuid
))
136 self
.fs
.sync(from_path
=cluster_uuid
)
139 paths
, env
= self
._init
_paths
_env
(
140 cluster_name
=cluster_uuid
, create_if_not_exist
=True
143 await self
._install
_impl
(
158 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
160 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
163 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
165 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
168 return await self
._exec
_inspect
_command
(
169 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
173 ####################################################################################
174 ################################### P R I V A T E ##################################
175 ####################################################################################
178 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
180 Creates and returns base cluster and kube dirs and returns them.
181 Also created helm3 dirs according to new directory specification, paths are
182 returned and also environment variables that must be provided to execute commands
184 Helm 2 directory specification uses helm_home dir:
186 The variables assigned for this paths are:
187 - Helm hone: $HELM_HOME
188 - helm kubeconfig: $KUBECONFIG
190 :param cluster_name: cluster_name
191 :return: Dictionary with config_paths and dictionary with helm environment variables
194 if base
.endswith("/") or base
.endswith("\\"):
197 # base dir for cluster
198 cluster_dir
= base
+ "/" + cluster_name
201 kube_dir
= cluster_dir
+ "/" + ".kube"
202 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
203 self
.log
.debug("Creating dir {}".format(kube_dir
))
204 os
.makedirs(kube_dir
)
207 helm_dir
= cluster_dir
+ "/" + ".helm"
208 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
209 self
.log
.debug("Creating dir {}".format(helm_dir
))
210 os
.makedirs(helm_dir
)
212 config_filename
= kube_dir
+ "/config"
214 # 2 - Prepare dictionary with paths
216 "kube_dir": kube_dir
,
217 "kube_config": config_filename
,
218 "cluster_dir": cluster_dir
,
219 "helm_dir": helm_dir
,
222 for file_name
, file in paths
.items():
223 if "dir" in file_name
and not os
.path
.exists(file):
224 err_msg
= "{} dir does not exist".format(file)
225 self
.log
.error(err_msg
)
226 raise K8sException(err_msg
)
228 # 3 - Prepare environment variables
229 env
= {"HELM_HOME": helm_dir
, "KUBECONFIG": config_filename
}
233 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
235 paths
, env
= self
._init
_paths
_env
(
236 cluster_name
=cluster_id
, create_if_not_exist
=True
239 command1
= "env KUBECONFIG={} {} get manifest {} ".format(
240 kubeconfig
, self
._helm
_command
, kdu_instance
242 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
243 output
, _rc
= await self
._local
_async
_exec
_pipe
(
244 command1
, command2
, env
=env
, raise_exception_on_error
=True
246 services
= self
._parse
_services
(output
)
250 async def _cluster_init(
251 self
, cluster_id
: str, namespace
: str, paths
: dict, env
: dict
254 Implements the helm version dependent cluster initialization:
255 For helm2 it initialized tiller environment if needed
258 # check if tiller pod is up in cluster
259 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
260 self
.kubectl_command
, paths
["kube_config"], namespace
262 output
, _rc
= await self
._local
_async
_exec
(
263 command
=command
, raise_exception_on_error
=True, env
=env
266 output_table
= self
._output
_to
_table
(output
=output
)
268 # find 'tiller' pod in all pods
269 already_initialized
= False
271 for row
in output_table
:
272 if row
[0].startswith("tiller-deploy"):
273 already_initialized
= True
279 n2vc_installed_sw
= False
280 if not already_initialized
:
282 "Initializing helm in client and server: {}".format(cluster_id
)
284 command
= "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
285 self
.kubectl_command
, paths
["kube_config"], self
.service_account
287 _
, _rc
= await self
._local
_async
_exec
(
288 command
=command
, raise_exception_on_error
=False, env
=env
292 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
293 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
294 ).format(self
.kubectl_command
, paths
["kube_config"], self
.service_account
)
295 _
, _rc
= await self
._local
_async
_exec
(
296 command
=command
, raise_exception_on_error
=False, env
=env
300 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
304 paths
["kube_config"],
307 self
.service_account
,
308 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
309 if self
._stable
_repo
_url
312 _
, _rc
= await self
._local
_async
_exec
(
313 command
=command
, raise_exception_on_error
=True, env
=env
315 n2vc_installed_sw
= True
317 # check client helm installation
318 check_file
= paths
["helm_dir"] + "/repository/repositories.yaml"
319 if not self
._check
_file
_exists
(
320 filename
=check_file
, exception_if_not_exists
=False
322 self
.log
.info("Initializing helm in client: {}".format(cluster_id
))
324 "{} --kubeconfig={} --tiller-namespace={} "
325 "--home={} init --client-only {} "
328 paths
["kube_config"],
331 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
332 if self
._stable
_repo
_url
335 output
, _rc
= await self
._local
_async
_exec
(
336 command
=command
, raise_exception_on_error
=True, env
=env
339 self
.log
.info("Helm client already initialized")
341 repo_list
= await self
.repo_list(cluster_id
)
342 for repo
in repo_list
:
343 if repo
["name"] == "stable" and repo
["url"] != self
._stable
_repo
_url
:
344 self
.log
.debug("Add new stable repo url: {}")
345 await self
.repo_remove(cluster_id
, "stable")
346 if self
._stable
_repo
_url
:
347 await self
.repo_add(cluster_id
, "stable", self
._stable
_repo
_url
)
350 return n2vc_installed_sw
352 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
353 # uninstall Tiller if necessary
355 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
358 paths
, env
= self
._init
_paths
_env
(
359 cluster_name
=cluster_id
, create_if_not_exist
=True
363 # find namespace for tiller pod
364 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
365 self
.kubectl_command
, paths
["kube_config"]
367 output
, _rc
= await self
._local
_async
_exec
(
368 command
=command
, raise_exception_on_error
=False, env
=env
370 output_table
= self
._output
_to
_table
(output
=output
)
372 for r
in output_table
:
374 if "tiller-deploy" in r
[1]:
380 msg
= "Tiller deployment not found in cluster {}".format(cluster_id
)
383 self
.log
.debug("namespace for tiller: {}".format(namespace
))
386 # uninstall tiller from cluster
387 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
388 command
= "{} --kubeconfig={} --home={} reset".format(
389 self
._helm
_command
, paths
["kube_config"], paths
["helm_dir"]
391 self
.log
.debug("resetting: {}".format(command
))
392 output
, _rc
= await self
._local
_async
_exec
(
393 command
=command
, raise_exception_on_error
=True, env
=env
395 # Delete clusterrolebinding and serviceaccount.
396 # Ignore if errors for backward compatibility
398 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
399 "io/osm-tiller-cluster-rule"
400 ).format(self
.kubectl_command
, paths
["kube_config"])
401 output
, _rc
= await self
._local
_async
_exec
(
402 command
=command
, raise_exception_on_error
=False, env
=env
405 "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
406 self
.kubectl_command
,
407 paths
["kube_config"],
409 self
.service_account
,
412 output
, _rc
= await self
._local
_async
_exec
(
413 command
=command
, raise_exception_on_error
=False, env
=env
417 self
.log
.debug("namespace not found")
419 async def _instances_list(self
, cluster_id
):
421 paths
, env
= self
._init
_paths
_env
(
422 cluster_name
=cluster_id
, create_if_not_exist
=True
425 command
= "{} list --output yaml".format(self
._helm
_command
)
427 output
, _rc
= await self
._local
_async
_exec
(
428 command
=command
, raise_exception_on_error
=True, env
=env
431 if output
and len(output
) > 0:
432 # parse yaml and update keys to lower case to unify with helm3
433 instances
= yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
435 for instance
in instances
:
436 new_instance
= dict((k
.lower(), v
) for k
, v
in instance
.items())
437 new_instances
.append(new_instance
)
442 def _get_inspect_command(
443 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
445 inspect_command
= "{} inspect {} {}{} {}".format(
446 self
._helm
_command
, show_command
, kdu_model
, repo_str
, version
448 return inspect_command
450 def _get_get_command(
451 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
453 get_command
= "env KUBECONFIG={} {} get {} {} --output yaml".format(
454 kubeconfig
, self
._helm
_command
, get_command
, kdu_instance
458 async def _status_kdu(
462 namespace
: str = None,
463 yaml_format
: bool = False,
464 show_error_log
: bool = False,
465 ) -> Union
[str, dict]:
467 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
471 paths
, env
= self
._init
_paths
_env
(
472 cluster_name
=cluster_id
, create_if_not_exist
=True
474 command
= ("env KUBECONFIG={} {} status {} --output yaml").format(
475 paths
["kube_config"], self
._helm
_command
, kdu_instance
477 output
, rc
= await self
._local
_async
_exec
(
479 raise_exception_on_error
=True,
480 show_error_log
=show_error_log
,
490 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
492 # remove field 'notes'
494 del data
.get("info").get("status")["notes"]
498 # parse the manifest to a list of dictionaries
499 if "manifest" in data
:
500 manifest_str
= data
.get("manifest")
501 manifest_docs
= yaml
.load_all(manifest_str
, Loader
=yaml
.SafeLoader
)
503 data
["manifest"] = []
504 for doc
in manifest_docs
:
505 data
["manifest"].append(doc
)
507 # parse field 'resources'
509 resources
= str(data
.get("info").get("status").get("resources"))
510 resource_table
= self
._output
_to
_table
(resources
)
511 data
.get("info").get("status")["resources"] = resource_table
515 # set description to lowercase (unify with helm3)
517 data
.get("info")["description"] = data
.get("info").pop("Description")
523 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
525 cluster_filter
= {"_admin.helm-chart.id": cluster_uuid
}
526 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
528 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
532 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
535 async def _is_install_completed(self
, cluster_id
: str, kdu_instance
: str) -> bool:
537 paths
, env
= self
._init
_paths
_env
(
538 cluster_name
=cluster_id
, create_if_not_exist
=True
541 status
= await self
._status
_kdu
(
542 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, yaml_format
=False
545 # extract info.status.resources-> str
548 # NAME READY UP-TO-DATE AVAILABLE AGE
549 # halting-horse-mongodb 0/1 1 0 0s
550 # halting-petit-mongodb 1/1 1 0 0s
552 resources
= K8sHelmBaseConnector
._get
_deep
(
553 status
, ("info", "status", "resources")
557 resources
= K8sHelmBaseConnector
._output
_to
_table
(resources
)
559 num_lines
= len(resources
)
562 while index
< num_lines
:
564 line1
= resources
[index
]
566 # find '==>' in column 0
567 if line1
[0] == "==>":
568 line2
= resources
[index
]
570 # find READY in column 1
571 if line2
[1] == "READY":
573 line3
= resources
[index
]
575 while len(line3
) > 1 and index
< num_lines
:
576 ready_value
= line3
[1]
577 parts
= ready_value
.split(sep
="/")
578 current
= int(parts
[0])
579 total
= int(parts
[1])
581 self
.log
.debug("NOT READY:\n {}".format(line3
))
583 line3
= resources
[index
]
591 def _get_install_command(
604 timeout_str
= "--timeout {}".format(timeout
)
609 atomic_str
= "--atomic"
613 namespace_str
= "--namespace {}".format(namespace
)
618 version_str
= "--version {}".format(version
)
621 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
622 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
623 kubeconfig
=kubeconfig
,
624 helm
=self
._helm
_command
,
636 def _get_upgrade_scale_command(
649 """Generates the command to scale a Helm Chart release
652 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
653 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
654 namespace (str): Namespace where this KDU instance is deployed
655 scale (int): Scale count
656 version (str): Constraint with specific version of the Chart to use
657 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
658 The --wait flag will be set automatically if --atomic is used
659 replica_str (str): The key under resource_name key where the scale count is stored
660 timeout (float): The time, in seconds, to wait
661 resource_name (str): The KDU's resource to scale
662 kubeconfig (str): Kubeconfig file path
665 str: command to scale a Helm Chart release
670 scale_dict
= {"{}.{}".format(resource_name
, replica_str
): scale
}
672 scale_dict
= {replica_str
: scale
}
674 scale_str
= self
._params
_to
_set
_option
(scale_dict
)
676 return self
._get
_upgrade
_command
(
678 kdu_instance
=kdu_instance
,
680 params_str
=scale_str
,
684 kubeconfig
=kubeconfig
,
687 def _get_upgrade_command(
699 """Generates the command to upgrade a Helm Chart release
702 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
703 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
704 namespace (str): Namespace where this KDU instance is deployed
705 params_str (str): Params used to upgrade the Helm Chart release
706 version (str): Constraint with specific version of the Chart to use
707 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
708 The --wait flag will be set automatically if --atomic is used
709 timeout (float): The time, in seconds, to wait
710 kubeconfig (str): Kubeconfig file path
711 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
713 str: command to upgrade a Helm Chart release
718 timeout_str
= "--timeout {}".format(timeout
)
723 atomic_str
= "--atomic"
728 force_str
= "--force "
733 version_str
= "--version {}".format(version
)
738 namespace_str
= "--namespace {}".format(namespace
)
741 "env KUBECONFIG={kubeconfig} {helm} upgrade {namespace} {atomic} --output yaml {params} {timeout} {force}"
742 "--reuse-values {name} {model} {ver}"
744 kubeconfig
=kubeconfig
,
745 helm
=self
._helm
_command
,
746 namespace
=namespace_str
,
757 def _get_rollback_command(
758 self
, kdu_instance
, namespace
, revision
, kubeconfig
760 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
761 kubeconfig
, self
._helm
_command
, kdu_instance
, revision
764 def _get_uninstall_command(
765 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
767 return "env KUBECONFIG={} {} delete --purge {}".format(
768 kubeconfig
, self
._helm
_command
, kdu_instance