a6cb11a4b4ae848d8b467b312dc373dfe835532f
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
39 class K8sHelmBaseConnector(K8sConnector
):
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
47 service_account
= "osm"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 self
.config
= EnvironConfig()
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
90 if self
._stable
_repo
_url
== "None":
91 self
._stable
_repo
_url
= None
93 # Lock to avoid concurrent execution of helm commands
94 self
.cmd_lock
= asyncio
.Lock()
96 def _get_namespace(self
, cluster_uuid
: str) -> str:
98 Obtains the namespace used by the cluster with the uuid passed by argument
100 param: cluster_uuid: cluster's uuid
103 # first, obtain the cluster corresponding to the uuid passed by argument
104 k8scluster
= self
.db
.get_one(
105 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
107 return k8scluster
.get("namespace")
112 namespace
: str = "kube-system",
113 reuse_cluster_uuid
=None,
117 It prepares a given K8s cluster environment to run Charts
119 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
121 :param namespace: optional namespace to be used for helm. By default,
122 'kube-system' will be used
123 :param reuse_cluster_uuid: existing cluster uuid for reuse
124 :param kwargs: Additional parameters (None yet)
125 :return: uuid of the K8s cluster and True if connector has installed some
126 software in the cluster
127 (on error, an exception will be raised)
130 if reuse_cluster_uuid
:
131 cluster_id
= reuse_cluster_uuid
133 cluster_id
= str(uuid4())
136 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
139 paths
, env
= self
._init
_paths
_env
(
140 cluster_name
=cluster_id
, create_if_not_exist
=True
142 mode
= stat
.S_IRUSR | stat
.S_IWUSR
143 with
open(paths
["kube_config"], "w", mode
) as f
:
145 os
.chmod(paths
["kube_config"], 0o600)
147 # Code with initialization specific of helm version
148 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
150 # sync fs with local data
151 self
.fs
.reverse_sync(from_path
=cluster_id
)
153 self
.log
.info("Cluster {} initialized".format(cluster_id
))
155 return cluster_id
, n2vc_installed_sw
162 repo_type
: str = "chart",
165 password
: str = None,
168 "Cluster {}, adding {} repository {}. URL: {}".format(
169 cluster_uuid
, repo_type
, name
, url
174 paths
, env
= self
._init
_paths
_env
(
175 cluster_name
=cluster_uuid
, create_if_not_exist
=True
179 self
.fs
.sync(from_path
=cluster_uuid
)
181 # helm repo add name url
182 command
= ("env KUBECONFIG={} {} repo add {} {}").format(
183 paths
["kube_config"], self
._helm
_command
, name
, url
187 temp_cert_file
= os
.path
.join(
188 self
.fs
.path
, "{}/helmcerts/".format(cluster_uuid
), "temp.crt"
190 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
191 with
open(temp_cert_file
, "w") as the_cert
:
193 command
+= " --ca-file {}".format(temp_cert_file
)
196 command
+= " --username={}".format(user
)
199 command
+= " --password={}".format(password
)
201 self
.log
.debug("adding repo: {}".format(command
))
202 await self
._local
_async
_exec
(
203 command
=command
, raise_exception_on_error
=True, env
=env
207 command
= "env KUBECONFIG={} {} repo update {}".format(
208 paths
["kube_config"], self
._helm
_command
, name
210 self
.log
.debug("updating repo: {}".format(command
))
211 await self
._local
_async
_exec
(
212 command
=command
, raise_exception_on_error
=False, env
=env
216 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
218 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
220 "Cluster {}, updating {} repository {}".format(
221 cluster_uuid
, repo_type
, name
226 paths
, env
= self
._init
_paths
_env
(
227 cluster_name
=cluster_uuid
, create_if_not_exist
=True
231 self
.fs
.sync(from_path
=cluster_uuid
)
234 command
= "{} repo update {}".format(self
._helm
_command
, name
)
235 self
.log
.debug("updating repo: {}".format(command
))
236 await self
._local
_async
_exec
(
237 command
=command
, raise_exception_on_error
=False, env
=env
241 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
243 async def repo_list(self
, cluster_uuid
: str) -> list:
245 Get the list of registered repositories
247 :return: list of registered repositories: [ (name, url) .... ]
250 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
253 paths
, env
= self
._init
_paths
_env
(
254 cluster_name
=cluster_uuid
, create_if_not_exist
=True
258 self
.fs
.sync(from_path
=cluster_uuid
)
260 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
261 paths
["kube_config"], self
._helm
_command
264 # Set exception to false because if there are no repos just want an empty list
265 output
, _rc
= await self
._local
_async
_exec
(
266 command
=command
, raise_exception_on_error
=False, env
=env
270 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
273 if output
and len(output
) > 0:
274 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
275 # unify format between helm2 and helm3 setting all keys lowercase
276 return self
._lower
_keys
_list
(repos
)
282 async def repo_remove(self
, cluster_uuid
: str, name
: str):
284 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
288 paths
, env
= self
._init
_paths
_env
(
289 cluster_name
=cluster_uuid
, create_if_not_exist
=True
293 self
.fs
.sync(from_path
=cluster_uuid
)
295 command
= "env KUBECONFIG={} {} repo remove {}".format(
296 paths
["kube_config"], self
._helm
_command
, name
298 await self
._local
_async
_exec
(
299 command
=command
, raise_exception_on_error
=True, env
=env
303 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
309 uninstall_sw
: bool = False,
314 Resets the Kubernetes cluster by removing the helm deployment that represents it.
316 :param cluster_uuid: The UUID of the cluster to reset
317 :param force: Boolean to force the reset
318 :param uninstall_sw: Boolean to force the reset
319 :param kwargs: Additional parameters (None yet)
320 :return: Returns True if successful or raises an exception.
322 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
324 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
325 cluster_uuid
, uninstall_sw
330 self
.fs
.sync(from_path
=cluster_uuid
)
332 # uninstall releases if needed.
334 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
335 if len(releases
) > 0:
339 kdu_instance
= r
.get("name")
340 chart
= r
.get("chart")
342 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
344 await self
.uninstall(
345 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
347 except Exception as e
:
348 # will not raise exception as it was found
349 # that in some cases of previously installed helm releases it
352 "Error uninstalling release {}: {}".format(
358 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
359 ).format(cluster_uuid
)
362 False # Allow to remove k8s cluster without removing Tiller
366 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
368 # delete cluster directory
369 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
370 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
371 # Remove also local directorio if still exist
372 direct
= self
.fs
.path
+ "/" + cluster_uuid
373 shutil
.rmtree(direct
, ignore_errors
=True)
377 def _is_helm_chart_a_file(self
, chart_name
: str):
378 return chart_name
.count("/") > 1
380 async def _install_impl(
388 timeout
: float = 300,
390 db_dict
: dict = None,
391 kdu_name
: str = None,
392 namespace
: str = None,
395 paths
, env
= self
._init
_paths
_env
(
396 cluster_name
=cluster_id
, create_if_not_exist
=True
400 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
401 cluster_id
=cluster_id
, params
=params
405 kdu_model
, version
= self
._split
_version
(kdu_model
)
407 _
, repo
= self
._split
_repo
(kdu_model
)
409 await self
.repo_update(cluster_id
, repo
)
411 command
= self
._get
_install
_command
(
419 paths
["kube_config"],
422 self
.log
.debug("installing: {}".format(command
))
425 # exec helm in a task
426 exec_task
= asyncio
.ensure_future(
427 coro_or_future
=self
._local
_async
_exec
(
428 command
=command
, raise_exception_on_error
=False, env
=env
432 # write status in another task
433 status_task
= asyncio
.ensure_future(
434 coro_or_future
=self
._store
_status
(
435 cluster_id
=cluster_id
,
436 kdu_instance
=kdu_instance
,
443 # wait for execution task
444 await asyncio
.wait([exec_task
])
449 output
, rc
= exec_task
.result()
453 output
, rc
= await self
._local
_async
_exec
(
454 command
=command
, raise_exception_on_error
=False, env
=env
457 # remove temporal values yaml file
459 os
.remove(file_to_delete
)
462 await self
._store
_status
(
463 cluster_id
=cluster_id
,
464 kdu_instance
=kdu_instance
,
471 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
473 raise K8sException(msg
)
479 kdu_model
: str = None,
481 timeout
: float = 300,
483 db_dict
: dict = None,
484 namespace
: str = None,
487 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
490 self
.fs
.sync(from_path
=cluster_uuid
)
492 # look for instance to obtain namespace
496 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
497 if not instance_info
:
498 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
499 namespace
= instance_info
["namespace"]
502 paths
, env
= self
._init
_paths
_env
(
503 cluster_name
=cluster_uuid
, create_if_not_exist
=True
507 self
.fs
.sync(from_path
=cluster_uuid
)
510 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
511 cluster_id
=cluster_uuid
, params
=params
515 kdu_model
, version
= self
._split
_version
(kdu_model
)
517 _
, repo
= self
._split
_repo
(kdu_model
)
519 await self
.repo_update(cluster_uuid
, repo
)
521 command
= self
._get
_upgrade
_command
(
529 paths
["kube_config"],
533 self
.log
.debug("upgrading: {}".format(command
))
537 # exec helm in a task
538 exec_task
= asyncio
.ensure_future(
539 coro_or_future
=self
._local
_async
_exec
(
540 command
=command
, raise_exception_on_error
=False, env
=env
543 # write status in another task
544 status_task
= asyncio
.ensure_future(
545 coro_or_future
=self
._store
_status
(
546 cluster_id
=cluster_uuid
,
547 kdu_instance
=kdu_instance
,
554 # wait for execution task
555 await asyncio
.wait([exec_task
])
559 output
, rc
= exec_task
.result()
563 output
, rc
= await self
._local
_async
_exec
(
564 command
=command
, raise_exception_on_error
=False, env
=env
567 # remove temporal values yaml file
569 os
.remove(file_to_delete
)
572 await self
._store
_status
(
573 cluster_id
=cluster_uuid
,
574 kdu_instance
=kdu_instance
,
581 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
583 raise K8sException(msg
)
586 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
588 # return new revision number
589 instance
= await self
.get_instance_info(
590 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
593 revision
= int(instance
.get("revision"))
594 self
.log
.debug("New revision: {}".format(revision
))
604 total_timeout
: float = 1800,
605 cluster_uuid
: str = None,
606 kdu_model
: str = None,
608 db_dict
: dict = None,
611 """Scale a resource in a Helm Chart.
614 kdu_instance: KDU instance name
615 scale: Scale to which to set the resource
616 resource_name: Resource name
617 total_timeout: The time, in seconds, to wait
618 cluster_uuid: The UUID of the cluster
619 kdu_model: The chart reference
620 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
621 The --wait flag will be set automatically if --atomic is used
622 db_dict: Dictionary for any additional data
623 kwargs: Additional parameters
626 True if successful, False otherwise
629 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
631 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
632 resource_name
, kdu_model
, cluster_uuid
635 self
.log
.debug(debug_mgs
)
637 # look for instance to obtain namespace
638 # get_instance_info function calls the sync command
639 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
640 if not instance_info
:
641 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
644 paths
, env
= self
._init
_paths
_env
(
645 cluster_name
=cluster_uuid
, create_if_not_exist
=True
649 kdu_model
, version
= self
._split
_version
(kdu_model
)
651 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
653 _
, replica_str
= await self
._get
_replica
_count
_url
(
654 kdu_model
, repo_url
, resource_name
657 command
= self
._get
_upgrade
_scale
_command
(
660 instance_info
["namespace"],
667 paths
["kube_config"],
670 self
.log
.debug("scaling: {}".format(command
))
673 # exec helm in a task
674 exec_task
= asyncio
.ensure_future(
675 coro_or_future
=self
._local
_async
_exec
(
676 command
=command
, raise_exception_on_error
=False, env
=env
679 # write status in another task
680 status_task
= asyncio
.ensure_future(
681 coro_or_future
=self
._store
_status
(
682 cluster_id
=cluster_uuid
,
683 kdu_instance
=kdu_instance
,
684 namespace
=instance_info
["namespace"],
690 # wait for execution task
691 await asyncio
.wait([exec_task
])
695 output
, rc
= exec_task
.result()
698 output
, rc
= await self
._local
_async
_exec
(
699 command
=command
, raise_exception_on_error
=False, env
=env
703 await self
._store
_status
(
704 cluster_id
=cluster_uuid
,
705 kdu_instance
=kdu_instance
,
706 namespace
=instance_info
["namespace"],
712 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
714 raise K8sException(msg
)
717 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
721 async def get_scale_count(
729 """Get a resource scale count.
732 cluster_uuid: The UUID of the cluster
733 resource_name: Resource name
734 kdu_instance: KDU instance name
735 kdu_model: The name or path of an Helm Chart
736 kwargs: Additional parameters
739 Resource instance count
743 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
746 # look for instance to obtain namespace
747 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
748 if not instance_info
:
749 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
752 paths
, _
= self
._init
_paths
_env
(
753 cluster_name
=cluster_uuid
, create_if_not_exist
=True
756 replicas
= await self
._get
_replica
_count
_instance
(
757 kdu_instance
=kdu_instance
,
758 namespace
=instance_info
["namespace"],
759 kubeconfig
=paths
["kube_config"],
760 resource_name
=resource_name
,
764 f
"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
767 # Get default value if scale count is not found from provided values
768 # Important note: this piece of code shall only be executed in the first scaling operation,
769 # since it is expected that the _get_replica_count_instance is able to obtain the number of
770 # replicas when a scale operation was already conducted previously for this KDU/resource!
772 repo_url
= await self
._find
_repo
(
773 kdu_model
=kdu_model
, cluster_uuid
=cluster_uuid
775 replicas
, _
= await self
._get
_replica
_count
_url
(
776 kdu_model
=kdu_model
, repo_url
=repo_url
, resource_name
=resource_name
780 f
"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
781 f
"{resource_name} obtained: {replicas}"
785 msg
= "Replica count not found. Cannot be scaled"
787 raise K8sException(msg
)
792 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
795 "rollback kdu_instance {} to revision {} from cluster {}".format(
796 kdu_instance
, revision
, cluster_uuid
801 self
.fs
.sync(from_path
=cluster_uuid
)
803 # look for instance to obtain namespace
804 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
805 if not instance_info
:
806 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
809 paths
, env
= self
._init
_paths
_env
(
810 cluster_name
=cluster_uuid
, create_if_not_exist
=True
814 self
.fs
.sync(from_path
=cluster_uuid
)
816 command
= self
._get
_rollback
_command
(
817 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
820 self
.log
.debug("rolling_back: {}".format(command
))
822 # exec helm in a task
823 exec_task
= asyncio
.ensure_future(
824 coro_or_future
=self
._local
_async
_exec
(
825 command
=command
, raise_exception_on_error
=False, env
=env
828 # write status in another task
829 status_task
= asyncio
.ensure_future(
830 coro_or_future
=self
._store
_status
(
831 cluster_id
=cluster_uuid
,
832 kdu_instance
=kdu_instance
,
833 namespace
=instance_info
["namespace"],
835 operation
="rollback",
839 # wait for execution task
840 await asyncio
.wait([exec_task
])
845 output
, rc
= exec_task
.result()
848 await self
._store
_status
(
849 cluster_id
=cluster_uuid
,
850 kdu_instance
=kdu_instance
,
851 namespace
=instance_info
["namespace"],
853 operation
="rollback",
857 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
859 raise K8sException(msg
)
862 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
864 # return new revision number
865 instance
= await self
.get_instance_info(
866 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
869 revision
= int(instance
.get("revision"))
870 self
.log
.debug("New revision: {}".format(revision
))
875 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
877 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
878 (this call should happen after all _terminate-config-primitive_ of the VNF
881 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
882 :param kdu_instance: unique name for the KDU instance to be deleted
883 :param kwargs: Additional parameters (None yet)
884 :return: True if successful
888 "uninstall kdu_instance {} from cluster {}".format(
889 kdu_instance
, cluster_uuid
894 self
.fs
.sync(from_path
=cluster_uuid
)
896 # look for instance to obtain namespace
897 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
898 if not instance_info
:
899 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
902 paths
, env
= self
._init
_paths
_env
(
903 cluster_name
=cluster_uuid
, create_if_not_exist
=True
907 self
.fs
.sync(from_path
=cluster_uuid
)
909 command
= self
._get
_uninstall
_command
(
910 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
912 output
, _rc
= await self
._local
_async
_exec
(
913 command
=command
, raise_exception_on_error
=True, env
=env
917 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
919 return self
._output
_to
_table
(output
)
921 async def instances_list(self
, cluster_uuid
: str) -> list:
923 returns a list of deployed releases in a cluster
925 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
929 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
932 self
.fs
.sync(from_path
=cluster_uuid
)
934 # execute internal command
935 result
= await self
._instances
_list
(cluster_uuid
)
938 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
942 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
943 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
944 for instance
in instances
:
945 if instance
.get("name") == kdu_instance
:
947 self
.log
.debug("Instance {} not found".format(kdu_instance
))
950 async def upgrade_charm(
954 charm_id
: str = None,
955 charm_type
: str = None,
956 timeout
: float = None,
958 """This method upgrade charms in VNFs
961 ee_id: Execution environment id
962 path: Local path to the charm
964 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
965 timeout: (Float) Timeout for the ns update operation
968 The output of the update operation if status equals to "completed"
970 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
972 async def exec_primitive(
974 cluster_uuid
: str = None,
975 kdu_instance
: str = None,
976 primitive_name
: str = None,
977 timeout
: float = 300,
979 db_dict
: dict = None,
982 """Exec primitive (Juju action)
984 :param cluster_uuid: The UUID of the cluster or namespace:cluster
985 :param kdu_instance: The unique name of the KDU instance
986 :param primitive_name: Name of action that will be executed
987 :param timeout: Timeout for action execution
988 :param params: Dictionary of all the parameters needed for the action
989 :db_dict: Dictionary for any additional data
990 :param kwargs: Additional parameters (None yet)
992 :return: Returns the output of the action
995 "KDUs deployed with Helm don't support actions "
996 "different from rollback, upgrade and status"
999 async def get_services(
1000 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
1003 Returns a list of services defined for the specified kdu instance.
1005 :param cluster_uuid: UUID of a K8s cluster known by OSM
1006 :param kdu_instance: unique name for the KDU instance
1007 :param namespace: K8s namespace used by the KDU instance
1008 :return: If successful, it will return a list of services, Each service
1009 can have the following data:
1010 - `name` of the service
1011 - `type` type of service in the k8 cluster
1012 - `ports` List of ports offered by the service, for each port includes at least
1013 name, port, protocol
1014 - `cluster_ip` Internal ip to be used inside k8s cluster
1015 - `external_ip` List of external ips (in case they are available)
1019 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1020 cluster_uuid
, kdu_instance
1025 paths
, env
= self
._init
_paths
_env
(
1026 cluster_name
=cluster_uuid
, create_if_not_exist
=True
1030 self
.fs
.sync(from_path
=cluster_uuid
)
1032 # get list of services names for kdu
1033 service_names
= await self
._get
_services
(
1034 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
1038 for service
in service_names
:
1039 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
1040 service_list
.append(service
)
1043 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1047 async def get_service(
1048 self
, cluster_uuid
: str, service_name
: str, namespace
: str
1052 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1053 service_name
, namespace
, cluster_uuid
1058 self
.fs
.sync(from_path
=cluster_uuid
)
1060 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
1063 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1067 async def status_kdu(
1068 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
1069 ) -> Union
[str, dict]:
1071 This call would retrieve tha current state of a given KDU instance. It would be
1072 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1073 values_ of the configuration parameters applied to a given instance. This call
1074 would be based on the `status` call.
1076 :param cluster_uuid: UUID of a K8s cluster known by OSM
1077 :param kdu_instance: unique name for the KDU instance
1078 :param kwargs: Additional parameters (None yet)
1079 :param yaml_format: if the return shall be returned as an YAML string or as a
1081 :return: If successful, it will return the following vector of arguments:
1082 - K8s `namespace` in the cluster where the KDU lives
1083 - `state` of the KDU instance. It can be:
1090 - List of `resources` (objects) that this release consists of, sorted by kind,
1091 and the status of those resources
1092 - Last `deployment_time`.
1096 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1097 cluster_uuid
, kdu_instance
1102 self
.fs
.sync(from_path
=cluster_uuid
)
1104 # get instance: needed to obtain namespace
1105 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1106 for instance
in instances
:
1107 if instance
.get("name") == kdu_instance
:
1110 # instance does not exist
1112 "Instance name: {} not found in cluster: {}".format(
1113 kdu_instance
, cluster_uuid
1117 status
= await self
._status
_kdu
(
1118 cluster_id
=cluster_uuid
,
1119 kdu_instance
=kdu_instance
,
1120 namespace
=instance
["namespace"],
1121 yaml_format
=yaml_format
,
1122 show_error_log
=True,
1126 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1130 async def get_values_kdu(
1131 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1134 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1136 return await self
._exec
_get
_command
(
1137 get_command
="values",
1138 kdu_instance
=kdu_instance
,
1139 namespace
=namespace
,
1140 kubeconfig
=kubeconfig
,
1143 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1144 """Method to obtain the Helm Chart package's values
1147 kdu_model: The name or path of an Helm Chart
1148 repo_url: Helm Chart repository url
1151 str: the values of the Helm Chart package
1155 "inspect kdu_model values {} from (optional) repo: {}".format(
1160 return await self
._exec
_inspect
_command
(
1161 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1164 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1167 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1170 return await self
._exec
_inspect
_command
(
1171 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1174 async def synchronize_repos(self
, cluster_uuid
: str):
1176 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1178 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1179 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1181 local_repo_list
= await self
.repo_list(cluster_uuid
)
1182 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1184 deleted_repo_list
= []
1185 added_repo_dict
= {}
1187 # iterate over the list of repos in the database that should be
1188 # added if not present
1189 for repo_name
, db_repo
in db_repo_dict
.items():
1191 # check if it is already present
1192 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1193 repo_id
= db_repo
.get("_id")
1194 if curr_repo_url
!= db_repo
["url"]:
1197 "repo {} url changed, delete and and again".format(
1201 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1202 deleted_repo_list
.append(repo_id
)
1205 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1206 if "ca_cert" in db_repo
:
1207 await self
.repo_add(
1211 cert
=db_repo
["ca_cert"],
1214 await self
.repo_add(
1219 added_repo_dict
[repo_id
] = db_repo
["name"]
1220 except Exception as e
:
1222 "Error adding repo id: {}, err_msg: {} ".format(
1227 # Delete repos that are present but not in nbi_list
1228 for repo_name
in local_repo_dict
:
1229 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1230 self
.log
.debug("delete repo {}".format(repo_name
))
1232 await self
.repo_remove(cluster_uuid
, repo_name
)
1233 deleted_repo_list
.append(repo_name
)
1234 except Exception as e
:
1236 "Error deleting repo, name: {}, err_msg: {}".format(
1241 return deleted_repo_list
, added_repo_dict
1243 except K8sException
:
1245 except Exception as e
:
1246 # Do not raise errors synchronizing repos
1247 self
.log
.error("Error synchronizing repos: {}".format(e
))
1248 raise Exception("Error synchronizing repos: {}".format(e
))
1250 def _get_db_repos_dict(self
, repo_ids
: list):
1252 for repo_id
in repo_ids
:
1253 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1254 db_repos_dict
[db_repo
["name"]] = db_repo
1255 return db_repos_dict
1258 ####################################################################################
1259 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1260 ####################################################################################
1264 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1266 Creates and returns base cluster and kube dirs and returns them.
1267 Also created helm3 dirs according to new directory specification, paths are
1268 not returned but assigned to helm environment variables
1270 :param cluster_name: cluster_name
1271 :return: Dictionary with config_paths and dictionary with helm environment variables
1275 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1277 Implements the helm version dependent cluster initialization
1281 async def _instances_list(self
, cluster_id
):
1283 Implements the helm version dependent helm instances list
1287 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1289 Implements the helm version dependent method to obtain services from a helm instance
1293 async def _status_kdu(
1297 namespace
: str = None,
1298 yaml_format
: bool = False,
1299 show_error_log
: bool = False,
1300 ) -> Union
[str, dict]:
1302 Implements the helm version dependent method to obtain status of a helm instance
1306 def _get_install_command(
1318 Obtain command to be executed to delete the indicated instance
1322 def _get_upgrade_scale_command(
1335 """Generates the command to scale a Helm Chart release
1338 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1339 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1340 namespace (str): Namespace where this KDU instance is deployed
1341 scale (int): Scale count
1342 version (str): Constraint with specific version of the Chart to use
1343 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1344 The --wait flag will be set automatically if --atomic is used
1345 replica_str (str): The key under resource_name key where the scale count is stored
1346 timeout (float): The time, in seconds, to wait
1347 resource_name (str): The KDU's resource to scale
1348 kubeconfig (str): Kubeconfig file path
1351 str: command to scale a Helm Chart release
1355 def _get_upgrade_command(
1367 """Generates the command to upgrade a Helm Chart release
1370 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1371 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1372 namespace (str): Namespace where this KDU instance is deployed
1373 params_str (str): Params used to upgrade the Helm Chart release
1374 version (str): Constraint with specific version of the Chart to use
1375 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1376 The --wait flag will be set automatically if --atomic is used
1377 timeout (float): The time, in seconds, to wait
1378 kubeconfig (str): Kubeconfig file path
1379 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
1381 str: command to upgrade a Helm Chart release
1385 def _get_rollback_command(
1386 self
, kdu_instance
, namespace
, revision
, kubeconfig
1389 Obtain command to be executed to rollback the indicated instance
1393 def _get_uninstall_command(
1394 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1397 Obtain command to be executed to delete the indicated instance
1401 def _get_inspect_command(
1402 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1404 """Generates the command to obtain the information about an Helm Chart package
1405 (´helm show ...´ command)
1408 show_command: the second part of the command (`helm show <show_command>`)
1409 kdu_model: The name or path of an Helm Chart
1410 repo_url: Helm Chart repository url
1411 version: constraint with specific version of the Chart to use
1414 str: the generated Helm Chart command
1418 def _get_get_command(
1419 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1421 """Obtain command to be executed to get information about the kdu instance."""
1424 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1426 Method call to uninstall cluster software for helm. This method is dependent
1428 For Helm v2 it will be called when Tiller must be uninstalled
1429 For Helm v3 it does nothing and does not need to be callled
1433 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1435 Obtains the cluster repos identifiers
1439 ####################################################################################
1440 ################################### P R I V A T E ##################################
1441 ####################################################################################
1445 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1446 if os
.path
.exists(filename
):
1449 msg
= "File {} does not exist".format(filename
)
1450 if exception_if_not_exists
:
1451 raise K8sException(msg
)
1454 def _remove_multiple_spaces(strobj
):
1455 strobj
= strobj
.strip()
1456 while " " in strobj
:
1457 strobj
= strobj
.replace(" ", " ")
1461 def _output_to_lines(output
: str) -> list:
1462 output_lines
= list()
1463 lines
= output
.splitlines(keepends
=False)
1467 output_lines
.append(line
)
1471 def _output_to_table(output
: str) -> list:
1472 output_table
= list()
1473 lines
= output
.splitlines(keepends
=False)
1475 line
= line
.replace("\t", " ")
1477 output_table
.append(line_list
)
1478 cells
= line
.split(sep
=" ")
1482 line_list
.append(cell
)
1486 def _parse_services(output
: str) -> list:
1487 lines
= output
.splitlines(keepends
=False)
1490 line
= line
.replace("\t", " ")
1491 cells
= line
.split(sep
=" ")
1492 if len(cells
) > 0 and cells
[0].startswith("service/"):
1493 elems
= cells
[0].split(sep
="/")
1495 services
.append(elems
[1])
1499 def _get_deep(dictionary
: dict, members
: tuple):
1504 value
= target
.get(m
)
1513 # find key:value in several lines
1515 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1516 for line
in p_lines
:
1518 if line
.startswith(p_key
+ ":"):
1519 parts
= line
.split(":")
1520 the_value
= parts
[1].strip()
1528 def _lower_keys_list(input_list
: list):
1530 Transform the keys in a list of dictionaries to lower case and returns a new list
1535 for dictionary
in input_list
:
1536 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1537 new_list
.append(new_dict
)
1540 async def _local_async_exec(
1543 raise_exception_on_error
: bool = False,
1544 show_error_log
: bool = True,
1545 encode_utf8
: bool = False,
1549 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1551 "Executing async local command: {}, env: {}".format(command
, env
)
1555 command
= shlex
.split(command
)
1557 environ
= os
.environ
.copy()
1562 async with self
.cmd_lock
:
1563 process
= await asyncio
.create_subprocess_exec(
1565 stdout
=asyncio
.subprocess
.PIPE
,
1566 stderr
=asyncio
.subprocess
.PIPE
,
1570 # wait for command terminate
1571 stdout
, stderr
= await process
.communicate()
1573 return_code
= process
.returncode
1577 output
= stdout
.decode("utf-8").strip()
1578 # output = stdout.decode()
1580 output
= stderr
.decode("utf-8").strip()
1581 # output = stderr.decode()
1583 if return_code
!= 0 and show_error_log
:
1585 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1588 self
.log
.debug("Return code: {}".format(return_code
))
1590 if raise_exception_on_error
and return_code
!= 0:
1591 raise K8sException(output
)
1594 output
= output
.encode("utf-8").strip()
1595 output
= str(output
).replace("\\n", "\n")
1597 return output
, return_code
1599 except asyncio
.CancelledError
:
1600 # first, kill the process if it is still running
1601 if process
.returncode
is None:
1604 except K8sException
:
1606 except Exception as e
:
1607 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1609 if raise_exception_on_error
:
1610 raise K8sException(e
) from e
1614 async def _local_async_exec_pipe(
1618 raise_exception_on_error
: bool = True,
1619 show_error_log
: bool = True,
1620 encode_utf8
: bool = False,
1624 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1625 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1626 command
= "{} | {}".format(command1
, command2
)
1628 "Executing async local command: {}, env: {}".format(command
, env
)
1632 command1
= shlex
.split(command1
)
1633 command2
= shlex
.split(command2
)
1635 environ
= os
.environ
.copy()
1640 async with self
.cmd_lock
:
1641 read
, write
= os
.pipe()
1642 process_1
= await asyncio
.create_subprocess_exec(
1643 *command1
, stdout
=write
, env
=environ
1646 process_2
= await asyncio
.create_subprocess_exec(
1647 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1650 stdout
, stderr
= await process_2
.communicate()
1652 return_code
= process_2
.returncode
1656 output
= stdout
.decode("utf-8").strip()
1657 # output = stdout.decode()
1659 output
= stderr
.decode("utf-8").strip()
1660 # output = stderr.decode()
1662 if return_code
!= 0 and show_error_log
:
1664 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1667 self
.log
.debug("Return code: {}".format(return_code
))
1669 if raise_exception_on_error
and return_code
!= 0:
1670 raise K8sException(output
)
1673 output
= output
.encode("utf-8").strip()
1674 output
= str(output
).replace("\\n", "\n")
1676 return output
, return_code
1677 except asyncio
.CancelledError
:
1678 # first, kill the processes if they are still running
1679 for process
in (process_1
, process_2
):
1680 if process
.returncode
is None:
1683 except K8sException
:
1685 except Exception as e
:
1686 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1688 if raise_exception_on_error
:
1689 raise K8sException(e
) from e
1693 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1695 Obtains the data of the specified service in the k8cluster.
1697 :param cluster_id: id of a K8s cluster known by OSM
1698 :param service_name: name of the K8s service in the specified namespace
1699 :param namespace: K8s namespace used by the KDU instance
1700 :return: If successful, it will return a service with the following data:
1701 - `name` of the service
1702 - `type` type of service in the k8 cluster
1703 - `ports` List of ports offered by the service, for each port includes at least
1704 name, port, protocol
1705 - `cluster_ip` Internal ip to be used inside k8s cluster
1706 - `external_ip` List of external ips (in case they are available)
1710 paths
, env
= self
._init
_paths
_env
(
1711 cluster_name
=cluster_id
, create_if_not_exist
=True
1714 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1715 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1718 output
, _rc
= await self
._local
_async
_exec
(
1719 command
=command
, raise_exception_on_error
=True, env
=env
1722 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1725 "name": service_name
,
1726 "type": self
._get
_deep
(data
, ("spec", "type")),
1727 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1728 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1730 if service
["type"] == "LoadBalancer":
1731 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1732 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1733 service
["external_ip"] = ip_list
1737 async def _exec_get_command(
1738 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1740 """Obtains information about the kdu instance."""
1742 full_command
= self
._get
_get
_command
(
1743 get_command
, kdu_instance
, namespace
, kubeconfig
1746 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1750 async def _exec_inspect_command(
1751 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1753 """Obtains information about an Helm Chart package (´helm show´ command)
1756 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1757 kdu_model: The name or path of an Helm Chart
1758 repo_url: Helm Chart repository url
1761 str: the requested info about the Helm Chart package
1766 repo_str
= " --repo {}".format(repo_url
)
1768 # Obtain the Chart's name and store it in the var kdu_model
1769 kdu_model
, _
= self
._split
_repo
(kdu_model
=kdu_model
)
1771 kdu_model
, version
= self
._split
_version
(kdu_model
)
1773 version_str
= "--version {}".format(version
)
1777 full_command
= self
._get
_inspect
_command
(
1778 show_command
=inspect_command
,
1779 kdu_model
=kdu_model
,
1781 version
=version_str
,
1784 output
, _
= await self
._local
_async
_exec
(command
=full_command
)
1788 async def _get_replica_count_url(
1791 repo_url
: str = None,
1792 resource_name
: str = None,
1794 """Get the replica count value in the Helm Chart Values.
1797 kdu_model: The name or path of an Helm Chart
1798 repo_url: Helm Chart repository url
1799 resource_name: Resource name
1803 - The number of replicas of the specific instance; if not found, returns None; and
1804 - The string corresponding to the replica count key in the Helm values
1807 kdu_values
= yaml
.load(
1808 await self
.values_kdu(kdu_model
=kdu_model
, repo_url
=repo_url
),
1809 Loader
=yaml
.SafeLoader
,
1812 self
.log
.debug(f
"Obtained the Helm package values for the KDU: {kdu_values}")
1816 "kdu_values not found for kdu_model {}".format(kdu_model
)
1820 kdu_values
= kdu_values
.get(resource_name
, None)
1823 msg
= "resource {} not found in the values in model {}".format(
1824 resource_name
, kdu_model
1827 raise K8sException(msg
)
1829 duplicate_check
= False
1834 if kdu_values
.get("replicaCount") is not None:
1835 replicas
= kdu_values
["replicaCount"]
1836 replica_str
= "replicaCount"
1837 elif kdu_values
.get("replicas") is not None:
1838 duplicate_check
= True
1839 replicas
= kdu_values
["replicas"]
1840 replica_str
= "replicas"
1844 "replicaCount or replicas not found in the resource"
1845 "{} values in model {}. Cannot be scaled".format(
1846 resource_name
, kdu_model
1851 "replicaCount or replicas not found in the values"
1852 "in model {}. Cannot be scaled".format(kdu_model
)
1855 raise K8sException(msg
)
1857 # Control if replicas and replicaCount exists at the same time
1858 msg
= "replicaCount and replicas are exists at the same time"
1860 if "replicaCount" in kdu_values
:
1862 raise K8sException(msg
)
1864 if "replicas" in kdu_values
:
1866 raise K8sException(msg
)
1868 return replicas
, replica_str
1870 async def _get_replica_count_instance(
1875 resource_name
: str = None,
1877 """Get the replica count value in the instance.
1880 kdu_instance: The name of the KDU instance
1881 namespace: KDU instance namespace
1883 resource_name: Resource name
1886 The number of replicas of the specific instance; if not found, returns None
1889 kdu_values
= yaml
.load(
1890 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1891 Loader
=yaml
.SafeLoader
,
1894 self
.log
.debug(f
"Obtained the Helm values for the KDU instance: {kdu_values}")
1900 kdu_values
.get(resource_name
, None) if resource_name
else None
1903 for replica_str
in ("replicaCount", "replicas"):
1905 replicas
= resource_values
.get(replica_str
)
1907 replicas
= kdu_values
.get(replica_str
)
1909 if replicas
is not None:
1914 async def _store_status(
1919 namespace
: str = None,
1920 db_dict
: dict = None,
1923 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1925 :param cluster_id (str): the cluster where the KDU instance is deployed
1926 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1927 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1928 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1929 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1930 values for the keys:
1931 - "collection": The Mongo DB collection to write to
1932 - "filter": The query filter to use in the update process
1933 - "path": The dot separated keys which targets the object to be updated
1938 detailed_status
= await self
._status
_kdu
(
1939 cluster_id
=cluster_id
,
1940 kdu_instance
=kdu_instance
,
1942 namespace
=namespace
,
1945 status
= detailed_status
.get("info").get("description")
1946 self
.log
.debug(f
"Status for KDU {kdu_instance} obtained: {status}.")
1948 # write status to db
1949 result
= await self
.write_app_status_to_db(
1952 detailed_status
=str(detailed_status
),
1953 operation
=operation
,
1957 self
.log
.info("Error writing in database. Task exiting...")
1959 except asyncio
.CancelledError
as e
:
1961 f
"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1963 except Exception as e
:
1964 self
.log
.warning(f
"Exception in method {self._store_status.__name__}: {e}")
1966 # params for use in -f file
1967 # returns values file option and filename (in order to delete it at the end)
1968 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1970 if params
and len(params
) > 0:
1971 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1973 def get_random_number():
1974 r
= random
.randrange(start
=1, stop
=99999999)
1982 value
= params
.get(key
)
1983 if "!!yaml" in str(value
):
1984 value
= yaml
.safe_load(value
[7:])
1985 params2
[key
] = value
1987 values_file
= get_random_number() + ".yaml"
1988 with
open(values_file
, "w") as stream
:
1989 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1991 return "-f {}".format(values_file
), values_file
1995 # params for use in --set option
1997 def _params_to_set_option(params
: dict) -> str:
1999 if params
and len(params
) > 0:
2002 value
= params
.get(key
, None)
2003 if value
is not None:
2005 params_str
+= "--set "
2009 params_str
+= "{}={}".format(key
, value
)
2013 def generate_kdu_instance_name(**kwargs
):
2014 chart_name
= kwargs
["kdu_model"]
2015 # check embeded chart (file or dir)
2016 if chart_name
.startswith("/"):
2017 # extract file or directory name
2018 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
2020 elif "://" in chart_name
:
2021 # extract last portion of URL
2022 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
2025 for c
in chart_name
:
2026 if c
.isalpha() or c
.isnumeric():
2033 # if does not start with alpha character, prefix 'a'
2034 if not name
[0].isalpha():
2039 def get_random_number():
2040 r
= random
.randrange(start
=1, stop
=99999999)
2042 s
= s
.rjust(10, "0")
2045 name
= name
+ get_random_number()
2048 def _split_version(self
, kdu_model
: str) -> (str, str):
2050 if not self
._is
_helm
_chart
_a
_file
(kdu_model
) and ":" in kdu_model
:
2051 parts
= kdu_model
.split(sep
=":")
2053 version
= str(parts
[1])
2054 kdu_model
= parts
[0]
2055 return kdu_model
, version
2057 def _split_repo(self
, kdu_model
: str) -> (str, str):
2058 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2061 kdu_model (str): Associated KDU model
2064 (str, str): Tuple with the Chart name in index 0, and the repo name
2065 in index 2; if there was a problem finding them, return None
2072 idx
= kdu_model
.find("/")
2074 chart_name
= kdu_model
[idx
+ 1 :]
2075 repo_name
= kdu_model
[:idx
]
2077 return chart_name
, repo_name
2079 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
2080 """Obtain the Helm repository for an Helm Chart
2083 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2084 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2087 str: the repository URL; if Helm Chart is a local one, the function returns None
2090 _
, repo_name
= self
._split
_repo
(kdu_model
=kdu_model
)
2094 # Find repository link
2095 local_repo_list
= await self
.repo_list(cluster_uuid
)
2096 for repo
in local_repo_list
:
2097 if repo
["name"] == repo_name
:
2098 repo_url
= repo
["url"]
2099 break # it is not necessary to continue the loop if the repo link was found...