2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
31 from uuid
import uuid4
33 from n2vc
.config
import EnvironConfig
34 from n2vc
.exceptions
import K8sException
35 from n2vc
.k8s_conn
import K8sConnector
38 class K8sHelmBaseConnector(K8sConnector
):
41 ####################################################################################
42 ################################### P U B L I C ####################################
43 ####################################################################################
46 service_account
= "osm"
52 kubectl_command
: str = "/usr/bin/kubectl",
53 helm_command
: str = "/usr/bin/helm",
59 :param fs: file system for kubernetes and helm configuration
60 :param db: database object to write current operation status
61 :param kubectl_command: path to kubectl executable
62 :param helm_command: path to helm executable
64 :param on_update_db: callback called when k8s connector updates database
68 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
70 self
.log
.info("Initializing K8S Helm connector")
72 self
.config
= EnvironConfig()
73 # random numbers for release name generation
74 random
.seed(time
.time())
79 # exception if kubectl is not installed
80 self
.kubectl_command
= kubectl_command
81 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
83 # exception if helm is not installed
84 self
._helm
_command
= helm_command
85 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
87 # obtain stable repo url from config or apply default
88 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
89 if self
._stable
_repo
_url
== "None":
90 self
._stable
_repo
_url
= None
93 def _get_namespace_cluster_id(cluster_uuid
: str) -> (str, str):
95 Parses cluster_uuid stored at database that can be either 'namespace:cluster_id' or only
96 cluster_id for backward compatibility
98 namespace
, _
, cluster_id
= cluster_uuid
.rpartition(":")
99 return namespace
, cluster_id
104 namespace
: str = "kube-system",
105 reuse_cluster_uuid
=None,
109 It prepares a given K8s cluster environment to run Charts
111 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
113 :param namespace: optional namespace to be used for helm. By default,
114 'kube-system' will be used
115 :param reuse_cluster_uuid: existing cluster uuid for reuse
116 :param kwargs: Additional parameters (None yet)
117 :return: uuid of the K8s cluster and True if connector has installed some
118 software in the cluster
119 (on error, an exception will be raised)
122 if reuse_cluster_uuid
:
123 namespace_
, cluster_id
= self
._get
_namespace
_cluster
_id
(reuse_cluster_uuid
)
124 namespace
= namespace_
or namespace
126 cluster_id
= str(uuid4())
127 cluster_uuid
= "{}:{}".format(namespace
, cluster_id
)
130 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
133 paths
, env
= self
._init
_paths
_env
(
134 cluster_name
=cluster_id
, create_if_not_exist
=True
136 mode
= stat
.S_IRUSR | stat
.S_IWUSR
137 with
open(paths
["kube_config"], "w", mode
) as f
:
139 os
.chmod(paths
["kube_config"], 0o600)
141 # Code with initialization specific of helm version
142 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
144 # sync fs with local data
145 self
.fs
.reverse_sync(from_path
=cluster_id
)
147 self
.log
.info("Cluster {} initialized".format(cluster_id
))
149 return cluster_uuid
, n2vc_installed_sw
152 self
, cluster_uuid
: str, name
: str, url
: str, repo_type
: str = "chart"
154 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
156 "Cluster {}, adding {} repository {}. URL: {}".format(
157 cluster_id
, repo_type
, name
, url
162 paths
, env
= self
._init
_paths
_env
(
163 cluster_name
=cluster_id
, create_if_not_exist
=True
167 self
.fs
.sync(from_path
=cluster_id
)
170 command
= "env KUBECONFIG={} {} repo update".format(
171 paths
["kube_config"], self
._helm
_command
173 self
.log
.debug("updating repo: {}".format(command
))
174 await self
._local
_async
_exec
(
175 command
=command
, raise_exception_on_error
=False, env
=env
178 # helm repo add name url
179 command
= "env KUBECONFIG={} {} repo add {} {}".format(
180 paths
["kube_config"], self
._helm
_command
, name
, url
182 self
.log
.debug("adding repo: {}".format(command
))
183 await self
._local
_async
_exec
(
184 command
=command
, raise_exception_on_error
=True, env
=env
188 self
.fs
.reverse_sync(from_path
=cluster_id
)
190 async def repo_list(self
, cluster_uuid
: str) -> list:
192 Get the list of registered repositories
194 :return: list of registered repositories: [ (name, url) .... ]
197 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
198 self
.log
.debug("list repositories for cluster {}".format(cluster_id
))
201 paths
, env
= self
._init
_paths
_env
(
202 cluster_name
=cluster_id
, create_if_not_exist
=True
206 self
.fs
.sync(from_path
=cluster_id
)
208 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
209 paths
["kube_config"], self
._helm
_command
212 # Set exception to false because if there are no repos just want an empty list
213 output
, _rc
= await self
._local
_async
_exec
(
214 command
=command
, raise_exception_on_error
=False, env
=env
218 self
.fs
.reverse_sync(from_path
=cluster_id
)
221 if output
and len(output
) > 0:
222 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
223 # unify format between helm2 and helm3 setting all keys lowercase
224 return self
._lower
_keys
_list
(repos
)
230 async def repo_remove(self
, cluster_uuid
: str, name
: str):
232 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
233 self
.log
.debug("remove {} repositories for cluster {}".format(name
, cluster_id
))
236 paths
, env
= self
._init
_paths
_env
(
237 cluster_name
=cluster_id
, create_if_not_exist
=True
241 self
.fs
.sync(from_path
=cluster_id
)
243 command
= "env KUBECONFIG={} {} repo remove {}".format(
244 paths
["kube_config"], self
._helm
_command
, name
246 await self
._local
_async
_exec
(
247 command
=command
, raise_exception_on_error
=True, env
=env
251 self
.fs
.reverse_sync(from_path
=cluster_id
)
257 uninstall_sw
: bool = False,
262 Resets the Kubernetes cluster by removing the helm deployment that represents it.
264 :param cluster_uuid: The UUID of the cluster to reset
265 :param force: Boolean to force the reset
266 :param uninstall_sw: Boolean to force the reset
267 :param kwargs: Additional parameters (None yet)
268 :return: Returns True if successful or raises an exception.
270 namespace
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
272 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
273 cluster_id
, uninstall_sw
278 self
.fs
.sync(from_path
=cluster_id
)
280 # uninstall releases if needed.
282 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
283 if len(releases
) > 0:
287 kdu_instance
= r
.get("name")
288 chart
= r
.get("chart")
290 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
292 await self
.uninstall(
293 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
295 except Exception as e
:
296 # will not raise exception as it was found
297 # that in some cases of previously installed helm releases it
300 "Error uninstalling release {}: {}".format(
306 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
310 False # Allow to remove k8s cluster without removing Tiller
314 await self
._uninstall
_sw
(cluster_id
, namespace
)
316 # delete cluster directory
317 self
.log
.debug("Removing directory {}".format(cluster_id
))
318 self
.fs
.file_delete(cluster_id
, ignore_non_exist
=True)
319 # Remove also local directorio if still exist
320 direct
= self
.fs
.path
+ "/" + cluster_id
321 shutil
.rmtree(direct
, ignore_errors
=True)
325 async def _install_impl(
333 timeout
: float = 300,
335 db_dict
: dict = None,
336 kdu_name
: str = None,
337 namespace
: str = None,
340 paths
, env
= self
._init
_paths
_env
(
341 cluster_name
=cluster_id
, create_if_not_exist
=True
345 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
346 cluster_id
=cluster_id
, params
=params
350 kdu_model
, version
= self
._split
_version
(kdu_model
)
352 command
= self
._get
_install
_command
(
360 paths
["kube_config"],
363 self
.log
.debug("installing: {}".format(command
))
366 # exec helm in a task
367 exec_task
= asyncio
.ensure_future(
368 coro_or_future
=self
._local
_async
_exec
(
369 command
=command
, raise_exception_on_error
=False, env
=env
373 # write status in another task
374 status_task
= asyncio
.ensure_future(
375 coro_or_future
=self
._store
_status
(
376 cluster_id
=cluster_id
,
377 kdu_instance
=kdu_instance
,
385 # wait for execution task
386 await asyncio
.wait([exec_task
])
391 output
, rc
= exec_task
.result()
395 output
, rc
= await self
._local
_async
_exec
(
396 command
=command
, raise_exception_on_error
=False, env
=env
399 # remove temporal values yaml file
401 os
.remove(file_to_delete
)
404 await self
._store
_status
(
405 cluster_id
=cluster_id
,
406 kdu_instance
=kdu_instance
,
415 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
417 raise K8sException(msg
)
423 kdu_model
: str = None,
425 timeout
: float = 300,
427 db_dict
: dict = None,
429 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
430 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_id
))
433 self
.fs
.sync(from_path
=cluster_id
)
435 # look for instance to obtain namespace
436 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
437 if not instance_info
:
438 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
441 paths
, env
= self
._init
_paths
_env
(
442 cluster_name
=cluster_id
, create_if_not_exist
=True
446 self
.fs
.sync(from_path
=cluster_id
)
449 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
450 cluster_id
=cluster_id
, params
=params
454 kdu_model
, version
= self
._split
_version
(kdu_model
)
456 command
= self
._get
_upgrade
_command
(
459 instance_info
["namespace"],
464 paths
["kube_config"],
467 self
.log
.debug("upgrading: {}".format(command
))
471 # exec helm in a task
472 exec_task
= asyncio
.ensure_future(
473 coro_or_future
=self
._local
_async
_exec
(
474 command
=command
, raise_exception_on_error
=False, env
=env
477 # write status in another task
478 status_task
= asyncio
.ensure_future(
479 coro_or_future
=self
._store
_status
(
480 cluster_id
=cluster_id
,
481 kdu_instance
=kdu_instance
,
482 namespace
=instance_info
["namespace"],
489 # wait for execution task
490 await asyncio
.wait([exec_task
])
494 output
, rc
= exec_task
.result()
498 output
, rc
= await self
._local
_async
_exec
(
499 command
=command
, raise_exception_on_error
=False, env
=env
502 # remove temporal values yaml file
504 os
.remove(file_to_delete
)
507 await self
._store
_status
(
508 cluster_id
=cluster_id
,
509 kdu_instance
=kdu_instance
,
510 namespace
=instance_info
["namespace"],
518 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
520 raise K8sException(msg
)
523 self
.fs
.reverse_sync(from_path
=cluster_id
)
525 # return new revision number
526 instance
= await self
.get_instance_info(
527 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
530 revision
= int(instance
.get("revision"))
531 self
.log
.debug("New revision: {}".format(revision
))
541 total_timeout
: float = 1800,
542 cluster_uuid
: str = None,
543 kdu_model
: str = None,
545 db_dict
: dict = None,
548 """Scale a resource in a Helm Chart.
551 kdu_instance: KDU instance name
552 scale: Scale to which to set the resource
553 resource_name: Resource name
554 total_timeout: The time, in seconds, to wait
555 cluster_uuid: The UUID of the cluster
556 kdu_model: The chart reference
557 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
558 The --wait flag will be set automatically if --atomic is used
559 db_dict: Dictionary for any additional data
560 kwargs: Additional parameters
563 True if successful, False otherwise
566 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
568 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_id
)
570 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
571 resource_name
, kdu_model
, cluster_id
574 self
.log
.debug(debug_mgs
)
576 # look for instance to obtain namespace
577 # get_instance_info function calls the sync command
578 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
579 if not instance_info
:
580 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
583 paths
, env
= self
._init
_paths
_env
(
584 cluster_name
=cluster_id
, create_if_not_exist
=True
588 kdu_model
, version
= self
._split
_version
(kdu_model
)
590 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
593 "Repository not found for kdu_model {}".format(kdu_model
)
596 _
, replica_str
= await self
._get
_replica
_count
_url
(
597 kdu_model
, repo_url
, resource_name
600 command
= self
._get
_upgrade
_scale
_command
(
603 instance_info
["namespace"],
610 paths
["kube_config"],
613 self
.log
.debug("scaling: {}".format(command
))
616 # exec helm in a task
617 exec_task
= asyncio
.ensure_future(
618 coro_or_future
=self
._local
_async
_exec
(
619 command
=command
, raise_exception_on_error
=False, env
=env
622 # write status in another task
623 status_task
= asyncio
.ensure_future(
624 coro_or_future
=self
._store
_status
(
625 cluster_id
=cluster_id
,
626 kdu_instance
=kdu_instance
,
627 namespace
=instance_info
["namespace"],
634 # wait for execution task
635 await asyncio
.wait([exec_task
])
639 output
, rc
= exec_task
.result()
642 output
, rc
= await self
._local
_async
_exec
(
643 command
=command
, raise_exception_on_error
=False, env
=env
647 await self
._store
_status
(
648 cluster_id
=cluster_id
,
649 kdu_instance
=kdu_instance
,
650 namespace
=instance_info
["namespace"],
658 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
660 raise K8sException(msg
)
663 self
.fs
.reverse_sync(from_path
=cluster_id
)
667 async def get_scale_count(
675 """Get a resource scale count.
678 cluster_uuid: The UUID of the cluster
679 resource_name: Resource name
680 kdu_instance: KDU instance name
681 kdu_model: The name or path of a bundle
682 kwargs: Additional parameters
685 Resource instance count
688 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
690 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_id
)
693 # look for instance to obtain namespace
694 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
695 if not instance_info
:
696 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
699 paths
, env
= self
._init
_paths
_env
(
700 cluster_name
=cluster_id
, create_if_not_exist
=True
703 replicas
= await self
._get
_replica
_count
_instance
(
704 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
707 # Get default value if scale count is not found from provided values
709 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
712 "Repository not found for kdu_model {}".format(kdu_model
)
715 replicas
, _
= await self
._get
_replica
_count
_url
(
716 kdu_model
, repo_url
, resource_name
720 msg
= "Replica count not found. Cannot be scaled"
722 raise K8sException(msg
)
727 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
730 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
732 "rollback kdu_instance {} to revision {} from cluster {}".format(
733 kdu_instance
, revision
, cluster_id
738 self
.fs
.sync(from_path
=cluster_id
)
740 # look for instance to obtain namespace
741 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
742 if not instance_info
:
743 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
746 paths
, env
= self
._init
_paths
_env
(
747 cluster_name
=cluster_id
, create_if_not_exist
=True
751 self
.fs
.sync(from_path
=cluster_id
)
753 command
= self
._get
_rollback
_command
(
754 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
757 self
.log
.debug("rolling_back: {}".format(command
))
759 # exec helm in a task
760 exec_task
= asyncio
.ensure_future(
761 coro_or_future
=self
._local
_async
_exec
(
762 command
=command
, raise_exception_on_error
=False, env
=env
765 # write status in another task
766 status_task
= asyncio
.ensure_future(
767 coro_or_future
=self
._store
_status
(
768 cluster_id
=cluster_id
,
769 kdu_instance
=kdu_instance
,
770 namespace
=instance_info
["namespace"],
772 operation
="rollback",
777 # wait for execution task
778 await asyncio
.wait([exec_task
])
783 output
, rc
= exec_task
.result()
786 await self
._store
_status
(
787 cluster_id
=cluster_id
,
788 kdu_instance
=kdu_instance
,
789 namespace
=instance_info
["namespace"],
791 operation
="rollback",
797 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
799 raise K8sException(msg
)
802 self
.fs
.reverse_sync(from_path
=cluster_id
)
804 # return new revision number
805 instance
= await self
.get_instance_info(
806 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
809 revision
= int(instance
.get("revision"))
810 self
.log
.debug("New revision: {}".format(revision
))
815 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
817 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
818 (this call should happen after all _terminate-config-primitive_ of the VNF
821 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
822 :param kdu_instance: unique name for the KDU instance to be deleted
823 :param kwargs: Additional parameters (None yet)
824 :return: True if successful
827 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
829 "uninstall kdu_instance {} from cluster {}".format(kdu_instance
, cluster_id
)
833 self
.fs
.sync(from_path
=cluster_id
)
835 # look for instance to obtain namespace
836 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
837 if not instance_info
:
838 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
841 paths
, env
= self
._init
_paths
_env
(
842 cluster_name
=cluster_id
, create_if_not_exist
=True
846 self
.fs
.sync(from_path
=cluster_id
)
848 command
= self
._get
_uninstall
_command
(
849 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
851 output
, _rc
= await self
._local
_async
_exec
(
852 command
=command
, raise_exception_on_error
=True, env
=env
856 self
.fs
.reverse_sync(from_path
=cluster_id
)
858 return self
._output
_to
_table
(output
)
860 async def instances_list(self
, cluster_uuid
: str) -> list:
862 returns a list of deployed releases in a cluster
864 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
868 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
869 self
.log
.debug("list releases for cluster {}".format(cluster_id
))
872 self
.fs
.sync(from_path
=cluster_id
)
874 # execute internal command
875 result
= await self
._instances
_list
(cluster_id
)
878 self
.fs
.reverse_sync(from_path
=cluster_id
)
882 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
883 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
884 for instance
in instances
:
885 if instance
.get("name") == kdu_instance
:
887 self
.log
.debug("Instance {} not found".format(kdu_instance
))
890 async def exec_primitive(
892 cluster_uuid
: str = None,
893 kdu_instance
: str = None,
894 primitive_name
: str = None,
895 timeout
: float = 300,
897 db_dict
: dict = None,
900 """Exec primitive (Juju action)
902 :param cluster_uuid: The UUID of the cluster or namespace:cluster
903 :param kdu_instance: The unique name of the KDU instance
904 :param primitive_name: Name of action that will be executed
905 :param timeout: Timeout for action execution
906 :param params: Dictionary of all the parameters needed for the action
907 :db_dict: Dictionary for any additional data
908 :param kwargs: Additional parameters (None yet)
910 :return: Returns the output of the action
913 "KDUs deployed with Helm don't support actions "
914 "different from rollback, upgrade and status"
917 async def get_services(
918 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
921 Returns a list of services defined for the specified kdu instance.
923 :param cluster_uuid: UUID of a K8s cluster known by OSM
924 :param kdu_instance: unique name for the KDU instance
925 :param namespace: K8s namespace used by the KDU instance
926 :return: If successful, it will return a list of services, Each service
927 can have the following data:
928 - `name` of the service
929 - `type` type of service in the k8 cluster
930 - `ports` List of ports offered by the service, for each port includes at least
932 - `cluster_ip` Internal ip to be used inside k8s cluster
933 - `external_ip` List of external ips (in case they are available)
936 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
938 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
939 cluster_uuid
, kdu_instance
944 paths
, env
= self
._init
_paths
_env
(
945 cluster_name
=cluster_id
, create_if_not_exist
=True
949 self
.fs
.sync(from_path
=cluster_id
)
951 # get list of services names for kdu
952 service_names
= await self
._get
_services
(
953 cluster_id
, kdu_instance
, namespace
, paths
["kube_config"]
957 for service
in service_names
:
958 service
= await self
._get
_service
(cluster_id
, service
, namespace
)
959 service_list
.append(service
)
962 self
.fs
.reverse_sync(from_path
=cluster_id
)
966 async def get_service(
967 self
, cluster_uuid
: str, service_name
: str, namespace
: str
971 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
972 service_name
, namespace
, cluster_uuid
976 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
979 self
.fs
.sync(from_path
=cluster_id
)
981 service
= await self
._get
_service
(cluster_id
, service_name
, namespace
)
984 self
.fs
.reverse_sync(from_path
=cluster_id
)
988 async def status_kdu(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
) -> str:
990 This call would retrieve tha current state of a given KDU instance. It would be
991 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
992 values_ of the configuration parameters applied to a given instance. This call
993 would be based on the `status` call.
995 :param cluster_uuid: UUID of a K8s cluster known by OSM
996 :param kdu_instance: unique name for the KDU instance
997 :param kwargs: Additional parameters (None yet)
998 :return: If successful, it will return the following vector of arguments:
999 - K8s `namespace` in the cluster where the KDU lives
1000 - `state` of the KDU instance. It can be:
1007 - List of `resources` (objects) that this release consists of, sorted by kind,
1008 and the status of those resources
1009 - Last `deployment_time`.
1013 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1014 cluster_uuid
, kdu_instance
1018 _
, cluster_id
= self
._get
_namespace
_cluster
_id
(cluster_uuid
)
1021 self
.fs
.sync(from_path
=cluster_id
)
1023 # get instance: needed to obtain namespace
1024 instances
= await self
._instances
_list
(cluster_id
=cluster_id
)
1025 for instance
in instances
:
1026 if instance
.get("name") == kdu_instance
:
1029 # instance does not exist
1031 "Instance name: {} not found in cluster: {}".format(
1032 kdu_instance
, cluster_id
1036 status
= await self
._status
_kdu
(
1037 cluster_id
=cluster_id
,
1038 kdu_instance
=kdu_instance
,
1039 namespace
=instance
["namespace"],
1040 show_error_log
=True,
1045 self
.fs
.reverse_sync(from_path
=cluster_id
)
1049 async def get_values_kdu(
1050 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1053 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1055 return await self
._exec
_get
_command
(
1056 get_command
="values",
1057 kdu_instance
=kdu_instance
,
1058 namespace
=namespace
,
1059 kubeconfig
=kubeconfig
,
1062 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1065 "inspect kdu_model values {} from (optional) repo: {}".format(
1070 return await self
._exec
_inspect
_command
(
1071 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1074 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1077 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1080 return await self
._exec
_inspect
_command
(
1081 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1084 async def synchronize_repos(self
, cluster_uuid
: str):
1086 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1088 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1089 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1091 local_repo_list
= await self
.repo_list(cluster_uuid
)
1092 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1094 deleted_repo_list
= []
1095 added_repo_dict
= {}
1097 # iterate over the list of repos in the database that should be
1098 # added if not present
1099 for repo_name
, db_repo
in db_repo_dict
.items():
1101 # check if it is already present
1102 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1103 repo_id
= db_repo
.get("_id")
1104 if curr_repo_url
!= db_repo
["url"]:
1107 "repo {} url changed, delete and and again".format(
1111 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1112 deleted_repo_list
.append(repo_id
)
1115 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1116 await self
.repo_add(
1117 cluster_uuid
, db_repo
["name"], db_repo
["url"]
1119 added_repo_dict
[repo_id
] = db_repo
["name"]
1120 except Exception as e
:
1122 "Error adding repo id: {}, err_msg: {} ".format(
1127 # Delete repos that are present but not in nbi_list
1128 for repo_name
in local_repo_dict
:
1129 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1130 self
.log
.debug("delete repo {}".format(repo_name
))
1132 await self
.repo_remove(cluster_uuid
, repo_name
)
1133 deleted_repo_list
.append(repo_name
)
1134 except Exception as e
:
1136 "Error deleting repo, name: {}, err_msg: {}".format(
1141 return deleted_repo_list
, added_repo_dict
1143 except K8sException
:
1145 except Exception as e
:
1146 # Do not raise errors synchronizing repos
1147 self
.log
.error("Error synchronizing repos: {}".format(e
))
1148 raise Exception("Error synchronizing repos: {}".format(e
))
1150 def _get_db_repos_dict(self
, repo_ids
: list):
1152 for repo_id
in repo_ids
:
1153 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1154 db_repos_dict
[db_repo
["name"]] = db_repo
1155 return db_repos_dict
1158 ####################################################################################
1159 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1160 ####################################################################################
1164 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1166 Creates and returns base cluster and kube dirs and returns them.
1167 Also created helm3 dirs according to new directory specification, paths are
1168 not returned but assigned to helm environment variables
1170 :param cluster_name: cluster_name
1171 :return: Dictionary with config_paths and dictionary with helm environment variables
1175 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1177 Implements the helm version dependent cluster initialization
1181 async def _instances_list(self
, cluster_id
):
1183 Implements the helm version dependent helm instances list
1187 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1189 Implements the helm version dependent method to obtain services from a helm instance
1193 async def _status_kdu(
1197 namespace
: str = None,
1198 show_error_log
: bool = False,
1199 return_text
: bool = False,
1202 Implements the helm version dependent method to obtain status of a helm instance
1206 def _get_install_command(
1218 Obtain command to be executed to delete the indicated instance
1222 def _get_upgrade_scale_command(
1235 """Obtain command to be executed to upgrade the indicated instance."""
1238 def _get_upgrade_command(
1250 Obtain command to be executed to upgrade the indicated instance
1254 def _get_rollback_command(
1255 self
, kdu_instance
, namespace
, revision
, kubeconfig
1258 Obtain command to be executed to rollback the indicated instance
1262 def _get_uninstall_command(
1263 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1266 Obtain command to be executed to delete the indicated instance
1270 def _get_inspect_command(
1271 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1274 Obtain command to be executed to obtain information about the kdu
1278 def _get_get_command(
1279 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1281 """Obtain command to be executed to get information about the kdu instance."""
1284 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1286 Method call to uninstall cluster software for helm. This method is dependent
1288 For Helm v2 it will be called when Tiller must be uninstalled
1289 For Helm v3 it does nothing and does not need to be callled
1293 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1295 Obtains the cluster repos identifiers
1299 ####################################################################################
1300 ################################### P R I V A T E ##################################
1301 ####################################################################################
1305 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1306 if os
.path
.exists(filename
):
1309 msg
= "File {} does not exist".format(filename
)
1310 if exception_if_not_exists
:
1311 raise K8sException(msg
)
1314 def _remove_multiple_spaces(strobj
):
1315 strobj
= strobj
.strip()
1316 while " " in strobj
:
1317 strobj
= strobj
.replace(" ", " ")
1321 def _output_to_lines(output
: str) -> list:
1322 output_lines
= list()
1323 lines
= output
.splitlines(keepends
=False)
1327 output_lines
.append(line
)
1331 def _output_to_table(output
: str) -> list:
1332 output_table
= list()
1333 lines
= output
.splitlines(keepends
=False)
1335 line
= line
.replace("\t", " ")
1337 output_table
.append(line_list
)
1338 cells
= line
.split(sep
=" ")
1342 line_list
.append(cell
)
1346 def _parse_services(output
: str) -> list:
1347 lines
= output
.splitlines(keepends
=False)
1350 line
= line
.replace("\t", " ")
1351 cells
= line
.split(sep
=" ")
1352 if len(cells
) > 0 and cells
[0].startswith("service/"):
1353 elems
= cells
[0].split(sep
="/")
1355 services
.append(elems
[1])
1359 def _get_deep(dictionary
: dict, members
: tuple):
1364 value
= target
.get(m
)
1373 # find key:value in several lines
1375 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1376 for line
in p_lines
:
1378 if line
.startswith(p_key
+ ":"):
1379 parts
= line
.split(":")
1380 the_value
= parts
[1].strip()
1388 def _lower_keys_list(input_list
: list):
1390 Transform the keys in a list of dictionaries to lower case and returns a new list
1395 for dictionary
in input_list
:
1396 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1397 new_list
.append(new_dict
)
1400 async def _local_async_exec(
1403 raise_exception_on_error
: bool = False,
1404 show_error_log
: bool = True,
1405 encode_utf8
: bool = False,
1409 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1411 "Executing async local command: {}, env: {}".format(command
, env
)
1415 command
= shlex
.split(command
)
1417 environ
= os
.environ
.copy()
1422 process
= await asyncio
.create_subprocess_exec(
1424 stdout
=asyncio
.subprocess
.PIPE
,
1425 stderr
=asyncio
.subprocess
.PIPE
,
1429 # wait for command terminate
1430 stdout
, stderr
= await process
.communicate()
1432 return_code
= process
.returncode
1436 output
= stdout
.decode("utf-8").strip()
1437 # output = stdout.decode()
1439 output
= stderr
.decode("utf-8").strip()
1440 # output = stderr.decode()
1442 if return_code
!= 0 and show_error_log
:
1444 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1447 self
.log
.debug("Return code: {}".format(return_code
))
1449 if raise_exception_on_error
and return_code
!= 0:
1450 raise K8sException(output
)
1453 output
= output
.encode("utf-8").strip()
1454 output
= str(output
).replace("\\n", "\n")
1456 return output
, return_code
1458 except asyncio
.CancelledError
:
1460 except K8sException
:
1462 except Exception as e
:
1463 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1465 if raise_exception_on_error
:
1466 raise K8sException(e
) from e
1470 async def _local_async_exec_pipe(
1474 raise_exception_on_error
: bool = True,
1475 show_error_log
: bool = True,
1476 encode_utf8
: bool = False,
1480 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1481 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1482 command
= "{} | {}".format(command1
, command2
)
1484 "Executing async local command: {}, env: {}".format(command
, env
)
1488 command1
= shlex
.split(command1
)
1489 command2
= shlex
.split(command2
)
1491 environ
= os
.environ
.copy()
1496 read
, write
= os
.pipe()
1497 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1499 process_2
= await asyncio
.create_subprocess_exec(
1500 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1503 stdout
, stderr
= await process_2
.communicate()
1505 return_code
= process_2
.returncode
1509 output
= stdout
.decode("utf-8").strip()
1510 # output = stdout.decode()
1512 output
= stderr
.decode("utf-8").strip()
1513 # output = stderr.decode()
1515 if return_code
!= 0 and show_error_log
:
1517 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1520 self
.log
.debug("Return code: {}".format(return_code
))
1522 if raise_exception_on_error
and return_code
!= 0:
1523 raise K8sException(output
)
1526 output
= output
.encode("utf-8").strip()
1527 output
= str(output
).replace("\\n", "\n")
1529 return output
, return_code
1530 except asyncio
.CancelledError
:
1532 except K8sException
:
1534 except Exception as e
:
1535 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1537 if raise_exception_on_error
:
1538 raise K8sException(e
) from e
1542 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1544 Obtains the data of the specified service in the k8cluster.
1546 :param cluster_id: id of a K8s cluster known by OSM
1547 :param service_name: name of the K8s service in the specified namespace
1548 :param namespace: K8s namespace used by the KDU instance
1549 :return: If successful, it will return a service with the following data:
1550 - `name` of the service
1551 - `type` type of service in the k8 cluster
1552 - `ports` List of ports offered by the service, for each port includes at least
1553 name, port, protocol
1554 - `cluster_ip` Internal ip to be used inside k8s cluster
1555 - `external_ip` List of external ips (in case they are available)
1559 paths
, env
= self
._init
_paths
_env
(
1560 cluster_name
=cluster_id
, create_if_not_exist
=True
1563 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1564 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1567 output
, _rc
= await self
._local
_async
_exec
(
1568 command
=command
, raise_exception_on_error
=True, env
=env
1571 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1574 "name": service_name
,
1575 "type": self
._get
_deep
(data
, ("spec", "type")),
1576 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1577 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1579 if service
["type"] == "LoadBalancer":
1580 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1581 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1582 service
["external_ip"] = ip_list
1586 async def _exec_get_command(
1587 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1589 """Obtains information about the kdu instance."""
1591 full_command
= self
._get
_get
_command
(
1592 get_command
, kdu_instance
, namespace
, kubeconfig
1595 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1599 async def _exec_inspect_command(
1600 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1602 """Obtains information about a kdu, no cluster (no env)."""
1606 repo_str
= " --repo {}".format(repo_url
)
1608 idx
= kdu_model
.find("/")
1611 kdu_model
= kdu_model
[idx
:]
1613 kdu_model
, version
= self
._split
_version
(kdu_model
)
1615 version_str
= "--version {}".format(version
)
1619 full_command
= self
._get
_inspect
_command
(
1620 inspect_command
, kdu_model
, repo_str
, version_str
1623 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1627 async def _get_replica_count_url(
1631 resource_name
: str = None,
1633 """Get the replica count value in the Helm Chart Values.
1636 kdu_model: The name or path of a bundle
1637 repo_url: Helm Chart repository url
1638 resource_name: Resource name
1641 True if replicas, False replicaCount
1644 kdu_values
= yaml
.load(
1645 await self
.values_kdu(kdu_model
, repo_url
), Loader
=yaml
.SafeLoader
1650 "kdu_values not found for kdu_model {}".format(kdu_model
)
1654 kdu_values
= kdu_values
.get(resource_name
, None)
1657 msg
= "resource {} not found in the values in model {}".format(
1658 resource_name
, kdu_model
1661 raise K8sException(msg
)
1663 duplicate_check
= False
1668 if kdu_values
.get("replicaCount", None):
1669 replicas
= kdu_values
["replicaCount"]
1670 replica_str
= "replicaCount"
1671 elif kdu_values
.get("replicas", None):
1672 duplicate_check
= True
1673 replicas
= kdu_values
["replicas"]
1674 replica_str
= "replicas"
1678 "replicaCount or replicas not found in the resource"
1679 "{} values in model {}. Cannot be scaled".format(
1680 resource_name
, kdu_model
1685 "replicaCount or replicas not found in the values"
1686 "in model {}. Cannot be scaled".format(kdu_model
)
1689 raise K8sException(msg
)
1691 # Control if replicas and replicaCount exists at the same time
1692 msg
= "replicaCount and replicas are exists at the same time"
1694 if "replicaCount" in kdu_values
:
1696 raise K8sException(msg
)
1698 if "replicas" in kdu_values
:
1700 raise K8sException(msg
)
1702 return replicas
, replica_str
1704 async def _get_replica_count_instance(
1709 resource_name
: str = None,
1711 """Get the replica count value in the instance.
1714 kdu_instance: The name of the KDU instance
1715 namespace: KDU instance namespace
1717 resource_name: Resource name
1720 True if replicas, False replicaCount
1723 kdu_values
= yaml
.load(
1724 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1725 Loader
=yaml
.SafeLoader
,
1732 kdu_values
.get(resource_name
, None) if resource_name
else None
1736 resource_values
.get("replicaCount", None)
1737 or resource_values
.get("replicas", None)
1741 kdu_values
.get("replicaCount", None)
1742 or kdu_values
.get("replicas", None)
1748 async def _store_status(
1753 namespace
: str = None,
1754 check_every
: float = 10,
1755 db_dict
: dict = None,
1756 run_once
: bool = False,
1760 await asyncio
.sleep(check_every
)
1761 detailed_status
= await self
._status
_kdu
(
1762 cluster_id
=cluster_id
,
1763 kdu_instance
=kdu_instance
,
1764 namespace
=namespace
,
1767 status
= detailed_status
.get("info").get("description")
1768 self
.log
.debug("KDU {} STATUS: {}.".format(kdu_instance
, status
))
1769 # write status to db
1770 result
= await self
.write_app_status_to_db(
1773 detailed_status
=str(detailed_status
),
1774 operation
=operation
,
1777 self
.log
.info("Error writing in database. Task exiting...")
1779 except asyncio
.CancelledError
:
1780 self
.log
.debug("Task cancelled")
1782 except Exception as e
:
1784 "_store_status exception: {}".format(str(e
)), exc_info
=True
1791 # params for use in -f file
1792 # returns values file option and filename (in order to delete it at the end)
1793 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1795 if params
and len(params
) > 0:
1796 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1798 def get_random_number():
1799 r
= random
.randrange(start
=1, stop
=99999999)
1807 value
= params
.get(key
)
1808 if "!!yaml" in str(value
):
1809 value
= yaml
.load(value
[7:])
1810 params2
[key
] = value
1812 values_file
= get_random_number() + ".yaml"
1813 with
open(values_file
, "w") as stream
:
1814 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1816 return "-f {}".format(values_file
), values_file
1820 # params for use in --set option
1822 def _params_to_set_option(params
: dict) -> str:
1824 if params
and len(params
) > 0:
1827 value
= params
.get(key
, None)
1828 if value
is not None:
1830 params_str
+= "--set "
1834 params_str
+= "{}={}".format(key
, value
)
1838 def generate_kdu_instance_name(**kwargs
):
1839 chart_name
= kwargs
["kdu_model"]
1840 # check embeded chart (file or dir)
1841 if chart_name
.startswith("/"):
1842 # extract file or directory name
1843 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1845 elif "://" in chart_name
:
1846 # extract last portion of URL
1847 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1850 for c
in chart_name
:
1851 if c
.isalpha() or c
.isnumeric():
1858 # if does not start with alpha character, prefix 'a'
1859 if not name
[0].isalpha():
1864 def get_random_number():
1865 r
= random
.randrange(start
=1, stop
=99999999)
1867 s
= s
.rjust(10, "0")
1870 name
= name
+ get_random_number()
1873 def _split_version(self
, kdu_model
: str) -> (str, str):
1875 if ":" in kdu_model
:
1876 parts
= kdu_model
.split(sep
=":")
1878 version
= str(parts
[1])
1879 kdu_model
= parts
[0]
1880 return kdu_model
, version
1882 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
1884 idx
= kdu_model
.find("/")
1886 repo_name
= kdu_model
[:idx
]
1887 # Find repository link
1888 local_repo_list
= await self
.repo_list(cluster_uuid
)
1889 for repo
in local_repo_list
:
1890 repo_url
= repo
["url"] if repo
["name"] == repo_name
else None