2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
32 from uuid
import uuid4
34 from n2vc
.exceptions
import K8sException
35 from n2vc
.k8s_conn
import K8sConnector
38 class K8sHelmBaseConnector(K8sConnector
):
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
45 service_account
= "osm"
46 _STABLE_REPO_URL
= "https://charts.helm.sh/stable"
52 kubectl_command
: str = "/usr/bin/kubectl",
53 helm_command
: str = "/usr/bin/helm",
56 vca_config
: dict = None,
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 # random numbers for release name generation
74 random
.seed(time
.time())
79 # exception if kubectl is not installed
80 self
.kubectl_command
= kubectl_command
81 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
83 # exception if helm is not installed
84 self
._helm
_command
= helm_command
85 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
87 # obtain stable repo url from config or apply default
88 if not vca_config
or not vca_config
.get("stablerepourl"):
89 self
._stable
_repo
_url
= self
._STABLE
_REPO
_URL
91 self
._stable
_repo
_url
= vca_config
.get("stablerepourl")
94 def _get_namespace_cluster_id(cluster_uuid
: str) -> (str, str):
96 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
97 cluster_id for backward compatibility
99 namespace
, _
, cluster_id
= cluster_uuid
.rpartition(':')
100 return namespace
, cluster_id
103 self
, k8s_creds
: str, namespace
: str = "kube-system", reuse_cluster_uuid
=None
106 It prepares a given K8s cluster environment to run Charts
108 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
110 :param namespace: optional namespace to be used for helm. By default,
111 'kube-system' will be used
112 :param reuse_cluster_uuid: existing cluster uuid for reuse
113 :return: uuid of the K8s cluster and True if connector has installed some
114 software in the cluster
115 (on error, an exception will be raised)
118 if reuse_cluster_uuid
:
119 namespace_
, cluster_id
= self
._get
_namespace
_cluster
_id
(reuse_cluster_uuid
)
120 namespace
= namespace_
or namespace
122 cluster_id
= str(uuid4())
123 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
125 self
.log
.debug("Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
))
127 paths
, env
= self
._init
_paths
_env
(
128 cluster_name
=cluster_id
, create_if_not_exist
=True
130 mode
= stat
.S_IRUSR | stat
.S_IWUSR
131 with
open(paths
["kube_config"], "w", mode
) as f
:
133 os
.chmod(paths
["kube_config"], 0o600)
135 # Code with initialization specific of helm version
136 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
138 # sync fs with local data
139 self
.fs
.reverse_sync(from_path
=cluster_id
)
141 self
.log
.info("Cluster {} initialized".format(cluster_id
))
143 return cluster_uuid
, n2vc_installed_sw
146 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
148 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
149 self
.log
.debug("Cluster {}, adding {} repository {}. URL: {}".format(
150 cluster_id
, repo_type
, name
, url
))
153 self
.fs
.sync(from_path
=cluster_id
)
156 paths
, env
= self
._init
_paths
_env
(
157 cluster_name
=cluster_id
, create_if_not_exist
=True
161 command
= "{} repo update".format(
164 self
.log
.debug("updating repo: {}".format(command
))
165 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=False, env
=env
)
167 # helm repo add name url
168 command
= "{} repo add {} {}".format(
169 self
._helm
_command
, name
, url
171 self
.log
.debug("adding repo: {}".format(command
))
172 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True, env
=env
)
175 self
.fs
.reverse_sync(from_path
=cluster_id
)
177 async def repo_list(self
, cluster_uuid
: str) -> list:
179 Get the list of registered repositories
181 :return: list of registered repositories: [ (name, url) .... ]
184 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
185 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
188 self
.fs
.sync(from_path
=cluster_id
)
191 paths
, env
= self
._init
_paths
_env
(
192 cluster_name
=cluster_id
, create_if_not_exist
=True
195 command
= "{} repo list --output yaml".format(
199 # Set exception to false because if there are no repos just want an empty list
200 output
, _rc
= await self
._local
_async
_exec
(
201 command
=command
, raise_exception_on_error
=False, env
=env
205 self
.fs
.reverse_sync(from_path
=cluster_id
)
208 if output
and len(output
) > 0:
209 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
210 # unify format between helm2 and helm3 setting all keys lowercase
211 return self
._lower
_keys
_list
(repos
)
217 async def repo_remove(self
, cluster_uuid
: str, name
: str):
219 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
220 self
.log
.debug("remove {} repositories for cluster {}".format(name
, cluster_id
))
223 self
.fs
.sync(from_path
=cluster_id
)
226 paths
, env
= self
._init
_paths
_env
(
227 cluster_name
=cluster_id
, create_if_not_exist
=True
230 command
= "{} repo remove {}".format(
231 self
._helm
_command
, name
233 await self
._local
_async
_exec
(command
=command
, raise_exception_on_error
=True, env
=env
)
236 self
.fs
.reverse_sync(from_path
=cluster_id
)
239 self
, cluster_uuid
: str, force
: bool = False, uninstall_sw
: bool = False
242 namespace
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
243 self
.log
.debug("Resetting K8s environment. cluster uuid: {} uninstall={}"
244 .format(cluster_id
, uninstall_sw
))
247 self
.fs
.sync(from_path
=cluster_id
)
249 # uninstall releases if needed.
251 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
252 if len(releases
) > 0:
256 kdu_instance
= r
.get("name")
257 chart
= r
.get("chart")
259 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
261 await self
.uninstall(
262 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
264 except Exception as e
:
265 # will not raise exception as it was found
266 # that in some cases of previously installed helm releases it
269 "Error uninstalling release {}: {}".format(kdu_instance
, e
)
273 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
276 uninstall_sw
= False # Allow to remove k8s cluster without removing Tiller
279 await self
._uninstall
_sw
(cluster_id
, namespace
)
281 # delete cluster directory
282 self
.log
.debug("Removing directory {}".format(cluster_id
))
283 self
.fs
.file_delete(cluster_id
, ignore_non_exist
=True)
284 # Remove also local directorio if still exist
285 direct
= self
.fs
.path
+ "/" + cluster_id
286 shutil
.rmtree(direct
, ignore_errors
=True)
290 async def _install_impl(
297 timeout
: float = 300,
299 db_dict
: dict = None,
300 kdu_name
: str = None,
301 namespace
: str = None,
304 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
305 cluster_id
=cluster_id
, params
=params
311 parts
= kdu_model
.split(sep
=":")
313 version
= str(parts
[1])
316 # generate a name for the release. Then, check if already exists
318 while kdu_instance
is None:
319 kdu_instance
= self
._generate
_release
_name
(kdu_model
)
321 result
= await self
._status
_kdu
(
322 cluster_id
=cluster_id
,
323 kdu_instance
=kdu_instance
,
325 show_error_log
=False,
327 if result
is not None:
328 # instance already exists: generate a new one
333 command
= self
._get
_install
_command
(kdu_model
, kdu_instance
, namespace
,
334 params_str
, version
, atomic
, timeout
)
336 self
.log
.debug("installing: {}".format(command
))
339 # exec helm in a task
340 exec_task
= asyncio
.ensure_future(
341 coro_or_future
=self
._local
_async
_exec
(
342 command
=command
, raise_exception_on_error
=False, env
=env
346 # write status in another task
347 status_task
= asyncio
.ensure_future(
348 coro_or_future
=self
._store
_status
(
349 cluster_id
=cluster_id
,
350 kdu_instance
=kdu_instance
,
358 # wait for execution task
359 await asyncio
.wait([exec_task
])
364 output
, rc
= exec_task
.result()
368 output
, rc
= await self
._local
_async
_exec
(
369 command
=command
, raise_exception_on_error
=False, env
=env
372 # remove temporal values yaml file
374 os
.remove(file_to_delete
)
377 await self
._store
_status
(
378 cluster_id
=cluster_id
,
379 kdu_instance
=kdu_instance
,
388 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
390 raise K8sException(msg
)
398 kdu_model
: str = None,
400 timeout
: float = 300,
402 db_dict
: dict = None,
404 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
405 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_id
))
408 self
.fs
.sync(from_path
=cluster_id
)
410 # look for instance to obtain namespace
411 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
412 if not instance_info
:
413 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
416 paths
, env
= self
._init
_paths
_env
(
417 cluster_name
=cluster_id
, create_if_not_exist
=True
421 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
422 cluster_id
=cluster_id
, params
=params
428 parts
= kdu_model
.split(sep
=":")
430 version
= str(parts
[1])
433 command
= self
._get
_upgrade
_command
(kdu_model
, kdu_instance
, instance_info
["namespace"],
434 params_str
, version
, atomic
, timeout
)
436 self
.log
.debug("upgrading: {}".format(command
))
440 # exec helm in a task
441 exec_task
= asyncio
.ensure_future(
442 coro_or_future
=self
._local
_async
_exec
(
443 command
=command
, raise_exception_on_error
=False, env
=env
446 # write status in another task
447 status_task
= asyncio
.ensure_future(
448 coro_or_future
=self
._store
_status
(
449 cluster_id
=cluster_id
,
450 kdu_instance
=kdu_instance
,
451 namespace
=instance_info
["namespace"],
458 # wait for execution task
459 await asyncio
.wait([exec_task
])
463 output
, rc
= exec_task
.result()
467 output
, rc
= await self
._local
_async
_exec
(
468 command
=command
, raise_exception_on_error
=False, env
=env
471 # remove temporal values yaml file
473 os
.remove(file_to_delete
)
476 await self
._store
_status
(
477 cluster_id
=cluster_id
,
478 kdu_instance
=kdu_instance
,
479 namespace
=instance_info
["namespace"],
487 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
489 raise K8sException(msg
)
492 self
.fs
.reverse_sync(from_path
=cluster_id
)
494 # return new revision number
495 instance
= await self
.get_instance_info(
496 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
499 revision
= int(instance
.get("revision"))
500 self
.log
.debug("New revision: {}".format(revision
))
506 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
509 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
511 "rollback kdu_instance {} to revision {} from cluster {}".format(
512 kdu_instance
, revision
, cluster_id
517 self
.fs
.sync(from_path
=cluster_id
)
519 # look for instance to obtain namespace
520 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
521 if not instance_info
:
522 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
525 paths
, env
= self
._init
_paths
_env
(
526 cluster_name
=cluster_id
, create_if_not_exist
=True
529 command
= self
._get
_rollback
_command
(kdu_instance
, instance_info
["namespace"],
532 self
.log
.debug("rolling_back: {}".format(command
))
534 # exec helm in a task
535 exec_task
= asyncio
.ensure_future(
536 coro_or_future
=self
._local
_async
_exec
(
537 command
=command
, raise_exception_on_error
=False, env
=env
540 # write status in another task
541 status_task
= asyncio
.ensure_future(
542 coro_or_future
=self
._store
_status
(
543 cluster_id
=cluster_id
,
544 kdu_instance
=kdu_instance
,
545 namespace
=instance_info
["namespace"],
547 operation
="rollback",
552 # wait for execution task
553 await asyncio
.wait([exec_task
])
558 output
, rc
= exec_task
.result()
561 await self
._store
_status
(
562 cluster_id
=cluster_id
,
563 kdu_instance
=kdu_instance
,
564 namespace
=instance_info
["namespace"],
566 operation
="rollback",
572 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
574 raise K8sException(msg
)
577 self
.fs
.reverse_sync(from_path
=cluster_id
)
579 # return new revision number
580 instance
= await self
.get_instance_info(
581 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
584 revision
= int(instance
.get("revision"))
585 self
.log
.debug("New revision: {}".format(revision
))
590 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str):
592 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
593 (this call should happen after all _terminate-config-primitive_ of the VNF
596 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
597 :param kdu_instance: unique name for the KDU instance to be deleted
598 :return: True if successful
601 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
603 "uninstall kdu_instance {} from cluster {}".format(
604 kdu_instance
, cluster_id
609 self
.fs
.sync(from_path
=cluster_id
)
611 # look for instance to obtain namespace
612 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
613 if not instance_info
:
614 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
617 paths
, env
= self
._init
_paths
_env
(
618 cluster_name
=cluster_id
, create_if_not_exist
=True
621 command
= self
._get
_uninstall
_command
(kdu_instance
, instance_info
["namespace"])
622 output
, _rc
= await self
._local
_async
_exec
(
623 command
=command
, raise_exception_on_error
=True, env
=env
627 self
.fs
.reverse_sync(from_path
=cluster_id
)
629 return self
._output
_to
_table
(output
)
631 async def instances_list(self
, cluster_uuid
: str) -> list:
633 returns a list of deployed releases in a cluster
635 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
639 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
640 self
.log
.debug("list releases for cluster {}".format(cluster_id
))
643 self
.fs
.sync(from_path
=cluster_id
)
645 # execute internal command
646 result
= await self
._instances
_list
(cluster_id
)
649 self
.fs
.reverse_sync(from_path
=cluster_id
)
653 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
654 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
655 for instance
in instances
:
656 if instance
.get("name") == kdu_instance
:
658 self
.log
.debug("Instance {} not found".format(kdu_instance
))
661 async def exec_primitive(
663 cluster_uuid
: str = None,
664 kdu_instance
: str = None,
665 primitive_name
: str = None,
666 timeout
: float = 300,
668 db_dict
: dict = None,
670 """Exec primitive (Juju action)
672 :param cluster_uuid: The UUID of the cluster or namespace:cluster
673 :param kdu_instance: The unique name of the KDU instance
674 :param primitive_name: Name of action that will be executed
675 :param timeout: Timeout for action execution
676 :param params: Dictionary of all the parameters needed for the action
677 :db_dict: Dictionary for any additional data
679 :return: Returns the output of the action
682 "KDUs deployed with Helm don't support actions "
683 "different from rollback, upgrade and status"
686 async def get_services(self
,
689 namespace
: str) -> list:
691 Returns a list of services defined for the specified kdu instance.
693 :param cluster_uuid: UUID of a K8s cluster known by OSM
694 :param kdu_instance: unique name for the KDU instance
695 :param namespace: K8s namespace used by the KDU instance
696 :return: If successful, it will return a list of services, Each service
697 can have the following data:
698 - `name` of the service
699 - `type` type of service in the k8 cluster
700 - `ports` List of ports offered by the service, for each port includes at least
702 - `cluster_ip` Internal ip to be used inside k8s cluster
703 - `external_ip` List of external ips (in case they are available)
706 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
708 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
709 cluster_uuid
, kdu_instance
714 self
.fs
.sync(from_path
=cluster_id
)
716 # get list of services names for kdu
717 service_names
= await self
._get
_services
(cluster_id
, kdu_instance
, namespace
)
720 for service
in service_names
:
721 service
= await self
._get
_service
(cluster_id
, service
, namespace
)
722 service_list
.append(service
)
725 self
.fs
.reverse_sync(from_path
=cluster_id
)
729 async def get_service(self
,
732 namespace
: str) -> object:
735 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
736 service_name
, namespace
, cluster_uuid
)
739 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
742 self
.fs
.sync(from_path
=cluster_id
)
744 service
= await self
._get
_service
(cluster_id
, service_name
, namespace
)
747 self
.fs
.reverse_sync(from_path
=cluster_id
)
751 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str) -> str:
754 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
755 cluster_uuid
, kdu_instance
759 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
762 self
.fs
.sync(from_path
=cluster_id
)
764 # get instance: needed to obtain namespace
765 instances
= await self
._instances
_list
(cluster_id
=cluster_id
)
766 for instance
in instances
:
767 if instance
.get("name") == kdu_instance
:
770 # instance does not exist
771 raise K8sException("Instance name: {} not found in cluster: {}".format(
772 kdu_instance
, cluster_id
))
774 status
= await self
._status
_kdu
(
775 cluster_id
=cluster_id
,
776 kdu_instance
=kdu_instance
,
777 namespace
=instance
["namespace"],
783 self
.fs
.reverse_sync(from_path
=cluster_id
)
787 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
790 "inspect kdu_model values {} from (optional) repo: {}".format(
795 return await self
._exec
_inspect
_comand
(
796 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
799 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
802 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
805 return await self
._exec
_inspect
_comand
(
806 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
809 async def synchronize_repos(self
, cluster_uuid
: str):
811 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
813 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
814 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
816 local_repo_list
= await self
.repo_list(cluster_uuid
)
817 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
819 deleted_repo_list
= []
822 # iterate over the list of repos in the database that should be
823 # added if not present
824 for repo_name
, db_repo
in db_repo_dict
.items():
826 # check if it is already present
827 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
828 repo_id
= db_repo
.get("_id")
829 if curr_repo_url
!= db_repo
["url"]:
831 self
.log
.debug("repo {} url changed, delete and and again".format(
833 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
834 deleted_repo_list
.append(repo_id
)
837 self
.log
.debug("add repo {}".format(db_repo
["name"]))
838 await self
.repo_add(cluster_uuid
, db_repo
["name"], db_repo
["url"])
839 added_repo_dict
[repo_id
] = db_repo
["name"]
840 except Exception as e
:
842 "Error adding repo id: {}, err_msg: {} ".format(
847 # Delete repos that are present but not in nbi_list
848 for repo_name
in local_repo_dict
:
849 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
850 self
.log
.debug("delete repo {}".format(repo_name
))
852 await self
.repo_remove(cluster_uuid
, repo_name
)
853 deleted_repo_list
.append(repo_name
)
854 except Exception as e
:
856 "Error deleting repo, name: {}, err_msg: {}".format(
861 return deleted_repo_list
, added_repo_dict
865 except Exception as e
:
866 # Do not raise errors synchronizing repos
867 self
.log
.error("Error synchronizing repos: {}".format(e
))
868 raise Exception("Error synchronizing repos: {}".format(e
))
870 def _get_db_repos_dict(self
, repo_ids
: list):
872 for repo_id
in repo_ids
:
873 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
874 db_repos_dict
[db_repo
["name"]] = db_repo
878 ####################################################################################
879 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
880 ####################################################################################
884 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
886 Creates and returns base cluster and kube dirs and returns them.
887 Also created helm3 dirs according to new directory specification, paths are
888 not returned but assigned to helm environment variables
890 :param cluster_name: cluster_name
891 :return: Dictionary with config_paths and dictionary with helm environment variables
895 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
897 Implements the helm version dependent cluster initialization
901 async def _instances_list(self
, cluster_id
):
903 Implements the helm version dependent helm instances list
907 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
):
909 Implements the helm version dependent method to obtain services from a helm instance
913 async def _status_kdu(self
, cluster_id
: str, kdu_instance
: str, namespace
: str = None,
914 show_error_log
: bool = False, return_text
: bool = False):
916 Implements the helm version dependent method to obtain status of a helm instance
920 def _get_install_command(self
, kdu_model
, kdu_instance
, namespace
,
921 params_str
, version
, atomic
, timeout
) -> str:
923 Obtain command to be executed to delete the indicated instance
927 def _get_upgrade_command(self
, kdu_model
, kdu_instance
, namespace
,
928 params_str
, version
, atomic
, timeout
) -> str:
930 Obtain command to be executed to upgrade the indicated instance
934 def _get_rollback_command(self
, kdu_instance
, namespace
, revision
) -> str:
936 Obtain command to be executed to rollback the indicated instance
940 def _get_uninstall_command(self
, kdu_instance
: str, namespace
: str) -> str:
942 Obtain command to be executed to delete the indicated instance
946 def _get_inspect_command(self
, show_command
: str, kdu_model
: str, repo_str
: str,
949 Obtain command to be executed to obtain information about the kdu
953 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
955 Method call to uninstall cluster software for helm. This method is dependent
957 For Helm v2 it will be called when Tiller must be uninstalled
958 For Helm v3 it does nothing and does not need to be callled
962 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
964 Obtains the cluster repos identifiers
968 ####################################################################################
969 ################################### P R I V A T E ##################################
970 ####################################################################################
974 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
975 if os
.path
.exists(filename
):
978 msg
= "File {} does not exist".format(filename
)
979 if exception_if_not_exists
:
980 raise K8sException(msg
)
983 def _remove_multiple_spaces(strobj
):
984 strobj
= strobj
.strip()
986 strobj
= strobj
.replace(" ", " ")
990 def _output_to_lines(output
: str) -> list:
991 output_lines
= list()
992 lines
= output
.splitlines(keepends
=False)
996 output_lines
.append(line
)
1000 def _output_to_table(output
: str) -> list:
1001 output_table
= list()
1002 lines
= output
.splitlines(keepends
=False)
1004 line
= line
.replace("\t", " ")
1006 output_table
.append(line_list
)
1007 cells
= line
.split(sep
=" ")
1011 line_list
.append(cell
)
1015 def _parse_services(output
: str) -> list:
1016 lines
= output
.splitlines(keepends
=False)
1019 line
= line
.replace("\t", " ")
1020 cells
= line
.split(sep
=" ")
1021 if len(cells
) > 0 and cells
[0].startswith("service/"):
1022 elems
= cells
[0].split(sep
="/")
1024 services
.append(elems
[1])
1028 def _get_deep(dictionary
: dict, members
: tuple):
1033 value
= target
.get(m
)
1042 # find key:value in several lines
1044 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1045 for line
in p_lines
:
1047 if line
.startswith(p_key
+ ":"):
1048 parts
= line
.split(":")
1049 the_value
= parts
[1].strip()
1057 def _lower_keys_list(input_list
: list):
1059 Transform the keys in a list of dictionaries to lower case and returns a new list
1063 for dictionary
in input_list
:
1064 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1065 new_list
.append(new_dict
)
1068 def _local_exec(self
, command
: str) -> (str, int):
1069 command
= self
._remove
_multiple
_spaces
(command
)
1070 self
.log
.debug("Executing sync local command: {}".format(command
))
1071 # raise exception if fails
1074 output
= subprocess
.check_output(
1075 command
, shell
=True, universal_newlines
=True
1078 self
.log
.debug(output
)
1082 return output
, return_code
1084 async def _local_async_exec(
1087 raise_exception_on_error
: bool = False,
1088 show_error_log
: bool = True,
1089 encode_utf8
: bool = False,
1093 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1094 self
.log
.debug("Executing async local command: {}, env: {}".format(command
, env
))
1097 command
= shlex
.split(command
)
1099 environ
= os
.environ
.copy()
1104 process
= await asyncio
.create_subprocess_exec(
1105 *command
, stdout
=asyncio
.subprocess
.PIPE
, stderr
=asyncio
.subprocess
.PIPE
,
1109 # wait for command terminate
1110 stdout
, stderr
= await process
.communicate()
1112 return_code
= process
.returncode
1116 output
= stdout
.decode("utf-8").strip()
1117 # output = stdout.decode()
1119 output
= stderr
.decode("utf-8").strip()
1120 # output = stderr.decode()
1122 if return_code
!= 0 and show_error_log
:
1124 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1127 self
.log
.debug("Return code: {}".format(return_code
))
1129 if raise_exception_on_error
and return_code
!= 0:
1130 raise K8sException(output
)
1133 output
= output
.encode("utf-8").strip()
1134 output
= str(output
).replace("\\n", "\n")
1136 return output
, return_code
1138 except asyncio
.CancelledError
:
1140 except K8sException
:
1142 except Exception as e
:
1143 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1145 if raise_exception_on_error
:
1146 raise K8sException(e
) from e
1150 async def _local_async_exec_pipe(self
,
1153 raise_exception_on_error
: bool = True,
1154 show_error_log
: bool = True,
1155 encode_utf8
: bool = False,
1158 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1159 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1160 command
= "{} | {}".format(command1
, command2
)
1161 self
.log
.debug("Executing async local command: {}, env: {}".format(command
, env
))
1164 command1
= shlex
.split(command1
)
1165 command2
= shlex
.split(command2
)
1167 environ
= os
.environ
.copy()
1172 read
, write
= os
.pipe()
1173 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1175 process_2
= await asyncio
.create_subprocess_exec(*command2
, stdin
=read
,
1176 stdout
=asyncio
.subprocess
.PIPE
,
1179 stdout
, stderr
= await process_2
.communicate()
1181 return_code
= process_2
.returncode
1185 output
= stdout
.decode("utf-8").strip()
1186 # output = stdout.decode()
1188 output
= stderr
.decode("utf-8").strip()
1189 # output = stderr.decode()
1191 if return_code
!= 0 and show_error_log
:
1193 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1196 self
.log
.debug("Return code: {}".format(return_code
))
1198 if raise_exception_on_error
and return_code
!= 0:
1199 raise K8sException(output
)
1202 output
= output
.encode("utf-8").strip()
1203 output
= str(output
).replace("\\n", "\n")
1205 return output
, return_code
1206 except asyncio
.CancelledError
:
1208 except K8sException
:
1210 except Exception as e
:
1211 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1213 if raise_exception_on_error
:
1214 raise K8sException(e
) from e
1218 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1220 Obtains the data of the specified service in the k8cluster.
1222 :param cluster_id: id of a K8s cluster known by OSM
1223 :param service_name: name of the K8s service in the specified namespace
1224 :param namespace: K8s namespace used by the KDU instance
1225 :return: If successful, it will return a service with the following data:
1226 - `name` of the service
1227 - `type` type of service in the k8 cluster
1228 - `ports` List of ports offered by the service, for each port includes at least
1229 name, port, protocol
1230 - `cluster_ip` Internal ip to be used inside k8s cluster
1231 - `external_ip` List of external ips (in case they are available)
1235 paths
, env
= self
._init
_paths
_env
(
1236 cluster_name
=cluster_id
, create_if_not_exist
=True
1239 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1240 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1243 output
, _rc
= await self
._local
_async
_exec
(
1244 command
=command
, raise_exception_on_error
=True, env
=env
1247 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1250 "name": service_name
,
1251 "type": self
._get
_deep
(data
, ("spec", "type")),
1252 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1253 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP"))
1255 if service
["type"] == "LoadBalancer":
1256 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1257 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1258 service
["external_ip"] = ip_list
1262 async def _exec_inspect_comand(
1263 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1266 Obtains information about a kdu, no cluster (no env)
1271 repo_str
= " --repo {}".format(repo_url
)
1273 idx
= kdu_model
.find("/")
1276 kdu_model
= kdu_model
[idx
:]
1279 if ":" in kdu_model
:
1280 parts
= kdu_model
.split(sep
=":")
1282 version
= "--version {}".format(str(parts
[1]))
1283 kdu_model
= parts
[0]
1285 full_command
= self
._get
_inspect
_command
(inspect_command
, kdu_model
, repo_str
, version
)
1286 output
, _rc
= await self
._local
_async
_exec
(
1287 command
=full_command
, encode_utf8
=True
1292 async def _store_status(
1297 namespace
: str = None,
1298 check_every
: float = 10,
1299 db_dict
: dict = None,
1300 run_once
: bool = False,
1304 await asyncio
.sleep(check_every
)
1305 detailed_status
= await self
._status
_kdu
(
1306 cluster_id
=cluster_id
, kdu_instance
=kdu_instance
, namespace
=namespace
,
1309 status
= detailed_status
.get("info").get("description")
1310 self
.log
.debug('KDU {} STATUS: {}.'.format(kdu_instance
, status
))
1311 # write status to db
1312 result
= await self
.write_app_status_to_db(
1315 detailed_status
=str(detailed_status
),
1316 operation
=operation
,
1319 self
.log
.info("Error writing in database. Task exiting...")
1321 except asyncio
.CancelledError
:
1322 self
.log
.debug("Task cancelled")
1324 except Exception as e
:
1325 self
.log
.debug("_store_status exception: {}".format(str(e
)), exc_info
=True)
1331 # params for use in -f file
1332 # returns values file option and filename (in order to delete it at the end)
1333 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1335 if params
and len(params
) > 0:
1336 self
._init
_paths
_env
(
1337 cluster_name
=cluster_id
, create_if_not_exist
=True
1340 def get_random_number():
1341 r
= random
.randrange(start
=1, stop
=99999999)
1349 value
= params
.get(key
)
1350 if "!!yaml" in str(value
):
1351 value
= yaml
.load(value
[7:])
1352 params2
[key
] = value
1354 values_file
= get_random_number() + ".yaml"
1355 with
open(values_file
, "w") as stream
:
1356 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1358 return "-f {}".format(values_file
), values_file
1362 # params for use in --set option
1364 def _params_to_set_option(params
: dict) -> str:
1366 if params
and len(params
) > 0:
1369 value
= params
.get(key
, None)
1370 if value
is not None:
1372 params_str
+= "--set "
1376 params_str
+= "{}={}".format(key
, value
)
1380 def _generate_release_name(chart_name
: str):
1381 # check embeded chart (file or dir)
1382 if chart_name
.startswith("/"):
1383 # extract file or directory name
1384 chart_name
= chart_name
[chart_name
.rfind("/") + 1:]
1386 elif "://" in chart_name
:
1387 # extract last portion of URL
1388 chart_name
= chart_name
[chart_name
.rfind("/") + 1:]
1391 for c
in chart_name
:
1392 if c
.isalpha() or c
.isnumeric():
1399 # if does not start with alpha character, prefix 'a'
1400 if not name
[0].isalpha():
1405 def get_random_number():
1406 r
= random
.randrange(start
=1, stop
=99999999)
1408 s
= s
.rjust(10, "0")
1411 name
= name
+ get_random_number()