2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
26 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
27 from n2vc
.exceptions
import K8sException
30 class K8sHelmConnector(K8sHelmBaseConnector
):
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
42 kubectl_command
: str = "/usr/bin/kubectl",
43 helm_command
: str = "/usr/bin/helm",
46 vca_config
: dict = None,
49 Initializes helm connector for helm v2
51 :param fs: file system for kubernetes and helm configuration
52 :param db: database object to write current operation status
53 :param kubectl_command: path to kubectl executable
54 :param helm_command: path to helm executable
56 :param on_update_db: callback called when k8s connector updates database
60 K8sHelmBaseConnector
.__init
__(
65 kubectl_command
=kubectl_command
,
66 helm_command
=helm_command
,
67 on_update_db
=on_update_db
,
68 vca_config
=vca_config
,
71 self
.log
.info("Initializing K8S Helm2 connector")
73 # initialize helm client-only
74 self
.log
.debug("Initializing helm client-only...")
75 command
= "{} init --client-only --stable-repo-url {} ".format(
76 self
._helm
_command
, self
._stable
_repo
_url
79 asyncio
.ensure_future(
80 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
82 # loop = asyncio.get_event_loop()
83 # loop.run_until_complete(self._local_async_exec(command=command,
84 # raise_exception_on_error=False))
85 except Exception as e
:
87 msg
="helm init failed (it was already initialized): {}".format(e
)
90 self
.log
.info("K8S Helm2 connector initialized")
100 db_dict
: dict = None,
101 kdu_name
: str = None,
102 namespace
: str = None,
106 Deploys of a new KDU instance. It would implicitly rely on the `install` call
107 to deploy the Chart/Bundle properly parametrized (in practice, this call would
108 happen before any _initial-config-primitive_of the VNF is called).
110 :param cluster_uuid: UUID of a K8s cluster known by OSM
111 :param kdu_model: chart/ reference (string), which can be either
113 - a name of chart available via the repos known by OSM
114 - a path to a packaged chart
115 - a path to an unpacked chart directory or a URL
116 :param kdu_instance: Kdu instance name
117 :param atomic: If set, installation process purges chart/bundle on fail, also
118 will wait until all the K8s objects are active
119 :param timeout: Time in seconds to wait for the install of the chart/bundle
120 (defaults to Helm default timeout: 300s)
121 :param params: dictionary of key-value pairs for instantiation parameters
122 (overriding default values)
123 :param dict db_dict: where to write into database when the status changes.
124 It contains a dict with {collection: <str>, filter: {},
126 e.g. {collection: "nsrs", filter:
127 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
128 :param kdu_name: Name of the KDU instance to be installed
129 :param namespace: K8s namespace to use for the KDU instance
130 :param kwargs: Additional parameters (None yet)
131 :return: True if successful
133 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
134 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_id
))
137 self
.fs
.sync(from_path
=cluster_id
)
140 paths
, env
= self
._init
_paths
_env
(
141 cluster_name
=cluster_id
, create_if_not_exist
=True
144 await self
._install
_impl
(
159 self
.fs
.reverse_sync(from_path
=cluster_id
)
161 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
164 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
167 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
170 return await self
._exec
_inspect
_comand
(
171 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
175 ####################################################################################
176 ################################### P R I V A T E ##################################
177 ####################################################################################
180 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
182 Creates and returns base cluster and kube dirs and returns them.
183 Also created helm3 dirs according to new directory specification, paths are
184 returned and also environment variables that must be provided to execute commands
186 Helm 2 directory specification uses helm_home dir:
188 The variables assigned for this paths are:
189 - Helm hone: $HELM_HOME
190 - helm kubeconfig: $KUBECONFIG
192 :param cluster_name: cluster_name
193 :return: Dictionary with config_paths and dictionary with helm environment variables
196 if base
.endswith("/") or base
.endswith("\\"):
199 # base dir for cluster
200 cluster_dir
= base
+ "/" + cluster_name
203 kube_dir
= cluster_dir
+ "/" + ".kube"
204 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
205 self
.log
.debug("Creating dir {}".format(kube_dir
))
206 os
.makedirs(kube_dir
)
209 helm_dir
= cluster_dir
+ "/" + ".helm"
210 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
211 self
.log
.debug("Creating dir {}".format(helm_dir
))
212 os
.makedirs(helm_dir
)
214 config_filename
= kube_dir
+ "/config"
216 # 2 - Prepare dictionary with paths
218 "kube_dir": kube_dir
,
219 "kube_config": config_filename
,
220 "cluster_dir": cluster_dir
,
221 "helm_dir": helm_dir
,
224 for file_name
, file in paths
.items():
225 if "dir" in file_name
and not os
.path
.exists(file):
226 err_msg
= "{} dir does not exist".format(file)
227 self
.log
.error(err_msg
)
228 raise K8sException(err_msg
)
230 # 3 - Prepare environment variables
231 env
= {"HELM_HOME": helm_dir
, "KUBECONFIG": config_filename
}
235 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
):
238 paths
, env
= self
._init
_paths
_env
(
239 cluster_name
=cluster_id
, create_if_not_exist
=True
242 command1
= "{} get manifest {} ".format(self
._helm
_command
, kdu_instance
)
243 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
244 output
, _rc
= await self
._local
_async
_exec
_pipe
(
245 command1
, command2
, env
=env
, raise_exception_on_error
=True
247 services
= self
._parse
_services
(output
)
251 async def _cluster_init(
252 self
, cluster_id
: str, namespace
: str, paths
: dict, env
: dict
255 Implements the helm version dependent cluster initialization:
256 For helm2 it initialized tiller environment if needed
259 # check if tiller pod is up in cluster
260 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
261 self
.kubectl_command
, paths
["kube_config"], namespace
263 output
, _rc
= await self
._local
_async
_exec
(
264 command
=command
, raise_exception_on_error
=True, env
=env
267 output_table
= self
._output
_to
_table
(output
=output
)
269 # find 'tiller' pod in all pods
270 already_initialized
= False
272 for row
in output_table
:
273 if row
[0].startswith("tiller-deploy"):
274 already_initialized
= True
280 n2vc_installed_sw
= False
281 if not already_initialized
:
283 "Initializing helm in client and server: {}".format(cluster_id
)
285 command
= "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
286 self
.kubectl_command
, paths
["kube_config"], self
.service_account
288 _
, _rc
= await self
._local
_async
_exec
(
289 command
=command
, raise_exception_on_error
=False, env
=env
293 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
294 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
295 ).format(self
.kubectl_command
, paths
["kube_config"], self
.service_account
)
296 _
, _rc
= await self
._local
_async
_exec
(
297 command
=command
, raise_exception_on_error
=False, env
=env
301 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
302 " --stable-repo-url {} init"
305 paths
["kube_config"],
308 self
.service_account
,
309 self
._stable
_repo
_url
,
311 _
, _rc
= await self
._local
_async
_exec
(
312 command
=command
, raise_exception_on_error
=True, env
=env
314 n2vc_installed_sw
= True
316 # check client helm installation
317 check_file
= paths
["helm_dir"] + "/repository/repositories.yaml"
318 if not self
._check
_file
_exists
(
319 filename
=check_file
, exception_if_not_exists
=False
321 self
.log
.info("Initializing helm in client: {}".format(cluster_id
))
323 "{} --kubeconfig={} --tiller-namespace={} "
324 "--home={} init --client-only --stable-repo-url {} "
327 paths
["kube_config"],
330 self
._stable
_repo
_url
,
332 output
, _rc
= await self
._local
_async
_exec
(
333 command
=command
, raise_exception_on_error
=True, env
=env
336 self
.log
.info("Helm client already initialized")
338 # remove old stable repo and add new one
339 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
340 repo_list
= await self
.repo_list(cluster_uuid
)
341 for repo
in repo_list
:
342 if repo
["name"] == "stable" and repo
["url"] != self
._stable
_repo
_url
:
343 self
.log
.debug("Add new stable repo url: {}")
344 await self
.repo_remove(cluster_uuid
, "stable")
345 await self
.repo_add(cluster_uuid
, "stable", self
._stable
_repo
_url
)
348 return n2vc_installed_sw
350 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
351 # uninstall Tiller if necessary
353 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
356 paths
, env
= self
._init
_paths
_env
(
357 cluster_name
=cluster_id
, create_if_not_exist
=True
361 # find namespace for tiller pod
362 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
363 self
.kubectl_command
, paths
["kube_config"]
365 output
, _rc
= await self
._local
_async
_exec
(
366 command
=command
, raise_exception_on_error
=False, env
=env
368 output_table
= self
._output
_to
_table
(output
=output
)
370 for r
in output_table
:
372 if "tiller-deploy" in r
[1]:
378 msg
= "Tiller deployment not found in cluster {}".format(cluster_id
)
381 self
.log
.debug("namespace for tiller: {}".format(namespace
))
384 # uninstall tiller from cluster
385 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
386 command
= "{} --kubeconfig={} --home={} reset".format(
387 self
._helm
_command
, paths
["kube_config"], paths
["helm_dir"]
389 self
.log
.debug("resetting: {}".format(command
))
390 output
, _rc
= await self
._local
_async
_exec
(
391 command
=command
, raise_exception_on_error
=True, env
=env
393 # Delete clusterrolebinding and serviceaccount.
394 # Ignore if errors for backward compatibility
396 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
397 "io/osm-tiller-cluster-rule"
398 ).format(self
.kubectl_command
, paths
["kube_config"])
399 output
, _rc
= await self
._local
_async
_exec
(
400 command
=command
, raise_exception_on_error
=False, env
=env
402 command
= "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
403 self
.kubectl_command
, paths
["kube_config"], self
.service_account
405 output
, _rc
= await self
._local
_async
_exec
(
406 command
=command
, raise_exception_on_error
=False, env
=env
410 self
.log
.debug("namespace not found")
412 async def _instances_list(self
, cluster_id
):
415 paths
, env
= self
._init
_paths
_env
(
416 cluster_name
=cluster_id
, create_if_not_exist
=True
419 command
= "{} list --output yaml".format(self
._helm
_command
)
421 output
, _rc
= await self
._local
_async
_exec
(
422 command
=command
, raise_exception_on_error
=True, env
=env
425 if output
and len(output
) > 0:
426 # parse yaml and update keys to lower case to unify with helm3
427 instances
= yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
429 for instance
in instances
:
430 new_instance
= dict((k
.lower(), v
) for k
, v
in instance
.items())
431 new_instances
.append(new_instance
)
436 def _get_inspect_command(
437 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
439 inspect_command
= "{} inspect {} {}{} {}".format(
440 self
._helm
_command
, show_command
, kdu_model
, repo_str
, version
442 return inspect_command
444 async def _status_kdu(
448 namespace
: str = None,
449 show_error_log
: bool = False,
450 return_text
: bool = False,
454 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
458 paths
, env
= self
._init
_paths
_env
(
459 cluster_name
=cluster_id
, create_if_not_exist
=True
461 command
= "{} status {} --output yaml".format(self
._helm
_command
, kdu_instance
)
462 output
, rc
= await self
._local
_async
_exec
(
464 raise_exception_on_error
=True,
465 show_error_log
=show_error_log
,
475 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
477 # remove field 'notes'
479 del data
.get("info").get("status")["notes"]
483 # parse field 'resources'
485 resources
= str(data
.get("info").get("status").get("resources"))
486 resource_table
= self
._output
_to
_table
(resources
)
487 data
.get("info").get("status")["resources"] = resource_table
491 # set description to lowercase (unify with helm3)
493 data
.get("info")["description"] = data
.get("info").pop("Description")
499 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
501 cluster_filter
= {"_admin.helm-chart.id": cluster_uuid
}
502 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
504 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
508 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
511 async def _is_install_completed(self
, cluster_id
: str, kdu_instance
: str) -> bool:
513 status
= await self
._status
_kdu
(
514 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, return_text
=False
517 # extract info.status.resources-> str
520 # NAME READY UP-TO-DATE AVAILABLE AGE
521 # halting-horse-mongodb 0/1 1 0 0s
522 # halting-petit-mongodb 1/1 1 0 0s
524 resources
= K8sHelmBaseConnector
._get
_deep
(
525 status
, ("info", "status", "resources")
529 resources
= K8sHelmBaseConnector
._output
_to
_table
(resources
)
531 num_lines
= len(resources
)
534 while index
< num_lines
:
536 line1
= resources
[index
]
538 # find '==>' in column 0
539 if line1
[0] == "==>":
540 line2
= resources
[index
]
542 # find READY in column 1
543 if line2
[1] == "READY":
545 line3
= resources
[index
]
547 while len(line3
) > 1 and index
< num_lines
:
548 ready_value
= line3
[1]
549 parts
= ready_value
.split(sep
="/")
550 current
= int(parts
[0])
551 total
= int(parts
[1])
553 self
.log
.debug("NOT READY:\n {}".format(line3
))
555 line3
= resources
[index
]
563 def _get_install_command(
564 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
569 timeout_str
= "--timeout {}".format(timeout
)
574 atomic_str
= "--atomic"
578 namespace_str
= "--namespace {}".format(namespace
)
583 version_str
= version_str
= "--version {}".format(version
)
586 "{helm} install {atomic} --output yaml "
587 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
588 helm
=self
._helm
_command
,
600 def _get_upgrade_command(
601 self
, kdu_model
, kdu_instance
, namespace
, params_str
, version
, atomic
, timeout
606 timeout_str
= "--timeout {}".format(timeout
)
611 atomic_str
= "--atomic"
616 version_str
= "--version {}".format(version
)
618 command
= "{helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}".format(
619 helm
=self
._helm
_command
,
629 def _get_rollback_command(self
, kdu_instance
, namespace
, revision
) -> str:
630 return "{} rollback {} {} --wait".format(
631 self
._helm
_command
, kdu_instance
, revision
634 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
635 return "{} delete --purge {}".format(self
._helm
_command
, kdu_instance
)