2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
23 from typing
import Union
27 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
28 from n2vc
.exceptions
import K8sException
31 class K8sHelmConnector(K8sHelmBaseConnector
):
34 ####################################################################################
35 ################################### P U B L I C ####################################
36 ####################################################################################
43 kubectl_command
: str = "/usr/bin/kubectl",
44 helm_command
: str = "/usr/bin/helm",
49 Initializes helm connector for helm v2
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
56 :param on_update_db: callback called when k8s connector updates database
60 K8sHelmBaseConnector
.__init
__(
65 kubectl_command
=kubectl_command
,
66 helm_command
=helm_command
,
67 on_update_db
=on_update_db
,
70 self
.log
.info("Initializing K8S Helm2 connector")
72 # initialize helm client-only
73 self
.log
.debug("Initializing helm client-only...")
74 command
= "{} init --client-only {} ".format(
76 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
77 if self
._stable
_repo
_url
81 asyncio
.ensure_future(
82 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
84 # loop = asyncio.get_event_loop()
85 # loop.run_until_complete(self._local_async_exec(command=command,
86 # raise_exception_on_error=False))
87 except Exception as e
:
89 msg
="helm init failed (it was already initialized): {}".format(e
)
92 self
.log
.info("K8S Helm2 connector initialized")
100 timeout
: float = 300,
102 db_dict
: dict = None,
103 kdu_name
: str = None,
104 namespace
: str = None,
108 Deploys of a new KDU instance. It would implicitly rely on the `install` call
109 to deploy the Chart/Bundle properly parametrized (in practice, this call would
110 happen before any _initial-config-primitive_of the VNF is called).
112 :param cluster_uuid: UUID of a K8s cluster known by OSM
113 :param kdu_model: chart/ reference (string), which can be either
115 - a name of chart available via the repos known by OSM
116 - a path to a packaged chart
117 - a path to an unpacked chart directory or a URL
118 :param kdu_instance: Kdu instance name
119 :param atomic: If set, installation process purges chart/bundle on fail, also
120 will wait until all the K8s objects are active
121 :param timeout: Time in seconds to wait for the install of the chart/bundle
122 (defaults to Helm default timeout: 300s)
123 :param params: dictionary of key-value pairs for instantiation parameters
124 (overriding default values)
125 :param dict db_dict: where to write into database when the status changes.
126 It contains a dict with {collection: <str>, filter: {},
128 e.g. {collection: "nsrs", filter:
129 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
130 :param kdu_name: Name of the KDU instance to be installed
131 :param namespace: K8s namespace to use for the KDU instance
132 :param kwargs: Additional parameters (None yet)
133 :return: True if successful
135 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
136 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_id
))
139 self
.fs
.sync(from_path
=cluster_id
)
142 paths
, env
= self
._init
_paths
_env
(
143 cluster_name
=cluster_id
, create_if_not_exist
=True
146 await self
._install
_impl
(
161 self
.fs
.reverse_sync(from_path
=cluster_id
)
163 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
166 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
169 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
172 return await self
._exec
_inspect
_comand
(
173 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
177 ####################################################################################
178 ################################### P R I V A T E ##################################
179 ####################################################################################
182 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
184 Creates and returns base cluster and kube dirs and returns them.
185 Also created helm3 dirs according to new directory specification, paths are
186 returned and also environment variables that must be provided to execute commands
188 Helm 2 directory specification uses helm_home dir:
190 The variables assigned for this paths are:
191 - Helm hone: $HELM_HOME
192 - helm kubeconfig: $KUBECONFIG
194 :param cluster_name: cluster_name
195 :return: Dictionary with config_paths and dictionary with helm environment variables
198 if base
.endswith("/") or base
.endswith("\\"):
201 # base dir for cluster
202 cluster_dir
= base
+ "/" + cluster_name
205 kube_dir
= cluster_dir
+ "/" + ".kube"
206 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
207 self
.log
.debug("Creating dir {}".format(kube_dir
))
208 os
.makedirs(kube_dir
)
211 helm_dir
= cluster_dir
+ "/" + ".helm"
212 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
213 self
.log
.debug("Creating dir {}".format(helm_dir
))
214 os
.makedirs(helm_dir
)
216 config_filename
= kube_dir
+ "/config"
218 # 2 - Prepare dictionary with paths
220 "kube_dir": kube_dir
,
221 "kube_config": config_filename
,
222 "cluster_dir": cluster_dir
,
223 "helm_dir": helm_dir
,
226 for file_name
, file in paths
.items():
227 if "dir" in file_name
and not os
.path
.exists(file):
228 err_msg
= "{} dir does not exist".format(file)
229 self
.log
.error(err_msg
)
230 raise K8sException(err_msg
)
232 # 3 - Prepare environment variables
233 env
= {"HELM_HOME": helm_dir
, "KUBECONFIG": config_filename
}
237 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
240 paths
, env
= self
._init
_paths
_env
(
241 cluster_name
=cluster_id
, create_if_not_exist
=True
244 command1
= "env KUBECONFIG={} {} get manifest {} ".format(
245 kubeconfig
, self
._helm
_command
, kdu_instance
247 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
248 output
, _rc
= await self
._local
_async
_exec
_pipe
(
249 command1
, command2
, env
=env
, raise_exception_on_error
=True
251 services
= self
._parse
_services
(output
)
255 async def _cluster_init(
256 self
, cluster_id
: str, namespace
: str, paths
: dict, env
: dict
259 Implements the helm version dependent cluster initialization:
260 For helm2 it initialized tiller environment if needed
263 # check if tiller pod is up in cluster
264 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
265 self
.kubectl_command
, paths
["kube_config"], namespace
267 output
, _rc
= await self
._local
_async
_exec
(
268 command
=command
, raise_exception_on_error
=True, env
=env
271 output_table
= self
._output
_to
_table
(output
=output
)
273 # find 'tiller' pod in all pods
274 already_initialized
= False
276 for row
in output_table
:
277 if row
[0].startswith("tiller-deploy"):
278 already_initialized
= True
284 n2vc_installed_sw
= False
285 if not already_initialized
:
287 "Initializing helm in client and server: {}".format(cluster_id
)
289 command
= "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
290 self
.kubectl_command
, paths
["kube_config"], self
.service_account
292 _
, _rc
= await self
._local
_async
_exec
(
293 command
=command
, raise_exception_on_error
=False, env
=env
297 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
298 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
299 ).format(self
.kubectl_command
, paths
["kube_config"], self
.service_account
)
300 _
, _rc
= await self
._local
_async
_exec
(
301 command
=command
, raise_exception_on_error
=False, env
=env
305 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
309 paths
["kube_config"],
312 self
.service_account
,
313 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
314 if self
._stable
_repo
_url
317 _
, _rc
= await self
._local
_async
_exec
(
318 command
=command
, raise_exception_on_error
=True, env
=env
320 n2vc_installed_sw
= True
322 # check client helm installation
323 check_file
= paths
["helm_dir"] + "/repository/repositories.yaml"
324 if not self
._check
_file
_exists
(
325 filename
=check_file
, exception_if_not_exists
=False
327 self
.log
.info("Initializing helm in client: {}".format(cluster_id
))
329 "{} --kubeconfig={} --tiller-namespace={} "
330 "--home={} init --client-only {} "
333 paths
["kube_config"],
336 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
337 if self
._stable
_repo
_url
340 output
, _rc
= await self
._local
_async
_exec
(
341 command
=command
, raise_exception_on_error
=True, env
=env
344 self
.log
.info("Helm client already initialized")
346 # remove old stable repo and add new one
347 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
348 repo_list
= await self
.repo_list(cluster_uuid
)
349 for repo
in repo_list
:
350 if repo
["name"] == "stable" and repo
["url"] != self
._stable
_repo
_url
:
351 self
.log
.debug("Add new stable repo url: {}")
352 await self
.repo_remove(cluster_uuid
, "stable")
353 if self
._stable
_repo
_url
:
354 await self
.repo_add(cluster_uuid
, "stable", self
._stable
_repo
_url
)
357 return n2vc_installed_sw
359 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
360 # uninstall Tiller if necessary
362 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
365 paths
, env
= self
._init
_paths
_env
(
366 cluster_name
=cluster_id
, create_if_not_exist
=True
370 # find namespace for tiller pod
371 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
372 self
.kubectl_command
, paths
["kube_config"]
374 output
, _rc
= await self
._local
_async
_exec
(
375 command
=command
, raise_exception_on_error
=False, env
=env
377 output_table
= self
._output
_to
_table
(output
=output
)
379 for r
in output_table
:
381 if "tiller-deploy" in r
[1]:
387 msg
= "Tiller deployment not found in cluster {}".format(cluster_id
)
390 self
.log
.debug("namespace for tiller: {}".format(namespace
))
393 # uninstall tiller from cluster
394 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
395 command
= "{} --kubeconfig={} --home={} reset".format(
396 self
._helm
_command
, paths
["kube_config"], paths
["helm_dir"]
398 self
.log
.debug("resetting: {}".format(command
))
399 output
, _rc
= await self
._local
_async
_exec
(
400 command
=command
, raise_exception_on_error
=True, env
=env
402 # Delete clusterrolebinding and serviceaccount.
403 # Ignore if errors for backward compatibility
405 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
406 "io/osm-tiller-cluster-rule"
407 ).format(self
.kubectl_command
, paths
["kube_config"])
408 output
, _rc
= await self
._local
_async
_exec
(
409 command
=command
, raise_exception_on_error
=False, env
=env
411 command
= "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
412 self
.kubectl_command
, paths
["kube_config"], self
.service_account
414 output
, _rc
= await self
._local
_async
_exec
(
415 command
=command
, raise_exception_on_error
=False, env
=env
419 self
.log
.debug("namespace not found")
421 async def _instances_list(self
, cluster_id
):
424 paths
, env
= self
._init
_paths
_env
(
425 cluster_name
=cluster_id
, create_if_not_exist
=True
428 command
= "{} list --output yaml".format(self
._helm
_command
)
430 output
, _rc
= await self
._local
_async
_exec
(
431 command
=command
, raise_exception_on_error
=True, env
=env
434 if output
and len(output
) > 0:
435 # parse yaml and update keys to lower case to unify with helm3
436 instances
= yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
438 for instance
in instances
:
439 new_instance
= dict((k
.lower(), v
) for k
, v
in instance
.items())
440 new_instances
.append(new_instance
)
445 def _get_inspect_command(
446 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
448 inspect_command
= "{} inspect {} {}{} {}".format(
449 self
._helm
_command
, show_command
, kdu_model
, repo_str
, version
451 return inspect_command
453 async def _status_kdu(
457 namespace
: str = None,
458 yaml_format
: bool = False,
459 show_error_log
: bool = False,
460 ) -> Union
[str, dict]:
463 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
467 paths
, env
= self
._init
_paths
_env
(
468 cluster_name
=cluster_id
, create_if_not_exist
=True
470 command
= ("env KUBECONFIG={} {} status {} --output yaml").format(
471 paths
["kube_config"], self
._helm
_command
, kdu_instance
473 output
, rc
= await self
._local
_async
_exec
(
475 raise_exception_on_error
=True,
476 show_error_log
=show_error_log
,
486 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
488 # remove field 'notes'
490 del data
.get("info").get("status")["notes"]
494 # parse the manifest to a list of dictionaries
495 if "manifest" in data
:
496 manifest_str
= data
.get("manifest")
497 manifest_docs
= yaml
.load_all(manifest_str
, Loader
=yaml
.SafeLoader
)
499 data
["manifest"] = []
500 for doc
in manifest_docs
:
501 data
["manifest"].append(doc
)
503 # parse field 'resources'
505 resources
= str(data
.get("info").get("status").get("resources"))
506 resource_table
= self
._output
_to
_table
(resources
)
507 data
.get("info").get("status")["resources"] = resource_table
511 # set description to lowercase (unify with helm3)
513 data
.get("info")["description"] = data
.get("info").pop("Description")
519 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
521 cluster_filter
= {"_admin.helm-chart.id": cluster_uuid
}
522 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
524 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
528 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
531 async def _is_install_completed(self
, cluster_id
: str, kdu_instance
: str) -> bool:
533 paths
, env
= self
._init
_paths
_env
(
534 cluster_name
=cluster_id
, create_if_not_exist
=True
537 status
= await self
._status
_kdu
(
538 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, yaml_format
=False
541 # extract info.status.resources-> str
544 # NAME READY UP-TO-DATE AVAILABLE AGE
545 # halting-horse-mongodb 0/1 1 0 0s
546 # halting-petit-mongodb 1/1 1 0 0s
548 resources
= K8sHelmBaseConnector
._get
_deep
(
549 status
, ("info", "status", "resources")
553 resources
= K8sHelmBaseConnector
._output
_to
_table
(resources
)
555 num_lines
= len(resources
)
558 while index
< num_lines
:
560 line1
= resources
[index
]
562 # find '==>' in column 0
563 if line1
[0] == "==>":
564 line2
= resources
[index
]
566 # find READY in column 1
567 if line2
[1] == "READY":
569 line3
= resources
[index
]
571 while len(line3
) > 1 and index
< num_lines
:
572 ready_value
= line3
[1]
573 parts
= ready_value
.split(sep
="/")
574 current
= int(parts
[0])
575 total
= int(parts
[1])
577 self
.log
.debug("NOT READY:\n {}".format(line3
))
579 line3
= resources
[index
]
587 def _get_install_command(
601 timeout_str
= "--timeout {}".format(timeout
)
606 atomic_str
= "--atomic"
610 namespace_str
= "--namespace {}".format(namespace
)
615 version_str
= version_str
= "--version {}".format(version
)
618 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
619 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
620 kubeconfig
=kubeconfig
,
621 helm
=self
._helm
_command
,
633 def _get_upgrade_command(
647 timeout_str
= "--timeout {}".format(timeout
)
652 atomic_str
= "--atomic"
657 version_str
= "--version {}".format(version
)
660 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"
662 kubeconfig
=kubeconfig
,
663 helm
=self
._helm
_command
,
673 def _get_rollback_command(
674 self
, kdu_instance
, namespace
, revision
, kubeconfig
676 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
677 kubeconfig
, self
._helm
_command
, kdu_instance
, revision
680 def _get_uninstall_command(
681 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
683 return "env KUBECONFIG={} {} delete --purge {}".format(
684 kubeconfig
, self
._helm
_command
, kdu_instance