5985066c9e7e3399e591be627ad12b6bd8c4490a
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
39 class K8sHelmBaseConnector(K8sConnector
):
42 ####################################################################################
43 ################################### P U B L I C ####################################
44 ####################################################################################
47 service_account
= "osm"
53 kubectl_command
: str = "/usr/bin/kubectl",
54 helm_command
: str = "/usr/bin/helm",
60 :param fs: file system for kubernetes and helm configuration
61 :param db: database object to write current operation status
62 :param kubectl_command: path to kubectl executable
63 :param helm_command: path to helm executable
65 :param on_update_db: callback called when k8s connector updates database
69 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
71 self
.log
.info("Initializing K8S Helm connector")
73 self
.config
= EnvironConfig()
74 # random numbers for release name generation
75 random
.seed(time
.time())
80 # exception if kubectl is not installed
81 self
.kubectl_command
= kubectl_command
82 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
84 # exception if helm is not installed
85 self
._helm
_command
= helm_command
86 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
88 # obtain stable repo url from config or apply default
89 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
90 if self
._stable
_repo
_url
== "None":
91 self
._stable
_repo
_url
= None
93 # Lock to avoid concurrent execution of helm commands
94 self
.cmd_lock
= asyncio
.Lock()
96 def _get_namespace(self
, cluster_uuid
: str) -> str:
98 Obtains the namespace used by the cluster with the uuid passed by argument
100 param: cluster_uuid: cluster's uuid
103 # first, obtain the cluster corresponding to the uuid passed by argument
104 k8scluster
= self
.db
.get_one(
105 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
107 return k8scluster
.get("namespace")
112 namespace
: str = "kube-system",
113 reuse_cluster_uuid
=None,
117 It prepares a given K8s cluster environment to run Charts
119 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
121 :param namespace: optional namespace to be used for helm. By default,
122 'kube-system' will be used
123 :param reuse_cluster_uuid: existing cluster uuid for reuse
124 :param kwargs: Additional parameters (None yet)
125 :return: uuid of the K8s cluster and True if connector has installed some
126 software in the cluster
127 (on error, an exception will be raised)
130 if reuse_cluster_uuid
:
131 cluster_id
= reuse_cluster_uuid
133 cluster_id
= str(uuid4())
136 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
139 paths
, env
= self
._init
_paths
_env
(
140 cluster_name
=cluster_id
, create_if_not_exist
=True
142 mode
= stat
.S_IRUSR | stat
.S_IWUSR
143 with
open(paths
["kube_config"], "w", mode
) as f
:
145 os
.chmod(paths
["kube_config"], 0o600)
147 # Code with initialization specific of helm version
148 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
150 # sync fs with local data
151 self
.fs
.reverse_sync(from_path
=cluster_id
)
153 self
.log
.info("Cluster {} initialized".format(cluster_id
))
155 return cluster_id
, n2vc_installed_sw
162 repo_type
: str = "chart",
165 password
: str = None,
168 "Cluster {}, adding {} repository {}. URL: {}".format(
169 cluster_uuid
, repo_type
, name
, url
174 paths
, env
= self
._init
_paths
_env
(
175 cluster_name
=cluster_uuid
, create_if_not_exist
=True
179 self
.fs
.sync(from_path
=cluster_uuid
)
181 # helm repo add name url
182 command
= ("env KUBECONFIG={} {} repo add {} {}").format(
183 paths
["kube_config"], self
._helm
_command
, name
, url
187 temp_cert_file
= os
.path
.join(
188 self
.fs
.path
, "{}/helmcerts/".format(cluster_uuid
), "temp.crt"
190 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
191 with
open(temp_cert_file
, "w") as the_cert
:
193 command
+= " --ca-file {}".format(temp_cert_file
)
196 command
+= " --username={}".format(user
)
199 command
+= " --password={}".format(password
)
201 self
.log
.debug("adding repo: {}".format(command
))
202 await self
._local
_async
_exec
(
203 command
=command
, raise_exception_on_error
=True, env
=env
207 command
= "env KUBECONFIG={} {} repo update {}".format(
208 paths
["kube_config"], self
._helm
_command
, name
210 self
.log
.debug("updating repo: {}".format(command
))
211 await self
._local
_async
_exec
(
212 command
=command
, raise_exception_on_error
=False, env
=env
216 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
218 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
220 "Cluster {}, updating {} repository {}".format(
221 cluster_uuid
, repo_type
, name
226 paths
, env
= self
._init
_paths
_env
(
227 cluster_name
=cluster_uuid
, create_if_not_exist
=True
231 self
.fs
.sync(from_path
=cluster_uuid
)
234 command
= "{} repo update {}".format(self
._helm
_command
, name
)
235 self
.log
.debug("updating repo: {}".format(command
))
236 await self
._local
_async
_exec
(
237 command
=command
, raise_exception_on_error
=False, env
=env
241 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
243 async def repo_list(self
, cluster_uuid
: str) -> list:
245 Get the list of registered repositories
247 :return: list of registered repositories: [ (name, url) .... ]
250 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
253 paths
, env
= self
._init
_paths
_env
(
254 cluster_name
=cluster_uuid
, create_if_not_exist
=True
258 self
.fs
.sync(from_path
=cluster_uuid
)
260 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
261 paths
["kube_config"], self
._helm
_command
264 # Set exception to false because if there are no repos just want an empty list
265 output
, _rc
= await self
._local
_async
_exec
(
266 command
=command
, raise_exception_on_error
=False, env
=env
270 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
273 if output
and len(output
) > 0:
274 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
275 # unify format between helm2 and helm3 setting all keys lowercase
276 return self
._lower
_keys
_list
(repos
)
282 async def repo_remove(self
, cluster_uuid
: str, name
: str):
284 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
288 paths
, env
= self
._init
_paths
_env
(
289 cluster_name
=cluster_uuid
, create_if_not_exist
=True
293 self
.fs
.sync(from_path
=cluster_uuid
)
295 command
= "env KUBECONFIG={} {} repo remove {}".format(
296 paths
["kube_config"], self
._helm
_command
, name
298 await self
._local
_async
_exec
(
299 command
=command
, raise_exception_on_error
=True, env
=env
303 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
309 uninstall_sw
: bool = False,
314 Resets the Kubernetes cluster by removing the helm deployment that represents it.
316 :param cluster_uuid: The UUID of the cluster to reset
317 :param force: Boolean to force the reset
318 :param uninstall_sw: Boolean to force the reset
319 :param kwargs: Additional parameters (None yet)
320 :return: Returns True if successful or raises an exception.
322 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
324 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
325 cluster_uuid
, uninstall_sw
330 self
.fs
.sync(from_path
=cluster_uuid
)
332 # uninstall releases if needed.
334 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
335 if len(releases
) > 0:
339 kdu_instance
= r
.get("name")
340 chart
= r
.get("chart")
342 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
344 await self
.uninstall(
345 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
347 except Exception as e
:
348 # will not raise exception as it was found
349 # that in some cases of previously installed helm releases it
352 "Error uninstalling release {}: {}".format(
358 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
359 ).format(cluster_uuid
)
362 False # Allow to remove k8s cluster without removing Tiller
366 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
368 # delete cluster directory
369 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
370 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
371 # Remove also local directorio if still exist
372 direct
= self
.fs
.path
+ "/" + cluster_uuid
373 shutil
.rmtree(direct
, ignore_errors
=True)
377 def _is_helm_chart_a_file(self
, chart_name
: str):
378 return chart_name
.count("/") > 1
380 async def _install_impl(
388 timeout
: float = 300,
390 db_dict
: dict = None,
391 kdu_name
: str = None,
392 namespace
: str = None,
395 paths
, env
= self
._init
_paths
_env
(
396 cluster_name
=cluster_id
, create_if_not_exist
=True
400 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
401 cluster_id
=cluster_id
, params
=params
405 kdu_model
, version
= self
._split
_version
(kdu_model
)
407 _
, repo
= self
._split
_repo
(kdu_model
)
409 await self
.repo_update(cluster_id
, repo
)
411 command
= self
._get
_install
_command
(
419 paths
["kube_config"],
422 self
.log
.debug("installing: {}".format(command
))
425 # exec helm in a task
426 exec_task
= asyncio
.ensure_future(
427 coro_or_future
=self
._local
_async
_exec
(
428 command
=command
, raise_exception_on_error
=False, env
=env
432 # write status in another task
433 status_task
= asyncio
.ensure_future(
434 coro_or_future
=self
._store
_status
(
435 cluster_id
=cluster_id
,
436 kdu_instance
=kdu_instance
,
443 # wait for execution task
444 await asyncio
.wait([exec_task
])
449 output
, rc
= exec_task
.result()
452 output
, rc
= await self
._local
_async
_exec
(
453 command
=command
, raise_exception_on_error
=False, env
=env
456 # remove temporal values yaml file
458 os
.remove(file_to_delete
)
461 await self
._store
_status
(
462 cluster_id
=cluster_id
,
463 kdu_instance
=kdu_instance
,
470 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
472 raise K8sException(msg
)
478 kdu_model
: str = None,
480 timeout
: float = 300,
482 db_dict
: dict = None,
484 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
487 self
.fs
.sync(from_path
=cluster_uuid
)
489 # look for instance to obtain namespace
490 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
491 if not instance_info
:
492 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
495 paths
, env
= self
._init
_paths
_env
(
496 cluster_name
=cluster_uuid
, create_if_not_exist
=True
500 self
.fs
.sync(from_path
=cluster_uuid
)
503 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
504 cluster_id
=cluster_uuid
, params
=params
508 kdu_model
, version
= self
._split
_version
(kdu_model
)
510 _
, repo
= self
._split
_repo
(kdu_model
)
512 await self
.repo_update(cluster_uuid
, repo
)
514 command
= self
._get
_upgrade
_command
(
517 instance_info
["namespace"],
522 paths
["kube_config"],
525 self
.log
.debug("upgrading: {}".format(command
))
528 # exec helm in a task
529 exec_task
= asyncio
.ensure_future(
530 coro_or_future
=self
._local
_async
_exec
(
531 command
=command
, raise_exception_on_error
=False, env
=env
534 # write status in another task
535 status_task
= asyncio
.ensure_future(
536 coro_or_future
=self
._store
_status
(
537 cluster_id
=cluster_uuid
,
538 kdu_instance
=kdu_instance
,
539 namespace
=instance_info
["namespace"],
545 # wait for execution task
546 await asyncio
.wait([exec_task
])
550 output
, rc
= exec_task
.result()
553 output
, rc
= await self
._local
_async
_exec
(
554 command
=command
, raise_exception_on_error
=False, env
=env
557 # remove temporal values yaml file
559 os
.remove(file_to_delete
)
562 await self
._store
_status
(
563 cluster_id
=cluster_uuid
,
564 kdu_instance
=kdu_instance
,
565 namespace
=instance_info
["namespace"],
571 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
573 raise K8sException(msg
)
576 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
578 # return new revision number
579 instance
= await self
.get_instance_info(
580 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
583 revision
= int(instance
.get("revision"))
584 self
.log
.debug("New revision: {}".format(revision
))
594 total_timeout
: float = 1800,
595 cluster_uuid
: str = None,
596 kdu_model
: str = None,
598 db_dict
: dict = None,
601 """Scale a resource in a Helm Chart.
604 kdu_instance: KDU instance name
605 scale: Scale to which to set the resource
606 resource_name: Resource name
607 total_timeout: The time, in seconds, to wait
608 cluster_uuid: The UUID of the cluster
609 kdu_model: The chart reference
610 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
611 The --wait flag will be set automatically if --atomic is used
612 db_dict: Dictionary for any additional data
613 kwargs: Additional parameters
616 True if successful, False otherwise
619 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
621 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
622 resource_name
, kdu_model
, cluster_uuid
625 self
.log
.debug(debug_mgs
)
627 # look for instance to obtain namespace
628 # get_instance_info function calls the sync command
629 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
630 if not instance_info
:
631 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
634 paths
, env
= self
._init
_paths
_env
(
635 cluster_name
=cluster_uuid
, create_if_not_exist
=True
639 kdu_model
, version
= self
._split
_version
(kdu_model
)
641 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
643 _
, replica_str
= await self
._get
_replica
_count
_url
(
644 kdu_model
, repo_url
, resource_name
647 command
= self
._get
_upgrade
_scale
_command
(
650 instance_info
["namespace"],
657 paths
["kube_config"],
660 self
.log
.debug("scaling: {}".format(command
))
663 # exec helm in a task
664 exec_task
= asyncio
.ensure_future(
665 coro_or_future
=self
._local
_async
_exec
(
666 command
=command
, raise_exception_on_error
=False, env
=env
669 # write status in another task
670 status_task
= asyncio
.ensure_future(
671 coro_or_future
=self
._store
_status
(
672 cluster_id
=cluster_uuid
,
673 kdu_instance
=kdu_instance
,
674 namespace
=instance_info
["namespace"],
680 # wait for execution task
681 await asyncio
.wait([exec_task
])
685 output
, rc
= exec_task
.result()
688 output
, rc
= await self
._local
_async
_exec
(
689 command
=command
, raise_exception_on_error
=False, env
=env
693 await self
._store
_status
(
694 cluster_id
=cluster_uuid
,
695 kdu_instance
=kdu_instance
,
696 namespace
=instance_info
["namespace"],
702 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
704 raise K8sException(msg
)
707 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
711 async def get_scale_count(
719 """Get a resource scale count.
722 cluster_uuid: The UUID of the cluster
723 resource_name: Resource name
724 kdu_instance: KDU instance name
725 kdu_model: The name or path of an Helm Chart
726 kwargs: Additional parameters
729 Resource instance count
733 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
736 # look for instance to obtain namespace
737 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
738 if not instance_info
:
739 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
742 paths
, _
= self
._init
_paths
_env
(
743 cluster_name
=cluster_uuid
, create_if_not_exist
=True
746 replicas
= await self
._get
_replica
_count
_instance
(
747 kdu_instance
=kdu_instance
,
748 namespace
=instance_info
["namespace"],
749 kubeconfig
=paths
["kube_config"],
750 resource_name
=resource_name
,
754 f
"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
757 # Get default value if scale count is not found from provided values
758 # Important note: this piece of code shall only be executed in the first scaling operation,
759 # since it is expected that the _get_replica_count_instance is able to obtain the number of
760 # replicas when a scale operation was already conducted previously for this KDU/resource!
762 repo_url
= await self
._find
_repo
(
763 kdu_model
=kdu_model
, cluster_uuid
=cluster_uuid
765 replicas
, _
= await self
._get
_replica
_count
_url
(
766 kdu_model
=kdu_model
, repo_url
=repo_url
, resource_name
=resource_name
770 f
"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
771 f
"{resource_name} obtained: {replicas}"
775 msg
= "Replica count not found. Cannot be scaled"
777 raise K8sException(msg
)
782 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
785 "rollback kdu_instance {} to revision {} from cluster {}".format(
786 kdu_instance
, revision
, cluster_uuid
791 self
.fs
.sync(from_path
=cluster_uuid
)
793 # look for instance to obtain namespace
794 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
795 if not instance_info
:
796 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
799 paths
, env
= self
._init
_paths
_env
(
800 cluster_name
=cluster_uuid
, create_if_not_exist
=True
804 self
.fs
.sync(from_path
=cluster_uuid
)
806 command
= self
._get
_rollback
_command
(
807 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
810 self
.log
.debug("rolling_back: {}".format(command
))
812 # exec helm in a task
813 exec_task
= asyncio
.ensure_future(
814 coro_or_future
=self
._local
_async
_exec
(
815 command
=command
, raise_exception_on_error
=False, env
=env
818 # write status in another task
819 status_task
= asyncio
.ensure_future(
820 coro_or_future
=self
._store
_status
(
821 cluster_id
=cluster_uuid
,
822 kdu_instance
=kdu_instance
,
823 namespace
=instance_info
["namespace"],
825 operation
="rollback",
829 # wait for execution task
830 await asyncio
.wait([exec_task
])
835 output
, rc
= exec_task
.result()
838 await self
._store
_status
(
839 cluster_id
=cluster_uuid
,
840 kdu_instance
=kdu_instance
,
841 namespace
=instance_info
["namespace"],
843 operation
="rollback",
847 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
849 raise K8sException(msg
)
852 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
854 # return new revision number
855 instance
= await self
.get_instance_info(
856 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
859 revision
= int(instance
.get("revision"))
860 self
.log
.debug("New revision: {}".format(revision
))
865 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
867 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
868 (this call should happen after all _terminate-config-primitive_ of the VNF
871 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
872 :param kdu_instance: unique name for the KDU instance to be deleted
873 :param kwargs: Additional parameters (None yet)
874 :return: True if successful
878 "uninstall kdu_instance {} from cluster {}".format(
879 kdu_instance
, cluster_uuid
884 self
.fs
.sync(from_path
=cluster_uuid
)
886 # look for instance to obtain namespace
887 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
888 if not instance_info
:
889 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
892 paths
, env
= self
._init
_paths
_env
(
893 cluster_name
=cluster_uuid
, create_if_not_exist
=True
897 self
.fs
.sync(from_path
=cluster_uuid
)
899 command
= self
._get
_uninstall
_command
(
900 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
902 output
, _rc
= await self
._local
_async
_exec
(
903 command
=command
, raise_exception_on_error
=True, env
=env
907 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
909 return self
._output
_to
_table
(output
)
911 async def instances_list(self
, cluster_uuid
: str) -> list:
913 returns a list of deployed releases in a cluster
915 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
919 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
922 self
.fs
.sync(from_path
=cluster_uuid
)
924 # execute internal command
925 result
= await self
._instances
_list
(cluster_uuid
)
928 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
932 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
933 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
934 for instance
in instances
:
935 if instance
.get("name") == kdu_instance
:
937 self
.log
.debug("Instance {} not found".format(kdu_instance
))
940 async def upgrade_charm(
944 charm_id
: str = None,
945 charm_type
: str = None,
946 timeout
: float = None,
948 """This method upgrade charms in VNFs
951 ee_id: Execution environment id
952 path: Local path to the charm
954 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
955 timeout: (Float) Timeout for the ns update operation
958 The output of the update operation if status equals to "completed"
960 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
962 async def exec_primitive(
964 cluster_uuid
: str = None,
965 kdu_instance
: str = None,
966 primitive_name
: str = None,
967 timeout
: float = 300,
969 db_dict
: dict = None,
972 """Exec primitive (Juju action)
974 :param cluster_uuid: The UUID of the cluster or namespace:cluster
975 :param kdu_instance: The unique name of the KDU instance
976 :param primitive_name: Name of action that will be executed
977 :param timeout: Timeout for action execution
978 :param params: Dictionary of all the parameters needed for the action
979 :db_dict: Dictionary for any additional data
980 :param kwargs: Additional parameters (None yet)
982 :return: Returns the output of the action
985 "KDUs deployed with Helm don't support actions "
986 "different from rollback, upgrade and status"
989 async def get_services(
990 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
993 Returns a list of services defined for the specified kdu instance.
995 :param cluster_uuid: UUID of a K8s cluster known by OSM
996 :param kdu_instance: unique name for the KDU instance
997 :param namespace: K8s namespace used by the KDU instance
998 :return: If successful, it will return a list of services, Each service
999 can have the following data:
1000 - `name` of the service
1001 - `type` type of service in the k8 cluster
1002 - `ports` List of ports offered by the service, for each port includes at least
1003 name, port, protocol
1004 - `cluster_ip` Internal ip to be used inside k8s cluster
1005 - `external_ip` List of external ips (in case they are available)
1009 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1010 cluster_uuid
, kdu_instance
1015 paths
, env
= self
._init
_paths
_env
(
1016 cluster_name
=cluster_uuid
, create_if_not_exist
=True
1020 self
.fs
.sync(from_path
=cluster_uuid
)
1022 # get list of services names for kdu
1023 service_names
= await self
._get
_services
(
1024 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
1028 for service
in service_names
:
1029 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
1030 service_list
.append(service
)
1033 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1037 async def get_service(
1038 self
, cluster_uuid
: str, service_name
: str, namespace
: str
1041 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1042 service_name
, namespace
, cluster_uuid
1047 self
.fs
.sync(from_path
=cluster_uuid
)
1049 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
1052 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1056 async def status_kdu(
1057 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
1058 ) -> Union
[str, dict]:
1060 This call would retrieve tha current state of a given KDU instance. It would be
1061 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1062 values_ of the configuration parameters applied to a given instance. This call
1063 would be based on the `status` call.
1065 :param cluster_uuid: UUID of a K8s cluster known by OSM
1066 :param kdu_instance: unique name for the KDU instance
1067 :param kwargs: Additional parameters (None yet)
1068 :param yaml_format: if the return shall be returned as an YAML string or as a
1070 :return: If successful, it will return the following vector of arguments:
1071 - K8s `namespace` in the cluster where the KDU lives
1072 - `state` of the KDU instance. It can be:
1079 - List of `resources` (objects) that this release consists of, sorted by kind,
1080 and the status of those resources
1081 - Last `deployment_time`.
1085 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1086 cluster_uuid
, kdu_instance
1091 self
.fs
.sync(from_path
=cluster_uuid
)
1093 # get instance: needed to obtain namespace
1094 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1095 for instance
in instances
:
1096 if instance
.get("name") == kdu_instance
:
1099 # instance does not exist
1101 "Instance name: {} not found in cluster: {}".format(
1102 kdu_instance
, cluster_uuid
1106 status
= await self
._status
_kdu
(
1107 cluster_id
=cluster_uuid
,
1108 kdu_instance
=kdu_instance
,
1109 namespace
=instance
["namespace"],
1110 yaml_format
=yaml_format
,
1111 show_error_log
=True,
1115 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1119 async def get_values_kdu(
1120 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1122 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1124 return await self
._exec
_get
_command
(
1125 get_command
="values",
1126 kdu_instance
=kdu_instance
,
1127 namespace
=namespace
,
1128 kubeconfig
=kubeconfig
,
1131 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1132 """Method to obtain the Helm Chart package's values
1135 kdu_model: The name or path of an Helm Chart
1136 repo_url: Helm Chart repository url
1139 str: the values of the Helm Chart package
1143 "inspect kdu_model values {} from (optional) repo: {}".format(
1148 return await self
._exec
_inspect
_command
(
1149 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1152 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1154 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1157 return await self
._exec
_inspect
_command
(
1158 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1161 async def synchronize_repos(self
, cluster_uuid
: str):
1162 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1164 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1165 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1167 local_repo_list
= await self
.repo_list(cluster_uuid
)
1168 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1170 deleted_repo_list
= []
1171 added_repo_dict
= {}
1173 # iterate over the list of repos in the database that should be
1174 # added if not present
1175 for repo_name
, db_repo
in db_repo_dict
.items():
1177 # check if it is already present
1178 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1179 repo_id
= db_repo
.get("_id")
1180 if curr_repo_url
!= db_repo
["url"]:
1183 "repo {} url changed, delete and and again".format(
1187 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1188 deleted_repo_list
.append(repo_id
)
1191 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1192 if "ca_cert" in db_repo
:
1193 await self
.repo_add(
1197 cert
=db_repo
["ca_cert"],
1200 await self
.repo_add(
1205 added_repo_dict
[repo_id
] = db_repo
["name"]
1206 except Exception as e
:
1208 "Error adding repo id: {}, err_msg: {} ".format(
1213 # Delete repos that are present but not in nbi_list
1214 for repo_name
in local_repo_dict
:
1215 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1216 self
.log
.debug("delete repo {}".format(repo_name
))
1218 await self
.repo_remove(cluster_uuid
, repo_name
)
1219 deleted_repo_list
.append(repo_name
)
1220 except Exception as e
:
1222 "Error deleting repo, name: {}, err_msg: {}".format(
1227 return deleted_repo_list
, added_repo_dict
1229 except K8sException
:
1231 except Exception as e
:
1232 # Do not raise errors synchronizing repos
1233 self
.log
.error("Error synchronizing repos: {}".format(e
))
1234 raise Exception("Error synchronizing repos: {}".format(e
))
1236 def _get_db_repos_dict(self
, repo_ids
: list):
1238 for repo_id
in repo_ids
:
1239 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1240 db_repos_dict
[db_repo
["name"]] = db_repo
1241 return db_repos_dict
1244 ####################################################################################
1245 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1246 ####################################################################################
1250 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1252 Creates and returns base cluster and kube dirs and returns them.
1253 Also created helm3 dirs according to new directory specification, paths are
1254 not returned but assigned to helm environment variables
1256 :param cluster_name: cluster_name
1257 :return: Dictionary with config_paths and dictionary with helm environment variables
1261 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1263 Implements the helm version dependent cluster initialization
1267 async def _instances_list(self
, cluster_id
):
1269 Implements the helm version dependent helm instances list
1273 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1275 Implements the helm version dependent method to obtain services from a helm instance
1279 async def _status_kdu(
1283 namespace
: str = None,
1284 yaml_format
: bool = False,
1285 show_error_log
: bool = False,
1286 ) -> Union
[str, dict]:
1288 Implements the helm version dependent method to obtain status of a helm instance
1292 def _get_install_command(
1304 Obtain command to be executed to delete the indicated instance
1308 def _get_upgrade_scale_command(
1321 """Generates the command to scale a Helm Chart release
1324 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1325 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1326 namespace (str): Namespace where this KDU instance is deployed
1327 scale (int): Scale count
1328 version (str): Constraint with specific version of the Chart to use
1329 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1330 The --wait flag will be set automatically if --atomic is used
1331 replica_str (str): The key under resource_name key where the scale count is stored
1332 timeout (float): The time, in seconds, to wait
1333 resource_name (str): The KDU's resource to scale
1334 kubeconfig (str): Kubeconfig file path
1337 str: command to scale a Helm Chart release
1341 def _get_upgrade_command(
1352 """Generates the command to upgrade a Helm Chart release
1355 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1356 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1357 namespace (str): Namespace where this KDU instance is deployed
1358 params_str (str): Params used to upgrade the Helm Chart release
1359 version (str): Constraint with specific version of the Chart to use
1360 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1361 The --wait flag will be set automatically if --atomic is used
1362 timeout (float): The time, in seconds, to wait
1363 kubeconfig (str): Kubeconfig file path
1366 str: command to upgrade a Helm Chart release
1370 def _get_rollback_command(
1371 self
, kdu_instance
, namespace
, revision
, kubeconfig
1374 Obtain command to be executed to rollback the indicated instance
1378 def _get_uninstall_command(
1379 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1382 Obtain command to be executed to delete the indicated instance
1386 def _get_inspect_command(
1387 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1389 """Generates the command to obtain the information about an Helm Chart package
1390 (´helm show ...´ command)
1393 show_command: the second part of the command (`helm show <show_command>`)
1394 kdu_model: The name or path of an Helm Chart
1395 repo_url: Helm Chart repository url
1396 version: constraint with specific version of the Chart to use
1399 str: the generated Helm Chart command
1403 def _get_get_command(
1404 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1406 """Obtain command to be executed to get information about the kdu instance."""
1409 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1411 Method call to uninstall cluster software for helm. This method is dependent
1413 For Helm v2 it will be called when Tiller must be uninstalled
1414 For Helm v3 it does nothing and does not need to be callled
1418 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1420 Obtains the cluster repos identifiers
1424 ####################################################################################
1425 ################################### P R I V A T E ##################################
1426 ####################################################################################
1430 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1431 if os
.path
.exists(filename
):
1434 msg
= "File {} does not exist".format(filename
)
1435 if exception_if_not_exists
:
1436 raise K8sException(msg
)
1439 def _remove_multiple_spaces(strobj
):
1440 strobj
= strobj
.strip()
1441 while " " in strobj
:
1442 strobj
= strobj
.replace(" ", " ")
1446 def _output_to_lines(output
: str) -> list:
1447 output_lines
= list()
1448 lines
= output
.splitlines(keepends
=False)
1452 output_lines
.append(line
)
1456 def _output_to_table(output
: str) -> list:
1457 output_table
= list()
1458 lines
= output
.splitlines(keepends
=False)
1460 line
= line
.replace("\t", " ")
1462 output_table
.append(line_list
)
1463 cells
= line
.split(sep
=" ")
1467 line_list
.append(cell
)
1471 def _parse_services(output
: str) -> list:
1472 lines
= output
.splitlines(keepends
=False)
1475 line
= line
.replace("\t", " ")
1476 cells
= line
.split(sep
=" ")
1477 if len(cells
) > 0 and cells
[0].startswith("service/"):
1478 elems
= cells
[0].split(sep
="/")
1480 services
.append(elems
[1])
1484 def _get_deep(dictionary
: dict, members
: tuple):
1489 value
= target
.get(m
)
1498 # find key:value in several lines
1500 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1501 for line
in p_lines
:
1503 if line
.startswith(p_key
+ ":"):
1504 parts
= line
.split(":")
1505 the_value
= parts
[1].strip()
1513 def _lower_keys_list(input_list
: list):
1515 Transform the keys in a list of dictionaries to lower case and returns a new list
1520 for dictionary
in input_list
:
1521 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1522 new_list
.append(new_dict
)
1525 async def _local_async_exec(
1528 raise_exception_on_error
: bool = False,
1529 show_error_log
: bool = True,
1530 encode_utf8
: bool = False,
1533 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1535 "Executing async local command: {}, env: {}".format(command
, env
)
1539 command
= shlex
.split(command
)
1541 environ
= os
.environ
.copy()
1546 async with self
.cmd_lock
:
1547 process
= await asyncio
.create_subprocess_exec(
1549 stdout
=asyncio
.subprocess
.PIPE
,
1550 stderr
=asyncio
.subprocess
.PIPE
,
1554 # wait for command terminate
1555 stdout
, stderr
= await process
.communicate()
1557 return_code
= process
.returncode
1561 output
= stdout
.decode("utf-8").strip()
1562 # output = stdout.decode()
1564 output
= stderr
.decode("utf-8").strip()
1565 # output = stderr.decode()
1567 if return_code
!= 0 and show_error_log
:
1569 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1572 self
.log
.debug("Return code: {}".format(return_code
))
1574 if raise_exception_on_error
and return_code
!= 0:
1575 raise K8sException(output
)
1578 output
= output
.encode("utf-8").strip()
1579 output
= str(output
).replace("\\n", "\n")
1581 return output
, return_code
1583 except asyncio
.CancelledError
:
1585 except K8sException
:
1587 except Exception as e
:
1588 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1590 if raise_exception_on_error
:
1591 raise K8sException(e
) from e
1595 async def _local_async_exec_pipe(
1599 raise_exception_on_error
: bool = True,
1600 show_error_log
: bool = True,
1601 encode_utf8
: bool = False,
1604 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1605 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1606 command
= "{} | {}".format(command1
, command2
)
1608 "Executing async local command: {}, env: {}".format(command
, env
)
1612 command1
= shlex
.split(command1
)
1613 command2
= shlex
.split(command2
)
1615 environ
= os
.environ
.copy()
1620 async with self
.cmd_lock
:
1621 read
, write
= os
.pipe()
1622 await asyncio
.create_subprocess_exec(
1623 *command1
, stdout
=write
, env
=environ
1626 process_2
= await asyncio
.create_subprocess_exec(
1627 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1630 stdout
, stderr
= await process_2
.communicate()
1632 return_code
= process_2
.returncode
1636 output
= stdout
.decode("utf-8").strip()
1637 # output = stdout.decode()
1639 output
= stderr
.decode("utf-8").strip()
1640 # output = stderr.decode()
1642 if return_code
!= 0 and show_error_log
:
1644 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1647 self
.log
.debug("Return code: {}".format(return_code
))
1649 if raise_exception_on_error
and return_code
!= 0:
1650 raise K8sException(output
)
1653 output
= output
.encode("utf-8").strip()
1654 output
= str(output
).replace("\\n", "\n")
1656 return output
, return_code
1657 except asyncio
.CancelledError
:
1659 except K8sException
:
1661 except Exception as e
:
1662 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1664 if raise_exception_on_error
:
1665 raise K8sException(e
) from e
1669 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1671 Obtains the data of the specified service in the k8cluster.
1673 :param cluster_id: id of a K8s cluster known by OSM
1674 :param service_name: name of the K8s service in the specified namespace
1675 :param namespace: K8s namespace used by the KDU instance
1676 :return: If successful, it will return a service with the following data:
1677 - `name` of the service
1678 - `type` type of service in the k8 cluster
1679 - `ports` List of ports offered by the service, for each port includes at least
1680 name, port, protocol
1681 - `cluster_ip` Internal ip to be used inside k8s cluster
1682 - `external_ip` List of external ips (in case they are available)
1686 paths
, env
= self
._init
_paths
_env
(
1687 cluster_name
=cluster_id
, create_if_not_exist
=True
1690 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1691 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1694 output
, _rc
= await self
._local
_async
_exec
(
1695 command
=command
, raise_exception_on_error
=True, env
=env
1698 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1701 "name": service_name
,
1702 "type": self
._get
_deep
(data
, ("spec", "type")),
1703 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1704 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1706 if service
["type"] == "LoadBalancer":
1707 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1708 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1709 service
["external_ip"] = ip_list
1713 async def _exec_get_command(
1714 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1716 """Obtains information about the kdu instance."""
1718 full_command
= self
._get
_get
_command
(
1719 get_command
, kdu_instance
, namespace
, kubeconfig
1722 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1726 async def _exec_inspect_command(
1727 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1729 """Obtains information about an Helm Chart package (´helm show´ command)
1732 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1733 kdu_model: The name or path of an Helm Chart
1734 repo_url: Helm Chart repository url
1737 str: the requested info about the Helm Chart package
1742 repo_str
= " --repo {}".format(repo_url
)
1744 # Obtain the Chart's name and store it in the var kdu_model
1745 kdu_model
, _
= self
._split
_repo
(kdu_model
=kdu_model
)
1747 kdu_model
, version
= self
._split
_version
(kdu_model
)
1749 version_str
= "--version {}".format(version
)
1753 full_command
= self
._get
_inspect
_command
(
1754 show_command
=inspect_command
,
1755 kdu_model
=kdu_model
,
1757 version
=version_str
,
1760 output
, _
= await self
._local
_async
_exec
(command
=full_command
)
1764 async def _get_replica_count_url(
1767 repo_url
: str = None,
1768 resource_name
: str = None,
1770 """Get the replica count value in the Helm Chart Values.
1773 kdu_model: The name or path of an Helm Chart
1774 repo_url: Helm Chart repository url
1775 resource_name: Resource name
1779 - The number of replicas of the specific instance; if not found, returns None; and
1780 - The string corresponding to the replica count key in the Helm values
1783 kdu_values
= yaml
.load(
1784 await self
.values_kdu(kdu_model
=kdu_model
, repo_url
=repo_url
),
1785 Loader
=yaml
.SafeLoader
,
1788 self
.log
.debug(f
"Obtained the Helm package values for the KDU: {kdu_values}")
1792 "kdu_values not found for kdu_model {}".format(kdu_model
)
1796 kdu_values
= kdu_values
.get(resource_name
, None)
1799 msg
= "resource {} not found in the values in model {}".format(
1800 resource_name
, kdu_model
1803 raise K8sException(msg
)
1805 duplicate_check
= False
1810 if kdu_values
.get("replicaCount") is not None:
1811 replicas
= kdu_values
["replicaCount"]
1812 replica_str
= "replicaCount"
1813 elif kdu_values
.get("replicas") is not None:
1814 duplicate_check
= True
1815 replicas
= kdu_values
["replicas"]
1816 replica_str
= "replicas"
1820 "replicaCount or replicas not found in the resource"
1821 "{} values in model {}. Cannot be scaled".format(
1822 resource_name
, kdu_model
1827 "replicaCount or replicas not found in the values"
1828 "in model {}. Cannot be scaled".format(kdu_model
)
1831 raise K8sException(msg
)
1833 # Control if replicas and replicaCount exists at the same time
1834 msg
= "replicaCount and replicas are exists at the same time"
1836 if "replicaCount" in kdu_values
:
1838 raise K8sException(msg
)
1840 if "replicas" in kdu_values
:
1842 raise K8sException(msg
)
1844 return replicas
, replica_str
1846 async def _get_replica_count_instance(
1851 resource_name
: str = None,
1853 """Get the replica count value in the instance.
1856 kdu_instance: The name of the KDU instance
1857 namespace: KDU instance namespace
1859 resource_name: Resource name
1862 The number of replicas of the specific instance; if not found, returns None
1865 kdu_values
= yaml
.load(
1866 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1867 Loader
=yaml
.SafeLoader
,
1870 self
.log
.debug(f
"Obtained the Helm values for the KDU instance: {kdu_values}")
1876 kdu_values
.get(resource_name
, None) if resource_name
else None
1879 for replica_str
in ("replicaCount", "replicas"):
1881 replicas
= resource_values
.get(replica_str
)
1883 replicas
= kdu_values
.get(replica_str
)
1885 if replicas
is not None:
1890 async def _store_status(
1895 namespace
: str = None,
1896 db_dict
: dict = None,
1899 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1901 :param cluster_id (str): the cluster where the KDU instance is deployed
1902 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1903 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1904 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1905 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1906 values for the keys:
1907 - "collection": The Mongo DB collection to write to
1908 - "filter": The query filter to use in the update process
1909 - "path": The dot separated keys which targets the object to be updated
1914 detailed_status
= await self
._status
_kdu
(
1915 cluster_id
=cluster_id
,
1916 kdu_instance
=kdu_instance
,
1918 namespace
=namespace
,
1921 status
= detailed_status
.get("info").get("description")
1922 self
.log
.debug(f
"Status for KDU {kdu_instance} obtained: {status}.")
1924 # write status to db
1925 result
= await self
.write_app_status_to_db(
1928 detailed_status
=str(detailed_status
),
1929 operation
=operation
,
1933 self
.log
.info("Error writing in database. Task exiting...")
1935 except asyncio
.CancelledError
as e
:
1937 f
"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1939 except Exception as e
:
1940 self
.log
.warning(f
"Exception in method {self._store_status.__name__}: {e}")
1942 # params for use in -f file
1943 # returns values file option and filename (in order to delete it at the end)
1944 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1945 if params
and len(params
) > 0:
1946 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1948 def get_random_number():
1949 r
= random
.randrange(start
=1, stop
=99999999)
1957 value
= params
.get(key
)
1958 if "!!yaml" in str(value
):
1959 value
= yaml
.safe_load(value
[7:])
1960 params2
[key
] = value
1962 values_file
= get_random_number() + ".yaml"
1963 with
open(values_file
, "w") as stream
:
1964 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1966 return "-f {}".format(values_file
), values_file
1970 # params for use in --set option
1972 def _params_to_set_option(params
: dict) -> str:
1974 if params
and len(params
) > 0:
1977 value
= params
.get(key
, None)
1978 if value
is not None:
1980 params_str
+= "--set "
1984 params_str
+= "{}={}".format(key
, value
)
1988 def generate_kdu_instance_name(**kwargs
):
1989 chart_name
= kwargs
["kdu_model"]
1990 # check embeded chart (file or dir)
1991 if chart_name
.startswith("/"):
1992 # extract file or directory name
1993 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1995 elif "://" in chart_name
:
1996 # extract last portion of URL
1997 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
2000 for c
in chart_name
:
2001 if c
.isalpha() or c
.isnumeric():
2008 # if does not start with alpha character, prefix 'a'
2009 if not name
[0].isalpha():
2014 def get_random_number():
2015 r
= random
.randrange(start
=1, stop
=99999999)
2017 s
= s
.rjust(10, "0")
2020 name
= name
+ get_random_number()
2023 def _split_version(self
, kdu_model
: str) -> (str, str):
2025 if not self
._is
_helm
_chart
_a
_file
(kdu_model
) and ":" in kdu_model
:
2026 parts
= kdu_model
.split(sep
=":")
2028 version
= str(parts
[1])
2029 kdu_model
= parts
[0]
2030 return kdu_model
, version
2032 def _split_repo(self
, kdu_model
: str) -> (str, str):
2033 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2036 kdu_model (str): Associated KDU model
2039 (str, str): Tuple with the Chart name in index 0, and the repo name
2040 in index 2; if there was a problem finding them, return None
2047 idx
= kdu_model
.find("/")
2049 chart_name
= kdu_model
[idx
+ 1 :]
2050 repo_name
= kdu_model
[:idx
]
2052 return chart_name
, repo_name
2054 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
2055 """Obtain the Helm repository for an Helm Chart
2058 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2059 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2062 str: the repository URL; if Helm Chart is a local one, the function returns None
2065 _
, repo_name
= self
._split
_repo
(kdu_model
=kdu_model
)
2069 # Find repository link
2070 local_repo_list
= await self
.repo_list(cluster_uuid
)
2071 for repo
in local_repo_list
:
2072 if repo
["name"] == repo_name
:
2073 repo_url
= repo
["url"]
2074 break # it is not necessary to continue the loop if the repo link was found...