2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
23 from typing
import Union
27 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
28 from n2vc
.exceptions
import K8sException
31 class K8sHelmConnector(K8sHelmBaseConnector
):
34 ####################################################################################
35 ################################### P U B L I C ####################################
36 ####################################################################################
43 kubectl_command
: str = "/usr/bin/kubectl",
44 helm_command
: str = "/usr/bin/helm",
49 Initializes helm connector for helm v2
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
56 :param on_update_db: callback called when k8s connector updates database
60 K8sHelmBaseConnector
.__init
__(
65 kubectl_command
=kubectl_command
,
66 helm_command
=helm_command
,
67 on_update_db
=on_update_db
,
70 self
.log
.info("Initializing K8S Helm2 connector")
72 # initialize helm client-only
73 self
.log
.debug("Initializing helm client-only...")
74 command
= "{} init --client-only {} ".format(
76 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
77 if self
._stable
_repo
_url
81 asyncio
.ensure_future(
82 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
84 # loop = asyncio.get_event_loop()
85 # loop.run_until_complete(self._local_async_exec(command=command,
86 # raise_exception_on_error=False))
87 except Exception as e
:
89 msg
="helm init failed (it was already initialized): {}".format(e
)
92 self
.log
.info("K8S Helm2 connector initialized")
100 timeout
: float = 300,
102 db_dict
: dict = None,
103 kdu_name
: str = None,
104 namespace
: str = None,
108 Deploys of a new KDU instance. It would implicitly rely on the `install` call
109 to deploy the Chart/Bundle properly parametrized (in practice, this call would
110 happen before any _initial-config-primitive_of the VNF is called).
112 :param cluster_uuid: UUID of a K8s cluster known by OSM
113 :param kdu_model: chart/ reference (string), which can be either
115 - a name of chart available via the repos known by OSM
116 - a path to a packaged chart
117 - a path to an unpacked chart directory or a URL
118 :param kdu_instance: Kdu instance name
119 :param atomic: If set, installation process purges chart/bundle on fail, also
120 will wait until all the K8s objects are active
121 :param timeout: Time in seconds to wait for the install of the chart/bundle
122 (defaults to Helm default timeout: 300s)
123 :param params: dictionary of key-value pairs for instantiation parameters
124 (overriding default values)
125 :param dict db_dict: where to write into database when the status changes.
126 It contains a dict with {collection: <str>, filter: {},
128 e.g. {collection: "nsrs", filter:
129 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
130 :param kdu_name: Name of the KDU instance to be installed
131 :param namespace: K8s namespace to use for the KDU instance
132 :param kwargs: Additional parameters (None yet)
133 :return: True if successful
135 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
136 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_id
))
139 self
.fs
.sync(from_path
=cluster_id
)
142 paths
, env
= self
._init
_paths
_env
(
143 cluster_name
=cluster_id
, create_if_not_exist
=True
146 await self
._install
_impl
(
161 self
.fs
.reverse_sync(from_path
=cluster_id
)
163 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
166 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
168 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
171 return await self
._exec
_inspect
_comand
(
172 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
176 ####################################################################################
177 ################################### P R I V A T E ##################################
178 ####################################################################################
181 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
183 Creates and returns base cluster and kube dirs and returns them.
184 Also created helm3 dirs according to new directory specification, paths are
185 returned and also environment variables that must be provided to execute commands
187 Helm 2 directory specification uses helm_home dir:
189 The variables assigned for this paths are:
190 - Helm hone: $HELM_HOME
191 - helm kubeconfig: $KUBECONFIG
193 :param cluster_name: cluster_name
194 :return: Dictionary with config_paths and dictionary with helm environment variables
197 if base
.endswith("/") or base
.endswith("\\"):
200 # base dir for cluster
201 cluster_dir
= base
+ "/" + cluster_name
204 kube_dir
= cluster_dir
+ "/" + ".kube"
205 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
206 self
.log
.debug("Creating dir {}".format(kube_dir
))
207 os
.makedirs(kube_dir
)
210 helm_dir
= cluster_dir
+ "/" + ".helm"
211 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
212 self
.log
.debug("Creating dir {}".format(helm_dir
))
213 os
.makedirs(helm_dir
)
215 config_filename
= kube_dir
+ "/config"
217 # 2 - Prepare dictionary with paths
219 "kube_dir": kube_dir
,
220 "kube_config": config_filename
,
221 "cluster_dir": cluster_dir
,
222 "helm_dir": helm_dir
,
225 for file_name
, file in paths
.items():
226 if "dir" in file_name
and not os
.path
.exists(file):
227 err_msg
= "{} dir does not exist".format(file)
228 self
.log
.error(err_msg
)
229 raise K8sException(err_msg
)
231 # 3 - Prepare environment variables
232 env
= {"HELM_HOME": helm_dir
, "KUBECONFIG": config_filename
}
236 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
238 paths
, env
= self
._init
_paths
_env
(
239 cluster_name
=cluster_id
, create_if_not_exist
=True
242 command1
= "env KUBECONFIG={} {} get manifest {} ".format(
243 kubeconfig
, self
._helm
_command
, kdu_instance
245 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
246 output
, _rc
= await self
._local
_async
_exec
_pipe
(
247 command1
, command2
, env
=env
, raise_exception_on_error
=True
249 services
= self
._parse
_services
(output
)
253 async def _cluster_init(
254 self
, cluster_id
: str, namespace
: str, paths
: dict, env
: dict
257 Implements the helm version dependent cluster initialization:
258 For helm2 it initialized tiller environment if needed
261 # check if tiller pod is up in cluster
262 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
263 self
.kubectl_command
, paths
["kube_config"], namespace
265 output
, _rc
= await self
._local
_async
_exec
(
266 command
=command
, raise_exception_on_error
=True, env
=env
269 output_table
= self
._output
_to
_table
(output
=output
)
271 # find 'tiller' pod in all pods
272 already_initialized
= False
274 for row
in output_table
:
275 if row
[0].startswith("tiller-deploy"):
276 already_initialized
= True
282 n2vc_installed_sw
= False
283 if not already_initialized
:
285 "Initializing helm in client and server: {}".format(cluster_id
)
287 command
= "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
288 self
.kubectl_command
, paths
["kube_config"], self
.service_account
290 _
, _rc
= await self
._local
_async
_exec
(
291 command
=command
, raise_exception_on_error
=False, env
=env
295 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
296 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
297 ).format(self
.kubectl_command
, paths
["kube_config"], self
.service_account
)
298 _
, _rc
= await self
._local
_async
_exec
(
299 command
=command
, raise_exception_on_error
=False, env
=env
303 "{} init --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
307 paths
["kube_config"],
310 self
.service_account
,
311 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
312 if self
._stable
_repo
_url
315 _
, _rc
= await self
._local
_async
_exec
(
316 command
=command
, raise_exception_on_error
=True, env
=env
318 n2vc_installed_sw
= True
320 # check client helm installation
321 check_file
= paths
["helm_dir"] + "/repository/repositories.yaml"
322 if not self
._check
_file
_exists
(
323 filename
=check_file
, exception_if_not_exists
=False
325 self
.log
.info("Initializing helm in client: {}".format(cluster_id
))
327 "{} init --kubeconfig={} --tiller-namespace={} "
328 "--home={} --client-only {} "
331 paths
["kube_config"],
334 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
335 if self
._stable
_repo
_url
338 output
, _rc
= await self
._local
_async
_exec
(
339 command
=command
, raise_exception_on_error
=True, env
=env
342 self
.log
.info("Helm client already initialized")
344 # remove old stable repo and add new one
345 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
346 repo_list
= await self
.repo_list(cluster_uuid
)
347 for repo
in repo_list
:
348 if repo
["name"] == "stable" and repo
["url"] != self
._stable
_repo
_url
:
349 self
.log
.debug("Add new stable repo url: {}")
350 await self
.repo_remove(cluster_uuid
, "stable")
351 if self
._stable
_repo
_url
:
352 await self
.repo_add(cluster_uuid
, "stable", self
._stable
_repo
_url
)
355 return n2vc_installed_sw
357 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
358 # uninstall Tiller if necessary
360 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
363 paths
, env
= self
._init
_paths
_env
(
364 cluster_name
=cluster_id
, create_if_not_exist
=True
368 # find namespace for tiller pod
369 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
370 self
.kubectl_command
, paths
["kube_config"]
372 output
, _rc
= await self
._local
_async
_exec
(
373 command
=command
, raise_exception_on_error
=False, env
=env
375 output_table
= self
._output
_to
_table
(output
=output
)
377 for r
in output_table
:
379 if "tiller-deploy" in r
[1]:
385 msg
= "Tiller deployment not found in cluster {}".format(cluster_id
)
388 self
.log
.debug("namespace for tiller: {}".format(namespace
))
391 # uninstall tiller from cluster
392 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
393 command
= "{} --kubeconfig={} --home={} reset".format(
394 self
._helm
_command
, paths
["kube_config"], paths
["helm_dir"]
396 self
.log
.debug("resetting: {}".format(command
))
397 output
, _rc
= await self
._local
_async
_exec
(
398 command
=command
, raise_exception_on_error
=True, env
=env
400 # Delete clusterrolebinding and serviceaccount.
401 # Ignore if errors for backward compatibility
403 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
404 "io/osm-tiller-cluster-rule"
405 ).format(self
.kubectl_command
, paths
["kube_config"])
406 output
, _rc
= await self
._local
_async
_exec
(
407 command
=command
, raise_exception_on_error
=False, env
=env
409 command
= "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
410 self
.kubectl_command
, paths
["kube_config"], self
.service_account
412 output
, _rc
= await self
._local
_async
_exec
(
413 command
=command
, raise_exception_on_error
=False, env
=env
417 self
.log
.debug("namespace not found")
419 async def _instances_list(self
, cluster_id
):
421 paths
, env
= self
._init
_paths
_env
(
422 cluster_name
=cluster_id
, create_if_not_exist
=True
425 command
= "{} list --output yaml".format(self
._helm
_command
)
427 output
, _rc
= await self
._local
_async
_exec
(
428 command
=command
, raise_exception_on_error
=True, env
=env
431 if output
and len(output
) > 0:
432 # parse yaml and update keys to lower case to unify with helm3
433 instances
= yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
435 for instance
in instances
:
436 new_instance
= dict((k
.lower(), v
) for k
, v
in instance
.items())
437 new_instances
.append(new_instance
)
442 def _get_inspect_command(
443 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
445 inspect_command
= "{} inspect {} {}{} {}".format(
446 self
._helm
_command
, show_command
, kdu_model
, repo_str
, version
448 return inspect_command
450 async def _status_kdu(
454 namespace
: str = None,
455 yaml_format
: bool = False,
456 show_error_log
: bool = False,
457 ) -> Union
[str, dict]:
459 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
463 paths
, env
= self
._init
_paths
_env
(
464 cluster_name
=cluster_id
, create_if_not_exist
=True
466 command
= ("env KUBECONFIG={} {} status {} --output yaml").format(
467 paths
["kube_config"], self
._helm
_command
, kdu_instance
469 output
, rc
= await self
._local
_async
_exec
(
471 raise_exception_on_error
=True,
472 show_error_log
=show_error_log
,
482 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
484 # remove field 'notes'
486 del data
.get("info").get("status")["notes"]
490 # parse the manifest to a list of dictionaries
491 if "manifest" in data
:
492 manifest_str
= data
.get("manifest")
493 manifest_docs
= yaml
.load_all(manifest_str
, Loader
=yaml
.SafeLoader
)
495 data
["manifest"] = []
496 for doc
in manifest_docs
:
497 data
["manifest"].append(doc
)
499 # parse field 'resources'
501 resources
= str(data
.get("info").get("status").get("resources"))
502 resource_table
= self
._output
_to
_table
(resources
)
503 data
.get("info").get("status")["resources"] = resource_table
507 # set description to lowercase (unify with helm3)
509 data
.get("info")["description"] = data
.get("info").pop("Description")
515 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
517 cluster_filter
= {"_admin.helm-chart.id": cluster_uuid
}
518 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
520 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
524 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
527 async def _is_install_completed(self
, cluster_id
: str, kdu_instance
: str) -> bool:
529 paths
, env
= self
._init
_paths
_env
(
530 cluster_name
=cluster_id
, create_if_not_exist
=True
533 status
= await self
._status
_kdu
(
534 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, yaml_format
=False
537 # extract info.status.resources-> str
540 # NAME READY UP-TO-DATE AVAILABLE AGE
541 # halting-horse-mongodb 0/1 1 0 0s
542 # halting-petit-mongodb 1/1 1 0 0s
544 resources
= K8sHelmBaseConnector
._get
_deep
(
545 status
, ("info", "status", "resources")
549 resources
= K8sHelmBaseConnector
._output
_to
_table
(resources
)
551 num_lines
= len(resources
)
554 while index
< num_lines
:
556 line1
= resources
[index
]
558 # find '==>' in column 0
559 if line1
[0] == "==>":
560 line2
= resources
[index
]
562 # find READY in column 1
563 if line2
[1] == "READY":
565 line3
= resources
[index
]
567 while len(line3
) > 1 and index
< num_lines
:
568 ready_value
= line3
[1]
569 parts
= ready_value
.split(sep
="/")
570 current
= int(parts
[0])
571 total
= int(parts
[1])
573 self
.log
.debug("NOT READY:\n {}".format(line3
))
575 line3
= resources
[index
]
583 def _get_install_command(
596 timeout_str
= "--timeout {}".format(timeout
)
601 atomic_str
= "--atomic"
605 namespace_str
= "--namespace {}".format(namespace
)
610 version_str
= version_str
= "--version {}".format(version
)
613 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
614 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
615 kubeconfig
=kubeconfig
,
616 helm
=self
._helm
_command
,
628 def _get_upgrade_command(
641 timeout_str
= "--timeout {}".format(timeout
)
646 atomic_str
= "--atomic"
651 version_str
= "--version {}".format(version
)
654 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"
656 kubeconfig
=kubeconfig
,
657 helm
=self
._helm
_command
,
667 def _get_rollback_command(
668 self
, kdu_instance
, namespace
, revision
, kubeconfig
670 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
671 kubeconfig
, self
._helm
_command
, kdu_instance
, revision
674 def _get_uninstall_command(
675 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
677 return "env KUBECONFIG={} {} delete --purge {}".format(
678 kubeconfig
, self
._helm
_command
, kdu_instance