2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
39 class K8sHelmBaseConnector(K8sConnector
):
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
47 service_account
= "osm"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 self
.config
= EnvironConfig()
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
90 if self
._stable
_repo
_url
== "None":
91 self
._stable
_repo
_url
= None
93 def _get_namespace(self
, cluster_uuid
: str) -> str:
95 Obtains the namespace used by the cluster with the uuid passed by argument
97 param: cluster_uuid: cluster's uuid
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster
= self
.db
.get_one(
102 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
104 return k8scluster
.get("namespace")
109 namespace
: str = "kube-system",
110 reuse_cluster_uuid
=None,
114 It prepares a given K8s cluster environment to run Charts
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
121 :param kwargs: Additional parameters (None yet)
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
127 if reuse_cluster_uuid
:
128 cluster_id
= reuse_cluster_uuid
130 cluster_id
= str(uuid4())
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
136 paths
, env
= self
._init
_paths
_env
(
137 cluster_name
=cluster_id
, create_if_not_exist
=True
139 mode
= stat
.S_IRUSR | stat
.S_IWUSR
140 with
open(paths
["kube_config"], "w", mode
) as f
:
142 os
.chmod(paths
["kube_config"], 0o600)
144 # Code with initialization specific of helm version
145 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
147 # sync fs with local data
148 self
.fs
.reverse_sync(from_path
=cluster_id
)
150 self
.log
.info("Cluster {} initialized".format(cluster_id
))
152 return cluster_id
, n2vc_installed_sw
159 repo_type
: str = "chart",
162 password
: str = None,
165 "Cluster {}, adding {} repository {}. URL: {}".format(
166 cluster_uuid
, repo_type
, name
, url
171 paths
, env
= self
._init
_paths
_env
(
172 cluster_name
=cluster_uuid
, create_if_not_exist
=True
176 self
.fs
.sync(from_path
=cluster_uuid
)
178 # helm repo add name url
179 command
= ("env KUBECONFIG={} {} repo add {} {}").format(
180 paths
["kube_config"], self
._helm
_command
, name
, url
184 temp_cert_file
= os
.path
.join(
185 self
.fs
.path
, "{}/helmcerts/".format(cluster_uuid
), "temp.crt"
187 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
188 with
open(temp_cert_file
, "w") as the_cert
:
190 command
+= " --ca-file {}".format(temp_cert_file
)
193 command
+= " --username={}".format(user
)
196 command
+= " --password={}".format(password
)
198 self
.log
.debug("adding repo: {}".format(command
))
199 await self
._local
_async
_exec
(
200 command
=command
, raise_exception_on_error
=True, env
=env
204 command
= "env KUBECONFIG={} {} repo update {}".format(
205 paths
["kube_config"], self
._helm
_command
, name
207 self
.log
.debug("updating repo: {}".format(command
))
208 await self
._local
_async
_exec
(
209 command
=command
, raise_exception_on_error
=False, env
=env
213 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
215 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
217 "Cluster {}, updating {} repository {}".format(
218 cluster_uuid
, repo_type
, name
223 paths
, env
= self
._init
_paths
_env
(
224 cluster_name
=cluster_uuid
, create_if_not_exist
=True
228 self
.fs
.sync(from_path
=cluster_uuid
)
231 command
= "{} repo update {}".format(self
._helm
_command
, name
)
232 self
.log
.debug("updating repo: {}".format(command
))
233 await self
._local
_async
_exec
(
234 command
=command
, raise_exception_on_error
=False, env
=env
238 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
240 async def repo_list(self
, cluster_uuid
: str) -> list:
242 Get the list of registered repositories
244 :return: list of registered repositories: [ (name, url) .... ]
247 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
250 paths
, env
= self
._init
_paths
_env
(
251 cluster_name
=cluster_uuid
, create_if_not_exist
=True
255 self
.fs
.sync(from_path
=cluster_uuid
)
257 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
258 paths
["kube_config"], self
._helm
_command
261 # Set exception to false because if there are no repos just want an empty list
262 output
, _rc
= await self
._local
_async
_exec
(
263 command
=command
, raise_exception_on_error
=False, env
=env
267 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
270 if output
and len(output
) > 0:
271 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
272 # unify format between helm2 and helm3 setting all keys lowercase
273 return self
._lower
_keys
_list
(repos
)
279 async def repo_remove(self
, cluster_uuid
: str, name
: str):
281 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
285 paths
, env
= self
._init
_paths
_env
(
286 cluster_name
=cluster_uuid
, create_if_not_exist
=True
290 self
.fs
.sync(from_path
=cluster_uuid
)
292 command
= "env KUBECONFIG={} {} repo remove {}".format(
293 paths
["kube_config"], self
._helm
_command
, name
295 await self
._local
_async
_exec
(
296 command
=command
, raise_exception_on_error
=True, env
=env
300 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
306 uninstall_sw
: bool = False,
311 Resets the Kubernetes cluster by removing the helm deployment that represents it.
313 :param cluster_uuid: The UUID of the cluster to reset
314 :param force: Boolean to force the reset
315 :param uninstall_sw: Boolean to force the reset
316 :param kwargs: Additional parameters (None yet)
317 :return: Returns True if successful or raises an exception.
319 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
321 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
322 cluster_uuid
, uninstall_sw
327 self
.fs
.sync(from_path
=cluster_uuid
)
329 # uninstall releases if needed.
331 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
332 if len(releases
) > 0:
336 kdu_instance
= r
.get("name")
337 chart
= r
.get("chart")
339 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
341 await self
.uninstall(
342 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
344 except Exception as e
:
345 # will not raise exception as it was found
346 # that in some cases of previously installed helm releases it
349 "Error uninstalling release {}: {}".format(
355 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
356 ).format(cluster_uuid
)
359 False # Allow to remove k8s cluster without removing Tiller
363 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
365 # delete cluster directory
366 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
367 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
368 # Remove also local directorio if still exist
369 direct
= self
.fs
.path
+ "/" + cluster_uuid
370 shutil
.rmtree(direct
, ignore_errors
=True)
374 async def _install_impl(
382 timeout
: float = 300,
384 db_dict
: dict = None,
385 kdu_name
: str = None,
386 namespace
: str = None,
389 paths
, env
= self
._init
_paths
_env
(
390 cluster_name
=cluster_id
, create_if_not_exist
=True
394 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
395 cluster_id
=cluster_id
, params
=params
399 kdu_model
, version
= self
._split
_version
(kdu_model
)
401 repo
= self
._split
_repo
(kdu_model
)
403 self
.repo_update(cluster_id
, repo
)
405 command
= self
._get
_install
_command
(
413 paths
["kube_config"],
416 self
.log
.debug("installing: {}".format(command
))
419 # exec helm in a task
420 exec_task
= asyncio
.ensure_future(
421 coro_or_future
=self
._local
_async
_exec
(
422 command
=command
, raise_exception_on_error
=False, env
=env
426 # write status in another task
427 status_task
= asyncio
.ensure_future(
428 coro_or_future
=self
._store
_status
(
429 cluster_id
=cluster_id
,
430 kdu_instance
=kdu_instance
,
438 # wait for execution task
439 await asyncio
.wait([exec_task
])
444 output
, rc
= exec_task
.result()
448 output
, rc
= await self
._local
_async
_exec
(
449 command
=command
, raise_exception_on_error
=False, env
=env
452 # remove temporal values yaml file
454 os
.remove(file_to_delete
)
457 await self
._store
_status
(
458 cluster_id
=cluster_id
,
459 kdu_instance
=kdu_instance
,
468 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
470 raise K8sException(msg
)
476 kdu_model
: str = None,
478 timeout
: float = 300,
480 db_dict
: dict = None,
482 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
485 self
.fs
.sync(from_path
=cluster_uuid
)
487 # look for instance to obtain namespace
488 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
489 if not instance_info
:
490 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
493 paths
, env
= self
._init
_paths
_env
(
494 cluster_name
=cluster_uuid
, create_if_not_exist
=True
498 self
.fs
.sync(from_path
=cluster_uuid
)
501 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
502 cluster_id
=cluster_uuid
, params
=params
506 kdu_model
, version
= self
._split
_version
(kdu_model
)
508 repo
= self
._split
_repo
(kdu_model
)
510 self
.repo_update(cluster_uuid
, repo
)
512 command
= self
._get
_upgrade
_command
(
515 instance_info
["namespace"],
520 paths
["kube_config"],
523 self
.log
.debug("upgrading: {}".format(command
))
527 # exec helm in a task
528 exec_task
= asyncio
.ensure_future(
529 coro_or_future
=self
._local
_async
_exec
(
530 command
=command
, raise_exception_on_error
=False, env
=env
533 # write status in another task
534 status_task
= asyncio
.ensure_future(
535 coro_or_future
=self
._store
_status
(
536 cluster_id
=cluster_uuid
,
537 kdu_instance
=kdu_instance
,
538 namespace
=instance_info
["namespace"],
545 # wait for execution task
546 await asyncio
.wait([exec_task
])
550 output
, rc
= exec_task
.result()
554 output
, rc
= await self
._local
_async
_exec
(
555 command
=command
, raise_exception_on_error
=False, env
=env
558 # remove temporal values yaml file
560 os
.remove(file_to_delete
)
563 await self
._store
_status
(
564 cluster_id
=cluster_uuid
,
565 kdu_instance
=kdu_instance
,
566 namespace
=instance_info
["namespace"],
574 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
576 raise K8sException(msg
)
579 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
581 # return new revision number
582 instance
= await self
.get_instance_info(
583 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
586 revision
= int(instance
.get("revision"))
587 self
.log
.debug("New revision: {}".format(revision
))
597 total_timeout
: float = 1800,
598 cluster_uuid
: str = None,
599 kdu_model
: str = None,
601 db_dict
: dict = None,
604 """Scale a resource in a Helm Chart.
607 kdu_instance: KDU instance name
608 scale: Scale to which to set the resource
609 resource_name: Resource name
610 total_timeout: The time, in seconds, to wait
611 cluster_uuid: The UUID of the cluster
612 kdu_model: The chart reference
613 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
614 The --wait flag will be set automatically if --atomic is used
615 db_dict: Dictionary for any additional data
616 kwargs: Additional parameters
619 True if successful, False otherwise
622 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
624 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
625 resource_name
, kdu_model
, cluster_uuid
628 self
.log
.debug(debug_mgs
)
630 # look for instance to obtain namespace
631 # get_instance_info function calls the sync command
632 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
633 if not instance_info
:
634 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
637 paths
, env
= self
._init
_paths
_env
(
638 cluster_name
=cluster_uuid
, create_if_not_exist
=True
642 kdu_model
, version
= self
._split
_version
(kdu_model
)
644 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
647 "Repository not found for kdu_model {}".format(kdu_model
)
650 _
, replica_str
= await self
._get
_replica
_count
_url
(
651 kdu_model
, repo_url
, resource_name
654 command
= self
._get
_upgrade
_scale
_command
(
657 instance_info
["namespace"],
664 paths
["kube_config"],
667 self
.log
.debug("scaling: {}".format(command
))
670 # exec helm in a task
671 exec_task
= asyncio
.ensure_future(
672 coro_or_future
=self
._local
_async
_exec
(
673 command
=command
, raise_exception_on_error
=False, env
=env
676 # write status in another task
677 status_task
= asyncio
.ensure_future(
678 coro_or_future
=self
._store
_status
(
679 cluster_id
=cluster_uuid
,
680 kdu_instance
=kdu_instance
,
681 namespace
=instance_info
["namespace"],
688 # wait for execution task
689 await asyncio
.wait([exec_task
])
693 output
, rc
= exec_task
.result()
696 output
, rc
= await self
._local
_async
_exec
(
697 command
=command
, raise_exception_on_error
=False, env
=env
701 await self
._store
_status
(
702 cluster_id
=cluster_uuid
,
703 kdu_instance
=kdu_instance
,
704 namespace
=instance_info
["namespace"],
712 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
714 raise K8sException(msg
)
717 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
721 async def get_scale_count(
729 """Get a resource scale count.
732 cluster_uuid: The UUID of the cluster
733 resource_name: Resource name
734 kdu_instance: KDU instance name
735 kdu_model: The name or path of a bundle
736 kwargs: Additional parameters
739 Resource instance count
743 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
746 # look for instance to obtain namespace
747 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
748 if not instance_info
:
749 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
752 paths
, env
= self
._init
_paths
_env
(
753 cluster_name
=cluster_uuid
, create_if_not_exist
=True
756 replicas
= await self
._get
_replica
_count
_instance
(
757 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
760 # Get default value if scale count is not found from provided values
762 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
765 "Repository not found for kdu_model {}".format(kdu_model
)
768 replicas
, _
= await self
._get
_replica
_count
_url
(
769 kdu_model
, repo_url
, resource_name
773 msg
= "Replica count not found. Cannot be scaled"
775 raise K8sException(msg
)
780 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
783 "rollback kdu_instance {} to revision {} from cluster {}".format(
784 kdu_instance
, revision
, cluster_uuid
789 self
.fs
.sync(from_path
=cluster_uuid
)
791 # look for instance to obtain namespace
792 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
793 if not instance_info
:
794 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
797 paths
, env
= self
._init
_paths
_env
(
798 cluster_name
=cluster_uuid
, create_if_not_exist
=True
802 self
.fs
.sync(from_path
=cluster_uuid
)
804 command
= self
._get
_rollback
_command
(
805 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
808 self
.log
.debug("rolling_back: {}".format(command
))
810 # exec helm in a task
811 exec_task
= asyncio
.ensure_future(
812 coro_or_future
=self
._local
_async
_exec
(
813 command
=command
, raise_exception_on_error
=False, env
=env
816 # write status in another task
817 status_task
= asyncio
.ensure_future(
818 coro_or_future
=self
._store
_status
(
819 cluster_id
=cluster_uuid
,
820 kdu_instance
=kdu_instance
,
821 namespace
=instance_info
["namespace"],
823 operation
="rollback",
828 # wait for execution task
829 await asyncio
.wait([exec_task
])
834 output
, rc
= exec_task
.result()
837 await self
._store
_status
(
838 cluster_id
=cluster_uuid
,
839 kdu_instance
=kdu_instance
,
840 namespace
=instance_info
["namespace"],
842 operation
="rollback",
848 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
850 raise K8sException(msg
)
853 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
855 # return new revision number
856 instance
= await self
.get_instance_info(
857 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
860 revision
= int(instance
.get("revision"))
861 self
.log
.debug("New revision: {}".format(revision
))
866 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
868 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
869 (this call should happen after all _terminate-config-primitive_ of the VNF
872 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
873 :param kdu_instance: unique name for the KDU instance to be deleted
874 :param kwargs: Additional parameters (None yet)
875 :return: True if successful
879 "uninstall kdu_instance {} from cluster {}".format(
880 kdu_instance
, cluster_uuid
885 self
.fs
.sync(from_path
=cluster_uuid
)
887 # look for instance to obtain namespace
888 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
889 if not instance_info
:
890 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
893 paths
, env
= self
._init
_paths
_env
(
894 cluster_name
=cluster_uuid
, create_if_not_exist
=True
898 self
.fs
.sync(from_path
=cluster_uuid
)
900 command
= self
._get
_uninstall
_command
(
901 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
903 output
, _rc
= await self
._local
_async
_exec
(
904 command
=command
, raise_exception_on_error
=True, env
=env
908 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
910 return self
._output
_to
_table
(output
)
912 async def instances_list(self
, cluster_uuid
: str) -> list:
914 returns a list of deployed releases in a cluster
916 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
920 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
923 self
.fs
.sync(from_path
=cluster_uuid
)
925 # execute internal command
926 result
= await self
._instances
_list
(cluster_uuid
)
929 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
933 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
934 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
935 for instance
in instances
:
936 if instance
.get("name") == kdu_instance
:
938 self
.log
.debug("Instance {} not found".format(kdu_instance
))
941 async def exec_primitive(
943 cluster_uuid
: str = None,
944 kdu_instance
: str = None,
945 primitive_name
: str = None,
946 timeout
: float = 300,
948 db_dict
: dict = None,
951 """Exec primitive (Juju action)
953 :param cluster_uuid: The UUID of the cluster or namespace:cluster
954 :param kdu_instance: The unique name of the KDU instance
955 :param primitive_name: Name of action that will be executed
956 :param timeout: Timeout for action execution
957 :param params: Dictionary of all the parameters needed for the action
958 :db_dict: Dictionary for any additional data
959 :param kwargs: Additional parameters (None yet)
961 :return: Returns the output of the action
964 "KDUs deployed with Helm don't support actions "
965 "different from rollback, upgrade and status"
968 async def get_services(
969 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
972 Returns a list of services defined for the specified kdu instance.
974 :param cluster_uuid: UUID of a K8s cluster known by OSM
975 :param kdu_instance: unique name for the KDU instance
976 :param namespace: K8s namespace used by the KDU instance
977 :return: If successful, it will return a list of services, Each service
978 can have the following data:
979 - `name` of the service
980 - `type` type of service in the k8 cluster
981 - `ports` List of ports offered by the service, for each port includes at least
983 - `cluster_ip` Internal ip to be used inside k8s cluster
984 - `external_ip` List of external ips (in case they are available)
988 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
989 cluster_uuid
, kdu_instance
994 paths
, env
= self
._init
_paths
_env
(
995 cluster_name
=cluster_uuid
, create_if_not_exist
=True
999 self
.fs
.sync(from_path
=cluster_uuid
)
1001 # get list of services names for kdu
1002 service_names
= await self
._get
_services
(
1003 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
1007 for service
in service_names
:
1008 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
1009 service_list
.append(service
)
1012 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1016 async def get_service(
1017 self
, cluster_uuid
: str, service_name
: str, namespace
: str
1021 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1022 service_name
, namespace
, cluster_uuid
1027 self
.fs
.sync(from_path
=cluster_uuid
)
1029 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
1032 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1036 async def status_kdu(
1037 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
1038 ) -> Union
[str, dict]:
1040 This call would retrieve tha current state of a given KDU instance. It would be
1041 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1042 values_ of the configuration parameters applied to a given instance. This call
1043 would be based on the `status` call.
1045 :param cluster_uuid: UUID of a K8s cluster known by OSM
1046 :param kdu_instance: unique name for the KDU instance
1047 :param kwargs: Additional parameters (None yet)
1048 :param yaml_format: if the return shall be returned as an YAML string or as a
1050 :return: If successful, it will return the following vector of arguments:
1051 - K8s `namespace` in the cluster where the KDU lives
1052 - `state` of the KDU instance. It can be:
1059 - List of `resources` (objects) that this release consists of, sorted by kind,
1060 and the status of those resources
1061 - Last `deployment_time`.
1065 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1066 cluster_uuid
, kdu_instance
1071 self
.fs
.sync(from_path
=cluster_uuid
)
1073 # get instance: needed to obtain namespace
1074 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1075 for instance
in instances
:
1076 if instance
.get("name") == kdu_instance
:
1079 # instance does not exist
1081 "Instance name: {} not found in cluster: {}".format(
1082 kdu_instance
, cluster_uuid
1086 status
= await self
._status
_kdu
(
1087 cluster_id
=cluster_uuid
,
1088 kdu_instance
=kdu_instance
,
1089 namespace
=instance
["namespace"],
1090 yaml_format
=yaml_format
,
1091 show_error_log
=True,
1095 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1099 async def get_values_kdu(
1100 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1103 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1105 return await self
._exec
_get
_command
(
1106 get_command
="values",
1107 kdu_instance
=kdu_instance
,
1108 namespace
=namespace
,
1109 kubeconfig
=kubeconfig
,
1112 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1115 "inspect kdu_model values {} from (optional) repo: {}".format(
1120 return await self
._exec
_inspect
_command
(
1121 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1124 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1127 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1130 return await self
._exec
_inspect
_command
(
1131 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1134 async def synchronize_repos(self
, cluster_uuid
: str):
1136 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1138 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1139 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1141 local_repo_list
= await self
.repo_list(cluster_uuid
)
1142 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1144 deleted_repo_list
= []
1145 added_repo_dict
= {}
1147 # iterate over the list of repos in the database that should be
1148 # added if not present
1149 for repo_name
, db_repo
in db_repo_dict
.items():
1151 # check if it is already present
1152 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1153 repo_id
= db_repo
.get("_id")
1154 if curr_repo_url
!= db_repo
["url"]:
1157 "repo {} url changed, delete and and again".format(
1161 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1162 deleted_repo_list
.append(repo_id
)
1165 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1166 if "ca_cert" in db_repo
:
1167 await self
.repo_add(
1171 cert
=db_repo
["ca_cert"],
1174 await self
.repo_add(
1179 added_repo_dict
[repo_id
] = db_repo
["name"]
1180 except Exception as e
:
1182 "Error adding repo id: {}, err_msg: {} ".format(
1187 # Delete repos that are present but not in nbi_list
1188 for repo_name
in local_repo_dict
:
1189 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1190 self
.log
.debug("delete repo {}".format(repo_name
))
1192 await self
.repo_remove(cluster_uuid
, repo_name
)
1193 deleted_repo_list
.append(repo_name
)
1194 except Exception as e
:
1196 "Error deleting repo, name: {}, err_msg: {}".format(
1201 return deleted_repo_list
, added_repo_dict
1203 except K8sException
:
1205 except Exception as e
:
1206 # Do not raise errors synchronizing repos
1207 self
.log
.error("Error synchronizing repos: {}".format(e
))
1208 raise Exception("Error synchronizing repos: {}".format(e
))
1210 def _get_db_repos_dict(self
, repo_ids
: list):
1212 for repo_id
in repo_ids
:
1213 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1214 db_repos_dict
[db_repo
["name"]] = db_repo
1215 return db_repos_dict
1218 ####################################################################################
1219 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1220 ####################################################################################
1224 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1226 Creates and returns base cluster and kube dirs and returns them.
1227 Also created helm3 dirs according to new directory specification, paths are
1228 not returned but assigned to helm environment variables
1230 :param cluster_name: cluster_name
1231 :return: Dictionary with config_paths and dictionary with helm environment variables
1235 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1237 Implements the helm version dependent cluster initialization
1241 async def _instances_list(self
, cluster_id
):
1243 Implements the helm version dependent helm instances list
1247 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1249 Implements the helm version dependent method to obtain services from a helm instance
1253 async def _status_kdu(
1257 namespace
: str = None,
1258 yaml_format
: bool = False,
1259 show_error_log
: bool = False,
1260 ) -> Union
[str, dict]:
1262 Implements the helm version dependent method to obtain status of a helm instance
1266 def _get_install_command(
1278 Obtain command to be executed to delete the indicated instance
1282 def _get_upgrade_scale_command(
1295 """Obtain command to be executed to upgrade the indicated instance."""
1298 def _get_upgrade_command(
1310 Obtain command to be executed to upgrade the indicated instance
1314 def _get_rollback_command(
1315 self
, kdu_instance
, namespace
, revision
, kubeconfig
1318 Obtain command to be executed to rollback the indicated instance
1322 def _get_uninstall_command(
1323 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1326 Obtain command to be executed to delete the indicated instance
1330 def _get_inspect_command(
1331 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1334 Obtain command to be executed to obtain information about the kdu
1338 def _get_get_command(
1339 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1341 """Obtain command to be executed to get information about the kdu instance."""
1344 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1346 Method call to uninstall cluster software for helm. This method is dependent
1348 For Helm v2 it will be called when Tiller must be uninstalled
1349 For Helm v3 it does nothing and does not need to be callled
1353 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1355 Obtains the cluster repos identifiers
1359 ####################################################################################
1360 ################################### P R I V A T E ##################################
1361 ####################################################################################
1365 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1366 if os
.path
.exists(filename
):
1369 msg
= "File {} does not exist".format(filename
)
1370 if exception_if_not_exists
:
1371 raise K8sException(msg
)
1374 def _remove_multiple_spaces(strobj
):
1375 strobj
= strobj
.strip()
1376 while " " in strobj
:
1377 strobj
= strobj
.replace(" ", " ")
1381 def _output_to_lines(output
: str) -> list:
1382 output_lines
= list()
1383 lines
= output
.splitlines(keepends
=False)
1387 output_lines
.append(line
)
1391 def _output_to_table(output
: str) -> list:
1392 output_table
= list()
1393 lines
= output
.splitlines(keepends
=False)
1395 line
= line
.replace("\t", " ")
1397 output_table
.append(line_list
)
1398 cells
= line
.split(sep
=" ")
1402 line_list
.append(cell
)
1406 def _parse_services(output
: str) -> list:
1407 lines
= output
.splitlines(keepends
=False)
1410 line
= line
.replace("\t", " ")
1411 cells
= line
.split(sep
=" ")
1412 if len(cells
) > 0 and cells
[0].startswith("service/"):
1413 elems
= cells
[0].split(sep
="/")
1415 services
.append(elems
[1])
1419 def _get_deep(dictionary
: dict, members
: tuple):
1424 value
= target
.get(m
)
1433 # find key:value in several lines
1435 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1436 for line
in p_lines
:
1438 if line
.startswith(p_key
+ ":"):
1439 parts
= line
.split(":")
1440 the_value
= parts
[1].strip()
1448 def _lower_keys_list(input_list
: list):
1450 Transform the keys in a list of dictionaries to lower case and returns a new list
1455 for dictionary
in input_list
:
1456 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1457 new_list
.append(new_dict
)
1460 async def _local_async_exec(
1463 raise_exception_on_error
: bool = False,
1464 show_error_log
: bool = True,
1465 encode_utf8
: bool = False,
1469 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1471 "Executing async local command: {}, env: {}".format(command
, env
)
1475 command
= shlex
.split(command
)
1477 environ
= os
.environ
.copy()
1482 process
= await asyncio
.create_subprocess_exec(
1484 stdout
=asyncio
.subprocess
.PIPE
,
1485 stderr
=asyncio
.subprocess
.PIPE
,
1489 # wait for command terminate
1490 stdout
, stderr
= await process
.communicate()
1492 return_code
= process
.returncode
1496 output
= stdout
.decode("utf-8").strip()
1497 # output = stdout.decode()
1499 output
= stderr
.decode("utf-8").strip()
1500 # output = stderr.decode()
1502 if return_code
!= 0 and show_error_log
:
1504 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1507 self
.log
.debug("Return code: {}".format(return_code
))
1509 if raise_exception_on_error
and return_code
!= 0:
1510 raise K8sException(output
)
1513 output
= output
.encode("utf-8").strip()
1514 output
= str(output
).replace("\\n", "\n")
1516 return output
, return_code
1518 except asyncio
.CancelledError
:
1520 except K8sException
:
1522 except Exception as e
:
1523 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1525 if raise_exception_on_error
:
1526 raise K8sException(e
) from e
1530 async def _local_async_exec_pipe(
1534 raise_exception_on_error
: bool = True,
1535 show_error_log
: bool = True,
1536 encode_utf8
: bool = False,
1540 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1541 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1542 command
= "{} | {}".format(command1
, command2
)
1544 "Executing async local command: {}, env: {}".format(command
, env
)
1548 command1
= shlex
.split(command1
)
1549 command2
= shlex
.split(command2
)
1551 environ
= os
.environ
.copy()
1556 read
, write
= os
.pipe()
1557 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1559 process_2
= await asyncio
.create_subprocess_exec(
1560 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1563 stdout
, stderr
= await process_2
.communicate()
1565 return_code
= process_2
.returncode
1569 output
= stdout
.decode("utf-8").strip()
1570 # output = stdout.decode()
1572 output
= stderr
.decode("utf-8").strip()
1573 # output = stderr.decode()
1575 if return_code
!= 0 and show_error_log
:
1577 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1580 self
.log
.debug("Return code: {}".format(return_code
))
1582 if raise_exception_on_error
and return_code
!= 0:
1583 raise K8sException(output
)
1586 output
= output
.encode("utf-8").strip()
1587 output
= str(output
).replace("\\n", "\n")
1589 return output
, return_code
1590 except asyncio
.CancelledError
:
1592 except K8sException
:
1594 except Exception as e
:
1595 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1597 if raise_exception_on_error
:
1598 raise K8sException(e
) from e
1602 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1604 Obtains the data of the specified service in the k8cluster.
1606 :param cluster_id: id of a K8s cluster known by OSM
1607 :param service_name: name of the K8s service in the specified namespace
1608 :param namespace: K8s namespace used by the KDU instance
1609 :return: If successful, it will return a service with the following data:
1610 - `name` of the service
1611 - `type` type of service in the k8 cluster
1612 - `ports` List of ports offered by the service, for each port includes at least
1613 name, port, protocol
1614 - `cluster_ip` Internal ip to be used inside k8s cluster
1615 - `external_ip` List of external ips (in case they are available)
1619 paths
, env
= self
._init
_paths
_env
(
1620 cluster_name
=cluster_id
, create_if_not_exist
=True
1623 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1624 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1627 output
, _rc
= await self
._local
_async
_exec
(
1628 command
=command
, raise_exception_on_error
=True, env
=env
1631 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1634 "name": service_name
,
1635 "type": self
._get
_deep
(data
, ("spec", "type")),
1636 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1637 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1639 if service
["type"] == "LoadBalancer":
1640 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1641 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1642 service
["external_ip"] = ip_list
1646 async def _exec_get_command(
1647 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1649 """Obtains information about the kdu instance."""
1651 full_command
= self
._get
_get
_command
(
1652 get_command
, kdu_instance
, namespace
, kubeconfig
1655 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1659 async def _exec_inspect_command(
1660 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1662 """Obtains information about a kdu, no cluster (no env)."""
1666 repo_str
= " --repo {}".format(repo_url
)
1668 idx
= kdu_model
.find("/")
1671 kdu_model
= kdu_model
[idx
:]
1673 kdu_model
, version
= self
._split
_version
(kdu_model
)
1675 version_str
= "--version {}".format(version
)
1679 full_command
= self
._get
_inspect
_command
(
1680 inspect_command
, kdu_model
, repo_str
, version_str
1683 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1687 async def _get_replica_count_url(
1691 resource_name
: str = None,
1693 """Get the replica count value in the Helm Chart Values.
1696 kdu_model: The name or path of a bundle
1697 repo_url: Helm Chart repository url
1698 resource_name: Resource name
1701 True if replicas, False replicaCount
1704 kdu_values
= yaml
.load(
1705 await self
.values_kdu(kdu_model
, repo_url
), Loader
=yaml
.SafeLoader
1710 "kdu_values not found for kdu_model {}".format(kdu_model
)
1714 kdu_values
= kdu_values
.get(resource_name
, None)
1717 msg
= "resource {} not found in the values in model {}".format(
1718 resource_name
, kdu_model
1721 raise K8sException(msg
)
1723 duplicate_check
= False
1728 if kdu_values
.get("replicaCount", None):
1729 replicas
= kdu_values
["replicaCount"]
1730 replica_str
= "replicaCount"
1731 elif kdu_values
.get("replicas", None):
1732 duplicate_check
= True
1733 replicas
= kdu_values
["replicas"]
1734 replica_str
= "replicas"
1738 "replicaCount or replicas not found in the resource"
1739 "{} values in model {}. Cannot be scaled".format(
1740 resource_name
, kdu_model
1745 "replicaCount or replicas not found in the values"
1746 "in model {}. Cannot be scaled".format(kdu_model
)
1749 raise K8sException(msg
)
1751 # Control if replicas and replicaCount exists at the same time
1752 msg
= "replicaCount and replicas are exists at the same time"
1754 if "replicaCount" in kdu_values
:
1756 raise K8sException(msg
)
1758 if "replicas" in kdu_values
:
1760 raise K8sException(msg
)
1762 return replicas
, replica_str
1764 async def _get_replica_count_instance(
1769 resource_name
: str = None,
1771 """Get the replica count value in the instance.
1774 kdu_instance: The name of the KDU instance
1775 namespace: KDU instance namespace
1777 resource_name: Resource name
1780 True if replicas, False replicaCount
1783 kdu_values
= yaml
.load(
1784 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1785 Loader
=yaml
.SafeLoader
,
1792 kdu_values
.get(resource_name
, None) if resource_name
else None
1796 resource_values
.get("replicaCount", None)
1797 or resource_values
.get("replicas", None)
1801 kdu_values
.get("replicaCount", None)
1802 or kdu_values
.get("replicas", None)
1808 async def _store_status(
1813 namespace
: str = None,
1814 check_every
: float = 10,
1815 db_dict
: dict = None,
1816 run_once
: bool = False,
1820 await asyncio
.sleep(check_every
)
1821 detailed_status
= await self
._status
_kdu
(
1822 cluster_id
=cluster_id
,
1823 kdu_instance
=kdu_instance
,
1825 namespace
=namespace
,
1827 status
= detailed_status
.get("info").get("description")
1828 self
.log
.debug("KDU {} STATUS: {}.".format(kdu_instance
, status
))
1829 # write status to db
1830 result
= await self
.write_app_status_to_db(
1833 detailed_status
=str(detailed_status
),
1834 operation
=operation
,
1837 self
.log
.info("Error writing in database. Task exiting...")
1839 except asyncio
.CancelledError
:
1840 self
.log
.debug("Task cancelled")
1842 except Exception as e
:
1844 "_store_status exception: {}".format(str(e
)), exc_info
=True
1851 # params for use in -f file
1852 # returns values file option and filename (in order to delete it at the end)
1853 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1855 if params
and len(params
) > 0:
1856 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1858 def get_random_number():
1859 r
= random
.randrange(start
=1, stop
=99999999)
1867 value
= params
.get(key
)
1868 if "!!yaml" in str(value
):
1869 value
= yaml
.load(value
[7:])
1870 params2
[key
] = value
1872 values_file
= get_random_number() + ".yaml"
1873 with
open(values_file
, "w") as stream
:
1874 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1876 return "-f {}".format(values_file
), values_file
1880 # params for use in --set option
1882 def _params_to_set_option(params
: dict) -> str:
1884 if params
and len(params
) > 0:
1887 value
= params
.get(key
, None)
1888 if value
is not None:
1890 params_str
+= "--set "
1894 params_str
+= "{}={}".format(key
, value
)
1898 def generate_kdu_instance_name(**kwargs
):
1899 chart_name
= kwargs
["kdu_model"]
1900 # check embeded chart (file or dir)
1901 if chart_name
.startswith("/"):
1902 # extract file or directory name
1903 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1905 elif "://" in chart_name
:
1906 # extract last portion of URL
1907 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1910 for c
in chart_name
:
1911 if c
.isalpha() or c
.isnumeric():
1918 # if does not start with alpha character, prefix 'a'
1919 if not name
[0].isalpha():
1924 def get_random_number():
1925 r
= random
.randrange(start
=1, stop
=99999999)
1927 s
= s
.rjust(10, "0")
1930 name
= name
+ get_random_number()
1933 def _split_version(self
, kdu_model
: str) -> (str, str):
1935 if ":" in kdu_model
:
1936 parts
= kdu_model
.split(sep
=":")
1938 version
= str(parts
[1])
1939 kdu_model
= parts
[0]
1940 return kdu_model
, version
1942 async def _split_repo(self
, kdu_model
: str) -> str:
1944 idx
= kdu_model
.find("/")
1946 repo_name
= kdu_model
[:idx
]
1949 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
1951 idx
= kdu_model
.find("/")
1953 repo_name
= kdu_model
[:idx
]
1954 # Find repository link
1955 local_repo_list
= await self
.repo_list(cluster_uuid
)
1956 for repo
in local_repo_list
:
1957 repo_url
= repo
["url"] if repo
["name"] == repo_name
else None