2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
31 from uuid
import uuid4
33 from n2vc
.config
import EnvironConfig
34 from n2vc
.exceptions
import K8sException
35 from n2vc
.k8s_conn
import K8sConnector
38 class K8sHelmBaseConnector(K8sConnector
):
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
46 service_account
= "osm"
52 kubectl_command
: str = "/usr/bin/kubectl",
53 helm_command
: str = "/usr/bin/helm",
59 :param fs: file system for kubernetes and helm configuration
60 :param db: database object to write current operation status
61 :param kubectl_command: path to kubectl executable
62 :param helm_command: path to helm executable
64 :param on_update_db: callback called when k8s connector updates database
68 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
70 self
.log
.info("Initializing K8S Helm connector")
72 self
.config
= EnvironConfig()
73 # random numbers for release name generation
74 random
.seed(time
.time())
79 # exception if kubectl is not installed
80 self
.kubectl_command
= kubectl_command
81 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
83 # exception if helm is not installed
84 self
._helm
_command
= helm_command
85 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
87 # obtain stable repo url from config or apply default
88 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
89 if self
._stable
_repo
_url
== "None":
90 self
._stable
_repo
_url
= None
92 def _get_namespace(self
, cluster_uuid
: str) -> str:
94 Obtains the namespace used by the cluster with the uuid passed by argument
96 param: cluster_uuid: cluster's uuid
99 # first, obtain the cluster corresponding to the uuid passed by argument
100 k8scluster
= self
.db
.get_one(
101 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
103 return k8scluster
.get("namespace")
108 namespace
: str = "kube-system",
109 reuse_cluster_uuid
=None,
113 It prepares a given K8s cluster environment to run Charts
115 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
117 :param namespace: optional namespace to be used for helm. By default,
118 'kube-system' will be used
119 :param reuse_cluster_uuid: existing cluster uuid for reuse
120 :param kwargs: Additional parameters (None yet)
121 :return: uuid of the K8s cluster and True if connector has installed some
122 software in the cluster
123 (on error, an exception will be raised)
126 if reuse_cluster_uuid
:
127 cluster_id
= reuse_cluster_uuid
129 cluster_id
= str(uuid4())
132 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
135 paths
, env
= self
._init
_paths
_env
(
136 cluster_name
=cluster_id
, create_if_not_exist
=True
138 mode
= stat
.S_IRUSR | stat
.S_IWUSR
139 with
open(paths
["kube_config"], "w", mode
) as f
:
141 os
.chmod(paths
["kube_config"], 0o600)
143 # Code with initialization specific of helm version
144 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
146 # sync fs with local data
147 self
.fs
.reverse_sync(from_path
=cluster_id
)
149 self
.log
.info("Cluster {} initialized".format(cluster_id
))
151 return cluster_id
, n2vc_installed_sw
158 repo_type
: str = "chart",
161 password
: str = None,
164 "Cluster {}, adding {} repository {}. URL: {}".format(
165 cluster_uuid
, repo_type
, name
, url
170 paths
, env
= self
._init
_paths
_env
(
171 cluster_name
=cluster_uuid
, create_if_not_exist
=True
175 self
.fs
.sync(from_path
=cluster_uuid
)
178 command
= "env KUBECONFIG={} {} repo update".format(
179 paths
["kube_config"], self
._helm
_command
181 self
.log
.debug("updating repo: {}".format(command
))
182 await self
._local
_async
_exec
(
183 command
=command
, raise_exception_on_error
=False, env
=env
186 # helm repo add name url
187 command
= ("env KUBECONFIG={} {} repo add {} {}").format(
188 paths
["kube_config"], self
._helm
_command
, name
, url
192 temp_cert_file
= os
.path
.join(
193 self
.fs
.path
, "{}/helmcerts/".format(cluster_id
), "temp.crt"
195 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
196 with
open(temp_cert_file
, "w") as the_cert
:
198 command
+= " --ca-file {}".format(temp_cert_file
)
201 command
+= " --username={}".format(user
)
204 command
+= " --password={}".format(password
)
206 self
.log
.debug("adding repo: {}".format(command
))
207 await self
._local
_async
_exec
(
208 command
=command
, raise_exception_on_error
=True, env
=env
212 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
214 async def repo_list(self
, cluster_uuid
: str) -> list:
216 Get the list of registered repositories
218 :return: list of registered repositories: [ (name, url) .... ]
221 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
224 paths
, env
= self
._init
_paths
_env
(
225 cluster_name
=cluster_uuid
, create_if_not_exist
=True
229 self
.fs
.sync(from_path
=cluster_uuid
)
231 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
232 paths
["kube_config"], self
._helm
_command
235 # Set exception to false because if there are no repos just want an empty list
236 output
, _rc
= await self
._local
_async
_exec
(
237 command
=command
, raise_exception_on_error
=False, env
=env
241 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
244 if output
and len(output
) > 0:
245 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
246 # unify format between helm2 and helm3 setting all keys lowercase
247 return self
._lower
_keys
_list
(repos
)
253 async def repo_remove(self
, cluster_uuid
: str, name
: str):
255 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
259 paths
, env
= self
._init
_paths
_env
(
260 cluster_name
=cluster_uuid
, create_if_not_exist
=True
264 self
.fs
.sync(from_path
=cluster_uuid
)
266 command
= "env KUBECONFIG={} {} repo remove {}".format(
267 paths
["kube_config"], self
._helm
_command
, name
269 await self
._local
_async
_exec
(
270 command
=command
, raise_exception_on_error
=True, env
=env
274 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
280 uninstall_sw
: bool = False,
285 Resets the Kubernetes cluster by removing the helm deployment that represents it.
287 :param cluster_uuid: The UUID of the cluster to reset
288 :param force: Boolean to force the reset
289 :param uninstall_sw: Boolean to force the reset
290 :param kwargs: Additional parameters (None yet)
291 :return: Returns True if successful or raises an exception.
293 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
295 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
296 cluster_uuid
, uninstall_sw
301 self
.fs
.sync(from_path
=cluster_uuid
)
303 # uninstall releases if needed.
305 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
306 if len(releases
) > 0:
310 kdu_instance
= r
.get("name")
311 chart
= r
.get("chart")
313 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
315 await self
.uninstall(
316 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
318 except Exception as e
:
319 # will not raise exception as it was found
320 # that in some cases of previously installed helm releases it
323 "Error uninstalling release {}: {}".format(
329 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
330 ).format(cluster_uuid
)
333 False # Allow to remove k8s cluster without removing Tiller
337 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
339 # delete cluster directory
340 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
341 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
342 # Remove also local directorio if still exist
343 direct
= self
.fs
.path
+ "/" + cluster_uuid
344 shutil
.rmtree(direct
, ignore_errors
=True)
348 async def _install_impl(
356 timeout
: float = 300,
358 db_dict
: dict = None,
359 kdu_name
: str = None,
360 namespace
: str = None,
363 paths
, env
= self
._init
_paths
_env
(
364 cluster_name
=cluster_id
, create_if_not_exist
=True
368 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
369 cluster_id
=cluster_id
, params
=params
373 kdu_model
, version
= self
._split
_version
(kdu_model
)
375 command
= self
._get
_install
_command
(
383 paths
["kube_config"],
386 self
.log
.debug("installing: {}".format(command
))
389 # exec helm in a task
390 exec_task
= asyncio
.ensure_future(
391 coro_or_future
=self
._local
_async
_exec
(
392 command
=command
, raise_exception_on_error
=False, env
=env
396 # write status in another task
397 status_task
= asyncio
.ensure_future(
398 coro_or_future
=self
._store
_status
(
399 cluster_id
=cluster_id
,
400 kdu_instance
=kdu_instance
,
408 # wait for execution task
409 await asyncio
.wait([exec_task
])
414 output
, rc
= exec_task
.result()
418 output
, rc
= await self
._local
_async
_exec
(
419 command
=command
, raise_exception_on_error
=False, env
=env
422 # remove temporal values yaml file
424 os
.remove(file_to_delete
)
427 await self
._store
_status
(
428 cluster_id
=cluster_id
,
429 kdu_instance
=kdu_instance
,
438 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
440 raise K8sException(msg
)
446 kdu_model
: str = None,
448 timeout
: float = 300,
450 db_dict
: dict = None,
452 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
455 self
.fs
.sync(from_path
=cluster_uuid
)
457 # look for instance to obtain namespace
458 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
459 if not instance_info
:
460 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
463 paths
, env
= self
._init
_paths
_env
(
464 cluster_name
=cluster_uuid
, create_if_not_exist
=True
468 self
.fs
.sync(from_path
=cluster_uuid
)
471 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
472 cluster_id
=cluster_uuid
, params
=params
476 kdu_model
, version
= self
._split
_version
(kdu_model
)
478 command
= self
._get
_upgrade
_command
(
481 instance_info
["namespace"],
486 paths
["kube_config"],
489 self
.log
.debug("upgrading: {}".format(command
))
493 # exec helm in a task
494 exec_task
= asyncio
.ensure_future(
495 coro_or_future
=self
._local
_async
_exec
(
496 command
=command
, raise_exception_on_error
=False, env
=env
499 # write status in another task
500 status_task
= asyncio
.ensure_future(
501 coro_or_future
=self
._store
_status
(
502 cluster_id
=cluster_uuid
,
503 kdu_instance
=kdu_instance
,
504 namespace
=instance_info
["namespace"],
511 # wait for execution task
512 await asyncio
.wait([exec_task
])
516 output
, rc
= exec_task
.result()
520 output
, rc
= await self
._local
_async
_exec
(
521 command
=command
, raise_exception_on_error
=False, env
=env
524 # remove temporal values yaml file
526 os
.remove(file_to_delete
)
529 await self
._store
_status
(
530 cluster_id
=cluster_uuid
,
531 kdu_instance
=kdu_instance
,
532 namespace
=instance_info
["namespace"],
540 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
542 raise K8sException(msg
)
545 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
547 # return new revision number
548 instance
= await self
.get_instance_info(
549 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
552 revision
= int(instance
.get("revision"))
553 self
.log
.debug("New revision: {}".format(revision
))
563 total_timeout
: float = 1800,
564 cluster_uuid
: str = None,
565 kdu_model
: str = None,
567 db_dict
: dict = None,
570 """Scale a resource in a Helm Chart.
573 kdu_instance: KDU instance name
574 scale: Scale to which to set the resource
575 resource_name: Resource name
576 total_timeout: The time, in seconds, to wait
577 cluster_uuid: The UUID of the cluster
578 kdu_model: The chart reference
579 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
580 The --wait flag will be set automatically if --atomic is used
581 db_dict: Dictionary for any additional data
582 kwargs: Additional parameters
585 True if successful, False otherwise
588 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
590 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
591 resource_name
, kdu_model
, cluster_uuid
594 self
.log
.debug(debug_mgs
)
596 # look for instance to obtain namespace
597 # get_instance_info function calls the sync command
598 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
599 if not instance_info
:
600 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
603 paths
, env
= self
._init
_paths
_env
(
604 cluster_name
=cluster_uuid
, create_if_not_exist
=True
608 kdu_model
, version
= self
._split
_version
(kdu_model
)
610 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
613 "Repository not found for kdu_model {}".format(kdu_model
)
616 _
, replica_str
= await self
._get
_replica
_count
_url
(
617 kdu_model
, repo_url
, resource_name
620 command
= self
._get
_upgrade
_scale
_command
(
623 instance_info
["namespace"],
630 paths
["kube_config"],
633 self
.log
.debug("scaling: {}".format(command
))
636 # exec helm in a task
637 exec_task
= asyncio
.ensure_future(
638 coro_or_future
=self
._local
_async
_exec
(
639 command
=command
, raise_exception_on_error
=False, env
=env
642 # write status in another task
643 status_task
= asyncio
.ensure_future(
644 coro_or_future
=self
._store
_status
(
645 cluster_id
=cluster_uuid
,
646 kdu_instance
=kdu_instance
,
647 namespace
=instance_info
["namespace"],
654 # wait for execution task
655 await asyncio
.wait([exec_task
])
659 output
, rc
= exec_task
.result()
662 output
, rc
= await self
._local
_async
_exec
(
663 command
=command
, raise_exception_on_error
=False, env
=env
667 await self
._store
_status
(
668 cluster_id
=cluster_uuid
,
669 kdu_instance
=kdu_instance
,
670 namespace
=instance_info
["namespace"],
678 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
680 raise K8sException(msg
)
683 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
687 async def get_scale_count(
695 """Get a resource scale count.
698 cluster_uuid: The UUID of the cluster
699 resource_name: Resource name
700 kdu_instance: KDU instance name
701 kdu_model: The name or path of a bundle
702 kwargs: Additional parameters
705 Resource instance count
709 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
712 # look for instance to obtain namespace
713 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
714 if not instance_info
:
715 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
718 paths
, env
= self
._init
_paths
_env
(
719 cluster_name
=cluster_uuid
, create_if_not_exist
=True
722 replicas
= await self
._get
_replica
_count
_instance
(
723 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
726 # Get default value if scale count is not found from provided values
728 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
731 "Repository not found for kdu_model {}".format(kdu_model
)
734 replicas
, _
= await self
._get
_replica
_count
_url
(
735 kdu_model
, repo_url
, resource_name
739 msg
= "Replica count not found. Cannot be scaled"
741 raise K8sException(msg
)
746 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
749 "rollback kdu_instance {} to revision {} from cluster {}".format(
750 kdu_instance
, revision
, cluster_uuid
755 self
.fs
.sync(from_path
=cluster_uuid
)
757 # look for instance to obtain namespace
758 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
759 if not instance_info
:
760 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
763 paths
, env
= self
._init
_paths
_env
(
764 cluster_name
=cluster_uuid
, create_if_not_exist
=True
768 self
.fs
.sync(from_path
=cluster_uuid
)
770 command
= self
._get
_rollback
_command
(
771 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
774 self
.log
.debug("rolling_back: {}".format(command
))
776 # exec helm in a task
777 exec_task
= asyncio
.ensure_future(
778 coro_or_future
=self
._local
_async
_exec
(
779 command
=command
, raise_exception_on_error
=False, env
=env
782 # write status in another task
783 status_task
= asyncio
.ensure_future(
784 coro_or_future
=self
._store
_status
(
785 cluster_id
=cluster_uuid
,
786 kdu_instance
=kdu_instance
,
787 namespace
=instance_info
["namespace"],
789 operation
="rollback",
794 # wait for execution task
795 await asyncio
.wait([exec_task
])
800 output
, rc
= exec_task
.result()
803 await self
._store
_status
(
804 cluster_id
=cluster_uuid
,
805 kdu_instance
=kdu_instance
,
806 namespace
=instance_info
["namespace"],
808 operation
="rollback",
814 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
816 raise K8sException(msg
)
819 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
821 # return new revision number
822 instance
= await self
.get_instance_info(
823 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
826 revision
= int(instance
.get("revision"))
827 self
.log
.debug("New revision: {}".format(revision
))
832 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
834 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
835 (this call should happen after all _terminate-config-primitive_ of the VNF
838 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
839 :param kdu_instance: unique name for the KDU instance to be deleted
840 :param kwargs: Additional parameters (None yet)
841 :return: True if successful
845 "uninstall kdu_instance {} from cluster {}".format(
846 kdu_instance
, cluster_uuid
851 self
.fs
.sync(from_path
=cluster_uuid
)
853 # look for instance to obtain namespace
854 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
855 if not instance_info
:
856 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
859 paths
, env
= self
._init
_paths
_env
(
860 cluster_name
=cluster_uuid
, create_if_not_exist
=True
864 self
.fs
.sync(from_path
=cluster_uuid
)
866 command
= self
._get
_uninstall
_command
(
867 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
869 output
, _rc
= await self
._local
_async
_exec
(
870 command
=command
, raise_exception_on_error
=True, env
=env
874 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
876 return self
._output
_to
_table
(output
)
878 async def instances_list(self
, cluster_uuid
: str) -> list:
880 returns a list of deployed releases in a cluster
882 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
886 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
889 self
.fs
.sync(from_path
=cluster_uuid
)
891 # execute internal command
892 result
= await self
._instances
_list
(cluster_uuid
)
895 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
899 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
900 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
901 for instance
in instances
:
902 if instance
.get("name") == kdu_instance
:
904 self
.log
.debug("Instance {} not found".format(kdu_instance
))
907 async def exec_primitive(
909 cluster_uuid
: str = None,
910 kdu_instance
: str = None,
911 primitive_name
: str = None,
912 timeout
: float = 300,
914 db_dict
: dict = None,
917 """Exec primitive (Juju action)
919 :param cluster_uuid: The UUID of the cluster or namespace:cluster
920 :param kdu_instance: The unique name of the KDU instance
921 :param primitive_name: Name of action that will be executed
922 :param timeout: Timeout for action execution
923 :param params: Dictionary of all the parameters needed for the action
924 :db_dict: Dictionary for any additional data
925 :param kwargs: Additional parameters (None yet)
927 :return: Returns the output of the action
930 "KDUs deployed with Helm don't support actions "
931 "different from rollback, upgrade and status"
934 async def get_services(
935 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
938 Returns a list of services defined for the specified kdu instance.
940 :param cluster_uuid: UUID of a K8s cluster known by OSM
941 :param kdu_instance: unique name for the KDU instance
942 :param namespace: K8s namespace used by the KDU instance
943 :return: If successful, it will return a list of services, Each service
944 can have the following data:
945 - `name` of the service
946 - `type` type of service in the k8 cluster
947 - `ports` List of ports offered by the service, for each port includes at least
949 - `cluster_ip` Internal ip to be used inside k8s cluster
950 - `external_ip` List of external ips (in case they are available)
954 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
955 cluster_uuid
, kdu_instance
960 paths
, env
= self
._init
_paths
_env
(
961 cluster_name
=cluster_uuid
, create_if_not_exist
=True
965 self
.fs
.sync(from_path
=cluster_uuid
)
967 # get list of services names for kdu
968 service_names
= await self
._get
_services
(
969 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
973 for service
in service_names
:
974 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
975 service_list
.append(service
)
978 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
982 async def get_service(
983 self
, cluster_uuid
: str, service_name
: str, namespace
: str
987 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
988 service_name
, namespace
, cluster_uuid
993 self
.fs
.sync(from_path
=cluster_uuid
)
995 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
998 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1002 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
) -> str:
1004 This call would retrieve tha current state of a given KDU instance. It would be
1005 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1006 values_ of the configuration parameters applied to a given instance. This call
1007 would be based on the `status` call.
1009 :param cluster_uuid: UUID of a K8s cluster known by OSM
1010 :param kdu_instance: unique name for the KDU instance
1011 :param kwargs: Additional parameters (None yet)
1012 :return: If successful, it will return the following vector of arguments:
1013 - K8s `namespace` in the cluster where the KDU lives
1014 - `state` of the KDU instance. It can be:
1021 - List of `resources` (objects) that this release consists of, sorted by kind,
1022 and the status of those resources
1023 - Last `deployment_time`.
1027 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1028 cluster_uuid
, kdu_instance
1033 self
.fs
.sync(from_path
=cluster_uuid
)
1035 # get instance: needed to obtain namespace
1036 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1037 for instance
in instances
:
1038 if instance
.get("name") == kdu_instance
:
1041 # instance does not exist
1043 "Instance name: {} not found in cluster: {}".format(
1044 kdu_instance
, cluster_uuid
1048 status
= await self
._status
_kdu
(
1049 cluster_id
=cluster_uuid
,
1050 kdu_instance
=kdu_instance
,
1051 namespace
=instance
["namespace"],
1052 show_error_log
=True,
1057 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1061 async def get_values_kdu(
1062 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1065 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1067 return await self
._exec
_get
_command
(
1068 get_command
="values",
1069 kdu_instance
=kdu_instance
,
1070 namespace
=namespace
,
1071 kubeconfig
=kubeconfig
,
1074 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1077 "inspect kdu_model values {} from (optional) repo: {}".format(
1082 return await self
._exec
_inspect
_command
(
1083 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1086 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1089 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1092 return await self
._exec
_inspect
_command
(
1093 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1096 async def synchronize_repos(self
, cluster_uuid
: str):
1098 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1100 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1101 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1103 local_repo_list
= await self
.repo_list(cluster_uuid
)
1104 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1106 deleted_repo_list
= []
1107 added_repo_dict
= {}
1109 # iterate over the list of repos in the database that should be
1110 # added if not present
1111 for repo_name
, db_repo
in db_repo_dict
.items():
1113 # check if it is already present
1114 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1115 repo_id
= db_repo
.get("_id")
1116 if curr_repo_url
!= db_repo
["url"]:
1119 "repo {} url changed, delete and and again".format(
1123 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1124 deleted_repo_list
.append(repo_id
)
1127 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1128 if "ca_cert" in db_repo
:
1129 await self
.repo_add(
1133 cert
=db_repo
["ca_cert"],
1136 await self
.repo_add(
1141 added_repo_dict
[repo_id
] = db_repo
["name"]
1142 except Exception as e
:
1144 "Error adding repo id: {}, err_msg: {} ".format(
1149 # Delete repos that are present but not in nbi_list
1150 for repo_name
in local_repo_dict
:
1151 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1152 self
.log
.debug("delete repo {}".format(repo_name
))
1154 await self
.repo_remove(cluster_uuid
, repo_name
)
1155 deleted_repo_list
.append(repo_name
)
1156 except Exception as e
:
1158 "Error deleting repo, name: {}, err_msg: {}".format(
1163 return deleted_repo_list
, added_repo_dict
1165 except K8sException
:
1167 except Exception as e
:
1168 # Do not raise errors synchronizing repos
1169 self
.log
.error("Error synchronizing repos: {}".format(e
))
1170 raise Exception("Error synchronizing repos: {}".format(e
))
1172 def _get_db_repos_dict(self
, repo_ids
: list):
1174 for repo_id
in repo_ids
:
1175 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1176 db_repos_dict
[db_repo
["name"]] = db_repo
1177 return db_repos_dict
1180 ####################################################################################
1181 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1182 ####################################################################################
1186 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1188 Creates and returns base cluster and kube dirs and returns them.
1189 Also created helm3 dirs according to new directory specification, paths are
1190 not returned but assigned to helm environment variables
1192 :param cluster_name: cluster_name
1193 :return: Dictionary with config_paths and dictionary with helm environment variables
1197 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1199 Implements the helm version dependent cluster initialization
1203 async def _instances_list(self
, cluster_id
):
1205 Implements the helm version dependent helm instances list
1209 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1211 Implements the helm version dependent method to obtain services from a helm instance
1215 async def _status_kdu(
1219 namespace
: str = None,
1220 show_error_log
: bool = False,
1221 return_text
: bool = False,
1224 Implements the helm version dependent method to obtain status of a helm instance
1228 def _get_install_command(
1240 Obtain command to be executed to delete the indicated instance
1244 def _get_upgrade_scale_command(
1257 """Obtain command to be executed to upgrade the indicated instance."""
1260 def _get_upgrade_command(
1272 Obtain command to be executed to upgrade the indicated instance
1276 def _get_rollback_command(
1277 self
, kdu_instance
, namespace
, revision
, kubeconfig
1280 Obtain command to be executed to rollback the indicated instance
1284 def _get_uninstall_command(
1285 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1288 Obtain command to be executed to delete the indicated instance
1292 def _get_inspect_command(
1293 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1296 Obtain command to be executed to obtain information about the kdu
1300 def _get_get_command(
1301 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1303 """Obtain command to be executed to get information about the kdu instance."""
1306 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1308 Method call to uninstall cluster software for helm. This method is dependent
1310 For Helm v2 it will be called when Tiller must be uninstalled
1311 For Helm v3 it does nothing and does not need to be callled
1315 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1317 Obtains the cluster repos identifiers
1321 ####################################################################################
1322 ################################### P R I V A T E ##################################
1323 ####################################################################################
1327 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1328 if os
.path
.exists(filename
):
1331 msg
= "File {} does not exist".format(filename
)
1332 if exception_if_not_exists
:
1333 raise K8sException(msg
)
1336 def _remove_multiple_spaces(strobj
):
1337 strobj
= strobj
.strip()
1338 while " " in strobj
:
1339 strobj
= strobj
.replace(" ", " ")
1343 def _output_to_lines(output
: str) -> list:
1344 output_lines
= list()
1345 lines
= output
.splitlines(keepends
=False)
1349 output_lines
.append(line
)
1353 def _output_to_table(output
: str) -> list:
1354 output_table
= list()
1355 lines
= output
.splitlines(keepends
=False)
1357 line
= line
.replace("\t", " ")
1359 output_table
.append(line_list
)
1360 cells
= line
.split(sep
=" ")
1364 line_list
.append(cell
)
1368 def _parse_services(output
: str) -> list:
1369 lines
= output
.splitlines(keepends
=False)
1372 line
= line
.replace("\t", " ")
1373 cells
= line
.split(sep
=" ")
1374 if len(cells
) > 0 and cells
[0].startswith("service/"):
1375 elems
= cells
[0].split(sep
="/")
1377 services
.append(elems
[1])
1381 def _get_deep(dictionary
: dict, members
: tuple):
1386 value
= target
.get(m
)
1395 # find key:value in several lines
1397 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1398 for line
in p_lines
:
1400 if line
.startswith(p_key
+ ":"):
1401 parts
= line
.split(":")
1402 the_value
= parts
[1].strip()
1410 def _lower_keys_list(input_list
: list):
1412 Transform the keys in a list of dictionaries to lower case and returns a new list
1417 for dictionary
in input_list
:
1418 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1419 new_list
.append(new_dict
)
1422 async def _local_async_exec(
1425 raise_exception_on_error
: bool = False,
1426 show_error_log
: bool = True,
1427 encode_utf8
: bool = False,
1431 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1433 "Executing async local command: {}, env: {}".format(command
, env
)
1437 command
= shlex
.split(command
)
1439 environ
= os
.environ
.copy()
1444 process
= await asyncio
.create_subprocess_exec(
1446 stdout
=asyncio
.subprocess
.PIPE
,
1447 stderr
=asyncio
.subprocess
.PIPE
,
1451 # wait for command terminate
1452 stdout
, stderr
= await process
.communicate()
1454 return_code
= process
.returncode
1458 output
= stdout
.decode("utf-8").strip()
1459 # output = stdout.decode()
1461 output
= stderr
.decode("utf-8").strip()
1462 # output = stderr.decode()
1464 if return_code
!= 0 and show_error_log
:
1466 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1469 self
.log
.debug("Return code: {}".format(return_code
))
1471 if raise_exception_on_error
and return_code
!= 0:
1472 raise K8sException(output
)
1475 output
= output
.encode("utf-8").strip()
1476 output
= str(output
).replace("\\n", "\n")
1478 return output
, return_code
1480 except asyncio
.CancelledError
:
1482 except K8sException
:
1484 except Exception as e
:
1485 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1487 if raise_exception_on_error
:
1488 raise K8sException(e
) from e
1492 async def _local_async_exec_pipe(
1496 raise_exception_on_error
: bool = True,
1497 show_error_log
: bool = True,
1498 encode_utf8
: bool = False,
1502 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1503 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1504 command
= "{} | {}".format(command1
, command2
)
1506 "Executing async local command: {}, env: {}".format(command
, env
)
1510 command1
= shlex
.split(command1
)
1511 command2
= shlex
.split(command2
)
1513 environ
= os
.environ
.copy()
1518 read
, write
= os
.pipe()
1519 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1521 process_2
= await asyncio
.create_subprocess_exec(
1522 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1525 stdout
, stderr
= await process_2
.communicate()
1527 return_code
= process_2
.returncode
1531 output
= stdout
.decode("utf-8").strip()
1532 # output = stdout.decode()
1534 output
= stderr
.decode("utf-8").strip()
1535 # output = stderr.decode()
1537 if return_code
!= 0 and show_error_log
:
1539 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1542 self
.log
.debug("Return code: {}".format(return_code
))
1544 if raise_exception_on_error
and return_code
!= 0:
1545 raise K8sException(output
)
1548 output
= output
.encode("utf-8").strip()
1549 output
= str(output
).replace("\\n", "\n")
1551 return output
, return_code
1552 except asyncio
.CancelledError
:
1554 except K8sException
:
1556 except Exception as e
:
1557 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1559 if raise_exception_on_error
:
1560 raise K8sException(e
) from e
1564 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1566 Obtains the data of the specified service in the k8cluster.
1568 :param cluster_id: id of a K8s cluster known by OSM
1569 :param service_name: name of the K8s service in the specified namespace
1570 :param namespace: K8s namespace used by the KDU instance
1571 :return: If successful, it will return a service with the following data:
1572 - `name` of the service
1573 - `type` type of service in the k8 cluster
1574 - `ports` List of ports offered by the service, for each port includes at least
1575 name, port, protocol
1576 - `cluster_ip` Internal ip to be used inside k8s cluster
1577 - `external_ip` List of external ips (in case they are available)
1581 paths
, env
= self
._init
_paths
_env
(
1582 cluster_name
=cluster_id
, create_if_not_exist
=True
1585 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1586 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1589 output
, _rc
= await self
._local
_async
_exec
(
1590 command
=command
, raise_exception_on_error
=True, env
=env
1593 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1596 "name": service_name
,
1597 "type": self
._get
_deep
(data
, ("spec", "type")),
1598 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1599 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1601 if service
["type"] == "LoadBalancer":
1602 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1603 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1604 service
["external_ip"] = ip_list
1608 async def _exec_get_command(
1609 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1611 """Obtains information about the kdu instance."""
1613 full_command
= self
._get
_get
_command
(
1614 get_command
, kdu_instance
, namespace
, kubeconfig
1617 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1621 async def _exec_inspect_command(
1622 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1624 """Obtains information about a kdu, no cluster (no env)."""
1628 repo_str
= " --repo {}".format(repo_url
)
1630 idx
= kdu_model
.find("/")
1633 kdu_model
= kdu_model
[idx
:]
1635 kdu_model
, version
= self
._split
_version
(kdu_model
)
1637 version_str
= "--version {}".format(version
)
1641 full_command
= self
._get
_inspect
_command
(
1642 inspect_command
, kdu_model
, repo_str
, version_str
1645 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1649 async def _get_replica_count_url(
1653 resource_name
: str = None,
1655 """Get the replica count value in the Helm Chart Values.
1658 kdu_model: The name or path of a bundle
1659 repo_url: Helm Chart repository url
1660 resource_name: Resource name
1663 True if replicas, False replicaCount
1666 kdu_values
= yaml
.load(
1667 await self
.values_kdu(kdu_model
, repo_url
), Loader
=yaml
.SafeLoader
1672 "kdu_values not found for kdu_model {}".format(kdu_model
)
1676 kdu_values
= kdu_values
.get(resource_name
, None)
1679 msg
= "resource {} not found in the values in model {}".format(
1680 resource_name
, kdu_model
1683 raise K8sException(msg
)
1685 duplicate_check
= False
1690 if kdu_values
.get("replicaCount", None):
1691 replicas
= kdu_values
["replicaCount"]
1692 replica_str
= "replicaCount"
1693 elif kdu_values
.get("replicas", None):
1694 duplicate_check
= True
1695 replicas
= kdu_values
["replicas"]
1696 replica_str
= "replicas"
1700 "replicaCount or replicas not found in the resource"
1701 "{} values in model {}. Cannot be scaled".format(
1702 resource_name
, kdu_model
1707 "replicaCount or replicas not found in the values"
1708 "in model {}. Cannot be scaled".format(kdu_model
)
1711 raise K8sException(msg
)
1713 # Control if replicas and replicaCount exists at the same time
1714 msg
= "replicaCount and replicas are exists at the same time"
1716 if "replicaCount" in kdu_values
:
1718 raise K8sException(msg
)
1720 if "replicas" in kdu_values
:
1722 raise K8sException(msg
)
1724 return replicas
, replica_str
1726 async def _get_replica_count_instance(
1731 resource_name
: str = None,
1733 """Get the replica count value in the instance.
1736 kdu_instance: The name of the KDU instance
1737 namespace: KDU instance namespace
1739 resource_name: Resource name
1742 True if replicas, False replicaCount
1745 kdu_values
= yaml
.load(
1746 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1747 Loader
=yaml
.SafeLoader
,
1754 kdu_values
.get(resource_name
, None) if resource_name
else None
1758 resource_values
.get("replicaCount", None)
1759 or resource_values
.get("replicas", None)
1763 kdu_values
.get("replicaCount", None)
1764 or kdu_values
.get("replicas", None)
1770 async def _store_status(
1775 namespace
: str = None,
1776 check_every
: float = 10,
1777 db_dict
: dict = None,
1778 run_once
: bool = False,
1782 await asyncio
.sleep(check_every
)
1783 detailed_status
= await self
._status
_kdu
(
1784 cluster_id
=cluster_id
,
1785 kdu_instance
=kdu_instance
,
1786 namespace
=namespace
,
1789 status
= detailed_status
.get("info").get("description")
1790 self
.log
.debug("KDU {} STATUS: {}.".format(kdu_instance
, status
))
1791 # write status to db
1792 result
= await self
.write_app_status_to_db(
1795 detailed_status
=str(detailed_status
),
1796 operation
=operation
,
1799 self
.log
.info("Error writing in database. Task exiting...")
1801 except asyncio
.CancelledError
:
1802 self
.log
.debug("Task cancelled")
1804 except Exception as e
:
1806 "_store_status exception: {}".format(str(e
)), exc_info
=True
1813 # params for use in -f file
1814 # returns values file option and filename (in order to delete it at the end)
1815 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1817 if params
and len(params
) > 0:
1818 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1820 def get_random_number():
1821 r
= random
.randrange(start
=1, stop
=99999999)
1829 value
= params
.get(key
)
1830 if "!!yaml" in str(value
):
1831 value
= yaml
.load(value
[7:])
1832 params2
[key
] = value
1834 values_file
= get_random_number() + ".yaml"
1835 with
open(values_file
, "w") as stream
:
1836 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1838 return "-f {}".format(values_file
), values_file
1842 # params for use in --set option
1844 def _params_to_set_option(params
: dict) -> str:
1846 if params
and len(params
) > 0:
1849 value
= params
.get(key
, None)
1850 if value
is not None:
1852 params_str
+= "--set "
1856 params_str
+= "{}={}".format(key
, value
)
1860 def generate_kdu_instance_name(**kwargs
):
1861 chart_name
= kwargs
["kdu_model"]
1862 # check embeded chart (file or dir)
1863 if chart_name
.startswith("/"):
1864 # extract file or directory name
1865 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1867 elif "://" in chart_name
:
1868 # extract last portion of URL
1869 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1872 for c
in chart_name
:
1873 if c
.isalpha() or c
.isnumeric():
1880 # if does not start with alpha character, prefix 'a'
1881 if not name
[0].isalpha():
1886 def get_random_number():
1887 r
= random
.randrange(start
=1, stop
=99999999)
1889 s
= s
.rjust(10, "0")
1892 name
= name
+ get_random_number()
1895 def _split_version(self
, kdu_model
: str) -> (str, str):
1897 if ":" in kdu_model
:
1898 parts
= kdu_model
.split(sep
=":")
1900 version
= str(parts
[1])
1901 kdu_model
= parts
[0]
1902 return kdu_model
, version
1904 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
1906 idx
= kdu_model
.find("/")
1908 repo_name
= kdu_model
[:idx
]
1909 # Find repository link
1910 local_repo_list
= await self
.repo_list(cluster_uuid
)
1911 for repo
in local_repo_list
:
1912 repo_url
= repo
["url"] if repo
["name"] == repo_name
else None