2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
23 from typing
import Union
24 from shlex
import quote
28 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
29 from n2vc
.exceptions
import K8sException
32 class K8sHelmConnector(K8sHelmBaseConnector
):
35 ####################################################################################
36 ################################### P U B L I C ####################################
37 ####################################################################################
44 kubectl_command
: str = "/usr/bin/kubectl",
45 helm_command
: str = "/usr/bin/helm",
50 Initializes helm connector for helm v2
52 :param fs: file system for kubernetes and helm configuration
53 :param db: database object to write current operation status
54 :param kubectl_command: path to kubectl executable
55 :param helm_command: path to helm executable
57 :param on_update_db: callback called when k8s connector updates database
61 K8sHelmBaseConnector
.__init
__(
66 kubectl_command
=kubectl_command
,
67 helm_command
=helm_command
,
68 on_update_db
=on_update_db
,
71 self
.log
.info("Initializing K8S Helm2 connector")
73 # initialize helm client-only
74 self
.log
.debug("Initializing helm client-only...")
75 command
= "{} init --client-only {} ".format(
77 "--stable-repo-url {}".format(quote(self
._stable
_repo
_url
))
78 if self
._stable
_repo
_url
83 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
85 except Exception as e
:
87 msg
="helm init failed (it was already initialized): {}".format(e
)
90 self
.log
.info("K8S Helm2 connector initialized")
100 db_dict
: dict = None,
101 kdu_name
: str = None,
102 namespace
: str = None,
106 Deploys of a new KDU instance. It would implicitly rely on the `install` call
107 to deploy the Chart/Bundle properly parametrized (in practice, this call would
108 happen before any _initial-config-primitive_of the VNF is called).
110 :param cluster_uuid: UUID of a K8s cluster known by OSM
111 :param kdu_model: chart/reference (string), which can be either
113 - a name of chart available via the repos known by OSM
114 (e.g. stable/openldap, stable/openldap:1.2.4)
115 - a path to a packaged chart (e.g. mychart.tgz)
116 - a path to an unpacked chart directory or a URL (e.g. mychart)
117 :param kdu_instance: Kdu instance name
118 :param atomic: If set, installation process purges chart/bundle on fail, also
119 will wait until all the K8s objects are active
120 :param timeout: Time in seconds to wait for the install of the chart/bundle
121 (defaults to Helm default timeout: 300s)
122 :param params: dictionary of key-value pairs for instantiation parameters
123 (overriding default values)
124 :param dict db_dict: where to write into database when the status changes.
125 It contains a dict with {collection: <str>, filter: {},
127 e.g. {collection: "nsrs", filter:
128 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
129 :param kdu_name: Name of the KDU instance to be installed
130 :param namespace: K8s namespace to use for the KDU instance
131 :param kwargs: Additional parameters (None yet)
132 :return: True if successful
134 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_uuid
))
137 self
.fs
.sync(from_path
=cluster_uuid
)
140 paths
, env
= self
._init
_paths
_env
(
141 cluster_name
=cluster_uuid
, create_if_not_exist
=True
144 await self
._install
_impl
(
159 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
161 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
164 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
166 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
169 return await self
._exec
_inspect
_command
(
170 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
174 ####################################################################################
175 ################################### P R I V A T E ##################################
176 ####################################################################################
179 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
181 Creates and returns base cluster and kube dirs and returns them.
182 Also created helm3 dirs according to new directory specification, paths are
183 returned and also environment variables that must be provided to execute commands
185 Helm 2 directory specification uses helm_home dir:
187 The variables assigned for this paths are:
188 - Helm hone: $HELM_HOME
189 - helm kubeconfig: $KUBECONFIG
191 :param cluster_name: cluster_name
192 :return: Dictionary with config_paths and dictionary with helm environment variables
195 if base
.endswith("/") or base
.endswith("\\"):
198 # base dir for cluster
199 cluster_dir
= base
+ "/" + cluster_name
202 kube_dir
= cluster_dir
+ "/" + ".kube"
203 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
204 self
.log
.debug("Creating dir {}".format(kube_dir
))
205 os
.makedirs(kube_dir
)
208 helm_dir
= cluster_dir
+ "/" + ".helm"
209 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
210 self
.log
.debug("Creating dir {}".format(helm_dir
))
211 os
.makedirs(helm_dir
)
213 config_filename
= kube_dir
+ "/config"
215 # 2 - Prepare dictionary with paths
217 "kube_dir": kube_dir
,
218 "kube_config": config_filename
,
219 "cluster_dir": cluster_dir
,
220 "helm_dir": helm_dir
,
223 for file_name
, file in paths
.items():
224 if "dir" in file_name
and not os
.path
.exists(file):
225 err_msg
= "{} dir does not exist".format(file)
226 self
.log
.error(err_msg
)
227 raise K8sException(err_msg
)
229 # 3 - Prepare environment variables
230 env
= {"HELM_HOME": helm_dir
, "KUBECONFIG": config_filename
}
234 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
236 paths
, env
= self
._init
_paths
_env
(
237 cluster_name
=cluster_id
, create_if_not_exist
=True
240 command1
= "env KUBECONFIG={} {} get manifest {} ".format(
241 kubeconfig
, self
._helm
_command
, quote(kdu_instance
)
243 command2
= "{} get --namespace={} -f -".format(
244 self
.kubectl_command
, quote(namespace
)
246 output
, _rc
= await self
._local
_async
_exec
_pipe
(
247 command1
, command2
, env
=env
, raise_exception_on_error
=True
249 services
= self
._parse
_services
(output
)
253 async def _cluster_init(
254 self
, cluster_id
: str, namespace
: str, paths
: dict, env
: dict
257 Implements the helm version dependent cluster initialization:
258 For helm2 it initialized tiller environment if needed
261 # check if tiller pod is up in cluster
262 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
263 self
.kubectl_command
, paths
["kube_config"], quote(namespace
)
265 output
, _rc
= await self
._local
_async
_exec
(
266 command
=command
, raise_exception_on_error
=True, env
=env
269 output_table
= self
._output
_to
_table
(output
=output
)
271 # find 'tiller' pod in all pods
272 already_initialized
= False
274 for row
in output_table
:
275 if row
[0].startswith("tiller-deploy"):
276 already_initialized
= True
282 n2vc_installed_sw
= False
283 if not already_initialized
:
285 "Initializing helm in client and server: {}".format(cluster_id
)
287 command
= "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
288 self
.kubectl_command
, paths
["kube_config"], quote(self
.service_account
)
290 _
, _rc
= await self
._local
_async
_exec
(
291 command
=command
, raise_exception_on_error
=False, env
=env
295 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
296 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
298 self
.kubectl_command
, paths
["kube_config"], quote(self
.service_account
)
300 _
, _rc
= await self
._local
_async
_exec
(
301 command
=command
, raise_exception_on_error
=False, env
=env
305 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
309 paths
["kube_config"],
311 quote(paths
["helm_dir"]),
312 quote(self
.service_account
),
313 "--stable-repo-url {}".format(quote(self
._stable
_repo
_url
))
314 if self
._stable
_repo
_url
317 _
, _rc
= await self
._local
_async
_exec
(
318 command
=command
, raise_exception_on_error
=True, env
=env
320 n2vc_installed_sw
= True
322 # check client helm installation
323 check_file
= paths
["helm_dir"] + "/repository/repositories.yaml"
324 if not self
._check
_file
_exists
(
325 filename
=check_file
, exception_if_not_exists
=False
327 self
.log
.info("Initializing helm in client: {}".format(cluster_id
))
329 "{} --kubeconfig={} --tiller-namespace={} "
330 "--home={} init --client-only {} "
333 paths
["kube_config"],
335 quote(paths
["helm_dir"]),
336 "--stable-repo-url {}".format(quote(self
._stable
_repo
_url
))
337 if self
._stable
_repo
_url
340 output
, _rc
= await self
._local
_async
_exec
(
341 command
=command
, raise_exception_on_error
=True, env
=env
344 self
.log
.info("Helm client already initialized")
346 repo_list
= await self
.repo_list(cluster_id
)
347 for repo
in repo_list
:
348 if repo
["name"] == "stable" and repo
["url"] != self
._stable
_repo
_url
:
349 self
.log
.debug("Add new stable repo url: {}")
350 await self
.repo_remove(cluster_id
, "stable")
351 if self
._stable
_repo
_url
:
352 await self
.repo_add(cluster_id
, "stable", self
._stable
_repo
_url
)
355 return n2vc_installed_sw
357 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
358 # uninstall Tiller if necessary
360 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
363 paths
, env
= self
._init
_paths
_env
(
364 cluster_name
=cluster_id
, create_if_not_exist
=True
368 # find namespace for tiller pod
369 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
370 self
.kubectl_command
, quote(paths
["kube_config"])
372 output
, _rc
= await self
._local
_async
_exec
(
373 command
=command
, raise_exception_on_error
=False, env
=env
375 output_table
= self
._output
_to
_table
(output
=output
)
377 for r
in output_table
:
379 if "tiller-deploy" in r
[1]:
385 msg
= "Tiller deployment not found in cluster {}".format(cluster_id
)
388 self
.log
.debug("namespace for tiller: {}".format(namespace
))
391 # uninstall tiller from cluster
392 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
393 command
= "{} --kubeconfig={} --home={} reset".format(
395 quote(paths
["kube_config"]),
396 quote(paths
["helm_dir"]),
398 self
.log
.debug("resetting: {}".format(command
))
399 output
, _rc
= await self
._local
_async
_exec
(
400 command
=command
, raise_exception_on_error
=True, env
=env
402 # Delete clusterrolebinding and serviceaccount.
403 # Ignore if errors for backward compatibility
405 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
406 "io/osm-tiller-cluster-rule"
407 ).format(self
.kubectl_command
, quote(paths
["kube_config"]))
408 output
, _rc
= await self
._local
_async
_exec
(
409 command
=command
, raise_exception_on_error
=False, env
=env
412 "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
413 self
.kubectl_command
,
414 quote(paths
["kube_config"]),
416 quote(self
.service_account
),
419 output
, _rc
= await self
._local
_async
_exec
(
420 command
=command
, raise_exception_on_error
=False, env
=env
424 self
.log
.debug("namespace not found")
426 async def _instances_list(self
, cluster_id
):
428 paths
, env
= self
._init
_paths
_env
(
429 cluster_name
=cluster_id
, create_if_not_exist
=True
432 command
= "{} list --output yaml".format(self
._helm
_command
)
434 output
, _rc
= await self
._local
_async
_exec
(
435 command
=command
, raise_exception_on_error
=True, env
=env
438 if output
and len(output
) > 0:
439 # parse yaml and update keys to lower case to unify with helm3
440 instances
= yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
442 for instance
in instances
:
443 new_instance
= dict((k
.lower(), v
) for k
, v
in instance
.items())
444 new_instances
.append(new_instance
)
449 def _get_inspect_command(
450 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
452 inspect_command
= "{} inspect {} {}{} {}".format(
453 self
._helm
_command
, show_command
, quote(kdu_model
), repo_str
, version
455 return inspect_command
457 def _get_get_command(
458 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
460 get_command
= "env KUBECONFIG={} {} get {} {} --output yaml".format(
461 kubeconfig
, self
._helm
_command
, get_command
, quote(kdu_instance
)
465 async def _status_kdu(
469 namespace
: str = None,
470 yaml_format
: bool = False,
471 show_error_log
: bool = False,
472 ) -> Union
[str, dict]:
474 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
478 paths
, env
= self
._init
_paths
_env
(
479 cluster_name
=cluster_id
, create_if_not_exist
=True
481 command
= ("env KUBECONFIG={} {} status {} --output yaml").format(
482 paths
["kube_config"], self
._helm
_command
, quote(kdu_instance
)
484 output
, rc
= await self
._local
_async
_exec
(
486 raise_exception_on_error
=True,
487 show_error_log
=show_error_log
,
497 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
499 # remove field 'notes'
501 del data
.get("info").get("status")["notes"]
505 # parse the manifest to a list of dictionaries
506 if "manifest" in data
:
507 manifest_str
= data
.get("manifest")
508 manifest_docs
= yaml
.load_all(manifest_str
, Loader
=yaml
.SafeLoader
)
510 data
["manifest"] = []
511 for doc
in manifest_docs
:
512 data
["manifest"].append(doc
)
514 # parse field 'resources'
516 resources
= str(data
.get("info").get("status").get("resources"))
517 resource_table
= self
._output
_to
_table
(resources
)
518 data
.get("info").get("status")["resources"] = resource_table
522 # set description to lowercase (unify with helm3)
524 data
.get("info")["description"] = data
.get("info").pop("Description")
530 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
532 cluster_filter
= {"_admin.helm-chart.id": cluster_uuid
}
533 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
535 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
539 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
542 async def _is_install_completed(self
, cluster_id
: str, kdu_instance
: str) -> bool:
544 paths
, env
= self
._init
_paths
_env
(
545 cluster_name
=cluster_id
, create_if_not_exist
=True
548 status
= await self
._status
_kdu
(
549 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, yaml_format
=False
552 # extract info.status.resources-> str
555 # NAME READY UP-TO-DATE AVAILABLE AGE
556 # halting-horse-mongodb 0/1 1 0 0s
557 # halting-petit-mongodb 1/1 1 0 0s
559 resources
= K8sHelmBaseConnector
._get
_deep
(
560 status
, ("info", "status", "resources")
564 resources
= K8sHelmBaseConnector
._output
_to
_table
(resources
)
566 num_lines
= len(resources
)
569 while index
< num_lines
:
571 line1
= resources
[index
]
573 # find '==>' in column 0
574 if line1
[0] == "==>":
575 line2
= resources
[index
]
577 # find READY in column 1
578 if line2
[1] == "READY":
580 line3
= resources
[index
]
582 while len(line3
) > 1 and index
< num_lines
:
583 ready_value
= line3
[1]
584 parts
= ready_value
.split(sep
="/")
585 current
= int(parts
[0])
586 total
= int(parts
[1])
588 self
.log
.debug("NOT READY:\n {}".format(line3
))
590 line3
= resources
[index
]
598 def _get_install_command(
611 timeout_str
= "--timeout {}".format(timeout
)
616 atomic_str
= "--atomic"
620 namespace_str
= "--namespace {}".format(quote(namespace
))
625 version_str
= "--version {}".format(version
)
628 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
629 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
630 kubeconfig
=kubeconfig
,
631 helm
=self
._helm
_command
,
635 name
=quote(kdu_instance
),
637 model
=quote(kdu_model
),
643 def _get_upgrade_scale_command(
656 """Generates the command to scale a Helm Chart release
659 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
660 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
661 namespace (str): Namespace where this KDU instance is deployed
662 scale (int): Scale count
663 version (str): Constraint with specific version of the Chart to use
664 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
665 The --wait flag will be set automatically if --atomic is used
666 replica_str (str): The key under resource_name key where the scale count is stored
667 timeout (float): The time, in seconds, to wait
668 resource_name (str): The KDU's resource to scale
669 kubeconfig (str): Kubeconfig file path
672 str: command to scale a Helm Chart release
677 scale_dict
= {"{}.{}".format(resource_name
, replica_str
): scale
}
679 scale_dict
= {replica_str
: scale
}
681 scale_str
= self
._params
_to
_set
_option
(scale_dict
)
683 return self
._get
_upgrade
_command
(
685 kdu_instance
=kdu_instance
,
687 params_str
=scale_str
,
691 kubeconfig
=kubeconfig
,
694 def _get_upgrade_command(
706 """Generates the command to upgrade a Helm Chart release
709 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
710 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
711 namespace (str): Namespace where this KDU instance is deployed
712 params_str (str): Params used to upgrade the Helm Chart release
713 version (str): Constraint with specific version of the Chart to use
714 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
715 The --wait flag will be set automatically if --atomic is used
716 timeout (float): The time, in seconds, to wait
717 kubeconfig (str): Kubeconfig file path
718 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
720 str: command to upgrade a Helm Chart release
725 timeout_str
= "--timeout {}".format(timeout
)
730 atomic_str
= "--atomic"
735 force_str
= "--force "
740 version_str
= "--version {}".format(quote(version
))
745 namespace_str
= "--namespace {}".format(quote(namespace
))
748 "env KUBECONFIG={kubeconfig} {helm} upgrade {namespace} {atomic} --output yaml {params} {timeout} {force}"
749 "--reuse-values {name} {model} {ver}"
751 kubeconfig
=kubeconfig
,
752 helm
=self
._helm
_command
,
753 namespace
=namespace_str
,
758 name
=quote(kdu_instance
),
759 model
=quote(kdu_model
),
764 def _get_rollback_command(
765 self
, kdu_instance
, namespace
, revision
, kubeconfig
767 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
768 kubeconfig
, self
._helm
_command
, quote(kdu_instance
), revision
771 def _get_uninstall_command(
772 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
774 return "env KUBECONFIG={} {} delete --purge {}".format(
775 kubeconfig
, self
._helm
_command
, quote(kdu_instance
)