2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
23 from typing
import Union
24 from shlex
import quote
28 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
29 from n2vc
.exceptions
import K8sException
32 class K8sHelmConnector(K8sHelmBaseConnector
):
35 ####################################################################################
36 ################################### P U B L I C ####################################
37 ####################################################################################
44 kubectl_command
: str = "/usr/bin/kubectl",
45 helm_command
: str = "/usr/bin/helm",
50 Initializes helm connector for helm v2
52 :param fs: file system for kubernetes and helm configuration
53 :param db: database object to write current operation status
54 :param kubectl_command: path to kubectl executable
55 :param helm_command: path to helm executable
57 :param on_update_db: callback called when k8s connector updates database
61 K8sHelmBaseConnector
.__init
__(
66 kubectl_command
=kubectl_command
,
67 helm_command
=helm_command
,
68 on_update_db
=on_update_db
,
71 self
.log
.info("Initializing K8S Helm2 connector")
73 # initialize helm client-only
74 self
.log
.debug("Initializing helm client-only...")
75 command
= "{} init --client-only {} ".format(
77 "--stable-repo-url {}".format(quote(self
._stable
_repo
_url
))
78 if self
._stable
_repo
_url
82 asyncio
.ensure_future(
83 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
85 # loop = asyncio.get_event_loop()
86 # loop.run_until_complete(self._local_async_exec(command=command,
87 # raise_exception_on_error=False))
88 except Exception as e
:
90 msg
="helm init failed (it was already initialized): {}".format(e
)
93 self
.log
.info("K8S Helm2 connector initialized")
101 timeout
: float = 300,
103 db_dict
: dict = None,
104 kdu_name
: str = None,
105 namespace
: str = None,
109 Deploys of a new KDU instance. It would implicitly rely on the `install` call
110 to deploy the Chart/Bundle properly parametrized (in practice, this call would
111 happen before any _initial-config-primitive_of the VNF is called).
113 :param cluster_uuid: UUID of a K8s cluster known by OSM
114 :param kdu_model: chart/reference (string), which can be either
116 - a name of chart available via the repos known by OSM
117 (e.g. stable/openldap, stable/openldap:1.2.4)
118 - a path to a packaged chart (e.g. mychart.tgz)
119 - a path to an unpacked chart directory or a URL (e.g. mychart)
120 :param kdu_instance: Kdu instance name
121 :param atomic: If set, installation process purges chart/bundle on fail, also
122 will wait until all the K8s objects are active
123 :param timeout: Time in seconds to wait for the install of the chart/bundle
124 (defaults to Helm default timeout: 300s)
125 :param params: dictionary of key-value pairs for instantiation parameters
126 (overriding default values)
127 :param dict db_dict: where to write into database when the status changes.
128 It contains a dict with {collection: <str>, filter: {},
130 e.g. {collection: "nsrs", filter:
131 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
132 :param kdu_name: Name of the KDU instance to be installed
133 :param namespace: K8s namespace to use for the KDU instance
134 :param kwargs: Additional parameters (None yet)
135 :return: True if successful
137 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_uuid
))
140 self
.fs
.sync(from_path
=cluster_uuid
)
143 paths
, env
= self
._init
_paths
_env
(
144 cluster_name
=cluster_uuid
, create_if_not_exist
=True
147 await self
._install
_impl
(
162 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
164 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
167 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
169 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
172 return await self
._exec
_inspect
_command
(
173 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
177 ####################################################################################
178 ################################### P R I V A T E ##################################
179 ####################################################################################
182 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
184 Creates and returns base cluster and kube dirs and returns them.
185 Also created helm3 dirs according to new directory specification, paths are
186 returned and also environment variables that must be provided to execute commands
188 Helm 2 directory specification uses helm_home dir:
190 The variables assigned for this paths are:
191 - Helm hone: $HELM_HOME
192 - helm kubeconfig: $KUBECONFIG
194 :param cluster_name: cluster_name
195 :return: Dictionary with config_paths and dictionary with helm environment variables
198 if base
.endswith("/") or base
.endswith("\\"):
201 # base dir for cluster
202 cluster_dir
= base
+ "/" + cluster_name
205 kube_dir
= cluster_dir
+ "/" + ".kube"
206 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
207 self
.log
.debug("Creating dir {}".format(kube_dir
))
208 os
.makedirs(kube_dir
)
211 helm_dir
= cluster_dir
+ "/" + ".helm"
212 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
213 self
.log
.debug("Creating dir {}".format(helm_dir
))
214 os
.makedirs(helm_dir
)
216 config_filename
= kube_dir
+ "/config"
218 # 2 - Prepare dictionary with paths
220 "kube_dir": kube_dir
,
221 "kube_config": config_filename
,
222 "cluster_dir": cluster_dir
,
223 "helm_dir": helm_dir
,
226 for file_name
, file in paths
.items():
227 if "dir" in file_name
and not os
.path
.exists(file):
228 err_msg
= "{} dir does not exist".format(file)
229 self
.log
.error(err_msg
)
230 raise K8sException(err_msg
)
232 # 3 - Prepare environment variables
233 env
= {"HELM_HOME": helm_dir
, "KUBECONFIG": config_filename
}
237 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
239 paths
, env
= self
._init
_paths
_env
(
240 cluster_name
=cluster_id
, create_if_not_exist
=True
243 command1
= "env KUBECONFIG={} {} get manifest {} ".format(
244 kubeconfig
, self
._helm
_command
, quote(kdu_instance
)
246 command2
= "{} get --namespace={} -f -".format(
247 self
.kubectl_command
, quote(namespace
)
249 output
, _rc
= await self
._local
_async
_exec
_pipe
(
250 command1
, command2
, env
=env
, raise_exception_on_error
=True
252 services
= self
._parse
_services
(output
)
256 async def _cluster_init(
257 self
, cluster_id
: str, namespace
: str, paths
: dict, env
: dict
260 Implements the helm version dependent cluster initialization:
261 For helm2 it initialized tiller environment if needed
264 # check if tiller pod is up in cluster
265 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
266 self
.kubectl_command
, paths
["kube_config"], quote(namespace
)
268 output
, _rc
= await self
._local
_async
_exec
(
269 command
=command
, raise_exception_on_error
=True, env
=env
272 output_table
= self
._output
_to
_table
(output
=output
)
274 # find 'tiller' pod in all pods
275 already_initialized
= False
277 for row
in output_table
:
278 if row
[0].startswith("tiller-deploy"):
279 already_initialized
= True
285 n2vc_installed_sw
= False
286 if not already_initialized
:
288 "Initializing helm in client and server: {}".format(cluster_id
)
290 command
= "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
291 self
.kubectl_command
, paths
["kube_config"], quote(self
.service_account
)
293 _
, _rc
= await self
._local
_async
_exec
(
294 command
=command
, raise_exception_on_error
=False, env
=env
298 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
299 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
301 self
.kubectl_command
, paths
["kube_config"], quote(self
.service_account
)
303 _
, _rc
= await self
._local
_async
_exec
(
304 command
=command
, raise_exception_on_error
=False, env
=env
308 "{} init --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
312 paths
["kube_config"],
314 quote(paths
["helm_dir"]),
315 quote(self
.service_account
),
316 "--stable-repo-url {}".format(quote(self
._stable
_repo
_url
))
317 if self
._stable
_repo
_url
320 _
, _rc
= await self
._local
_async
_exec
(
321 command
=command
, raise_exception_on_error
=True, env
=env
323 n2vc_installed_sw
= True
325 # check client helm installation
326 check_file
= paths
["helm_dir"] + "/repository/repositories.yaml"
327 if not self
._check
_file
_exists
(
328 filename
=check_file
, exception_if_not_exists
=False
330 self
.log
.info("Initializing helm in client: {}".format(cluster_id
))
332 "{} init --kubeconfig={} --tiller-namespace={} "
333 "--home={} --client-only {} "
336 paths
["kube_config"],
338 quote(paths
["helm_dir"]),
339 "--stable-repo-url {}".format(quote(self
._stable
_repo
_url
))
340 if self
._stable
_repo
_url
343 output
, _rc
= await self
._local
_async
_exec
(
344 command
=command
, raise_exception_on_error
=True, env
=env
347 self
.log
.info("Helm client already initialized")
349 repo_list
= await self
.repo_list(cluster_id
)
350 for repo
in repo_list
:
351 if repo
["name"] == "stable" and repo
["url"] != self
._stable
_repo
_url
:
352 self
.log
.debug("Add new stable repo url: {}")
353 await self
.repo_remove(cluster_id
, "stable")
354 if self
._stable
_repo
_url
:
355 await self
.repo_add(cluster_id
, "stable", self
._stable
_repo
_url
)
358 return n2vc_installed_sw
360 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
361 # uninstall Tiller if necessary
363 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
366 paths
, env
= self
._init
_paths
_env
(
367 cluster_name
=cluster_id
, create_if_not_exist
=True
371 # find namespace for tiller pod
372 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
373 self
.kubectl_command
, quote(paths
["kube_config"])
375 output
, _rc
= await self
._local
_async
_exec
(
376 command
=command
, raise_exception_on_error
=False, env
=env
378 output_table
= self
._output
_to
_table
(output
=output
)
380 for r
in output_table
:
382 if "tiller-deploy" in r
[1]:
388 msg
= "Tiller deployment not found in cluster {}".format(cluster_id
)
391 self
.log
.debug("namespace for tiller: {}".format(namespace
))
394 # uninstall tiller from cluster
395 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
396 command
= "{} --kubeconfig={} --home={} reset".format(
398 quote(paths
["kube_config"]),
399 quote(paths
["helm_dir"]),
401 self
.log
.debug("resetting: {}".format(command
))
402 output
, _rc
= await self
._local
_async
_exec
(
403 command
=command
, raise_exception_on_error
=True, env
=env
405 # Delete clusterrolebinding and serviceaccount.
406 # Ignore if errors for backward compatibility
408 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
409 "io/osm-tiller-cluster-rule"
410 ).format(self
.kubectl_command
, quote(paths
["kube_config"]))
411 output
, _rc
= await self
._local
_async
_exec
(
412 command
=command
, raise_exception_on_error
=False, env
=env
415 "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
416 self
.kubectl_command
,
417 quote(paths
["kube_config"]),
419 quote(self
.service_account
),
422 output
, _rc
= await self
._local
_async
_exec
(
423 command
=command
, raise_exception_on_error
=False, env
=env
427 self
.log
.debug("namespace not found")
429 async def _instances_list(self
, cluster_id
):
431 paths
, env
= self
._init
_paths
_env
(
432 cluster_name
=cluster_id
, create_if_not_exist
=True
435 command
= "{} list --output yaml".format(self
._helm
_command
)
437 output
, _rc
= await self
._local
_async
_exec
(
438 command
=command
, raise_exception_on_error
=True, env
=env
441 if output
and len(output
) > 0:
442 # parse yaml and update keys to lower case to unify with helm3
443 instances
= yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
445 for instance
in instances
:
446 new_instance
= dict((k
.lower(), v
) for k
, v
in instance
.items())
447 new_instances
.append(new_instance
)
452 def _get_inspect_command(
453 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
455 inspect_command
= "{} inspect {} {}{} {}".format(
456 self
._helm
_command
, show_command
, quote(kdu_model
), repo_str
, version
458 return inspect_command
460 def _get_get_command(
461 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
463 get_command
= "env KUBECONFIG={} {} get {} {} --output yaml".format(
464 kubeconfig
, self
._helm
_command
, get_command
, quote(kdu_instance
)
468 async def _status_kdu(
472 namespace
: str = None,
473 yaml_format
: bool = False,
474 show_error_log
: bool = False,
475 ) -> Union
[str, dict]:
477 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
481 paths
, env
= self
._init
_paths
_env
(
482 cluster_name
=cluster_id
, create_if_not_exist
=True
484 command
= ("env KUBECONFIG={} {} status {} --output yaml").format(
485 paths
["kube_config"], self
._helm
_command
, quote(kdu_instance
)
487 output
, rc
= await self
._local
_async
_exec
(
489 raise_exception_on_error
=True,
490 show_error_log
=show_error_log
,
500 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
502 # remove field 'notes'
504 del data
.get("info").get("status")["notes"]
508 # parse the manifest to a list of dictionaries
509 if "manifest" in data
:
510 manifest_str
= data
.get("manifest")
511 manifest_docs
= yaml
.load_all(manifest_str
, Loader
=yaml
.SafeLoader
)
513 data
["manifest"] = []
514 for doc
in manifest_docs
:
515 data
["manifest"].append(doc
)
517 # parse field 'resources'
519 resources
= str(data
.get("info").get("status").get("resources"))
520 resource_table
= self
._output
_to
_table
(resources
)
521 data
.get("info").get("status")["resources"] = resource_table
525 # set description to lowercase (unify with helm3)
527 data
.get("info")["description"] = data
.get("info").pop("Description")
533 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
535 cluster_filter
= {"_admin.helm-chart.id": cluster_uuid
}
536 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
538 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
542 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
545 async def _is_install_completed(self
, cluster_id
: str, kdu_instance
: str) -> bool:
547 paths
, env
= self
._init
_paths
_env
(
548 cluster_name
=cluster_id
, create_if_not_exist
=True
551 status
= await self
._status
_kdu
(
552 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, yaml_format
=False
555 # extract info.status.resources-> str
558 # NAME READY UP-TO-DATE AVAILABLE AGE
559 # halting-horse-mongodb 0/1 1 0 0s
560 # halting-petit-mongodb 1/1 1 0 0s
562 resources
= K8sHelmBaseConnector
._get
_deep
(
563 status
, ("info", "status", "resources")
567 resources
= K8sHelmBaseConnector
._output
_to
_table
(resources
)
569 num_lines
= len(resources
)
572 while index
< num_lines
:
574 line1
= resources
[index
]
576 # find '==>' in column 0
577 if line1
[0] == "==>":
578 line2
= resources
[index
]
580 # find READY in column 1
581 if line2
[1] == "READY":
583 line3
= resources
[index
]
585 while len(line3
) > 1 and index
< num_lines
:
586 ready_value
= line3
[1]
587 parts
= ready_value
.split(sep
="/")
588 current
= int(parts
[0])
589 total
= int(parts
[1])
591 self
.log
.debug("NOT READY:\n {}".format(line3
))
593 line3
= resources
[index
]
601 def _get_install_command(
614 timeout_str
= "--timeout {}".format(timeout
)
619 atomic_str
= "--atomic"
623 namespace_str
= "--namespace {}".format(quote(namespace
))
628 version_str
= "--version {}".format(version
)
631 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
632 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
633 kubeconfig
=kubeconfig
,
634 helm
=self
._helm
_command
,
638 name
=quote(kdu_instance
),
640 model
=quote(kdu_model
),
646 def _get_upgrade_scale_command(
659 """Generates the command to scale a Helm Chart release
662 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
663 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
664 namespace (str): Namespace where this KDU instance is deployed
665 scale (int): Scale count
666 version (str): Constraint with specific version of the Chart to use
667 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
668 The --wait flag will be set automatically if --atomic is used
669 replica_str (str): The key under resource_name key where the scale count is stored
670 timeout (float): The time, in seconds, to wait
671 resource_name (str): The KDU's resource to scale
672 kubeconfig (str): Kubeconfig file path
675 str: command to scale a Helm Chart release
680 scale_dict
= {"{}.{}".format(resource_name
, replica_str
): scale
}
682 scale_dict
= {replica_str
: scale
}
684 scale_str
= self
._params
_to
_set
_option
(scale_dict
)
686 return self
._get
_upgrade
_command
(
688 kdu_instance
=kdu_instance
,
690 params_str
=scale_str
,
694 kubeconfig
=kubeconfig
,
697 def _get_upgrade_command(
708 """Generates the command to upgrade a Helm Chart release
711 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
712 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
713 namespace (str): Namespace where this KDU instance is deployed
714 params_str (str): Params used to upgrade the Helm Chart release
715 version (str): Constraint with specific version of the Chart to use
716 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
717 The --wait flag will be set automatically if --atomic is used
718 timeout (float): The time, in seconds, to wait
719 kubeconfig (str): Kubeconfig file path
722 str: command to upgrade a Helm Chart release
727 timeout_str
= "--timeout {}".format(timeout
)
732 atomic_str
= "--atomic"
737 version_str
= "--version {}".format(quote(version
))
740 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} "
741 "--reuse-values {name} {model} {ver}"
743 kubeconfig
=kubeconfig
,
744 helm
=self
._helm
_command
,
748 name
=quote(kdu_instance
),
749 model
=quote(kdu_model
),
754 def _get_rollback_command(
755 self
, kdu_instance
, namespace
, revision
, kubeconfig
757 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
758 kubeconfig
, self
._helm
_command
, quote(kdu_instance
), revision
761 def _get_uninstall_command(
762 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
764 return "env KUBECONFIG={} {} delete --purge {}".format(
765 kubeconfig
, self
._helm
_command
, quote(kdu_instance
)