2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
26 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
27 from n2vc
.exceptions
import K8sException
30 class K8sHelmConnector(K8sHelmBaseConnector
):
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
42 kubectl_command
: str = "/usr/bin/kubectl",
43 helm_command
: str = "/usr/bin/helm",
48 Initializes helm connector for helm v2
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
55 :param on_update_db: callback called when k8s connector updates database
59 K8sHelmBaseConnector
.__init
__(
64 kubectl_command
=kubectl_command
,
65 helm_command
=helm_command
,
66 on_update_db
=on_update_db
,
69 self
.log
.info("Initializing K8S Helm2 connector")
71 # initialize helm client-only
72 self
.log
.debug("Initializing helm client-only...")
73 command
= "{} init --client-only {} ".format(
75 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
76 if self
._stable
_repo
_url
80 asyncio
.ensure_future(
81 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
83 # loop = asyncio.get_event_loop()
84 # loop.run_until_complete(self._local_async_exec(command=command,
85 # raise_exception_on_error=False))
86 except Exception as e
:
88 msg
="helm init failed (it was already initialized): {}".format(e
)
91 self
.log
.info("K8S Helm2 connector initialized")
101 db_dict
: dict = None,
102 kdu_name
: str = None,
103 namespace
: str = None,
107 Deploys of a new KDU instance. It would implicitly rely on the `install` call
108 to deploy the Chart/Bundle properly parametrized (in practice, this call would
109 happen before any _initial-config-primitive_of the VNF is called).
111 :param cluster_uuid: UUID of a K8s cluster known by OSM
112 :param kdu_model: chart/ reference (string), which can be either
114 - a name of chart available via the repos known by OSM
115 - a path to a packaged chart
116 - a path to an unpacked chart directory or a URL
117 :param kdu_instance: Kdu instance name
118 :param atomic: If set, installation process purges chart/bundle on fail, also
119 will wait until all the K8s objects are active
120 :param timeout: Time in seconds to wait for the install of the chart/bundle
121 (defaults to Helm default timeout: 300s)
122 :param params: dictionary of key-value pairs for instantiation parameters
123 (overriding default values)
124 :param dict db_dict: where to write into database when the status changes.
125 It contains a dict with {collection: <str>, filter: {},
127 e.g. {collection: "nsrs", filter:
128 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
129 :param kdu_name: Name of the KDU instance to be installed
130 :param namespace: K8s namespace to use for the KDU instance
131 :param kwargs: Additional parameters (None yet)
132 :return: True if successful
134 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_uuid
))
137 self
.fs
.sync(from_path
=cluster_uuid
)
140 paths
, env
= self
._init
_paths
_env
(
141 cluster_name
=cluster_uuid
, create_if_not_exist
=True
144 await self
._install
_impl
(
159 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
161 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
164 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
167 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
170 return await self
._exec
_inspect
_command
(
171 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
175 ####################################################################################
176 ################################### P R I V A T E ##################################
177 ####################################################################################
180 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
182 Creates and returns base cluster and kube dirs and returns them.
183 Also created helm3 dirs according to new directory specification, paths are
184 returned and also environment variables that must be provided to execute commands
186 Helm 2 directory specification uses helm_home dir:
188 The variables assigned for this paths are:
189 - Helm hone: $HELM_HOME
190 - helm kubeconfig: $KUBECONFIG
192 :param cluster_name: cluster_name
193 :return: Dictionary with config_paths and dictionary with helm environment variables
196 if base
.endswith("/") or base
.endswith("\\"):
199 # base dir for cluster
200 cluster_dir
= base
+ "/" + cluster_name
203 kube_dir
= cluster_dir
+ "/" + ".kube"
204 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
205 self
.log
.debug("Creating dir {}".format(kube_dir
))
206 os
.makedirs(kube_dir
)
209 helm_dir
= cluster_dir
+ "/" + ".helm"
210 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
211 self
.log
.debug("Creating dir {}".format(helm_dir
))
212 os
.makedirs(helm_dir
)
214 config_filename
= kube_dir
+ "/config"
216 # 2 - Prepare dictionary with paths
218 "kube_dir": kube_dir
,
219 "kube_config": config_filename
,
220 "cluster_dir": cluster_dir
,
221 "helm_dir": helm_dir
,
224 for file_name
, file in paths
.items():
225 if "dir" in file_name
and not os
.path
.exists(file):
226 err_msg
= "{} dir does not exist".format(file)
227 self
.log
.error(err_msg
)
228 raise K8sException(err_msg
)
230 # 3 - Prepare environment variables
231 env
= {"HELM_HOME": helm_dir
, "KUBECONFIG": config_filename
}
235 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
238 paths
, env
= self
._init
_paths
_env
(
239 cluster_name
=cluster_id
, create_if_not_exist
=True
242 command1
= "env KUBECONFIG={} {} get manifest {} ".format(
243 kubeconfig
, self
._helm
_command
, kdu_instance
245 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
246 output
, _rc
= await self
._local
_async
_exec
_pipe
(
247 command1
, command2
, env
=env
, raise_exception_on_error
=True
249 services
= self
._parse
_services
(output
)
253 async def _cluster_init(
254 self
, cluster_id
: str, namespace
: str, paths
: dict, env
: dict
257 Implements the helm version dependent cluster initialization:
258 For helm2 it initialized tiller environment if needed
261 # check if tiller pod is up in cluster
262 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
263 self
.kubectl_command
, paths
["kube_config"], namespace
265 output
, _rc
= await self
._local
_async
_exec
(
266 command
=command
, raise_exception_on_error
=True, env
=env
269 output_table
= self
._output
_to
_table
(output
=output
)
271 # find 'tiller' pod in all pods
272 already_initialized
= False
274 for row
in output_table
:
275 if row
[0].startswith("tiller-deploy"):
276 already_initialized
= True
282 n2vc_installed_sw
= False
283 if not already_initialized
:
285 "Initializing helm in client and server: {}".format(cluster_id
)
287 command
= "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
288 self
.kubectl_command
, paths
["kube_config"], self
.service_account
290 _
, _rc
= await self
._local
_async
_exec
(
291 command
=command
, raise_exception_on_error
=False, env
=env
295 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
296 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
297 ).format(self
.kubectl_command
, paths
["kube_config"], self
.service_account
)
298 _
, _rc
= await self
._local
_async
_exec
(
299 command
=command
, raise_exception_on_error
=False, env
=env
303 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
307 paths
["kube_config"],
310 self
.service_account
,
311 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
312 if self
._stable
_repo
_url
315 _
, _rc
= await self
._local
_async
_exec
(
316 command
=command
, raise_exception_on_error
=True, env
=env
318 n2vc_installed_sw
= True
320 # check client helm installation
321 check_file
= paths
["helm_dir"] + "/repository/repositories.yaml"
322 if not self
._check
_file
_exists
(
323 filename
=check_file
, exception_if_not_exists
=False
325 self
.log
.info("Initializing helm in client: {}".format(cluster_id
))
327 "{} --kubeconfig={} --tiller-namespace={} "
328 "--home={} init --client-only {} "
331 paths
["kube_config"],
334 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
335 if self
._stable
_repo
_url
338 output
, _rc
= await self
._local
_async
_exec
(
339 command
=command
, raise_exception_on_error
=True, env
=env
342 self
.log
.info("Helm client already initialized")
344 repo_list
= await self
.repo_list(cluster_id
)
345 for repo
in repo_list
:
346 if repo
["name"] == "stable" and repo
["url"] != self
._stable
_repo
_url
:
347 self
.log
.debug("Add new stable repo url: {}")
348 await self
.repo_remove(cluster_id
, "stable")
349 if self
._stable
_repo
_url
:
350 await self
.repo_add(cluster_id
, "stable", self
._stable
_repo
_url
)
353 return n2vc_installed_sw
355 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
356 # uninstall Tiller if necessary
358 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
361 paths
, env
= self
._init
_paths
_env
(
362 cluster_name
=cluster_id
, create_if_not_exist
=True
366 # find namespace for tiller pod
367 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
368 self
.kubectl_command
, paths
["kube_config"]
370 output
, _rc
= await self
._local
_async
_exec
(
371 command
=command
, raise_exception_on_error
=False, env
=env
373 output_table
= self
._output
_to
_table
(output
=output
)
375 for r
in output_table
:
377 if "tiller-deploy" in r
[1]:
383 msg
= "Tiller deployment not found in cluster {}".format(cluster_id
)
386 self
.log
.debug("namespace for tiller: {}".format(namespace
))
389 # uninstall tiller from cluster
390 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
391 command
= "{} --kubeconfig={} --home={} reset".format(
392 self
._helm
_command
, paths
["kube_config"], paths
["helm_dir"]
394 self
.log
.debug("resetting: {}".format(command
))
395 output
, _rc
= await self
._local
_async
_exec
(
396 command
=command
, raise_exception_on_error
=True, env
=env
398 # Delete clusterrolebinding and serviceaccount.
399 # Ignore if errors for backward compatibility
401 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
402 "io/osm-tiller-cluster-rule"
403 ).format(self
.kubectl_command
, paths
["kube_config"])
404 output
, _rc
= await self
._local
_async
_exec
(
405 command
=command
, raise_exception_on_error
=False, env
=env
408 "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
409 self
.kubectl_command
,
410 paths
["kube_config"],
412 self
.service_account
,
415 output
, _rc
= await self
._local
_async
_exec
(
416 command
=command
, raise_exception_on_error
=False, env
=env
420 self
.log
.debug("namespace not found")
422 async def _instances_list(self
, cluster_id
):
425 paths
, env
= self
._init
_paths
_env
(
426 cluster_name
=cluster_id
, create_if_not_exist
=True
429 command
= "{} list --output yaml".format(self
._helm
_command
)
431 output
, _rc
= await self
._local
_async
_exec
(
432 command
=command
, raise_exception_on_error
=True, env
=env
435 if output
and len(output
) > 0:
436 # parse yaml and update keys to lower case to unify with helm3
437 instances
= yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
439 for instance
in instances
:
440 new_instance
= dict((k
.lower(), v
) for k
, v
in instance
.items())
441 new_instances
.append(new_instance
)
446 def _get_inspect_command(
447 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
449 inspect_command
= "{} inspect {} {}{} {}".format(
450 self
._helm
_command
, show_command
, kdu_model
, repo_str
, version
452 return inspect_command
454 def _get_get_command(
455 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
457 get_command
= "env KUBECONFIG={} {} get {} {} --output yaml".format(
458 kubeconfig
, self
._helm
_command
, get_command
, kdu_instance
462 async def _status_kdu(
466 namespace
: str = None,
467 show_error_log
: bool = False,
468 return_text
: bool = False,
472 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
476 paths
, env
= self
._init
_paths
_env
(
477 cluster_name
=cluster_id
, create_if_not_exist
=True
479 command
= ("env KUBECONFIG={} {} status {} --output yaml").format(
480 paths
["kube_config"], self
._helm
_command
, kdu_instance
482 output
, rc
= await self
._local
_async
_exec
(
484 raise_exception_on_error
=True,
485 show_error_log
=show_error_log
,
495 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
497 # remove field 'notes'
499 del data
.get("info").get("status")["notes"]
503 # parse the manifest to a list of dictionaries
504 if "manifest" in data
:
505 manifest_str
= data
.get("manifest")
506 manifest_docs
= yaml
.load_all(manifest_str
, Loader
=yaml
.SafeLoader
)
508 data
["manifest"] = []
509 for doc
in manifest_docs
:
510 data
["manifest"].append(doc
)
512 # parse field 'resources'
514 resources
= str(data
.get("info").get("status").get("resources"))
515 resource_table
= self
._output
_to
_table
(resources
)
516 data
.get("info").get("status")["resources"] = resource_table
520 # set description to lowercase (unify with helm3)
522 data
.get("info")["description"] = data
.get("info").pop("Description")
528 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
530 cluster_filter
= {"_admin.helm-chart.id": cluster_uuid
}
531 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
533 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
537 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
540 async def _is_install_completed(self
, cluster_id
: str, kdu_instance
: str) -> bool:
542 paths
, env
= self
._init
_paths
_env
(
543 cluster_name
=cluster_id
, create_if_not_exist
=True
546 status
= await self
._status
_kdu
(
547 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, return_text
=False
550 # extract info.status.resources-> str
553 # NAME READY UP-TO-DATE AVAILABLE AGE
554 # halting-horse-mongodb 0/1 1 0 0s
555 # halting-petit-mongodb 1/1 1 0 0s
557 resources
= K8sHelmBaseConnector
._get
_deep
(
558 status
, ("info", "status", "resources")
562 resources
= K8sHelmBaseConnector
._output
_to
_table
(resources
)
564 num_lines
= len(resources
)
567 while index
< num_lines
:
569 line1
= resources
[index
]
571 # find '==>' in column 0
572 if line1
[0] == "==>":
573 line2
= resources
[index
]
575 # find READY in column 1
576 if line2
[1] == "READY":
578 line3
= resources
[index
]
580 while len(line3
) > 1 and index
< num_lines
:
581 ready_value
= line3
[1]
582 parts
= ready_value
.split(sep
="/")
583 current
= int(parts
[0])
584 total
= int(parts
[1])
586 self
.log
.debug("NOT READY:\n {}".format(line3
))
588 line3
= resources
[index
]
596 def _get_install_command(
610 timeout_str
= "--timeout {}".format(timeout
)
615 atomic_str
= "--atomic"
619 namespace_str
= "--namespace {}".format(namespace
)
624 version_str
= "--version {}".format(version
)
627 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
628 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
629 kubeconfig
=kubeconfig
,
630 helm
=self
._helm
_command
,
642 def _get_upgrade_scale_command(
658 timeout_str
= "--timeout {}s".format(timeout
)
663 atomic_str
= "--atomic"
668 version_str
= "--version {}".format(version
)
672 scale_dict
= {"{}.{}".format(resource_name
, replica_str
): scale
}
674 scale_dict
= {replica_str
: scale
}
676 scale_str
= self
._params
_to
_set
_option
(scale_dict
)
679 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {scale} {timeout} {name} {model} {ver}"
681 helm
=self
._helm
_command
,
688 kubeconfig
=kubeconfig
,
692 def _get_upgrade_command(
706 timeout_str
= "--timeout {}".format(timeout
)
711 atomic_str
= "--atomic"
716 version_str
= "--version {}".format(version
)
719 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"
721 kubeconfig
=kubeconfig
,
722 helm
=self
._helm
_command
,
732 def _get_rollback_command(
733 self
, kdu_instance
, namespace
, revision
, kubeconfig
735 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
736 kubeconfig
, self
._helm
_command
, kdu_instance
, revision
739 def _get_uninstall_command(
740 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
742 return "env KUBECONFIG={} {} delete --purge {}".format(
743 kubeconfig
, self
._helm
_command
, kdu_instance