2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
26 from n2vc
.k8s_helm_base_conn
import K8sHelmBaseConnector
27 from n2vc
.exceptions
import K8sException
30 class K8sHelmConnector(K8sHelmBaseConnector
):
33 ####################################################################################
34 ################################### P U B L I C ####################################
35 ####################################################################################
42 kubectl_command
: str = "/usr/bin/kubectl",
43 helm_command
: str = "/usr/bin/helm",
48 Initializes helm connector for helm v2
50 :param fs: file system for kubernetes and helm configuration
51 :param db: database object to write current operation status
52 :param kubectl_command: path to kubectl executable
53 :param helm_command: path to helm executable
55 :param on_update_db: callback called when k8s connector updates database
59 K8sHelmBaseConnector
.__init
__(
64 kubectl_command
=kubectl_command
,
65 helm_command
=helm_command
,
66 on_update_db
=on_update_db
,
69 self
.log
.info("Initializing K8S Helm2 connector")
71 # initialize helm client-only
72 self
.log
.debug("Initializing helm client-only...")
73 command
= "{} init --client-only {} ".format(
75 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
76 if self
._stable
_repo
_url
80 asyncio
.ensure_future(
81 self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False)
83 # loop = asyncio.get_event_loop()
84 # loop.run_until_complete(self._local_async_exec(command=command,
85 # raise_exception_on_error=False))
86 except Exception as e
:
88 msg
="helm init failed (it was already initialized): {}".format(e
)
91 self
.log
.info("K8S Helm2 connector initialized")
101 db_dict
: dict = None,
102 kdu_name
: str = None,
103 namespace
: str = None,
107 Deploys of a new KDU instance. It would implicitly rely on the `install` call
108 to deploy the Chart/Bundle properly parametrized (in practice, this call would
109 happen before any _initial-config-primitive_of the VNF is called).
111 :param cluster_uuid: UUID of a K8s cluster known by OSM
112 :param kdu_model: chart/ reference (string), which can be either
114 - a name of chart available via the repos known by OSM
115 - a path to a packaged chart
116 - a path to an unpacked chart directory or a URL
117 :param kdu_instance: Kdu instance name
118 :param atomic: If set, installation process purges chart/bundle on fail, also
119 will wait until all the K8s objects are active
120 :param timeout: Time in seconds to wait for the install of the chart/bundle
121 (defaults to Helm default timeout: 300s)
122 :param params: dictionary of key-value pairs for instantiation parameters
123 (overriding default values)
124 :param dict db_dict: where to write into database when the status changes.
125 It contains a dict with {collection: <str>, filter: {},
127 e.g. {collection: "nsrs", filter:
128 {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
129 :param kdu_name: Name of the KDU instance to be installed
130 :param namespace: K8s namespace to use for the KDU instance
131 :param kwargs: Additional parameters (None yet)
132 :return: True if successful
134 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
135 self
.log
.debug("installing {} in cluster {}".format(kdu_model
, cluster_id
))
138 self
.fs
.sync(from_path
=cluster_id
)
141 paths
, env
= self
._init
_paths
_env
(
142 cluster_name
=cluster_id
, create_if_not_exist
=True
145 await self
._install
_impl
(
160 self
.fs
.reverse_sync(from_path
=cluster_id
)
162 self
.log
.debug("Returning kdu_instance {}".format(kdu_instance
))
165 async def inspect_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
168 "inspect kdu_model {} from (optional) repo: {}".format(kdu_model
, repo_url
)
171 return await self
._exec
_inspect
_command
(
172 inspect_command
="", kdu_model
=kdu_model
, repo_url
=repo_url
176 ####################################################################################
177 ################################### P R I V A T E ##################################
178 ####################################################################################
181 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
183 Creates and returns base cluster and kube dirs and returns them.
184 Also created helm3 dirs according to new directory specification, paths are
185 returned and also environment variables that must be provided to execute commands
187 Helm 2 directory specification uses helm_home dir:
189 The variables assigned for this paths are:
190 - Helm hone: $HELM_HOME
191 - helm kubeconfig: $KUBECONFIG
193 :param cluster_name: cluster_name
194 :return: Dictionary with config_paths and dictionary with helm environment variables
197 if base
.endswith("/") or base
.endswith("\\"):
200 # base dir for cluster
201 cluster_dir
= base
+ "/" + cluster_name
204 kube_dir
= cluster_dir
+ "/" + ".kube"
205 if create_if_not_exist
and not os
.path
.exists(kube_dir
):
206 self
.log
.debug("Creating dir {}".format(kube_dir
))
207 os
.makedirs(kube_dir
)
210 helm_dir
= cluster_dir
+ "/" + ".helm"
211 if create_if_not_exist
and not os
.path
.exists(helm_dir
):
212 self
.log
.debug("Creating dir {}".format(helm_dir
))
213 os
.makedirs(helm_dir
)
215 config_filename
= kube_dir
+ "/config"
217 # 2 - Prepare dictionary with paths
219 "kube_dir": kube_dir
,
220 "kube_config": config_filename
,
221 "cluster_dir": cluster_dir
,
222 "helm_dir": helm_dir
,
225 for file_name
, file in paths
.items():
226 if "dir" in file_name
and not os
.path
.exists(file):
227 err_msg
= "{} dir does not exist".format(file)
228 self
.log
.error(err_msg
)
229 raise K8sException(err_msg
)
231 # 3 - Prepare environment variables
232 env
= {"HELM_HOME": helm_dir
, "KUBECONFIG": config_filename
}
236 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
239 paths
, env
= self
._init
_paths
_env
(
240 cluster_name
=cluster_id
, create_if_not_exist
=True
243 command1
= "env KUBECONFIG={} {} get manifest {} ".format(
244 kubeconfig
, self
._helm
_command
, kdu_instance
246 command2
= "{} get --namespace={} -f -".format(self
.kubectl_command
, namespace
)
247 output
, _rc
= await self
._local
_async
_exec
_pipe
(
248 command1
, command2
, env
=env
, raise_exception_on_error
=True
250 services
= self
._parse
_services
(output
)
254 async def _cluster_init(
255 self
, cluster_id
: str, namespace
: str, paths
: dict, env
: dict
258 Implements the helm version dependent cluster initialization:
259 For helm2 it initialized tiller environment if needed
262 # check if tiller pod is up in cluster
263 command
= "{} --kubeconfig={} --namespace={} get deployments".format(
264 self
.kubectl_command
, paths
["kube_config"], namespace
266 output
, _rc
= await self
._local
_async
_exec
(
267 command
=command
, raise_exception_on_error
=True, env
=env
270 output_table
= self
._output
_to
_table
(output
=output
)
272 # find 'tiller' pod in all pods
273 already_initialized
= False
275 for row
in output_table
:
276 if row
[0].startswith("tiller-deploy"):
277 already_initialized
= True
283 n2vc_installed_sw
= False
284 if not already_initialized
:
286 "Initializing helm in client and server: {}".format(cluster_id
)
288 command
= "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
289 self
.kubectl_command
, paths
["kube_config"], self
.service_account
291 _
, _rc
= await self
._local
_async
_exec
(
292 command
=command
, raise_exception_on_error
=False, env
=env
296 "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
297 "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
298 ).format(self
.kubectl_command
, paths
["kube_config"], self
.service_account
)
299 _
, _rc
= await self
._local
_async
_exec
(
300 command
=command
, raise_exception_on_error
=False, env
=env
304 "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
308 paths
["kube_config"],
311 self
.service_account
,
312 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
313 if self
._stable
_repo
_url
316 _
, _rc
= await self
._local
_async
_exec
(
317 command
=command
, raise_exception_on_error
=True, env
=env
319 n2vc_installed_sw
= True
321 # check client helm installation
322 check_file
= paths
["helm_dir"] + "/repository/repositories.yaml"
323 if not self
._check
_file
_exists
(
324 filename
=check_file
, exception_if_not_exists
=False
326 self
.log
.info("Initializing helm in client: {}".format(cluster_id
))
328 "{} --kubeconfig={} --tiller-namespace={} "
329 "--home={} init --client-only {} "
332 paths
["kube_config"],
335 "--stable-repo-url {}".format(self
._stable
_repo
_url
)
336 if self
._stable
_repo
_url
339 output
, _rc
= await self
._local
_async
_exec
(
340 command
=command
, raise_exception_on_error
=True, env
=env
343 self
.log
.info("Helm client already initialized")
345 # remove old stable repo and add new one
346 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
347 repo_list
= await self
.repo_list(cluster_uuid
)
348 for repo
in repo_list
:
349 if repo
["name"] == "stable" and repo
["url"] != self
._stable
_repo
_url
:
350 self
.log
.debug("Add new stable repo url: {}")
351 await self
.repo_remove(cluster_uuid
, "stable")
352 if self
._stable
_repo
_url
:
353 await self
.repo_add(cluster_uuid
, "stable", self
._stable
_repo
_url
)
356 return n2vc_installed_sw
358 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
359 # uninstall Tiller if necessary
361 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
364 paths
, env
= self
._init
_paths
_env
(
365 cluster_name
=cluster_id
, create_if_not_exist
=True
369 # find namespace for tiller pod
370 command
= "{} --kubeconfig={} get deployments --all-namespaces".format(
371 self
.kubectl_command
, paths
["kube_config"]
373 output
, _rc
= await self
._local
_async
_exec
(
374 command
=command
, raise_exception_on_error
=False, env
=env
376 output_table
= self
._output
_to
_table
(output
=output
)
378 for r
in output_table
:
380 if "tiller-deploy" in r
[1]:
386 msg
= "Tiller deployment not found in cluster {}".format(cluster_id
)
389 self
.log
.debug("namespace for tiller: {}".format(namespace
))
392 # uninstall tiller from cluster
393 self
.log
.debug("Uninstalling tiller from cluster {}".format(cluster_id
))
394 command
= "{} --kubeconfig={} --home={} reset".format(
395 self
._helm
_command
, paths
["kube_config"], paths
["helm_dir"]
397 self
.log
.debug("resetting: {}".format(command
))
398 output
, _rc
= await self
._local
_async
_exec
(
399 command
=command
, raise_exception_on_error
=True, env
=env
401 # Delete clusterrolebinding and serviceaccount.
402 # Ignore if errors for backward compatibility
404 "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
405 "io/osm-tiller-cluster-rule"
406 ).format(self
.kubectl_command
, paths
["kube_config"])
407 output
, _rc
= await self
._local
_async
_exec
(
408 command
=command
, raise_exception_on_error
=False, env
=env
410 command
= "{} --kubeconfig={} --namespace kube-system delete serviceaccount/{}".format(
411 self
.kubectl_command
, paths
["kube_config"], self
.service_account
413 output
, _rc
= await self
._local
_async
_exec
(
414 command
=command
, raise_exception_on_error
=False, env
=env
418 self
.log
.debug("namespace not found")
420 async def _instances_list(self
, cluster_id
):
423 paths
, env
= self
._init
_paths
_env
(
424 cluster_name
=cluster_id
, create_if_not_exist
=True
427 command
= "{} list --output yaml".format(self
._helm
_command
)
429 output
, _rc
= await self
._local
_async
_exec
(
430 command
=command
, raise_exception_on_error
=True, env
=env
433 if output
and len(output
) > 0:
434 # parse yaml and update keys to lower case to unify with helm3
435 instances
= yaml
.load(output
, Loader
=yaml
.SafeLoader
).get("Releases")
437 for instance
in instances
:
438 new_instance
= dict((k
.lower(), v
) for k
, v
in instance
.items())
439 new_instances
.append(new_instance
)
444 def _get_inspect_command(
445 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
447 inspect_command
= "{} inspect {} {}{} {}".format(
448 self
._helm
_command
, show_command
, kdu_model
, repo_str
, version
450 return inspect_command
452 def _get_get_command(
453 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
455 get_command
= "env KUBECONFIG={} {} get {} {} --output yaml".format(
456 kubeconfig
, self
._helm
_command
, get_command
, kdu_instance
460 async def _status_kdu(
464 namespace
: str = None,
465 show_error_log
: bool = False,
466 return_text
: bool = False,
470 "status of kdu_instance: {}, namespace: {} ".format(kdu_instance
, namespace
)
474 paths
, env
= self
._init
_paths
_env
(
475 cluster_name
=cluster_id
, create_if_not_exist
=True
477 command
= ("env KUBECONFIG={} {} status {} --output yaml").format(
478 paths
["kube_config"], self
._helm
_command
, kdu_instance
480 output
, rc
= await self
._local
_async
_exec
(
482 raise_exception_on_error
=True,
483 show_error_log
=show_error_log
,
493 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
495 # remove field 'notes'
497 del data
.get("info").get("status")["notes"]
501 # parse field 'resources'
503 resources
= str(data
.get("info").get("status").get("resources"))
504 resource_table
= self
._output
_to
_table
(resources
)
505 data
.get("info").get("status")["resources"] = resource_table
509 # set description to lowercase (unify with helm3)
511 data
.get("info")["description"] = data
.get("info").pop("Description")
517 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
519 cluster_filter
= {"_admin.helm-chart.id": cluster_uuid
}
520 cluster
= self
.db
.get_one("k8sclusters", cluster_filter
)
522 repo_ids
= cluster
.get("_admin").get("helm_chart_repos") or []
526 "k8cluster with helm-id : {} not found".format(cluster_uuid
)
529 async def _is_install_completed(self
, cluster_id
: str, kdu_instance
: str) -> bool:
531 paths
, env
= self
._init
_paths
_env
(
532 cluster_name
=cluster_id
, create_if_not_exist
=True
535 status
= await self
._status
_kdu
(
536 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, return_text
=False
539 # extract info.status.resources-> str
542 # NAME READY UP-TO-DATE AVAILABLE AGE
543 # halting-horse-mongodb 0/1 1 0 0s
544 # halting-petit-mongodb 1/1 1 0 0s
546 resources
= K8sHelmBaseConnector
._get
_deep
(
547 status
, ("info", "status", "resources")
551 resources
= K8sHelmBaseConnector
._output
_to
_table
(resources
)
553 num_lines
= len(resources
)
556 while index
< num_lines
:
558 line1
= resources
[index
]
560 # find '==>' in column 0
561 if line1
[0] == "==>":
562 line2
= resources
[index
]
564 # find READY in column 1
565 if line2
[1] == "READY":
567 line3
= resources
[index
]
569 while len(line3
) > 1 and index
< num_lines
:
570 ready_value
= line3
[1]
571 parts
= ready_value
.split(sep
="/")
572 current
= int(parts
[0])
573 total
= int(parts
[1])
575 self
.log
.debug("NOT READY:\n {}".format(line3
))
577 line3
= resources
[index
]
585 def _get_install_command(
599 timeout_str
= "--timeout {}".format(timeout
)
604 atomic_str
= "--atomic"
608 namespace_str
= "--namespace {}".format(namespace
)
613 version_str
= "--version {}".format(version
)
616 "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
617 "{params} {timeout} --name={name} {ns} {model} {ver}".format(
618 kubeconfig
=kubeconfig
,
619 helm
=self
._helm
_command
,
631 def _get_upgrade_scale_command(
647 timeout_str
= "--timeout {}s".format(timeout
)
652 atomic_str
= "--atomic"
657 version_str
= "--version {}".format(version
)
661 scale_dict
= {"{}.{}".format(resource_name
, replica_str
): scale
}
663 scale_dict
= {replica_str
: scale
}
665 scale_str
= self
._params
_to
_set
_option
(scale_dict
)
668 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {scale} {timeout} {name} {model} {ver}"
670 helm
=self
._helm
_command
,
677 kubeconfig
=kubeconfig
,
681 def _get_upgrade_command(
695 timeout_str
= "--timeout {}".format(timeout
)
700 atomic_str
= "--atomic"
705 version_str
= "--version {}".format(version
)
708 "env KUBECONFIG={kubeconfig} {helm} upgrade {atomic} --output yaml {params} {timeout} {name} {model} {ver}"
710 kubeconfig
=kubeconfig
,
711 helm
=self
._helm
_command
,
721 def _get_rollback_command(
722 self
, kdu_instance
, namespace
, revision
, kubeconfig
724 return "env KUBECONFIG={} {} rollback {} {} --wait".format(
725 kubeconfig
, self
._helm
_command
, kdu_instance
, revision
728 def _get_uninstall_command(
729 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
731 return "env KUBECONFIG={} {} delete --purge {}".format(
732 kubeconfig
, self
._helm
_command
, kdu_instance