2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
31 from uuid
import uuid4
33 from n2vc
.config
import EnvironConfig
34 from n2vc
.exceptions
import K8sException
35 from n2vc
.k8s_conn
import K8sConnector
38 class K8sHelmBaseConnector(K8sConnector
):
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
46 service_account
= "osm"
52 kubectl_command
: str = "/usr/bin/kubectl",
53 helm_command
: str = "/usr/bin/helm",
59 :param fs: file system for kubernetes and helm configuration
60 :param db: database object to write current operation status
61 :param kubectl_command: path to kubectl executable
62 :param helm_command: path to helm executable
64 :param on_update_db: callback called when k8s connector updates database
68 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
70 self
.log
.info("Initializing K8S Helm connector")
72 self
.config
= EnvironConfig()
73 # random numbers for release name generation
74 random
.seed(time
.time())
79 # exception if kubectl is not installed
80 self
.kubectl_command
= kubectl_command
81 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
83 # exception if helm is not installed
84 self
._helm
_command
= helm_command
85 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
87 # obtain stable repo url from config or apply default
88 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
89 if self
._stable
_repo
_url
== "None":
90 self
._stable
_repo
_url
= None
93 def _get_namespace_cluster_id(cluster_uuid
: str) -> (str, str):
95 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
96 cluster_id for backward compatibility
98 namespace
, _
, cluster_id
= cluster_uuid
.rpartition(":")
99 return namespace
, cluster_id
104 namespace
: str = "kube-system",
105 reuse_cluster_uuid
=None,
109 It prepares a given K8s cluster environment to run Charts
111 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
113 :param namespace: optional namespace to be used for helm. By default,
114 'kube-system' will be used
115 :param reuse_cluster_uuid: existing cluster uuid for reuse
116 :param kwargs: Additional parameters (None yet)
117 :return: uuid of the K8s cluster and True if connector has installed some
118 software in the cluster
119 (on error, an exception will be raised)
122 if reuse_cluster_uuid
:
123 namespace_
, cluster_id
= self
._get
_namespace
_cluster
_id
(reuse_cluster_uuid
)
124 namespace
= namespace_
or namespace
126 cluster_id
= str(uuid4())
127 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
130 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
133 paths
, env
= self
._init
_paths
_env
(
134 cluster_name
=cluster_id
, create_if_not_exist
=True
136 mode
= stat
.S_IRUSR | stat
.S_IWUSR
137 with
open(paths
["kube_config"], "w", mode
) as f
:
139 os
.chmod(paths
["kube_config"], 0o600)
141 # Code with initialization specific of helm version
142 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
144 # sync fs with local data
145 self
.fs
.reverse_sync(from_path
=cluster_id
)
147 self
.log
.info("Cluster {} initialized".format(cluster_id
))
149 return cluster_uuid
, n2vc_installed_sw
152 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
154 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
156 "Cluster {}, adding {} repository {}. URL: {}".format(
157 cluster_id
, repo_type
, name
, url
162 paths
, env
= self
._init
_paths
_env
(
163 cluster_name
=cluster_id
, create_if_not_exist
=True
167 self
.fs
.sync(from_path
=cluster_id
)
170 command
= "env KUBECONFIG={} {} repo update".format(
171 paths
["kube_config"], self
._helm
_command
173 self
.log
.debug("updating repo: {}".format(command
))
174 await self
._local
_async
_exec
(
175 command
=command
, raise_exception_on_error
=False, env
=env
178 # helm repo add name url
179 command
= "env KUBECONFIG={} {} repo add {} {}".format(
180 paths
["kube_config"], self
._helm
_command
, name
, url
182 self
.log
.debug("adding repo: {}".format(command
))
183 await self
._local
_async
_exec
(
184 command
=command
, raise_exception_on_error
=True, env
=env
188 self
.fs
.reverse_sync(from_path
=cluster_id
)
190 async def repo_list(self
, cluster_uuid
: str) -> list:
192 Get the list of registered repositories
194 :return: list of registered repositories: [ (name, url) .... ]
197 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
198 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
201 paths
, env
= self
._init
_paths
_env
(
202 cluster_name
=cluster_id
, create_if_not_exist
=True
206 self
.fs
.sync(from_path
=cluster_id
)
208 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
209 paths
["kube_config"], self
._helm
_command
212 # Set exception to false because if there are no repos just want an empty list
213 output
, _rc
= await self
._local
_async
_exec
(
214 command
=command
, raise_exception_on_error
=False, env
=env
218 self
.fs
.reverse_sync(from_path
=cluster_id
)
221 if output
and len(output
) > 0:
222 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
223 # unify format between helm2 and helm3 setting all keys lowercase
224 return self
._lower
_keys
_list
(repos
)
230 async def repo_remove(self
, cluster_uuid
: str, name
: str):
232 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
233 self
.log
.debug("remove {} repositories for cluster {}".format(name
, cluster_id
))
236 paths
, env
= self
._init
_paths
_env
(
237 cluster_name
=cluster_id
, create_if_not_exist
=True
241 self
.fs
.sync(from_path
=cluster_id
)
243 command
= "env KUBECONFIG={} {} repo remove {}".format(
244 paths
["kube_config"], self
._helm
_command
, name
246 await self
._local
_async
_exec
(
247 command
=command
, raise_exception_on_error
=True, env
=env
251 self
.fs
.reverse_sync(from_path
=cluster_id
)
257 uninstall_sw
: bool = False,
262 Resets the Kubernetes cluster by removing the helm deployment that represents it.
264 :param cluster_uuid: The UUID of the cluster to reset
265 :param force: Boolean to force the reset
266 :param uninstall_sw: Boolean to force the reset
267 :param kwargs: Additional parameters (None yet)
268 :return: Returns True if successful or raises an exception.
270 namespace
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
272 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
273 cluster_id
, uninstall_sw
278 self
.fs
.sync(from_path
=cluster_id
)
280 # uninstall releases if needed.
282 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
283 if len(releases
) > 0:
287 kdu_instance
= r
.get("name")
288 chart
= r
.get("chart")
290 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
292 await self
.uninstall(
293 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
295 except Exception as e
:
296 # will not raise exception as it was found
297 # that in some cases of previously installed helm releases it
300 "Error uninstalling release {}: {}".format(
306 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
310 False # Allow to remove k8s cluster without removing Tiller
314 await self
._uninstall
_sw
(cluster_id
, namespace
)
316 # delete cluster directory
317 self
.log
.debug("Removing directory {}".format(cluster_id
))
318 self
.fs
.file_delete(cluster_id
, ignore_non_exist
=True)
319 # Remove also local directorio if still exist
320 direct
= self
.fs
.path
+ "/" + cluster_id
321 shutil
.rmtree(direct
, ignore_errors
=True)
325 async def _install_impl(
333 timeout
: float = 300,
335 db_dict
: dict = None,
336 kdu_name
: str = None,
337 namespace
: str = None,
340 paths
, env
= self
._init
_paths
_env
(
341 cluster_name
=cluster_id
, create_if_not_exist
=True
345 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
346 cluster_id
=cluster_id
, params
=params
352 parts
= kdu_model
.split(sep
=":")
354 version
= str(parts
[1])
357 command
= self
._get
_install
_command
(
365 paths
["kube_config"],
368 self
.log
.debug("installing: {}".format(command
))
371 # exec helm in a task
372 exec_task
= asyncio
.ensure_future(
373 coro_or_future
=self
._local
_async
_exec
(
374 command
=command
, raise_exception_on_error
=False, env
=env
378 # write status in another task
379 status_task
= asyncio
.ensure_future(
380 coro_or_future
=self
._store
_status
(
381 cluster_id
=cluster_id
,
382 kdu_instance
=kdu_instance
,
390 # wait for execution task
391 await asyncio
.wait([exec_task
])
396 output
, rc
= exec_task
.result()
400 output
, rc
= await self
._local
_async
_exec
(
401 command
=command
, raise_exception_on_error
=False, env
=env
404 # remove temporal values yaml file
406 os
.remove(file_to_delete
)
409 await self
._store
_status
(
410 cluster_id
=cluster_id
,
411 kdu_instance
=kdu_instance
,
420 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
422 raise K8sException(msg
)
428 kdu_model
: str = None,
430 timeout
: float = 300,
432 db_dict
: dict = None,
434 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
435 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_id
))
438 self
.fs
.sync(from_path
=cluster_id
)
440 # look for instance to obtain namespace
441 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
442 if not instance_info
:
443 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
446 paths
, env
= self
._init
_paths
_env
(
447 cluster_name
=cluster_id
, create_if_not_exist
=True
451 self
.fs
.sync(from_path
=cluster_id
)
454 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
455 cluster_id
=cluster_id
, params
=params
461 parts
= kdu_model
.split(sep
=":")
463 version
= str(parts
[1])
466 command
= self
._get
_upgrade
_command
(
469 instance_info
["namespace"],
474 paths
["kube_config"],
477 self
.log
.debug("upgrading: {}".format(command
))
481 # exec helm in a task
482 exec_task
= asyncio
.ensure_future(
483 coro_or_future
=self
._local
_async
_exec
(
484 command
=command
, raise_exception_on_error
=False, env
=env
487 # write status in another task
488 status_task
= asyncio
.ensure_future(
489 coro_or_future
=self
._store
_status
(
490 cluster_id
=cluster_id
,
491 kdu_instance
=kdu_instance
,
492 namespace
=instance_info
["namespace"],
499 # wait for execution task
500 await asyncio
.wait([exec_task
])
504 output
, rc
= exec_task
.result()
508 output
, rc
= await self
._local
_async
_exec
(
509 command
=command
, raise_exception_on_error
=False, env
=env
512 # remove temporal values yaml file
514 os
.remove(file_to_delete
)
517 await self
._store
_status
(
518 cluster_id
=cluster_id
,
519 kdu_instance
=kdu_instance
,
520 namespace
=instance_info
["namespace"],
528 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
530 raise K8sException(msg
)
533 self
.fs
.reverse_sync(from_path
=cluster_id
)
535 # return new revision number
536 instance
= await self
.get_instance_info(
537 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
540 revision
= int(instance
.get("revision"))
541 self
.log
.debug("New revision: {}".format(revision
))
551 total_timeout
: float = 1800,
554 raise NotImplementedError("Method not implemented")
556 async def get_scale_count(
562 raise NotImplementedError("Method not implemented")
565 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
568 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
570 "rollback kdu_instance {} to revision {} from cluster {}".format(
571 kdu_instance
, revision
, cluster_id
576 self
.fs
.sync(from_path
=cluster_id
)
578 # look for instance to obtain namespace
579 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
580 if not instance_info
:
581 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
584 paths
, env
= self
._init
_paths
_env
(
585 cluster_name
=cluster_id
, create_if_not_exist
=True
589 self
.fs
.sync(from_path
=cluster_id
)
591 command
= self
._get
_rollback
_command
(
592 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
595 self
.log
.debug("rolling_back: {}".format(command
))
597 # exec helm in a task
598 exec_task
= asyncio
.ensure_future(
599 coro_or_future
=self
._local
_async
_exec
(
600 command
=command
, raise_exception_on_error
=False, env
=env
603 # write status in another task
604 status_task
= asyncio
.ensure_future(
605 coro_or_future
=self
._store
_status
(
606 cluster_id
=cluster_id
,
607 kdu_instance
=kdu_instance
,
608 namespace
=instance_info
["namespace"],
610 operation
="rollback",
615 # wait for execution task
616 await asyncio
.wait([exec_task
])
621 output
, rc
= exec_task
.result()
624 await self
._store
_status
(
625 cluster_id
=cluster_id
,
626 kdu_instance
=kdu_instance
,
627 namespace
=instance_info
["namespace"],
629 operation
="rollback",
635 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
637 raise K8sException(msg
)
640 self
.fs
.reverse_sync(from_path
=cluster_id
)
642 # return new revision number
643 instance
= await self
.get_instance_info(
644 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
647 revision
= int(instance
.get("revision"))
648 self
.log
.debug("New revision: {}".format(revision
))
653 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
655 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
656 (this call should happen after all _terminate-config-primitive_ of the VNF
659 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
660 :param kdu_instance: unique name for the KDU instance to be deleted
661 :param kwargs: Additional parameters (None yet)
662 :return: True if successful
665 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
667 "uninstall kdu_instance {} from cluster {}".format(kdu_instance
, cluster_id
)
671 self
.fs
.sync(from_path
=cluster_id
)
673 # look for instance to obtain namespace
674 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
675 if not instance_info
:
676 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
679 paths
, env
= self
._init
_paths
_env
(
680 cluster_name
=cluster_id
, create_if_not_exist
=True
684 self
.fs
.sync(from_path
=cluster_id
)
686 command
= self
._get
_uninstall
_command
(
687 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
689 output
, _rc
= await self
._local
_async
_exec
(
690 command
=command
, raise_exception_on_error
=True, env
=env
694 self
.fs
.reverse_sync(from_path
=cluster_id
)
696 return self
._output
_to
_table
(output
)
698 async def instances_list(self
, cluster_uuid
: str) -> list:
700 returns a list of deployed releases in a cluster
702 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
706 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
707 self
.log
.debug("list releases for cluster {}".format(cluster_id
))
710 self
.fs
.sync(from_path
=cluster_id
)
712 # execute internal command
713 result
= await self
._instances
_list
(cluster_id
)
716 self
.fs
.reverse_sync(from_path
=cluster_id
)
720 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
721 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
722 for instance
in instances
:
723 if instance
.get("name") == kdu_instance
:
725 self
.log
.debug("Instance {} not found".format(kdu_instance
))
728 async def exec_primitive(
730 cluster_uuid
: str = None,
731 kdu_instance
: str = None,
732 primitive_name
: str = None,
733 timeout
: float = 300,
735 db_dict
: dict = None,
738 """Exec primitive (Juju action)
740 :param cluster_uuid: The UUID of the cluster or namespace:cluster
741 :param kdu_instance: The unique name of the KDU instance
742 :param primitive_name: Name of action that will be executed
743 :param timeout: Timeout for action execution
744 :param params: Dictionary of all the parameters needed for the action
745 :db_dict: Dictionary for any additional data
746 :param kwargs: Additional parameters (None yet)
748 :return: Returns the output of the action
751 "KDUs deployed with Helm don't support actions "
752 "different from rollback, upgrade and status"
755 async def get_services(
756 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
759 Returns a list of services defined for the specified kdu instance.
761 :param cluster_uuid: UUID of a K8s cluster known by OSM
762 :param kdu_instance: unique name for the KDU instance
763 :param namespace: K8s namespace used by the KDU instance
764 :return: If successful, it will return a list of services, Each service
765 can have the following data:
766 - `name` of the service
767 - `type` type of service in the k8 cluster
768 - `ports` List of ports offered by the service, for each port includes at least
770 - `cluster_ip` Internal ip to be used inside k8s cluster
771 - `external_ip` List of external ips (in case they are available)
774 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
776 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
777 cluster_uuid
, kdu_instance
782 paths
, env
= self
._init
_paths
_env
(
783 cluster_name
=cluster_id
, create_if_not_exist
=True
787 self
.fs
.sync(from_path
=cluster_id
)
789 # get list of services names for kdu
790 service_names
= await self
._get
_services
(
791 cluster_id
, kdu_instance
, namespace
, paths
["kube_config"]
795 for service
in service_names
:
796 service
= await self
._get
_service
(cluster_id
, service
, namespace
)
797 service_list
.append(service
)
800 self
.fs
.reverse_sync(from_path
=cluster_id
)
804 async def get_service(
805 self
, cluster_uuid
: str, service_name
: str, namespace
: str
809 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
810 service_name
, namespace
, cluster_uuid
814 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
817 self
.fs
.sync(from_path
=cluster_id
)
819 service
= await self
._get
_service
(cluster_id
, service_name
, namespace
)
822 self
.fs
.reverse_sync(from_path
=cluster_id
)
826 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
) -> str:
828 This call would retrieve tha current state of a given KDU instance. It would be
829 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
830 values_ of the configuration parameters applied to a given instance. This call
831 would be based on the `status` call.
833 :param cluster_uuid: UUID of a K8s cluster known by OSM
834 :param kdu_instance: unique name for the KDU instance
835 :param kwargs: Additional parameters (None yet)
836 :return: If successful, it will return the following vector of arguments:
837 - K8s `namespace` in the cluster where the KDU lives
838 - `state` of the KDU instance. It can be:
845 - List of `resources` (objects) that this release consists of, sorted by kind,
846 and the status of those resources
847 - Last `deployment_time`.
851 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
852 cluster_uuid
, kdu_instance
856 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
859 self
.fs
.sync(from_path
=cluster_id
)
861 # get instance: needed to obtain namespace
862 instances
= await self
._instances
_list
(cluster_id
=cluster_id
)
863 for instance
in instances
:
864 if instance
.get("name") == kdu_instance
:
867 # instance does not exist
869 "Instance name: {} not found in cluster: {}".format(
870 kdu_instance
, cluster_id
874 status
= await self
._status
_kdu
(
875 cluster_id
=cluster_id
,
876 kdu_instance
=kdu_instance
,
877 namespace
=instance
["namespace"],
883 self
.fs
.reverse_sync(from_path
=cluster_id
)
887 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
890 "inspect kdu_model values {} from (optional) repo: {}".format(
895 return await self
._exec
_inspect
_comand
(
896 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
899 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
902 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
905 return await self
._exec
_inspect
_comand
(
906 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
909 async def synchronize_repos(self
, cluster_uuid
: str):
911 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
913 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
914 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
916 local_repo_list
= await self
.repo_list(cluster_uuid
)
917 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
919 deleted_repo_list
= []
922 # iterate over the list of repos in the database that should be
923 # added if not present
924 for repo_name
, db_repo
in db_repo_dict
.items():
926 # check if it is already present
927 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
928 repo_id
= db_repo
.get("_id")
929 if curr_repo_url
!= db_repo
["url"]:
932 "repo {} url changed, delete and and again".format(
936 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
937 deleted_repo_list
.append(repo_id
)
940 self
.log
.debug("add repo {}".format(db_repo
["name"]))
942 cluster_uuid
, db_repo
["name"], db_repo
["url"]
944 added_repo_dict
[repo_id
] = db_repo
["name"]
945 except Exception as e
:
947 "Error adding repo id: {}, err_msg: {} ".format(
952 # Delete repos that are present but not in nbi_list
953 for repo_name
in local_repo_dict
:
954 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
955 self
.log
.debug("delete repo {}".format(repo_name
))
957 await self
.repo_remove(cluster_uuid
, repo_name
)
958 deleted_repo_list
.append(repo_name
)
959 except Exception as e
:
961 "Error deleting repo, name: {}, err_msg: {}".format(
966 return deleted_repo_list
, added_repo_dict
970 except Exception as e
:
971 # Do not raise errors synchronizing repos
972 self
.log
.error("Error synchronizing repos: {}".format(e
))
973 raise Exception("Error synchronizing repos: {}".format(e
))
975 def _get_db_repos_dict(self
, repo_ids
: list):
977 for repo_id
in repo_ids
:
978 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
979 db_repos_dict
[db_repo
["name"]] = db_repo
983 ####################################################################################
984 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
985 ####################################################################################
989 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
991 Creates and returns base cluster and kube dirs and returns them.
992 Also created helm3 dirs according to new directory specification, paths are
993 not returned but assigned to helm environment variables
995 :param cluster_name: cluster_name
996 :return: Dictionary with config_paths and dictionary with helm environment variables
1000 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1002 Implements the helm version dependent cluster initialization
1006 async def _instances_list(self
, cluster_id
):
1008 Implements the helm version dependent helm instances list
1012 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1014 Implements the helm version dependent method to obtain services from a helm instance
1018 async def _status_kdu(
1022 namespace
: str = None,
1023 show_error_log
: bool = False,
1024 return_text
: bool = False,
1027 Implements the helm version dependent method to obtain status of a helm instance
1031 def _get_install_command(
1043 Obtain command to be executed to delete the indicated instance
1047 def _get_upgrade_command(
1059 Obtain command to be executed to upgrade the indicated instance
1063 def _get_rollback_command(
1064 self
, kdu_instance
, namespace
, revision
, kubeconfig
1067 Obtain command to be executed to rollback the indicated instance
1071 def _get_uninstall_command(
1072 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1075 Obtain command to be executed to delete the indicated instance
1079 def _get_inspect_command(
1080 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1083 Obtain command to be executed to obtain information about the kdu
1087 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1089 Method call to uninstall cluster software for helm. This method is dependent
1091 For Helm v2 it will be called when Tiller must be uninstalled
1092 For Helm v3 it does nothing and does not need to be callled
1096 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1098 Obtains the cluster repos identifiers
1102 ####################################################################################
1103 ################################### P R I V A T E ##################################
1104 ####################################################################################
1108 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1109 if os
.path
.exists(filename
):
1112 msg
= "File {} does not exist".format(filename
)
1113 if exception_if_not_exists
:
1114 raise K8sException(msg
)
1117 def _remove_multiple_spaces(strobj
):
1118 strobj
= strobj
.strip()
1119 while " " in strobj
:
1120 strobj
= strobj
.replace(" ", " ")
1124 def _output_to_lines(output
: str) -> list:
1125 output_lines
= list()
1126 lines
= output
.splitlines(keepends
=False)
1130 output_lines
.append(line
)
1134 def _output_to_table(output
: str) -> list:
1135 output_table
= list()
1136 lines
= output
.splitlines(keepends
=False)
1138 line
= line
.replace("\t", " ")
1140 output_table
.append(line_list
)
1141 cells
= line
.split(sep
=" ")
1145 line_list
.append(cell
)
1149 def _parse_services(output
: str) -> list:
1150 lines
= output
.splitlines(keepends
=False)
1153 line
= line
.replace("\t", " ")
1154 cells
= line
.split(sep
=" ")
1155 if len(cells
) > 0 and cells
[0].startswith("service/"):
1156 elems
= cells
[0].split(sep
="/")
1158 services
.append(elems
[1])
1162 def _get_deep(dictionary
: dict, members
: tuple):
1167 value
= target
.get(m
)
1176 # find key:value in several lines
1178 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1179 for line
in p_lines
:
1181 if line
.startswith(p_key
+ ":"):
1182 parts
= line
.split(":")
1183 the_value
= parts
[1].strip()
1191 def _lower_keys_list(input_list
: list):
1193 Transform the keys in a list of dictionaries to lower case and returns a new list
1198 for dictionary
in input_list
:
1199 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1200 new_list
.append(new_dict
)
1203 async def _local_async_exec(
1206 raise_exception_on_error
: bool = False,
1207 show_error_log
: bool = True,
1208 encode_utf8
: bool = False,
1212 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1214 "Executing async local command: {}, env: {}".format(command
, env
)
1218 command
= shlex
.split(command
)
1220 environ
= os
.environ
.copy()
1225 process
= await asyncio
.create_subprocess_exec(
1227 stdout
=asyncio
.subprocess
.PIPE
,
1228 stderr
=asyncio
.subprocess
.PIPE
,
1232 # wait for command terminate
1233 stdout
, stderr
= await process
.communicate()
1235 return_code
= process
.returncode
1239 output
= stdout
.decode("utf-8").strip()
1240 # output = stdout.decode()
1242 output
= stderr
.decode("utf-8").strip()
1243 # output = stderr.decode()
1245 if return_code
!= 0 and show_error_log
:
1247 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1250 self
.log
.debug("Return code: {}".format(return_code
))
1252 if raise_exception_on_error
and return_code
!= 0:
1253 raise K8sException(output
)
1256 output
= output
.encode("utf-8").strip()
1257 output
= str(output
).replace("\\n", "\n")
1259 return output
, return_code
1261 except asyncio
.CancelledError
:
1263 except K8sException
:
1265 except Exception as e
:
1266 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1268 if raise_exception_on_error
:
1269 raise K8sException(e
) from e
1273 async def _local_async_exec_pipe(
1277 raise_exception_on_error
: bool = True,
1278 show_error_log
: bool = True,
1279 encode_utf8
: bool = False,
1283 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1284 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1285 command
= "{} | {}".format(command1
, command2
)
1287 "Executing async local command: {}, env: {}".format(command
, env
)
1291 command1
= shlex
.split(command1
)
1292 command2
= shlex
.split(command2
)
1294 environ
= os
.environ
.copy()
1299 read
, write
= os
.pipe()
1300 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1302 process_2
= await asyncio
.create_subprocess_exec(
1303 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1306 stdout
, stderr
= await process_2
.communicate()
1308 return_code
= process_2
.returncode
1312 output
= stdout
.decode("utf-8").strip()
1313 # output = stdout.decode()
1315 output
= stderr
.decode("utf-8").strip()
1316 # output = stderr.decode()
1318 if return_code
!= 0 and show_error_log
:
1320 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1323 self
.log
.debug("Return code: {}".format(return_code
))
1325 if raise_exception_on_error
and return_code
!= 0:
1326 raise K8sException(output
)
1329 output
= output
.encode("utf-8").strip()
1330 output
= str(output
).replace("\\n", "\n")
1332 return output
, return_code
1333 except asyncio
.CancelledError
:
1335 except K8sException
:
1337 except Exception as e
:
1338 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1340 if raise_exception_on_error
:
1341 raise K8sException(e
) from e
1345 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1347 Obtains the data of the specified service in the k8cluster.
1349 :param cluster_id: id of a K8s cluster known by OSM
1350 :param service_name: name of the K8s service in the specified namespace
1351 :param namespace: K8s namespace used by the KDU instance
1352 :return: If successful, it will return a service with the following data:
1353 - `name` of the service
1354 - `type` type of service in the k8 cluster
1355 - `ports` List of ports offered by the service, for each port includes at least
1356 name, port, protocol
1357 - `cluster_ip` Internal ip to be used inside k8s cluster
1358 - `external_ip` List of external ips (in case they are available)
1362 paths
, env
= self
._init
_paths
_env
(
1363 cluster_name
=cluster_id
, create_if_not_exist
=True
1366 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1367 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1370 output
, _rc
= await self
._local
_async
_exec
(
1371 command
=command
, raise_exception_on_error
=True, env
=env
1374 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1377 "name": service_name
,
1378 "type": self
._get
_deep
(data
, ("spec", "type")),
1379 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1380 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1382 if service
["type"] == "LoadBalancer":
1383 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1384 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1385 service
["external_ip"] = ip_list
1389 async def _exec_inspect_comand(
1390 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1393 Obtains information about a kdu, no cluster (no env)
1398 repo_str
= " --repo {}".format(repo_url
)
1400 idx
= kdu_model
.find("/")
1403 kdu_model
= kdu_model
[idx
:]
1406 if ":" in kdu_model
:
1407 parts
= kdu_model
.split(sep
=":")
1409 version
= "--version {}".format(str(parts
[1]))
1410 kdu_model
= parts
[0]
1412 full_command
= self
._get
_inspect
_command
(
1413 inspect_command
, kdu_model
, repo_str
, version
1415 output
, _rc
= await self
._local
_async
_exec
(
1416 command
=full_command
, encode_utf8
=True
1421 async def _store_status(
1426 namespace
: str = None,
1427 check_every
: float = 10,
1428 db_dict
: dict = None,
1429 run_once
: bool = False,
1433 await asyncio
.sleep(check_every
)
1434 detailed_status
= await self
._status
_kdu
(
1435 cluster_id
=cluster_id
,
1436 kdu_instance
=kdu_instance
,
1437 namespace
=namespace
,
1440 status
= detailed_status
.get("info").get("description")
1441 self
.log
.debug("KDU {} STATUS: {}.".format(kdu_instance
, status
))
1442 # write status to db
1443 result
= await self
.write_app_status_to_db(
1446 detailed_status
=str(detailed_status
),
1447 operation
=operation
,
1450 self
.log
.info("Error writing in database. Task exiting...")
1452 except asyncio
.CancelledError
:
1453 self
.log
.debug("Task cancelled")
1455 except Exception as e
:
1457 "_store_status exception: {}".format(str(e
)), exc_info
=True
1464 # params for use in -f file
1465 # returns values file option and filename (in order to delete it at the end)
1466 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1468 if params
and len(params
) > 0:
1469 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1471 def get_random_number():
1472 r
= random
.randrange(start
=1, stop
=99999999)
1480 value
= params
.get(key
)
1481 if "!!yaml" in str(value
):
1482 value
= yaml
.load(value
[7:])
1483 params2
[key
] = value
1485 values_file
= get_random_number() + ".yaml"
1486 with
open(values_file
, "w") as stream
:
1487 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1489 return "-f {}".format(values_file
), values_file
1493 # params for use in --set option
1495 def _params_to_set_option(params
: dict) -> str:
1497 if params
and len(params
) > 0:
1500 value
= params
.get(key
, None)
1501 if value
is not None:
1503 params_str
+= "--set "
1507 params_str
+= "{}={}".format(key
, value
)
1511 def generate_kdu_instance_name(**kwargs
):
1512 chart_name
= kwargs
["kdu_model"]
1513 # check embeded chart (file or dir)
1514 if chart_name
.startswith("/"):
1515 # extract file or directory name
1516 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1518 elif "://" in chart_name
:
1519 # extract last portion of URL
1520 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1523 for c
in chart_name
:
1524 if c
.isalpha() or c
.isnumeric():
1531 # if does not start with alpha character, prefix 'a'
1532 if not name
[0].isalpha():
1537 def get_random_number():
1538 r
= random
.randrange(start
=1, stop
=99999999)
1540 s
= s
.rjust(10, "0")
1543 name
= name
+ get_random_number()