2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
31 from uuid
import uuid4
33 from n2vc
.config
import EnvironConfig
34 from n2vc
.exceptions
import K8sException
35 from n2vc
.k8s_conn
import K8sConnector
38 class K8sHelmBaseConnector(K8sConnector
):
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
46 service_account
= "osm"
52 kubectl_command
: str = "/usr/bin/kubectl",
53 helm_command
: str = "/usr/bin/helm",
59 :param fs: file system for kubernetes and helm configuration
60 :param db: database object to write current operation status
61 :param kubectl_command: path to kubectl executable
62 :param helm_command: path to helm executable
64 :param on_update_db: callback called when k8s connector updates database
68 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
70 self
.log
.info("Initializing K8S Helm connector")
72 self
.config
= EnvironConfig()
73 # random numbers for release name generation
74 random
.seed(time
.time())
79 # exception if kubectl is not installed
80 self
.kubectl_command
= kubectl_command
81 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
83 # exception if helm is not installed
84 self
._helm
_command
= helm_command
85 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
87 # obtain stable repo url from config or apply default
88 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
89 if self
._stable
_repo
_url
== "None":
90 self
._stable
_repo
_url
= None
92 def _get_namespace(self
, cluster_uuid
: str) -> str:
94 Obtains the namespace used by the cluster with the uuid passed by argument
96 param: cluster_uuid: cluster's uuid
99 # first, obtain the cluster corresponding to the uuid passed by argument
100 k8scluster
= self
.db
.get_one(
101 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
103 return k8scluster
.get("namespace")
108 namespace
: str = "kube-system",
109 reuse_cluster_uuid
=None,
113 It prepares a given K8s cluster environment to run Charts
115 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
117 :param namespace: optional namespace to be used for helm. By default,
118 'kube-system' will be used
119 :param reuse_cluster_uuid: existing cluster uuid for reuse
120 :param kwargs: Additional parameters (None yet)
121 :return: uuid of the K8s cluster and True if connector has installed some
122 software in the cluster
123 (on error, an exception will be raised)
126 if reuse_cluster_uuid
:
127 cluster_id
= reuse_cluster_uuid
129 cluster_id
= str(uuid4())
132 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
135 paths
, env
= self
._init
_paths
_env
(
136 cluster_name
=cluster_id
, create_if_not_exist
=True
138 mode
= stat
.S_IRUSR | stat
.S_IWUSR
139 with
open(paths
["kube_config"], "w", mode
) as f
:
141 os
.chmod(paths
["kube_config"], 0o600)
143 # Code with initialization specific of helm version
144 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
146 # sync fs with local data
147 self
.fs
.reverse_sync(from_path
=cluster_id
)
149 self
.log
.info("Cluster {} initialized".format(cluster_id
))
151 return cluster_id
, n2vc_installed_sw
154 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
157 "Cluster {}, adding {} repository {}. URL: {}".format(
158 cluster_uuid
, repo_type
, name
, url
163 paths
, env
= self
._init
_paths
_env
(
164 cluster_name
=cluster_uuid
, create_if_not_exist
=True
168 self
.fs
.sync(from_path
=cluster_uuid
)
171 command
= "env KUBECONFIG={} {} repo update".format(
172 paths
["kube_config"], self
._helm
_command
174 self
.log
.debug("updating repo: {}".format(command
))
175 await self
._local
_async
_exec
(
176 command
=command
, raise_exception_on_error
=False, env
=env
179 # helm repo add name url
180 command
= "env KUBECONFIG={} {} repo add {} {}".format(
181 paths
["kube_config"], self
._helm
_command
, name
, url
183 self
.log
.debug("adding repo: {}".format(command
))
184 await self
._local
_async
_exec
(
185 command
=command
, raise_exception_on_error
=True, env
=env
189 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
191 async def repo_list(self
, cluster_uuid
: str) -> list:
193 Get the list of registered repositories
195 :return: list of registered repositories: [ (name, url) .... ]
198 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
201 paths
, env
= self
._init
_paths
_env
(
202 cluster_name
=cluster_uuid
, create_if_not_exist
=True
206 self
.fs
.sync(from_path
=cluster_uuid
)
208 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
209 paths
["kube_config"], self
._helm
_command
212 # Set exception to false because if there are no repos just want an empty list
213 output
, _rc
= await self
._local
_async
_exec
(
214 command
=command
, raise_exception_on_error
=False, env
=env
218 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
221 if output
and len(output
) > 0:
222 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
223 # unify format between helm2 and helm3 setting all keys lowercase
224 return self
._lower
_keys
_list
(repos
)
230 async def repo_remove(self
, cluster_uuid
: str, name
: str):
232 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
236 paths
, env
= self
._init
_paths
_env
(
237 cluster_name
=cluster_uuid
, create_if_not_exist
=True
241 self
.fs
.sync(from_path
=cluster_uuid
)
243 command
= "env KUBECONFIG={} {} repo remove {}".format(
244 paths
["kube_config"], self
._helm
_command
, name
246 await self
._local
_async
_exec
(
247 command
=command
, raise_exception_on_error
=True, env
=env
251 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
257 uninstall_sw
: bool = False,
262 Resets the Kubernetes cluster by removing the helm deployment that represents it.
264 :param cluster_uuid: The UUID of the cluster to reset
265 :param force: Boolean to force the reset
266 :param uninstall_sw: Boolean to force the reset
267 :param kwargs: Additional parameters (None yet)
268 :return: Returns True if successful or raises an exception.
270 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
272 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
273 cluster_uuid
, uninstall_sw
278 self
.fs
.sync(from_path
=cluster_uuid
)
280 # uninstall releases if needed.
282 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
283 if len(releases
) > 0:
287 kdu_instance
= r
.get("name")
288 chart
= r
.get("chart")
290 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
292 await self
.uninstall(
293 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
295 except Exception as e
:
296 # will not raise exception as it was found
297 # that in some cases of previously installed helm releases it
300 "Error uninstalling release {}: {}".format(
306 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
307 ).format(cluster_uuid
)
310 False # Allow to remove k8s cluster without removing Tiller
314 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
316 # delete cluster directory
317 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
318 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
319 # Remove also local directorio if still exist
320 direct
= self
.fs
.path
+ "/" + cluster_uuid
321 shutil
.rmtree(direct
, ignore_errors
=True)
325 async def _install_impl(
333 timeout
: float = 300,
335 db_dict
: dict = None,
336 kdu_name
: str = None,
337 namespace
: str = None,
340 paths
, env
= self
._init
_paths
_env
(
341 cluster_name
=cluster_id
, create_if_not_exist
=True
345 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
346 cluster_id
=cluster_id
, params
=params
352 parts
= kdu_model
.split(sep
=":")
354 version
= str(parts
[1])
357 command
= self
._get
_install
_command
(
365 paths
["kube_config"],
368 self
.log
.debug("installing: {}".format(command
))
371 # exec helm in a task
372 exec_task
= asyncio
.ensure_future(
373 coro_or_future
=self
._local
_async
_exec
(
374 command
=command
, raise_exception_on_error
=False, env
=env
378 # write status in another task
379 status_task
= asyncio
.ensure_future(
380 coro_or_future
=self
._store
_status
(
381 cluster_id
=cluster_id
,
382 kdu_instance
=kdu_instance
,
390 # wait for execution task
391 await asyncio
.wait([exec_task
])
396 output
, rc
= exec_task
.result()
400 output
, rc
= await self
._local
_async
_exec
(
401 command
=command
, raise_exception_on_error
=False, env
=env
404 # remove temporal values yaml file
406 os
.remove(file_to_delete
)
409 await self
._store
_status
(
410 cluster_id
=cluster_id
,
411 kdu_instance
=kdu_instance
,
420 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
422 raise K8sException(msg
)
428 kdu_model
: str = None,
430 timeout
: float = 300,
432 db_dict
: dict = None,
434 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
437 self
.fs
.sync(from_path
=cluster_uuid
)
439 # look for instance to obtain namespace
440 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
441 if not instance_info
:
442 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
445 paths
, env
= self
._init
_paths
_env
(
446 cluster_name
=cluster_uuid
, create_if_not_exist
=True
450 self
.fs
.sync(from_path
=cluster_uuid
)
453 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
454 cluster_id
=cluster_uuid
, params
=params
460 parts
= kdu_model
.split(sep
=":")
462 version
= str(parts
[1])
465 command
= self
._get
_upgrade
_command
(
468 instance_info
["namespace"],
473 paths
["kube_config"],
476 self
.log
.debug("upgrading: {}".format(command
))
480 # exec helm in a task
481 exec_task
= asyncio
.ensure_future(
482 coro_or_future
=self
._local
_async
_exec
(
483 command
=command
, raise_exception_on_error
=False, env
=env
486 # write status in another task
487 status_task
= asyncio
.ensure_future(
488 coro_or_future
=self
._store
_status
(
489 cluster_id
=cluster_uuid
,
490 kdu_instance
=kdu_instance
,
491 namespace
=instance_info
["namespace"],
498 # wait for execution task
499 await asyncio
.wait([exec_task
])
503 output
, rc
= exec_task
.result()
507 output
, rc
= await self
._local
_async
_exec
(
508 command
=command
, raise_exception_on_error
=False, env
=env
511 # remove temporal values yaml file
513 os
.remove(file_to_delete
)
516 await self
._store
_status
(
517 cluster_id
=cluster_uuid
,
518 kdu_instance
=kdu_instance
,
519 namespace
=instance_info
["namespace"],
527 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
529 raise K8sException(msg
)
532 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
534 # return new revision number
535 instance
= await self
.get_instance_info(
536 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
539 revision
= int(instance
.get("revision"))
540 self
.log
.debug("New revision: {}".format(revision
))
550 total_timeout
: float = 1800,
553 raise NotImplementedError("Method not implemented")
555 async def get_scale_count(
561 raise NotImplementedError("Method not implemented")
564 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
567 "rollback kdu_instance {} to revision {} from cluster {}".format(
568 kdu_instance
, revision
, cluster_uuid
573 self
.fs
.sync(from_path
=cluster_uuid
)
575 # look for instance to obtain namespace
576 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
577 if not instance_info
:
578 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
581 paths
, env
= self
._init
_paths
_env
(
582 cluster_name
=cluster_uuid
, create_if_not_exist
=True
586 self
.fs
.sync(from_path
=cluster_uuid
)
588 command
= self
._get
_rollback
_command
(
589 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
592 self
.log
.debug("rolling_back: {}".format(command
))
594 # exec helm in a task
595 exec_task
= asyncio
.ensure_future(
596 coro_or_future
=self
._local
_async
_exec
(
597 command
=command
, raise_exception_on_error
=False, env
=env
600 # write status in another task
601 status_task
= asyncio
.ensure_future(
602 coro_or_future
=self
._store
_status
(
603 cluster_id
=cluster_uuid
,
604 kdu_instance
=kdu_instance
,
605 namespace
=instance_info
["namespace"],
607 operation
="rollback",
612 # wait for execution task
613 await asyncio
.wait([exec_task
])
618 output
, rc
= exec_task
.result()
621 await self
._store
_status
(
622 cluster_id
=cluster_uuid
,
623 kdu_instance
=kdu_instance
,
624 namespace
=instance_info
["namespace"],
626 operation
="rollback",
632 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
634 raise K8sException(msg
)
637 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
639 # return new revision number
640 instance
= await self
.get_instance_info(
641 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
644 revision
= int(instance
.get("revision"))
645 self
.log
.debug("New revision: {}".format(revision
))
650 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
652 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
653 (this call should happen after all _terminate-config-primitive_ of the VNF
656 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
657 :param kdu_instance: unique name for the KDU instance to be deleted
658 :param kwargs: Additional parameters (None yet)
659 :return: True if successful
663 "uninstall kdu_instance {} from cluster {}".format(
664 kdu_instance
, cluster_uuid
669 self
.fs
.sync(from_path
=cluster_uuid
)
671 # look for instance to obtain namespace
672 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
673 if not instance_info
:
674 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
677 paths
, env
= self
._init
_paths
_env
(
678 cluster_name
=cluster_uuid
, create_if_not_exist
=True
682 self
.fs
.sync(from_path
=cluster_uuid
)
684 command
= self
._get
_uninstall
_command
(
685 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
687 output
, _rc
= await self
._local
_async
_exec
(
688 command
=command
, raise_exception_on_error
=True, env
=env
692 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
694 return self
._output
_to
_table
(output
)
696 async def instances_list(self
, cluster_uuid
: str) -> list:
698 returns a list of deployed releases in a cluster
700 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
704 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
707 self
.fs
.sync(from_path
=cluster_uuid
)
709 # execute internal command
710 result
= await self
._instances
_list
(cluster_uuid
)
713 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
717 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
718 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
719 for instance
in instances
:
720 if instance
.get("name") == kdu_instance
:
722 self
.log
.debug("Instance {} not found".format(kdu_instance
))
725 async def exec_primitive(
727 cluster_uuid
: str = None,
728 kdu_instance
: str = None,
729 primitive_name
: str = None,
730 timeout
: float = 300,
732 db_dict
: dict = None,
735 """Exec primitive (Juju action)
737 :param cluster_uuid: The UUID of the cluster or namespace:cluster
738 :param kdu_instance: The unique name of the KDU instance
739 :param primitive_name: Name of action that will be executed
740 :param timeout: Timeout for action execution
741 :param params: Dictionary of all the parameters needed for the action
742 :db_dict: Dictionary for any additional data
743 :param kwargs: Additional parameters (None yet)
745 :return: Returns the output of the action
748 "KDUs deployed with Helm don't support actions "
749 "different from rollback, upgrade and status"
752 async def get_services(
753 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
756 Returns a list of services defined for the specified kdu instance.
758 :param cluster_uuid: UUID of a K8s cluster known by OSM
759 :param kdu_instance: unique name for the KDU instance
760 :param namespace: K8s namespace used by the KDU instance
761 :return: If successful, it will return a list of services, Each service
762 can have the following data:
763 - `name` of the service
764 - `type` type of service in the k8 cluster
765 - `ports` List of ports offered by the service, for each port includes at least
767 - `cluster_ip` Internal ip to be used inside k8s cluster
768 - `external_ip` List of external ips (in case they are available)
772 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
773 cluster_uuid
, kdu_instance
778 paths
, env
= self
._init
_paths
_env
(
779 cluster_name
=cluster_uuid
, create_if_not_exist
=True
783 self
.fs
.sync(from_path
=cluster_uuid
)
785 # get list of services names for kdu
786 service_names
= await self
._get
_services
(
787 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
791 for service
in service_names
:
792 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
793 service_list
.append(service
)
796 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
800 async def get_service(
801 self
, cluster_uuid
: str, service_name
: str, namespace
: str
805 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
806 service_name
, namespace
, cluster_uuid
811 self
.fs
.sync(from_path
=cluster_uuid
)
813 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
816 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
820 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
) -> str:
822 This call would retrieve tha current state of a given KDU instance. It would be
823 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
824 values_ of the configuration parameters applied to a given instance. This call
825 would be based on the `status` call.
827 :param cluster_uuid: UUID of a K8s cluster known by OSM
828 :param kdu_instance: unique name for the KDU instance
829 :param kwargs: Additional parameters (None yet)
830 :return: If successful, it will return the following vector of arguments:
831 - K8s `namespace` in the cluster where the KDU lives
832 - `state` of the KDU instance. It can be:
839 - List of `resources` (objects) that this release consists of, sorted by kind,
840 and the status of those resources
841 - Last `deployment_time`.
845 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
846 cluster_uuid
, kdu_instance
851 self
.fs
.sync(from_path
=cluster_uuid
)
853 # get instance: needed to obtain namespace
854 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
855 for instance
in instances
:
856 if instance
.get("name") == kdu_instance
:
859 # instance does not exist
861 "Instance name: {} not found in cluster: {}".format(
862 kdu_instance
, cluster_uuid
866 status
= await self
._status
_kdu
(
867 cluster_id
=cluster_uuid
,
868 kdu_instance
=kdu_instance
,
869 namespace
=instance
["namespace"],
875 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
879 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
882 "inspect kdu_model values {} from (optional) repo: {}".format(
887 return await self
._exec
_inspect
_comand
(
888 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
891 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
894 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
897 return await self
._exec
_inspect
_comand
(
898 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
901 async def synchronize_repos(self
, cluster_uuid
: str):
903 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
905 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
906 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
908 local_repo_list
= await self
.repo_list(cluster_uuid
)
909 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
911 deleted_repo_list
= []
914 # iterate over the list of repos in the database that should be
915 # added if not present
916 for repo_name
, db_repo
in db_repo_dict
.items():
918 # check if it is already present
919 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
920 repo_id
= db_repo
.get("_id")
921 if curr_repo_url
!= db_repo
["url"]:
924 "repo {} url changed, delete and and again".format(
928 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
929 deleted_repo_list
.append(repo_id
)
932 self
.log
.debug("add repo {}".format(db_repo
["name"]))
934 cluster_uuid
, db_repo
["name"], db_repo
["url"]
936 added_repo_dict
[repo_id
] = db_repo
["name"]
937 except Exception as e
:
939 "Error adding repo id: {}, err_msg: {} ".format(
944 # Delete repos that are present but not in nbi_list
945 for repo_name
in local_repo_dict
:
946 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
947 self
.log
.debug("delete repo {}".format(repo_name
))
949 await self
.repo_remove(cluster_uuid
, repo_name
)
950 deleted_repo_list
.append(repo_name
)
951 except Exception as e
:
953 "Error deleting repo, name: {}, err_msg: {}".format(
958 return deleted_repo_list
, added_repo_dict
962 except Exception as e
:
963 # Do not raise errors synchronizing repos
964 self
.log
.error("Error synchronizing repos: {}".format(e
))
965 raise Exception("Error synchronizing repos: {}".format(e
))
967 def _get_db_repos_dict(self
, repo_ids
: list):
969 for repo_id
in repo_ids
:
970 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
971 db_repos_dict
[db_repo
["name"]] = db_repo
975 ####################################################################################
976 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
977 ####################################################################################
981 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
983 Creates and returns base cluster and kube dirs and returns them.
984 Also created helm3 dirs according to new directory specification, paths are
985 not returned but assigned to helm environment variables
987 :param cluster_name: cluster_name
988 :return: Dictionary with config_paths and dictionary with helm environment variables
992 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
994 Implements the helm version dependent cluster initialization
998 async def _instances_list(self
, cluster_id
):
1000 Implements the helm version dependent helm instances list
1004 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1006 Implements the helm version dependent method to obtain services from a helm instance
1010 async def _status_kdu(
1014 namespace
: str = None,
1015 show_error_log
: bool = False,
1016 return_text
: bool = False,
1019 Implements the helm version dependent method to obtain status of a helm instance
1023 def _get_install_command(
1035 Obtain command to be executed to delete the indicated instance
1039 def _get_upgrade_command(
1051 Obtain command to be executed to upgrade the indicated instance
1055 def _get_rollback_command(
1056 self
, kdu_instance
, namespace
, revision
, kubeconfig
1059 Obtain command to be executed to rollback the indicated instance
1063 def _get_uninstall_command(
1064 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1067 Obtain command to be executed to delete the indicated instance
1071 def _get_inspect_command(
1072 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1075 Obtain command to be executed to obtain information about the kdu
1079 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1081 Method call to uninstall cluster software for helm. This method is dependent
1083 For Helm v2 it will be called when Tiller must be uninstalled
1084 For Helm v3 it does nothing and does not need to be callled
1088 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1090 Obtains the cluster repos identifiers
1094 ####################################################################################
1095 ################################### P R I V A T E ##################################
1096 ####################################################################################
1100 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1101 if os
.path
.exists(filename
):
1104 msg
= "File {} does not exist".format(filename
)
1105 if exception_if_not_exists
:
1106 raise K8sException(msg
)
1109 def _remove_multiple_spaces(strobj
):
1110 strobj
= strobj
.strip()
1111 while " " in strobj
:
1112 strobj
= strobj
.replace(" ", " ")
1116 def _output_to_lines(output
: str) -> list:
1117 output_lines
= list()
1118 lines
= output
.splitlines(keepends
=False)
1122 output_lines
.append(line
)
1126 def _output_to_table(output
: str) -> list:
1127 output_table
= list()
1128 lines
= output
.splitlines(keepends
=False)
1130 line
= line
.replace("\t", " ")
1132 output_table
.append(line_list
)
1133 cells
= line
.split(sep
=" ")
1137 line_list
.append(cell
)
1141 def _parse_services(output
: str) -> list:
1142 lines
= output
.splitlines(keepends
=False)
1145 line
= line
.replace("\t", " ")
1146 cells
= line
.split(sep
=" ")
1147 if len(cells
) > 0 and cells
[0].startswith("service/"):
1148 elems
= cells
[0].split(sep
="/")
1150 services
.append(elems
[1])
1154 def _get_deep(dictionary
: dict, members
: tuple):
1159 value
= target
.get(m
)
1168 # find key:value in several lines
1170 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1171 for line
in p_lines
:
1173 if line
.startswith(p_key
+ ":"):
1174 parts
= line
.split(":")
1175 the_value
= parts
[1].strip()
1183 def _lower_keys_list(input_list
: list):
1185 Transform the keys in a list of dictionaries to lower case and returns a new list
1190 for dictionary
in input_list
:
1191 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1192 new_list
.append(new_dict
)
1195 async def _local_async_exec(
1198 raise_exception_on_error
: bool = False,
1199 show_error_log
: bool = True,
1200 encode_utf8
: bool = False,
1204 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1206 "Executing async local command: {}, env: {}".format(command
, env
)
1210 command
= shlex
.split(command
)
1212 environ
= os
.environ
.copy()
1217 process
= await asyncio
.create_subprocess_exec(
1219 stdout
=asyncio
.subprocess
.PIPE
,
1220 stderr
=asyncio
.subprocess
.PIPE
,
1224 # wait for command terminate
1225 stdout
, stderr
= await process
.communicate()
1227 return_code
= process
.returncode
1231 output
= stdout
.decode("utf-8").strip()
1232 # output = stdout.decode()
1234 output
= stderr
.decode("utf-8").strip()
1235 # output = stderr.decode()
1237 if return_code
!= 0 and show_error_log
:
1239 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1242 self
.log
.debug("Return code: {}".format(return_code
))
1244 if raise_exception_on_error
and return_code
!= 0:
1245 raise K8sException(output
)
1248 output
= output
.encode("utf-8").strip()
1249 output
= str(output
).replace("\\n", "\n")
1251 return output
, return_code
1253 except asyncio
.CancelledError
:
1255 except K8sException
:
1257 except Exception as e
:
1258 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1260 if raise_exception_on_error
:
1261 raise K8sException(e
) from e
1265 async def _local_async_exec_pipe(
1269 raise_exception_on_error
: bool = True,
1270 show_error_log
: bool = True,
1271 encode_utf8
: bool = False,
1275 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1276 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1277 command
= "{} | {}".format(command1
, command2
)
1279 "Executing async local command: {}, env: {}".format(command
, env
)
1283 command1
= shlex
.split(command1
)
1284 command2
= shlex
.split(command2
)
1286 environ
= os
.environ
.copy()
1291 read
, write
= os
.pipe()
1292 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1294 process_2
= await asyncio
.create_subprocess_exec(
1295 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1298 stdout
, stderr
= await process_2
.communicate()
1300 return_code
= process_2
.returncode
1304 output
= stdout
.decode("utf-8").strip()
1305 # output = stdout.decode()
1307 output
= stderr
.decode("utf-8").strip()
1308 # output = stderr.decode()
1310 if return_code
!= 0 and show_error_log
:
1312 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1315 self
.log
.debug("Return code: {}".format(return_code
))
1317 if raise_exception_on_error
and return_code
!= 0:
1318 raise K8sException(output
)
1321 output
= output
.encode("utf-8").strip()
1322 output
= str(output
).replace("\\n", "\n")
1324 return output
, return_code
1325 except asyncio
.CancelledError
:
1327 except K8sException
:
1329 except Exception as e
:
1330 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1332 if raise_exception_on_error
:
1333 raise K8sException(e
) from e
1337 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1339 Obtains the data of the specified service in the k8cluster.
1341 :param cluster_id: id of a K8s cluster known by OSM
1342 :param service_name: name of the K8s service in the specified namespace
1343 :param namespace: K8s namespace used by the KDU instance
1344 :return: If successful, it will return a service with the following data:
1345 - `name` of the service
1346 - `type` type of service in the k8 cluster
1347 - `ports` List of ports offered by the service, for each port includes at least
1348 name, port, protocol
1349 - `cluster_ip` Internal ip to be used inside k8s cluster
1350 - `external_ip` List of external ips (in case they are available)
1354 paths
, env
= self
._init
_paths
_env
(
1355 cluster_name
=cluster_id
, create_if_not_exist
=True
1358 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1359 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1362 output
, _rc
= await self
._local
_async
_exec
(
1363 command
=command
, raise_exception_on_error
=True, env
=env
1366 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1369 "name": service_name
,
1370 "type": self
._get
_deep
(data
, ("spec", "type")),
1371 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1372 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1374 if service
["type"] == "LoadBalancer":
1375 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1376 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1377 service
["external_ip"] = ip_list
1381 async def _exec_inspect_comand(
1382 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1385 Obtains information about a kdu, no cluster (no env)
1390 repo_str
= " --repo {}".format(repo_url
)
1392 idx
= kdu_model
.find("/")
1395 kdu_model
= kdu_model
[idx
:]
1398 if ":" in kdu_model
:
1399 parts
= kdu_model
.split(sep
=":")
1401 version
= "--version {}".format(str(parts
[1]))
1402 kdu_model
= parts
[0]
1404 full_command
= self
._get
_inspect
_command
(
1405 inspect_command
, kdu_model
, repo_str
, version
1407 output
, _rc
= await self
._local
_async
_exec
(
1408 command
=full_command
, encode_utf8
=True
1413 async def _store_status(
1418 namespace
: str = None,
1419 check_every
: float = 10,
1420 db_dict
: dict = None,
1421 run_once
: bool = False,
1425 await asyncio
.sleep(check_every
)
1426 detailed_status
= await self
._status
_kdu
(
1427 cluster_id
=cluster_id
,
1428 kdu_instance
=kdu_instance
,
1429 namespace
=namespace
,
1432 status
= detailed_status
.get("info").get("description")
1433 self
.log
.debug("KDU {} STATUS: {}.".format(kdu_instance
, status
))
1434 # write status to db
1435 result
= await self
.write_app_status_to_db(
1438 detailed_status
=str(detailed_status
),
1439 operation
=operation
,
1442 self
.log
.info("Error writing in database. Task exiting...")
1444 except asyncio
.CancelledError
:
1445 self
.log
.debug("Task cancelled")
1447 except Exception as e
:
1449 "_store_status exception: {}".format(str(e
)), exc_info
=True
1456 # params for use in -f file
1457 # returns values file option and filename (in order to delete it at the end)
1458 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1460 if params
and len(params
) > 0:
1461 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1463 def get_random_number():
1464 r
= random
.randrange(start
=1, stop
=99999999)
1472 value
= params
.get(key
)
1473 if "!!yaml" in str(value
):
1474 value
= yaml
.load(value
[7:])
1475 params2
[key
] = value
1477 values_file
= get_random_number() + ".yaml"
1478 with
open(values_file
, "w") as stream
:
1479 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1481 return "-f {}".format(values_file
), values_file
1485 # params for use in --set option
1487 def _params_to_set_option(params
: dict) -> str:
1489 if params
and len(params
) > 0:
1492 value
= params
.get(key
, None)
1493 if value
is not None:
1495 params_str
+= "--set "
1499 params_str
+= "{}={}".format(key
, value
)
1503 def generate_kdu_instance_name(**kwargs
):
1504 chart_name
= kwargs
["kdu_model"]
1505 # check embeded chart (file or dir)
1506 if chart_name
.startswith("/"):
1507 # extract file or directory name
1508 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1510 elif "://" in chart_name
:
1511 # extract last portion of URL
1512 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1515 for c
in chart_name
:
1516 if c
.isalpha() or c
.isnumeric():
1523 # if does not start with alpha character, prefix 'a'
1524 if not name
[0].isalpha():
1529 def get_random_number():
1530 r
= random
.randrange(start
=1, stop
=99999999)
1532 s
= s
.rjust(10, "0")
1535 name
= name
+ get_random_number()