2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
39 class K8sHelmBaseConnector(K8sConnector
):
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
47 service_account
= "osm"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 self
.config
= EnvironConfig()
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
90 if self
._stable
_repo
_url
== "None":
91 self
._stable
_repo
_url
= None
93 def _get_namespace(self
, cluster_uuid
: str) -> str:
95 Obtains the namespace used by the cluster with the uuid passed by argument
97 param: cluster_uuid: cluster's uuid
100 # first, obtain the cluster corresponding to the uuid passed by argument
101 k8scluster
= self
.db
.get_one(
102 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
104 return k8scluster
.get("namespace")
109 namespace
: str = "kube-system",
110 reuse_cluster_uuid
=None,
114 It prepares a given K8s cluster environment to run Charts
116 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
118 :param namespace: optional namespace to be used for helm. By default,
119 'kube-system' will be used
120 :param reuse_cluster_uuid: existing cluster uuid for reuse
121 :param kwargs: Additional parameters (None yet)
122 :return: uuid of the K8s cluster and True if connector has installed some
123 software in the cluster
124 (on error, an exception will be raised)
127 if reuse_cluster_uuid
:
128 cluster_id
= reuse_cluster_uuid
130 cluster_id
= str(uuid4())
133 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
136 paths
, env
= self
._init
_paths
_env
(
137 cluster_name
=cluster_id
, create_if_not_exist
=True
139 mode
= stat
.S_IRUSR | stat
.S_IWUSR
140 with
open(paths
["kube_config"], "w", mode
) as f
:
142 os
.chmod(paths
["kube_config"], 0o600)
144 # Code with initialization specific of helm version
145 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
147 # sync fs with local data
148 self
.fs
.reverse_sync(from_path
=cluster_id
)
150 self
.log
.info("Cluster {} initialized".format(cluster_id
))
152 return cluster_id
, n2vc_installed_sw
159 repo_type
: str = "chart",
162 password
: str = None,
165 "Cluster {}, adding {} repository {}. URL: {}".format(
166 cluster_uuid
, repo_type
, name
, url
171 paths
, env
= self
._init
_paths
_env
(
172 cluster_name
=cluster_uuid
, create_if_not_exist
=True
176 self
.fs
.sync(from_path
=cluster_uuid
)
178 # helm repo add name url
179 command
= ("env KUBECONFIG={} {} repo add {} {}").format(
180 paths
["kube_config"], self
._helm
_command
, name
, url
184 temp_cert_file
= os
.path
.join(
185 self
.fs
.path
, "{}/helmcerts/".format(cluster_uuid
), "temp.crt"
187 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
188 with
open(temp_cert_file
, "w") as the_cert
:
190 command
+= " --ca-file {}".format(temp_cert_file
)
193 command
+= " --username={}".format(user
)
196 command
+= " --password={}".format(password
)
198 self
.log
.debug("adding repo: {}".format(command
))
199 await self
._local
_async
_exec
(
200 command
=command
, raise_exception_on_error
=True, env
=env
204 command
= "env KUBECONFIG={} {} repo update".format(
205 paths
["kube_config"], self
._helm
_command
207 self
.log
.debug("updating repo: {}".format(command
))
208 await self
._local
_async
_exec
(
209 command
=command
, raise_exception_on_error
=False, env
=env
213 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
215 async def repo_list(self
, cluster_uuid
: str) -> list:
217 Get the list of registered repositories
219 :return: list of registered repositories: [ (name, url) .... ]
222 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
225 paths
, env
= self
._init
_paths
_env
(
226 cluster_name
=cluster_uuid
, create_if_not_exist
=True
230 self
.fs
.sync(from_path
=cluster_uuid
)
232 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
233 paths
["kube_config"], self
._helm
_command
236 # Set exception to false because if there are no repos just want an empty list
237 output
, _rc
= await self
._local
_async
_exec
(
238 command
=command
, raise_exception_on_error
=False, env
=env
242 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
245 if output
and len(output
) > 0:
246 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
247 # unify format between helm2 and helm3 setting all keys lowercase
248 return self
._lower
_keys
_list
(repos
)
254 async def repo_remove(self
, cluster_uuid
: str, name
: str):
256 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
260 paths
, env
= self
._init
_paths
_env
(
261 cluster_name
=cluster_uuid
, create_if_not_exist
=True
265 self
.fs
.sync(from_path
=cluster_uuid
)
267 command
= "env KUBECONFIG={} {} repo remove {}".format(
268 paths
["kube_config"], self
._helm
_command
, name
270 await self
._local
_async
_exec
(
271 command
=command
, raise_exception_on_error
=True, env
=env
275 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
281 uninstall_sw
: bool = False,
286 Resets the Kubernetes cluster by removing the helm deployment that represents it.
288 :param cluster_uuid: The UUID of the cluster to reset
289 :param force: Boolean to force the reset
290 :param uninstall_sw: Boolean to force the reset
291 :param kwargs: Additional parameters (None yet)
292 :return: Returns True if successful or raises an exception.
294 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
296 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
297 cluster_uuid
, uninstall_sw
302 self
.fs
.sync(from_path
=cluster_uuid
)
304 # uninstall releases if needed.
306 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
307 if len(releases
) > 0:
311 kdu_instance
= r
.get("name")
312 chart
= r
.get("chart")
314 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
316 await self
.uninstall(
317 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
319 except Exception as e
:
320 # will not raise exception as it was found
321 # that in some cases of previously installed helm releases it
324 "Error uninstalling release {}: {}".format(
330 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
331 ).format(cluster_uuid
)
334 False # Allow to remove k8s cluster without removing Tiller
338 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
340 # delete cluster directory
341 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
342 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
343 # Remove also local directorio if still exist
344 direct
= self
.fs
.path
+ "/" + cluster_uuid
345 shutil
.rmtree(direct
, ignore_errors
=True)
349 async def _install_impl(
357 timeout
: float = 300,
359 db_dict
: dict = None,
360 kdu_name
: str = None,
361 namespace
: str = None,
364 paths
, env
= self
._init
_paths
_env
(
365 cluster_name
=cluster_id
, create_if_not_exist
=True
369 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
370 cluster_id
=cluster_id
, params
=params
374 kdu_model
, version
= self
._split
_version
(kdu_model
)
376 command
= self
._get
_install
_command
(
384 paths
["kube_config"],
387 self
.log
.debug("installing: {}".format(command
))
390 # exec helm in a task
391 exec_task
= asyncio
.ensure_future(
392 coro_or_future
=self
._local
_async
_exec
(
393 command
=command
, raise_exception_on_error
=False, env
=env
397 # write status in another task
398 status_task
= asyncio
.ensure_future(
399 coro_or_future
=self
._store
_status
(
400 cluster_id
=cluster_id
,
401 kdu_instance
=kdu_instance
,
409 # wait for execution task
410 await asyncio
.wait([exec_task
])
415 output
, rc
= exec_task
.result()
419 output
, rc
= await self
._local
_async
_exec
(
420 command
=command
, raise_exception_on_error
=False, env
=env
423 # remove temporal values yaml file
425 os
.remove(file_to_delete
)
428 await self
._store
_status
(
429 cluster_id
=cluster_id
,
430 kdu_instance
=kdu_instance
,
439 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
441 raise K8sException(msg
)
447 kdu_model
: str = None,
449 timeout
: float = 300,
451 db_dict
: dict = None,
453 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
456 self
.fs
.sync(from_path
=cluster_uuid
)
458 # look for instance to obtain namespace
459 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
460 if not instance_info
:
461 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
464 paths
, env
= self
._init
_paths
_env
(
465 cluster_name
=cluster_uuid
, create_if_not_exist
=True
469 self
.fs
.sync(from_path
=cluster_uuid
)
472 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
473 cluster_id
=cluster_uuid
, params
=params
477 kdu_model
, version
= self
._split
_version
(kdu_model
)
479 command
= self
._get
_upgrade
_command
(
482 instance_info
["namespace"],
487 paths
["kube_config"],
490 self
.log
.debug("upgrading: {}".format(command
))
494 # exec helm in a task
495 exec_task
= asyncio
.ensure_future(
496 coro_or_future
=self
._local
_async
_exec
(
497 command
=command
, raise_exception_on_error
=False, env
=env
500 # write status in another task
501 status_task
= asyncio
.ensure_future(
502 coro_or_future
=self
._store
_status
(
503 cluster_id
=cluster_uuid
,
504 kdu_instance
=kdu_instance
,
505 namespace
=instance_info
["namespace"],
512 # wait for execution task
513 await asyncio
.wait([exec_task
])
517 output
, rc
= exec_task
.result()
521 output
, rc
= await self
._local
_async
_exec
(
522 command
=command
, raise_exception_on_error
=False, env
=env
525 # remove temporal values yaml file
527 os
.remove(file_to_delete
)
530 await self
._store
_status
(
531 cluster_id
=cluster_uuid
,
532 kdu_instance
=kdu_instance
,
533 namespace
=instance_info
["namespace"],
541 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
543 raise K8sException(msg
)
546 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
548 # return new revision number
549 instance
= await self
.get_instance_info(
550 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
553 revision
= int(instance
.get("revision"))
554 self
.log
.debug("New revision: {}".format(revision
))
564 total_timeout
: float = 1800,
565 cluster_uuid
: str = None,
566 kdu_model
: str = None,
568 db_dict
: dict = None,
571 """Scale a resource in a Helm Chart.
574 kdu_instance: KDU instance name
575 scale: Scale to which to set the resource
576 resource_name: Resource name
577 total_timeout: The time, in seconds, to wait
578 cluster_uuid: The UUID of the cluster
579 kdu_model: The chart reference
580 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
581 The --wait flag will be set automatically if --atomic is used
582 db_dict: Dictionary for any additional data
583 kwargs: Additional parameters
586 True if successful, False otherwise
589 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
591 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
592 resource_name
, kdu_model
, cluster_uuid
595 self
.log
.debug(debug_mgs
)
597 # look for instance to obtain namespace
598 # get_instance_info function calls the sync command
599 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
600 if not instance_info
:
601 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
604 paths
, env
= self
._init
_paths
_env
(
605 cluster_name
=cluster_uuid
, create_if_not_exist
=True
609 kdu_model
, version
= self
._split
_version
(kdu_model
)
611 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
614 "Repository not found for kdu_model {}".format(kdu_model
)
617 _
, replica_str
= await self
._get
_replica
_count
_url
(
618 kdu_model
, repo_url
, resource_name
621 command
= self
._get
_upgrade
_scale
_command
(
624 instance_info
["namespace"],
631 paths
["kube_config"],
634 self
.log
.debug("scaling: {}".format(command
))
637 # exec helm in a task
638 exec_task
= asyncio
.ensure_future(
639 coro_or_future
=self
._local
_async
_exec
(
640 command
=command
, raise_exception_on_error
=False, env
=env
643 # write status in another task
644 status_task
= asyncio
.ensure_future(
645 coro_or_future
=self
._store
_status
(
646 cluster_id
=cluster_uuid
,
647 kdu_instance
=kdu_instance
,
648 namespace
=instance_info
["namespace"],
655 # wait for execution task
656 await asyncio
.wait([exec_task
])
660 output
, rc
= exec_task
.result()
663 output
, rc
= await self
._local
_async
_exec
(
664 command
=command
, raise_exception_on_error
=False, env
=env
668 await self
._store
_status
(
669 cluster_id
=cluster_uuid
,
670 kdu_instance
=kdu_instance
,
671 namespace
=instance_info
["namespace"],
679 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
681 raise K8sException(msg
)
684 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
688 async def get_scale_count(
696 """Get a resource scale count.
699 cluster_uuid: The UUID of the cluster
700 resource_name: Resource name
701 kdu_instance: KDU instance name
702 kdu_model: The name or path of a bundle
703 kwargs: Additional parameters
706 Resource instance count
710 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
713 # look for instance to obtain namespace
714 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
715 if not instance_info
:
716 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
719 paths
, env
= self
._init
_paths
_env
(
720 cluster_name
=cluster_uuid
, create_if_not_exist
=True
723 replicas
= await self
._get
_replica
_count
_instance
(
724 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
727 # Get default value if scale count is not found from provided values
729 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
732 "Repository not found for kdu_model {}".format(kdu_model
)
735 replicas
, _
= await self
._get
_replica
_count
_url
(
736 kdu_model
, repo_url
, resource_name
740 msg
= "Replica count not found. Cannot be scaled"
742 raise K8sException(msg
)
747 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
750 "rollback kdu_instance {} to revision {} from cluster {}".format(
751 kdu_instance
, revision
, cluster_uuid
756 self
.fs
.sync(from_path
=cluster_uuid
)
758 # look for instance to obtain namespace
759 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
760 if not instance_info
:
761 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
764 paths
, env
= self
._init
_paths
_env
(
765 cluster_name
=cluster_uuid
, create_if_not_exist
=True
769 self
.fs
.sync(from_path
=cluster_uuid
)
771 command
= self
._get
_rollback
_command
(
772 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
775 self
.log
.debug("rolling_back: {}".format(command
))
777 # exec helm in a task
778 exec_task
= asyncio
.ensure_future(
779 coro_or_future
=self
._local
_async
_exec
(
780 command
=command
, raise_exception_on_error
=False, env
=env
783 # write status in another task
784 status_task
= asyncio
.ensure_future(
785 coro_or_future
=self
._store
_status
(
786 cluster_id
=cluster_uuid
,
787 kdu_instance
=kdu_instance
,
788 namespace
=instance_info
["namespace"],
790 operation
="rollback",
795 # wait for execution task
796 await asyncio
.wait([exec_task
])
801 output
, rc
= exec_task
.result()
804 await self
._store
_status
(
805 cluster_id
=cluster_uuid
,
806 kdu_instance
=kdu_instance
,
807 namespace
=instance_info
["namespace"],
809 operation
="rollback",
815 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
817 raise K8sException(msg
)
820 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
822 # return new revision number
823 instance
= await self
.get_instance_info(
824 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
827 revision
= int(instance
.get("revision"))
828 self
.log
.debug("New revision: {}".format(revision
))
833 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
835 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
836 (this call should happen after all _terminate-config-primitive_ of the VNF
839 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
840 :param kdu_instance: unique name for the KDU instance to be deleted
841 :param kwargs: Additional parameters (None yet)
842 :return: True if successful
846 "uninstall kdu_instance {} from cluster {}".format(
847 kdu_instance
, cluster_uuid
852 self
.fs
.sync(from_path
=cluster_uuid
)
854 # look for instance to obtain namespace
855 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
856 if not instance_info
:
857 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
860 paths
, env
= self
._init
_paths
_env
(
861 cluster_name
=cluster_uuid
, create_if_not_exist
=True
865 self
.fs
.sync(from_path
=cluster_uuid
)
867 command
= self
._get
_uninstall
_command
(
868 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
870 output
, _rc
= await self
._local
_async
_exec
(
871 command
=command
, raise_exception_on_error
=True, env
=env
875 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
877 return self
._output
_to
_table
(output
)
879 async def instances_list(self
, cluster_uuid
: str) -> list:
881 returns a list of deployed releases in a cluster
883 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
887 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
890 self
.fs
.sync(from_path
=cluster_uuid
)
892 # execute internal command
893 result
= await self
._instances
_list
(cluster_uuid
)
896 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
900 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
901 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
902 for instance
in instances
:
903 if instance
.get("name") == kdu_instance
:
905 self
.log
.debug("Instance {} not found".format(kdu_instance
))
908 async def exec_primitive(
910 cluster_uuid
: str = None,
911 kdu_instance
: str = None,
912 primitive_name
: str = None,
913 timeout
: float = 300,
915 db_dict
: dict = None,
918 """Exec primitive (Juju action)
920 :param cluster_uuid: The UUID of the cluster or namespace:cluster
921 :param kdu_instance: The unique name of the KDU instance
922 :param primitive_name: Name of action that will be executed
923 :param timeout: Timeout for action execution
924 :param params: Dictionary of all the parameters needed for the action
925 :db_dict: Dictionary for any additional data
926 :param kwargs: Additional parameters (None yet)
928 :return: Returns the output of the action
931 "KDUs deployed with Helm don't support actions "
932 "different from rollback, upgrade and status"
935 async def get_services(
936 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
939 Returns a list of services defined for the specified kdu instance.
941 :param cluster_uuid: UUID of a K8s cluster known by OSM
942 :param kdu_instance: unique name for the KDU instance
943 :param namespace: K8s namespace used by the KDU instance
944 :return: If successful, it will return a list of services, Each service
945 can have the following data:
946 - `name` of the service
947 - `type` type of service in the k8 cluster
948 - `ports` List of ports offered by the service, for each port includes at least
950 - `cluster_ip` Internal ip to be used inside k8s cluster
951 - `external_ip` List of external ips (in case they are available)
955 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
956 cluster_uuid
, kdu_instance
961 paths
, env
= self
._init
_paths
_env
(
962 cluster_name
=cluster_uuid
, create_if_not_exist
=True
966 self
.fs
.sync(from_path
=cluster_uuid
)
968 # get list of services names for kdu
969 service_names
= await self
._get
_services
(
970 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
974 for service
in service_names
:
975 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
976 service_list
.append(service
)
979 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
983 async def get_service(
984 self
, cluster_uuid
: str, service_name
: str, namespace
: str
988 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
989 service_name
, namespace
, cluster_uuid
994 self
.fs
.sync(from_path
=cluster_uuid
)
996 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
999 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1003 async def status_kdu(
1004 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
1005 ) -> Union
[str, dict]:
1007 This call would retrieve tha current state of a given KDU instance. It would be
1008 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1009 values_ of the configuration parameters applied to a given instance. This call
1010 would be based on the `status` call.
1012 :param cluster_uuid: UUID of a K8s cluster known by OSM
1013 :param kdu_instance: unique name for the KDU instance
1014 :param kwargs: Additional parameters (None yet)
1015 :param yaml_format: if the return shall be returned as an YAML string or as a
1017 :return: If successful, it will return the following vector of arguments:
1018 - K8s `namespace` in the cluster where the KDU lives
1019 - `state` of the KDU instance. It can be:
1026 - List of `resources` (objects) that this release consists of, sorted by kind,
1027 and the status of those resources
1028 - Last `deployment_time`.
1032 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1033 cluster_uuid
, kdu_instance
1038 self
.fs
.sync(from_path
=cluster_uuid
)
1040 # get instance: needed to obtain namespace
1041 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1042 for instance
in instances
:
1043 if instance
.get("name") == kdu_instance
:
1046 # instance does not exist
1048 "Instance name: {} not found in cluster: {}".format(
1049 kdu_instance
, cluster_uuid
1053 status
= await self
._status
_kdu
(
1054 cluster_id
=cluster_uuid
,
1055 kdu_instance
=kdu_instance
,
1056 namespace
=instance
["namespace"],
1057 yaml_format
=yaml_format
,
1058 show_error_log
=True,
1062 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1066 async def get_values_kdu(
1067 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1070 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1072 return await self
._exec
_get
_command
(
1073 get_command
="values",
1074 kdu_instance
=kdu_instance
,
1075 namespace
=namespace
,
1076 kubeconfig
=kubeconfig
,
1079 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1082 "inspect kdu_model values {} from (optional) repo: {}".format(
1087 return await self
._exec
_inspect
_command
(
1088 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1091 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1094 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1097 return await self
._exec
_inspect
_command
(
1098 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1101 async def synchronize_repos(self
, cluster_uuid
: str):
1103 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1105 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1106 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1108 local_repo_list
= await self
.repo_list(cluster_uuid
)
1109 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1111 deleted_repo_list
= []
1112 added_repo_dict
= {}
1114 # iterate over the list of repos in the database that should be
1115 # added if not present
1116 for repo_name
, db_repo
in db_repo_dict
.items():
1118 # check if it is already present
1119 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1120 repo_id
= db_repo
.get("_id")
1121 if curr_repo_url
!= db_repo
["url"]:
1124 "repo {} url changed, delete and and again".format(
1128 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1129 deleted_repo_list
.append(repo_id
)
1132 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1133 if "ca_cert" in db_repo
:
1134 await self
.repo_add(
1138 cert
=db_repo
["ca_cert"],
1141 await self
.repo_add(
1146 added_repo_dict
[repo_id
] = db_repo
["name"]
1147 except Exception as e
:
1149 "Error adding repo id: {}, err_msg: {} ".format(
1154 # Delete repos that are present but not in nbi_list
1155 for repo_name
in local_repo_dict
:
1156 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1157 self
.log
.debug("delete repo {}".format(repo_name
))
1159 await self
.repo_remove(cluster_uuid
, repo_name
)
1160 deleted_repo_list
.append(repo_name
)
1161 except Exception as e
:
1163 "Error deleting repo, name: {}, err_msg: {}".format(
1168 return deleted_repo_list
, added_repo_dict
1170 except K8sException
:
1172 except Exception as e
:
1173 # Do not raise errors synchronizing repos
1174 self
.log
.error("Error synchronizing repos: {}".format(e
))
1175 raise Exception("Error synchronizing repos: {}".format(e
))
1177 def _get_db_repos_dict(self
, repo_ids
: list):
1179 for repo_id
in repo_ids
:
1180 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1181 db_repos_dict
[db_repo
["name"]] = db_repo
1182 return db_repos_dict
1185 ####################################################################################
1186 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1187 ####################################################################################
1191 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1193 Creates and returns base cluster and kube dirs and returns them.
1194 Also created helm3 dirs according to new directory specification, paths are
1195 not returned but assigned to helm environment variables
1197 :param cluster_name: cluster_name
1198 :return: Dictionary with config_paths and dictionary with helm environment variables
1202 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1204 Implements the helm version dependent cluster initialization
1208 async def _instances_list(self
, cluster_id
):
1210 Implements the helm version dependent helm instances list
1214 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1216 Implements the helm version dependent method to obtain services from a helm instance
1220 async def _status_kdu(
1224 namespace
: str = None,
1225 yaml_format
: bool = False,
1226 show_error_log
: bool = False,
1227 ) -> Union
[str, dict]:
1229 Implements the helm version dependent method to obtain status of a helm instance
1233 def _get_install_command(
1245 Obtain command to be executed to delete the indicated instance
1249 def _get_upgrade_scale_command(
1262 """Obtain command to be executed to upgrade the indicated instance."""
1265 def _get_upgrade_command(
1277 Obtain command to be executed to upgrade the indicated instance
1281 def _get_rollback_command(
1282 self
, kdu_instance
, namespace
, revision
, kubeconfig
1285 Obtain command to be executed to rollback the indicated instance
1289 def _get_uninstall_command(
1290 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1293 Obtain command to be executed to delete the indicated instance
1297 def _get_inspect_command(
1298 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1301 Obtain command to be executed to obtain information about the kdu
1305 def _get_get_command(
1306 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1308 """Obtain command to be executed to get information about the kdu instance."""
1311 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1313 Method call to uninstall cluster software for helm. This method is dependent
1315 For Helm v2 it will be called when Tiller must be uninstalled
1316 For Helm v3 it does nothing and does not need to be callled
1320 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1322 Obtains the cluster repos identifiers
1326 ####################################################################################
1327 ################################### P R I V A T E ##################################
1328 ####################################################################################
1332 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1333 if os
.path
.exists(filename
):
1336 msg
= "File {} does not exist".format(filename
)
1337 if exception_if_not_exists
:
1338 raise K8sException(msg
)
1341 def _remove_multiple_spaces(strobj
):
1342 strobj
= strobj
.strip()
1343 while " " in strobj
:
1344 strobj
= strobj
.replace(" ", " ")
1348 def _output_to_lines(output
: str) -> list:
1349 output_lines
= list()
1350 lines
= output
.splitlines(keepends
=False)
1354 output_lines
.append(line
)
1358 def _output_to_table(output
: str) -> list:
1359 output_table
= list()
1360 lines
= output
.splitlines(keepends
=False)
1362 line
= line
.replace("\t", " ")
1364 output_table
.append(line_list
)
1365 cells
= line
.split(sep
=" ")
1369 line_list
.append(cell
)
1373 def _parse_services(output
: str) -> list:
1374 lines
= output
.splitlines(keepends
=False)
1377 line
= line
.replace("\t", " ")
1378 cells
= line
.split(sep
=" ")
1379 if len(cells
) > 0 and cells
[0].startswith("service/"):
1380 elems
= cells
[0].split(sep
="/")
1382 services
.append(elems
[1])
1386 def _get_deep(dictionary
: dict, members
: tuple):
1391 value
= target
.get(m
)
1400 # find key:value in several lines
1402 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1403 for line
in p_lines
:
1405 if line
.startswith(p_key
+ ":"):
1406 parts
= line
.split(":")
1407 the_value
= parts
[1].strip()
1415 def _lower_keys_list(input_list
: list):
1417 Transform the keys in a list of dictionaries to lower case and returns a new list
1422 for dictionary
in input_list
:
1423 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1424 new_list
.append(new_dict
)
1427 async def _local_async_exec(
1430 raise_exception_on_error
: bool = False,
1431 show_error_log
: bool = True,
1432 encode_utf8
: bool = False,
1436 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1438 "Executing async local command: {}, env: {}".format(command
, env
)
1442 command
= shlex
.split(command
)
1444 environ
= os
.environ
.copy()
1449 process
= await asyncio
.create_subprocess_exec(
1451 stdout
=asyncio
.subprocess
.PIPE
,
1452 stderr
=asyncio
.subprocess
.PIPE
,
1456 # wait for command terminate
1457 stdout
, stderr
= await process
.communicate()
1459 return_code
= process
.returncode
1463 output
= stdout
.decode("utf-8").strip()
1464 # output = stdout.decode()
1466 output
= stderr
.decode("utf-8").strip()
1467 # output = stderr.decode()
1469 if return_code
!= 0 and show_error_log
:
1471 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1474 self
.log
.debug("Return code: {}".format(return_code
))
1476 if raise_exception_on_error
and return_code
!= 0:
1477 raise K8sException(output
)
1480 output
= output
.encode("utf-8").strip()
1481 output
= str(output
).replace("\\n", "\n")
1483 return output
, return_code
1485 except asyncio
.CancelledError
:
1487 except K8sException
:
1489 except Exception as e
:
1490 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1492 if raise_exception_on_error
:
1493 raise K8sException(e
) from e
1497 async def _local_async_exec_pipe(
1501 raise_exception_on_error
: bool = True,
1502 show_error_log
: bool = True,
1503 encode_utf8
: bool = False,
1507 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1508 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1509 command
= "{} | {}".format(command1
, command2
)
1511 "Executing async local command: {}, env: {}".format(command
, env
)
1515 command1
= shlex
.split(command1
)
1516 command2
= shlex
.split(command2
)
1518 environ
= os
.environ
.copy()
1523 read
, write
= os
.pipe()
1524 await asyncio
.create_subprocess_exec(*command1
, stdout
=write
, env
=environ
)
1526 process_2
= await asyncio
.create_subprocess_exec(
1527 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1530 stdout
, stderr
= await process_2
.communicate()
1532 return_code
= process_2
.returncode
1536 output
= stdout
.decode("utf-8").strip()
1537 # output = stdout.decode()
1539 output
= stderr
.decode("utf-8").strip()
1540 # output = stderr.decode()
1542 if return_code
!= 0 and show_error_log
:
1544 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1547 self
.log
.debug("Return code: {}".format(return_code
))
1549 if raise_exception_on_error
and return_code
!= 0:
1550 raise K8sException(output
)
1553 output
= output
.encode("utf-8").strip()
1554 output
= str(output
).replace("\\n", "\n")
1556 return output
, return_code
1557 except asyncio
.CancelledError
:
1559 except K8sException
:
1561 except Exception as e
:
1562 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1564 if raise_exception_on_error
:
1565 raise K8sException(e
) from e
1569 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1571 Obtains the data of the specified service in the k8cluster.
1573 :param cluster_id: id of a K8s cluster known by OSM
1574 :param service_name: name of the K8s service in the specified namespace
1575 :param namespace: K8s namespace used by the KDU instance
1576 :return: If successful, it will return a service with the following data:
1577 - `name` of the service
1578 - `type` type of service in the k8 cluster
1579 - `ports` List of ports offered by the service, for each port includes at least
1580 name, port, protocol
1581 - `cluster_ip` Internal ip to be used inside k8s cluster
1582 - `external_ip` List of external ips (in case they are available)
1586 paths
, env
= self
._init
_paths
_env
(
1587 cluster_name
=cluster_id
, create_if_not_exist
=True
1590 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1591 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1594 output
, _rc
= await self
._local
_async
_exec
(
1595 command
=command
, raise_exception_on_error
=True, env
=env
1598 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1601 "name": service_name
,
1602 "type": self
._get
_deep
(data
, ("spec", "type")),
1603 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1604 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1606 if service
["type"] == "LoadBalancer":
1607 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1608 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1609 service
["external_ip"] = ip_list
1613 async def _exec_get_command(
1614 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1616 """Obtains information about the kdu instance."""
1618 full_command
= self
._get
_get
_command
(
1619 get_command
, kdu_instance
, namespace
, kubeconfig
1622 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1626 async def _exec_inspect_command(
1627 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1629 """Obtains information about a kdu, no cluster (no env)."""
1633 repo_str
= " --repo {}".format(repo_url
)
1635 idx
= kdu_model
.find("/")
1638 kdu_model
= kdu_model
[idx
:]
1640 kdu_model
, version
= self
._split
_version
(kdu_model
)
1642 version_str
= "--version {}".format(version
)
1646 full_command
= self
._get
_inspect
_command
(
1647 inspect_command
, kdu_model
, repo_str
, version_str
1650 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1654 async def _get_replica_count_url(
1658 resource_name
: str = None,
1660 """Get the replica count value in the Helm Chart Values.
1663 kdu_model: The name or path of a bundle
1664 repo_url: Helm Chart repository url
1665 resource_name: Resource name
1668 True if replicas, False replicaCount
1671 kdu_values
= yaml
.load(
1672 await self
.values_kdu(kdu_model
, repo_url
), Loader
=yaml
.SafeLoader
1677 "kdu_values not found for kdu_model {}".format(kdu_model
)
1681 kdu_values
= kdu_values
.get(resource_name
, None)
1684 msg
= "resource {} not found in the values in model {}".format(
1685 resource_name
, kdu_model
1688 raise K8sException(msg
)
1690 duplicate_check
= False
1695 if kdu_values
.get("replicaCount", None):
1696 replicas
= kdu_values
["replicaCount"]
1697 replica_str
= "replicaCount"
1698 elif kdu_values
.get("replicas", None):
1699 duplicate_check
= True
1700 replicas
= kdu_values
["replicas"]
1701 replica_str
= "replicas"
1705 "replicaCount or replicas not found in the resource"
1706 "{} values in model {}. Cannot be scaled".format(
1707 resource_name
, kdu_model
1712 "replicaCount or replicas not found in the values"
1713 "in model {}. Cannot be scaled".format(kdu_model
)
1716 raise K8sException(msg
)
1718 # Control if replicas and replicaCount exists at the same time
1719 msg
= "replicaCount and replicas are exists at the same time"
1721 if "replicaCount" in kdu_values
:
1723 raise K8sException(msg
)
1725 if "replicas" in kdu_values
:
1727 raise K8sException(msg
)
1729 return replicas
, replica_str
1731 async def _get_replica_count_instance(
1736 resource_name
: str = None,
1738 """Get the replica count value in the instance.
1741 kdu_instance: The name of the KDU instance
1742 namespace: KDU instance namespace
1744 resource_name: Resource name
1747 True if replicas, False replicaCount
1750 kdu_values
= yaml
.load(
1751 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1752 Loader
=yaml
.SafeLoader
,
1759 kdu_values
.get(resource_name
, None) if resource_name
else None
1763 resource_values
.get("replicaCount", None)
1764 or resource_values
.get("replicas", None)
1768 kdu_values
.get("replicaCount", None)
1769 or kdu_values
.get("replicas", None)
1775 async def _store_status(
1780 namespace
: str = None,
1781 check_every
: float = 10,
1782 db_dict
: dict = None,
1783 run_once
: bool = False,
1787 await asyncio
.sleep(check_every
)
1788 detailed_status
= await self
._status
_kdu
(
1789 cluster_id
=cluster_id
,
1790 kdu_instance
=kdu_instance
,
1792 namespace
=namespace
,
1794 status
= detailed_status
.get("info").get("description")
1795 self
.log
.debug("KDU {} STATUS: {}.".format(kdu_instance
, status
))
1796 # write status to db
1797 result
= await self
.write_app_status_to_db(
1800 detailed_status
=str(detailed_status
),
1801 operation
=operation
,
1804 self
.log
.info("Error writing in database. Task exiting...")
1806 except asyncio
.CancelledError
:
1807 self
.log
.debug("Task cancelled")
1809 except Exception as e
:
1811 "_store_status exception: {}".format(str(e
)), exc_info
=True
1818 # params for use in -f file
1819 # returns values file option and filename (in order to delete it at the end)
1820 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1822 if params
and len(params
) > 0:
1823 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1825 def get_random_number():
1826 r
= random
.randrange(start
=1, stop
=99999999)
1834 value
= params
.get(key
)
1835 if "!!yaml" in str(value
):
1836 value
= yaml
.load(value
[7:])
1837 params2
[key
] = value
1839 values_file
= get_random_number() + ".yaml"
1840 with
open(values_file
, "w") as stream
:
1841 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1843 return "-f {}".format(values_file
), values_file
1847 # params for use in --set option
1849 def _params_to_set_option(params
: dict) -> str:
1851 if params
and len(params
) > 0:
1854 value
= params
.get(key
, None)
1855 if value
is not None:
1857 params_str
+= "--set "
1861 params_str
+= "{}={}".format(key
, value
)
1865 def generate_kdu_instance_name(**kwargs
):
1866 chart_name
= kwargs
["kdu_model"]
1867 # check embeded chart (file or dir)
1868 if chart_name
.startswith("/"):
1869 # extract file or directory name
1870 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1872 elif "://" in chart_name
:
1873 # extract last portion of URL
1874 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1877 for c
in chart_name
:
1878 if c
.isalpha() or c
.isnumeric():
1885 # if does not start with alpha character, prefix 'a'
1886 if not name
[0].isalpha():
1891 def get_random_number():
1892 r
= random
.randrange(start
=1, stop
=99999999)
1894 s
= s
.rjust(10, "0")
1897 name
= name
+ get_random_number()
1900 def _split_version(self
, kdu_model
: str) -> (str, str):
1902 if ":" in kdu_model
:
1903 parts
= kdu_model
.split(sep
=":")
1905 version
= str(parts
[1])
1906 kdu_model
= parts
[0]
1907 return kdu_model
, version
1909 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
1911 idx
= kdu_model
.find("/")
1913 repo_name
= kdu_model
[:idx
]
1914 # Find repository link
1915 local_repo_list
= await self
.repo_list(cluster_uuid
)
1916 for repo
in local_repo_list
:
1917 repo_url
= repo
["url"] if repo
["name"] == repo_name
else None