2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
23 from typing
import Union
27 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
28 from n2vc
.exceptions
import K8sException
31 class K8sHelmConnector(K8sHelmBaseConnector
):
34 ####################################################################################
35 ################################### P U B L I C ####################################
36 ####################################################################################
43 kubectl_command
: str = "/usr/bin/kubectl",
44 helm_command
: str = "/usr/bin/helm",
49 Initializes helm connector for helm v2
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
56 :param on_update_db: callback called when k8s connector updates database
60 K8sHelmBaseConnector
.__init
__(
65 kubectl_command
=kubectl_command
,
66 helm_command
=helm_command
,
67 on_update_db
=on_update_db
,
70 self
.log
.info("Initializing K8S Helm2 connector")
72 # initialize helm client-only
73 self
.log
.debug("Initializing helm client-only...")
74 command
= "{} init --client-only {} ".format(
76 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
77 if self
._stable
_repo
_url
81 asyncio
.ensure_future(
82 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
84 # loop = asyncio.get_event_loop()
85 # loop.run_until_complete(self._local_async_exec(command=command,
86 # raise_exception_on_error=False))
87 except Exception as e
:
89 msg
="helm init failed (it was already initialized): {}".format(e
)
92 self
.log
.info("K8S Helm2 connector initialized")
100 timeout
: float = 300,
102 db_dict
: dict = None,
103 kdu_name
: str = None,
104 namespace
: str = None,
108 Deploys of a new KDU instance. It would implicitly rely on the `install` call
109 to deploy the Chart/Bundle properly parametrized (in practice, this call would
110 happen before any _initial-config-primitive_of the VNF is called).
112 :param cluster_uuid: UUID of a K8s cluster known by OSM
113 :param kdu_model: chart/reference (string), which can be either
115 - a name of chart available via the repos known by OSM
116 (e.g. stable/openldap, stable/openldap:1.2.4)
117 - a path to a packaged chart (e.g. mychart.tgz)
118 - a path to an unpacked chart directory or a URL (e.g. mychart)
119 :param kdu_instance: Kdu instance name
120 :param atomic: If set, installation process purges chart/bundle on fail, also
121 will wait until all the K8s objects are active
122 :param timeout: Time in seconds to wait for the install of the chart/bundle
123 (defaults to Helm default timeout: 300s)
124 :param params: dictionary of key-value pairs for instantiation parameters
125 (overriding default values)
126 :param dict db_dict: where to write into database when the status changes.
127 It contains a dict with {collection: <str>, filter: {},
129 e.g. {collection: "nsrs", filter:
130 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
131 :param kdu_name: Name of the KDU instance to be installed
132 :param namespace: K8s namespace to use for the KDU instance
133 :param kwargs: Additional parameters (None yet)
134 :return: True if successful
136 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_uuid
))
139 self
.fs
.sync(from_path
=cluster_uuid
)
142 paths
, env
= self
._init
_paths
_env
(
143 cluster_name
=cluster_uuid
, create_if_not_exist
=True
146 await self
._install
_impl
(
161 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
163 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
166 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
169 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
172 return await self
._exec
_inspect
_command
(
173 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
177 ####################################################################################
178 ################################### P R I V A T E ##################################
179 ####################################################################################
182 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
184 Creates and returns base cluster and kube dirs and returns them.
185 Also created helm3 dirs according to new directory specification, paths are
186 returned and also environment variables that must be provided to execute commands
188 Helm 2 directory specification uses helm_home dir:
190 The variables assigned for this paths are:
191 - Helm hone: $HELM_HOME
192 - helm kubeconfig: $KUBECONFIG
194 :param cluster_name: cluster_name
195 :return: Dictionary with config_paths and dictionary with helm environment variables
198 if base
.endswith("/") or base
.endswith("\\"):
201 # base dir for cluster
202 cluster_dir
= base
+ "/" + cluster_name
205 kube_dir
= cluster_dir
+ "/" + ".kube"
206 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
207 self
.log
.debug("Creating dir {}".format(kube_dir
))
208 os
.makedirs(kube_dir
)
211 helm_dir
= cluster_dir
+ "/" + ".helm"
212 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
213 self
.log
.debug("Creating dir {}".format(helm_dir
))
214 os
.makedirs(helm_dir
)
216 config_filename
= kube_dir
+ "/config"
218 # 2 - Prepare dictionary with paths
220 "kube_dir": kube_dir
,
221 "kube_config": config_filename
,
222 "cluster_dir": cluster_dir
,
223 "helm_dir": helm_dir
,
226 for file_name
, file in paths
.items():
227 if "dir" in file_name
and not os
.path
.exists(file):
228 err_msg
= "{} dir does not exist".format(file)
229 self
.log
.error(err_msg
)
230 raise K8sException(err_msg
)
232 # 3 - Prepare environment variables
233 env
= {"HELM_HOME": helm_dir
, "KUBECONFIG": config_filename
}
237 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
240 paths
, env
= self
._init
_paths
_env
(
241 cluster_name
=cluster_id
, create_if_not_exist
=True
244 command1
= "env KUBECONFIG={} {} get manifest {} ".format(
245 kubeconfig
, self
._helm
_command
, kdu_instance
247 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
248 output
, _rc
= await self
._local
_async
_exec
_pipe
(
249 command1
, command2
, env
=env
, raise_exception_on_error
=True
251 services
= self
._parse
_services
(output
)
255 async def _cluster_init(
256 self
, cluster_id
: str, namespace
: str, paths
: dict, env
: dict
259 Implements the helm version dependent cluster initialization:
260 For helm2 it initialized tiller environment if needed
263 # check if tiller pod is up in cluster
264 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
265 self
.kubectl_command
, paths
["kube_config"], namespace
267 output
, _rc
= await self
._local
_async
_exec
(
268 command
=command
, raise_exception_on_error
=True, env
=env
271 output_table
= self
._output
_to
_table
(output
=output
)
273 # find 'tiller' pod in all pods
274 already_initialized
= False
276 for row
in output_table
:
277 if row
[0].startswith("tiller-deploy"):
278 already_initialized
= True
284 n2vc_installed_sw
= False
285 if not already_initialized
:
287 "Initializing helm in client and server: {}".format(cluster_id
)
289 command
= "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
290 self
.kubectl_command
, paths
["kube_config"], self
.service_account
292 _
, _rc
= await self
._local
_async
_exec
(
293 command
=command
, raise_exception_on_error
=False, env
=env
297 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
298 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
299 ).format(self
.kubectl_command
, paths
["kube_config"], self
.service_account
)
300 _
, _rc
= await self
._local
_async
_exec
(
301 command
=command
, raise_exception_on_error
=False, env
=env
305 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
309 paths
["kube_config"],
312 self
.service_account
,
313 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
314 if self
._stable
_repo
_url
317 _
, _rc
= await self
._local
_async
_exec
(
318 command
=command
, raise_exception_on_error
=True, env
=env
320 n2vc_installed_sw
= True
322 # check client helm installation
323 check_file
= paths
["helm_dir"] + "/repository/repositories.yaml"
324 if not self
._check
_file
_exists
(
325 filename
=check_file
, exception_if_not_exists
=False
327 self
.log
.info("Initializing helm in client: {}".format(cluster_id
))
329 "{} --kubeconfig={} --tiller-namespace={} "
330 "--home={} init --client-only {} "
333 paths
["kube_config"],
336 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
337 if self
._stable
_repo
_url
340 output
, _rc
= await self
._local
_async
_exec
(
341 command
=command
, raise_exception_on_error
=True, env
=env
344 self
.log
.info("Helm client already initialized")
346 repo_list
= await self
.repo_list(cluster_id
)
347 for repo
in repo_list
:
348 if repo
["name"] == "stable" and repo
["url"] != self
._stable
_repo
_url
:
349 self
.log
.debug("Add new stable repo url: {}")
350 await self
.repo_remove(cluster_id
, "stable")
351 if self
._stable
_repo
_url
:
352 await self
.repo_add(cluster_id
, "stable", self
._stable
_repo
_url
)
355 return n2vc_installed_sw
357 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
358 # uninstall Tiller if necessary
360 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
363 paths
, env
= self
._init
_paths
_env
(
364 cluster_name
=cluster_id
, create_if_not_exist
=True
368 # find namespace for tiller pod
369 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
370 self
.kubectl_command
, paths
["kube_config"]
372 output
, _rc
= await self
._local
_async
_exec
(
373 command
=command
, raise_exception_on_error
=False, env
=env
375 output_table
= self
._output
_to
_table
(output
=output
)
377 for r
in output_table
:
379 if "tiller-deploy" in r
[1]:
385 msg
= "Tiller deployment not found in cluster {}".format(cluster_id
)
388 self
.log
.debug("namespace for tiller: {}".format(namespace
))
391 # uninstall tiller from cluster
392 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
393 command
= "{} --kubeconfig={} --home={} reset".format(
394 self
._helm
_command
, paths
["kube_config"], paths
["helm_dir"]
396 self
.log
.debug("resetting: {}".format(command
))
397 output
, _rc
= await self
._local
_async
_exec
(
398 command
=command
, raise_exception_on_error
=True, env
=env
400 # Delete clusterrolebinding and serviceaccount.
401 # Ignore if errors for backward compatibility
403 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
404 "io/osm-tiller-cluster-rule"
405 ).format(self
.kubectl_command
, paths
["kube_config"])
406 output
, _rc
= await self
._local
_async
_exec
(
407 command
=command
, raise_exception_on_error
=False, env
=env
410 "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
411 self
.kubectl_command
,
412 paths
["kube_config"],
414 self
.service_account
,
417 output
, _rc
= await self
._local
_async
_exec
(
418 command
=command
, raise_exception_on_error
=False, env
=env
422 self
.log
.debug("namespace not found")
424 async def _instances_list(self
, cluster_id
):
427 paths
, env
= self
._init
_paths
_env
(
428 cluster_name
=cluster_id
, create_if_not_exist
=True
431 command
= "{} list --output yaml".format(self
._helm
_command
)
433 output
, _rc
= await self
._local
_async
_exec
(
434 command
=command
, raise_exception_on_error
=True, env
=env
437 if output
and len(output
) > 0:
438 # parse yaml and update keys to lower case to unify with helm3
439 instances
= yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
441 for instance
in instances
:
442 new_instance
= dict((k
.lower(), v
) for k
, v
in instance
.items())
443 new_instances
.append(new_instance
)
448 def _get_inspect_command(
449 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
451 inspect_command
= "{} inspect {} {}{} {}".format(
452 self
._helm
_command
, show_command
, kdu_model
, repo_str
, version
454 return inspect_command
456 def _get_get_command(
457 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
459 get_command
= "env KUBECONFIG={} {} get {} {} --output yaml".format(
460 kubeconfig
, self
._helm
_command
, get_command
, kdu_instance
464 async def _status_kdu(
468 namespace
: str = None,
469 yaml_format
: bool = False,
470 show_error_log
: bool = False,
471 ) -> Union
[str, dict]:
474 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
478 paths
, env
= self
._init
_paths
_env
(
479 cluster_name
=cluster_id
, create_if_not_exist
=True
481 command
= ("env KUBECONFIG={} {} status {} --output yaml").format(
482 paths
["kube_config"], self
._helm
_command
, kdu_instance
484 output
, rc
= await self
._local
_async
_exec
(
486 raise_exception_on_error
=True,
487 show_error_log
=show_error_log
,
497 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
499 # remove field 'notes'
501 del data
.get("info").get("status")["notes"]
505 # parse the manifest to a list of dictionaries
506 if "manifest" in data
:
507 manifest_str
= data
.get("manifest")
508 manifest_docs
= yaml
.load_all(manifest_str
, Loader
=yaml
.SafeLoader
)
510 data
["manifest"] = []
511 for doc
in manifest_docs
:
512 data
["manifest"].append(doc
)
514 # parse field 'resources'
516 resources
= str(data
.get("info").get("status").get("resources"))
517 resource_table
= self
._output
_to
_table
(resources
)
518 data
.get("info").get("status")["resources"] = resource_table
522 # set description to lowercase (unify with helm3)
524 data
.get("info")["description"] = data
.get("info").pop("Description")
530 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
532 cluster_filter
= {"_admin.helm-chart.id": cluster_uuid
}
533 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
535 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
539 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
542 async def _is_install_completed(self
, cluster_id
: str, kdu_instance
: str) -> bool:
544 paths
, env
= self
._init
_paths
_env
(
545 cluster_name
=cluster_id
, create_if_not_exist
=True
548 status
= await self
._status
_kdu
(
549 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, yaml_format
=False
552 # extract info.status.resources-> str
555 # NAME READY UP-TO-DATE AVAILABLE AGE
556 # halting-horse-mongodb 0/1 1 0 0s
557 # halting-petit-mongodb 1/1 1 0 0s
559 resources
= K8sHelmBaseConnector
._get
_deep
(
560 status
, ("info", "status", "resources")
564 resources
= K8sHelmBaseConnector
._output
_to
_table
(resources
)
566 num_lines
= len(resources
)
569 while index
< num_lines
:
571 line1
= resources
[index
]
573 # find '==>' in column 0
574 if line1
[0] == "==>":
575 line2
= resources
[index
]
577 # find READY in column 1
578 if line2
[1] == "READY":
580 line3
= resources
[index
]
582 while len(line3
) > 1 and index
< num_lines
:
583 ready_value
= line3
[1]
584 parts
= ready_value
.split(sep
="/")
585 current
= int(parts
[0])
586 total
= int(parts
[1])
588 self
.log
.debug("NOT READY:\n {}".format(line3
))
590 line3
= resources
[index
]
598 def _get_install_command(
612 timeout_str
= "--timeout {}".format(timeout
)
617 atomic_str
= "--atomic"
621 namespace_str
= "--namespace {}".format(namespace
)
626 version_str
= "--version {}".format(version
)
629 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
630 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
631 kubeconfig
=kubeconfig
,
632 helm
=self
._helm
_command
,
644 def _get_upgrade_scale_command(
657 """Generates the command to scale a Helm Chart release
660 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
661 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
662 namespace (str): Namespace where this KDU instance is deployed
663 scale (int): Scale count
664 version (str): Constraint with specific version of the Chart to use
665 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
666 The --wait flag will be set automatically if --atomic is used
667 replica_str (str): The key under resource_name key where the scale count is stored
668 timeout (float): The time, in seconds, to wait
669 resource_name (str): The KDU's resource to scale
670 kubeconfig (str): Kubeconfig file path
673 str: command to scale a Helm Chart release
678 scale_dict
= {"{}.{}".format(resource_name
, replica_str
): scale
}
680 scale_dict
= {replica_str
: scale
}
682 scale_str
= self
._params
_to
_set
_option
(scale_dict
)
684 return self
._get
_upgrade
_command
(
686 kdu_instance
=kdu_instance
,
688 params_str
=scale_str
,
692 kubeconfig
=kubeconfig
,
695 def _get_upgrade_command(
707 """Generates the command to upgrade a Helm Chart release
710 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
711 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
712 namespace (str): Namespace where this KDU instance is deployed
713 params_str (str): Params used to upgrade the Helm Chart release
714 version (str): Constraint with specific version of the Chart to use
715 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
716 The --wait flag will be set automatically if --atomic is used
717 timeout (float): The time, in seconds, to wait
718 kubeconfig (str): Kubeconfig file path
719 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
721 str: command to upgrade a Helm Chart release
726 timeout_str
= "--timeout {}".format(timeout
)
731 atomic_str
= "--atomic"
736 force_str
= "--force "
741 version_str
= "--version {}".format(version
)
746 namespace_str
= "--namespace {}".format(namespace
)
749 "env KUBECONFIG={kubeconfig} {helm} upgrade {namespace} {atomic} --output yaml {params} {timeout} {force}"
750 "--reuse-values {name} {model} {ver}"
752 kubeconfig
=kubeconfig
,
753 helm
=self
._helm
_command
,
754 namespace
=namespace_str
,
765 def _get_rollback_command(
766 self
, kdu_instance
, namespace
, revision
, kubeconfig
768 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
769 kubeconfig
, self
._helm
_command
, kdu_instance
, revision
772 def _get_uninstall_command(
773 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
775 return "env KUBECONFIG={} {} delete --purge {}".format(
776 kubeconfig
, self
._helm
_command
, kdu_instance