2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
37 from n2vc
.kubectl
import Kubectl
40 class K8sHelmBaseConnector(K8sConnector
):
43 ####################################################################################
44 ################################### P U B L I C ####################################
45 ####################################################################################
48 service_account
= "osm"
54 kubectl_command
: str = "/usr/bin/kubectl",
55 helm_command
: str = "/usr/bin/helm",
61 :param fs: file system for kubernetes and helm configuration
62 :param db: database object to write current operation status
63 :param kubectl_command: path to kubectl executable
64 :param helm_command: path to helm executable
66 :param on_update_db: callback called when k8s connector updates database
70 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
72 self
.log
.info("Initializing K8S Helm connector")
74 self
.config
= EnvironConfig()
75 # random numbers for release name generation
76 random
.seed(time
.time())
81 # exception if kubectl is not installed
82 self
.kubectl_command
= kubectl_command
83 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
85 # exception if helm is not installed
86 self
._helm
_command
= helm_command
87 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
89 # obtain stable repo url from config or apply default
90 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
91 if self
._stable
_repo
_url
== "None":
92 self
._stable
_repo
_url
= None
94 # Lock to avoid concurrent execution of helm commands
95 self
.cmd_lock
= asyncio
.Lock()
97 def _get_namespace(self
, cluster_uuid
: str) -> str:
99 Obtains the namespace used by the cluster with the uuid passed by argument
101 param: cluster_uuid: cluster's uuid
104 # first, obtain the cluster corresponding to the uuid passed by argument
105 k8scluster
= self
.db
.get_one(
106 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
108 return k8scluster
.get("namespace")
113 namespace
: str = "kube-system",
114 reuse_cluster_uuid
=None,
118 It prepares a given K8s cluster environment to run Charts
120 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
122 :param namespace: optional namespace to be used for helm. By default,
123 'kube-system' will be used
124 :param reuse_cluster_uuid: existing cluster uuid for reuse
125 :param kwargs: Additional parameters (None yet)
126 :return: uuid of the K8s cluster and True if connector has installed some
127 software in the cluster
128 (on error, an exception will be raised)
131 if reuse_cluster_uuid
:
132 cluster_id
= reuse_cluster_uuid
134 cluster_id
= str(uuid4())
137 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
140 paths
, env
= self
._init
_paths
_env
(
141 cluster_name
=cluster_id
, create_if_not_exist
=True
143 mode
= stat
.S_IRUSR | stat
.S_IWUSR
144 with
open(paths
["kube_config"], "w", mode
) as f
:
146 os
.chmod(paths
["kube_config"], 0o600)
148 # Code with initialization specific of helm version
149 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
151 # sync fs with local data
152 self
.fs
.reverse_sync(from_path
=cluster_id
)
154 self
.log
.info("Cluster {} initialized".format(cluster_id
))
156 return cluster_id
, n2vc_installed_sw
163 repo_type
: str = "chart",
166 password
: str = None,
169 "Cluster {}, adding {} repository {}. URL: {}".format(
170 cluster_uuid
, repo_type
, name
, url
175 paths
, env
= self
._init
_paths
_env
(
176 cluster_name
=cluster_uuid
, create_if_not_exist
=True
180 self
.fs
.sync(from_path
=cluster_uuid
)
182 # helm repo add name url
183 command
= ("env KUBECONFIG={} {} repo add {} {}").format(
184 paths
["kube_config"], self
._helm
_command
, name
, url
188 temp_cert_file
= os
.path
.join(
189 self
.fs
.path
, "{}/helmcerts/".format(cluster_uuid
), "temp.crt"
191 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
192 with
open(temp_cert_file
, "w") as the_cert
:
194 command
+= " --ca-file {}".format(temp_cert_file
)
197 command
+= " --username={}".format(user
)
200 command
+= " --password={}".format(password
)
202 self
.log
.debug("adding repo: {}".format(command
))
203 await self
._local
_async
_exec
(
204 command
=command
, raise_exception_on_error
=True, env
=env
208 command
= "env KUBECONFIG={} {} repo update {}".format(
209 paths
["kube_config"], self
._helm
_command
, name
211 self
.log
.debug("updating repo: {}".format(command
))
212 await self
._local
_async
_exec
(
213 command
=command
, raise_exception_on_error
=False, env
=env
217 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
219 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
221 "Cluster {}, updating {} repository {}".format(
222 cluster_uuid
, repo_type
, name
227 paths
, env
= self
._init
_paths
_env
(
228 cluster_name
=cluster_uuid
, create_if_not_exist
=True
232 self
.fs
.sync(from_path
=cluster_uuid
)
235 command
= "{} repo update {}".format(self
._helm
_command
, name
)
236 self
.log
.debug("updating repo: {}".format(command
))
237 await self
._local
_async
_exec
(
238 command
=command
, raise_exception_on_error
=False, env
=env
242 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
244 async def repo_list(self
, cluster_uuid
: str) -> list:
246 Get the list of registered repositories
248 :return: list of registered repositories: [ (name, url) .... ]
251 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
254 paths
, env
= self
._init
_paths
_env
(
255 cluster_name
=cluster_uuid
, create_if_not_exist
=True
259 self
.fs
.sync(from_path
=cluster_uuid
)
261 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
262 paths
["kube_config"], self
._helm
_command
265 # Set exception to false because if there are no repos just want an empty list
266 output
, _rc
= await self
._local
_async
_exec
(
267 command
=command
, raise_exception_on_error
=False, env
=env
271 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
274 if output
and len(output
) > 0:
275 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
276 # unify format between helm2 and helm3 setting all keys lowercase
277 return self
._lower
_keys
_list
(repos
)
283 async def repo_remove(self
, cluster_uuid
: str, name
: str):
285 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
289 paths
, env
= self
._init
_paths
_env
(
290 cluster_name
=cluster_uuid
, create_if_not_exist
=True
294 self
.fs
.sync(from_path
=cluster_uuid
)
296 command
= "env KUBECONFIG={} {} repo remove {}".format(
297 paths
["kube_config"], self
._helm
_command
, name
299 await self
._local
_async
_exec
(
300 command
=command
, raise_exception_on_error
=True, env
=env
304 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
310 uninstall_sw
: bool = False,
315 Resets the Kubernetes cluster by removing the helm deployment that represents it.
317 :param cluster_uuid: The UUID of the cluster to reset
318 :param force: Boolean to force the reset
319 :param uninstall_sw: Boolean to force the reset
320 :param kwargs: Additional parameters (None yet)
321 :return: Returns True if successful or raises an exception.
323 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
325 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
326 cluster_uuid
, uninstall_sw
331 self
.fs
.sync(from_path
=cluster_uuid
)
333 # uninstall releases if needed.
335 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
336 if len(releases
) > 0:
340 kdu_instance
= r
.get("name")
341 chart
= r
.get("chart")
343 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
345 await self
.uninstall(
346 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
348 except Exception as e
:
349 # will not raise exception as it was found
350 # that in some cases of previously installed helm releases it
353 "Error uninstalling release {}: {}".format(
359 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
360 ).format(cluster_uuid
)
363 False # Allow to remove k8s cluster without removing Tiller
367 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
369 # delete cluster directory
370 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
371 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
372 # Remove also local directorio if still exist
373 direct
= self
.fs
.path
+ "/" + cluster_uuid
374 shutil
.rmtree(direct
, ignore_errors
=True)
378 def _is_helm_chart_a_file(self
, chart_name
: str):
379 return chart_name
.count("/") > 1
381 async def _install_impl(
389 timeout
: float = 300,
391 db_dict
: dict = None,
392 kdu_name
: str = None,
393 namespace
: str = None,
396 paths
, env
= self
._init
_paths
_env
(
397 cluster_name
=cluster_id
, create_if_not_exist
=True
401 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
402 cluster_id
=cluster_id
, params
=params
406 kdu_model
, version
= self
._split
_version
(kdu_model
)
408 _
, repo
= self
._split
_repo
(kdu_model
)
410 await self
.repo_update(cluster_id
, repo
)
412 command
= self
._get
_install
_command
(
420 paths
["kube_config"],
423 self
.log
.debug("installing: {}".format(command
))
426 # exec helm in a task
427 exec_task
= asyncio
.ensure_future(
428 coro_or_future
=self
._local
_async
_exec
(
429 command
=command
, raise_exception_on_error
=False, env
=env
433 # write status in another task
434 status_task
= asyncio
.ensure_future(
435 coro_or_future
=self
._store
_status
(
436 cluster_id
=cluster_id
,
437 kdu_instance
=kdu_instance
,
444 # wait for execution task
445 await asyncio
.wait([exec_task
])
450 output
, rc
= exec_task
.result()
454 output
, rc
= await self
._local
_async
_exec
(
455 command
=command
, raise_exception_on_error
=False, env
=env
458 # remove temporal values yaml file
460 os
.remove(file_to_delete
)
463 await self
._store
_status
(
464 cluster_id
=cluster_id
,
465 kdu_instance
=kdu_instance
,
472 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
474 raise K8sException(msg
)
480 kdu_model
: str = None,
482 timeout
: float = 300,
484 db_dict
: dict = None,
485 namespace
: str = None,
488 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
491 self
.fs
.sync(from_path
=cluster_uuid
)
493 # look for instance to obtain namespace
497 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
498 if not instance_info
:
499 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
500 namespace
= instance_info
["namespace"]
503 paths
, env
= self
._init
_paths
_env
(
504 cluster_name
=cluster_uuid
, create_if_not_exist
=True
508 self
.fs
.sync(from_path
=cluster_uuid
)
511 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
512 cluster_id
=cluster_uuid
, params
=params
516 kdu_model
, version
= self
._split
_version
(kdu_model
)
518 _
, repo
= self
._split
_repo
(kdu_model
)
520 await self
.repo_update(cluster_uuid
, repo
)
522 command
= self
._get
_upgrade
_command
(
530 paths
["kube_config"],
534 self
.log
.debug("upgrading: {}".format(command
))
538 # exec helm in a task
539 exec_task
= asyncio
.ensure_future(
540 coro_or_future
=self
._local
_async
_exec
(
541 command
=command
, raise_exception_on_error
=False, env
=env
544 # write status in another task
545 status_task
= asyncio
.ensure_future(
546 coro_or_future
=self
._store
_status
(
547 cluster_id
=cluster_uuid
,
548 kdu_instance
=kdu_instance
,
555 # wait for execution task
556 await asyncio
.wait([exec_task
])
560 output
, rc
= exec_task
.result()
564 output
, rc
= await self
._local
_async
_exec
(
565 command
=command
, raise_exception_on_error
=False, env
=env
568 # remove temporal values yaml file
570 os
.remove(file_to_delete
)
573 await self
._store
_status
(
574 cluster_id
=cluster_uuid
,
575 kdu_instance
=kdu_instance
,
582 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
584 raise K8sException(msg
)
587 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
589 # return new revision number
590 instance
= await self
.get_instance_info(
591 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
594 revision
= int(instance
.get("revision"))
595 self
.log
.debug("New revision: {}".format(revision
))
605 total_timeout
: float = 1800,
606 cluster_uuid
: str = None,
607 kdu_model
: str = None,
609 db_dict
: dict = None,
612 """Scale a resource in a Helm Chart.
615 kdu_instance: KDU instance name
616 scale: Scale to which to set the resource
617 resource_name: Resource name
618 total_timeout: The time, in seconds, to wait
619 cluster_uuid: The UUID of the cluster
620 kdu_model: The chart reference
621 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
622 The --wait flag will be set automatically if --atomic is used
623 db_dict: Dictionary for any additional data
624 kwargs: Additional parameters
627 True if successful, False otherwise
630 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
632 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
633 resource_name
, kdu_model
, cluster_uuid
636 self
.log
.debug(debug_mgs
)
638 # look for instance to obtain namespace
639 # get_instance_info function calls the sync command
640 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
641 if not instance_info
:
642 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
645 paths
, env
= self
._init
_paths
_env
(
646 cluster_name
=cluster_uuid
, create_if_not_exist
=True
650 kdu_model
, version
= self
._split
_version
(kdu_model
)
652 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
654 _
, replica_str
= await self
._get
_replica
_count
_url
(
655 kdu_model
, repo_url
, resource_name
658 command
= self
._get
_upgrade
_scale
_command
(
661 instance_info
["namespace"],
668 paths
["kube_config"],
671 self
.log
.debug("scaling: {}".format(command
))
674 # exec helm in a task
675 exec_task
= asyncio
.ensure_future(
676 coro_or_future
=self
._local
_async
_exec
(
677 command
=command
, raise_exception_on_error
=False, env
=env
680 # write status in another task
681 status_task
= asyncio
.ensure_future(
682 coro_or_future
=self
._store
_status
(
683 cluster_id
=cluster_uuid
,
684 kdu_instance
=kdu_instance
,
685 namespace
=instance_info
["namespace"],
691 # wait for execution task
692 await asyncio
.wait([exec_task
])
696 output
, rc
= exec_task
.result()
699 output
, rc
= await self
._local
_async
_exec
(
700 command
=command
, raise_exception_on_error
=False, env
=env
704 await self
._store
_status
(
705 cluster_id
=cluster_uuid
,
706 kdu_instance
=kdu_instance
,
707 namespace
=instance_info
["namespace"],
713 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
715 raise K8sException(msg
)
718 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
722 async def get_scale_count(
730 """Get a resource scale count.
733 cluster_uuid: The UUID of the cluster
734 resource_name: Resource name
735 kdu_instance: KDU instance name
736 kdu_model: The name or path of an Helm Chart
737 kwargs: Additional parameters
740 Resource instance count
744 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
747 # look for instance to obtain namespace
748 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
749 if not instance_info
:
750 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
753 paths
, _
= self
._init
_paths
_env
(
754 cluster_name
=cluster_uuid
, create_if_not_exist
=True
757 replicas
= await self
._get
_replica
_count
_instance
(
758 kdu_instance
=kdu_instance
,
759 namespace
=instance_info
["namespace"],
760 kubeconfig
=paths
["kube_config"],
761 resource_name
=resource_name
,
765 f
"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
768 # Get default value if scale count is not found from provided values
769 # Important note: this piece of code shall only be executed in the first scaling operation,
770 # since it is expected that the _get_replica_count_instance is able to obtain the number of
771 # replicas when a scale operation was already conducted previously for this KDU/resource!
773 repo_url
= await self
._find
_repo
(
774 kdu_model
=kdu_model
, cluster_uuid
=cluster_uuid
776 replicas
, _
= await self
._get
_replica
_count
_url
(
777 kdu_model
=kdu_model
, repo_url
=repo_url
, resource_name
=resource_name
781 f
"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
782 f
"{resource_name} obtained: {replicas}"
786 msg
= "Replica count not found. Cannot be scaled"
788 raise K8sException(msg
)
793 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
796 "rollback kdu_instance {} to revision {} from cluster {}".format(
797 kdu_instance
, revision
, cluster_uuid
802 self
.fs
.sync(from_path
=cluster_uuid
)
804 # look for instance to obtain namespace
805 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
806 if not instance_info
:
807 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
810 paths
, env
= self
._init
_paths
_env
(
811 cluster_name
=cluster_uuid
, create_if_not_exist
=True
815 self
.fs
.sync(from_path
=cluster_uuid
)
817 command
= self
._get
_rollback
_command
(
818 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
821 self
.log
.debug("rolling_back: {}".format(command
))
823 # exec helm in a task
824 exec_task
= asyncio
.ensure_future(
825 coro_or_future
=self
._local
_async
_exec
(
826 command
=command
, raise_exception_on_error
=False, env
=env
829 # write status in another task
830 status_task
= asyncio
.ensure_future(
831 coro_or_future
=self
._store
_status
(
832 cluster_id
=cluster_uuid
,
833 kdu_instance
=kdu_instance
,
834 namespace
=instance_info
["namespace"],
836 operation
="rollback",
840 # wait for execution task
841 await asyncio
.wait([exec_task
])
846 output
, rc
= exec_task
.result()
849 await self
._store
_status
(
850 cluster_id
=cluster_uuid
,
851 kdu_instance
=kdu_instance
,
852 namespace
=instance_info
["namespace"],
854 operation
="rollback",
858 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
860 raise K8sException(msg
)
863 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
865 # return new revision number
866 instance
= await self
.get_instance_info(
867 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
870 revision
= int(instance
.get("revision"))
871 self
.log
.debug("New revision: {}".format(revision
))
876 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
878 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
879 (this call should happen after all _terminate-config-primitive_ of the VNF
882 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
883 :param kdu_instance: unique name for the KDU instance to be deleted
884 :param kwargs: Additional parameters (None yet)
885 :return: True if successful
889 "uninstall kdu_instance {} from cluster {}".format(
890 kdu_instance
, cluster_uuid
895 self
.fs
.sync(from_path
=cluster_uuid
)
897 # look for instance to obtain namespace
898 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
899 if not instance_info
:
900 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
903 paths
, env
= self
._init
_paths
_env
(
904 cluster_name
=cluster_uuid
, create_if_not_exist
=True
908 self
.fs
.sync(from_path
=cluster_uuid
)
910 command
= self
._get
_uninstall
_command
(
911 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
913 output
, _rc
= await self
._local
_async
_exec
(
914 command
=command
, raise_exception_on_error
=True, env
=env
918 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
920 return self
._output
_to
_table
(output
)
922 async def instances_list(self
, cluster_uuid
: str) -> list:
924 returns a list of deployed releases in a cluster
926 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
930 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
933 self
.fs
.sync(from_path
=cluster_uuid
)
935 # execute internal command
936 result
= await self
._instances
_list
(cluster_uuid
)
939 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
943 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
944 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
945 for instance
in instances
:
946 if instance
.get("name") == kdu_instance
:
948 self
.log
.debug("Instance {} not found".format(kdu_instance
))
951 async def upgrade_charm(
955 charm_id
: str = None,
956 charm_type
: str = None,
957 timeout
: float = None,
959 """This method upgrade charms in VNFs
962 ee_id: Execution environment id
963 path: Local path to the charm
965 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
966 timeout: (Float) Timeout for the ns update operation
969 The output of the update operation if status equals to "completed"
971 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
973 async def exec_primitive(
975 cluster_uuid
: str = None,
976 kdu_instance
: str = None,
977 primitive_name
: str = None,
978 timeout
: float = 300,
980 db_dict
: dict = None,
983 """Exec primitive (Juju action)
985 :param cluster_uuid: The UUID of the cluster or namespace:cluster
986 :param kdu_instance: The unique name of the KDU instance
987 :param primitive_name: Name of action that will be executed
988 :param timeout: Timeout for action execution
989 :param params: Dictionary of all the parameters needed for the action
990 :db_dict: Dictionary for any additional data
991 :param kwargs: Additional parameters (None yet)
993 :return: Returns the output of the action
996 "KDUs deployed with Helm don't support actions "
997 "different from rollback, upgrade and status"
1000 async def get_services(
1001 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
1004 Returns a list of services defined for the specified kdu instance.
1006 :param cluster_uuid: UUID of a K8s cluster known by OSM
1007 :param kdu_instance: unique name for the KDU instance
1008 :param namespace: K8s namespace used by the KDU instance
1009 :return: If successful, it will return a list of services, Each service
1010 can have the following data:
1011 - `name` of the service
1012 - `type` type of service in the k8 cluster
1013 - `ports` List of ports offered by the service, for each port includes at least
1014 name, port, protocol
1015 - `cluster_ip` Internal ip to be used inside k8s cluster
1016 - `external_ip` List of external ips (in case they are available)
1020 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1021 cluster_uuid
, kdu_instance
1026 paths
, env
= self
._init
_paths
_env
(
1027 cluster_name
=cluster_uuid
, create_if_not_exist
=True
1031 self
.fs
.sync(from_path
=cluster_uuid
)
1033 # get list of services names for kdu
1034 service_names
= await self
._get
_services
(
1035 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
1039 for service
in service_names
:
1040 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
1041 service_list
.append(service
)
1044 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1048 async def get_service(
1049 self
, cluster_uuid
: str, service_name
: str, namespace
: str
1053 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1054 service_name
, namespace
, cluster_uuid
1059 self
.fs
.sync(from_path
=cluster_uuid
)
1061 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
1064 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1068 async def status_kdu(
1069 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
1070 ) -> Union
[str, dict]:
1072 This call would retrieve tha current state of a given KDU instance. It would be
1073 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1074 values_ of the configuration parameters applied to a given instance. This call
1075 would be based on the `status` call.
1077 :param cluster_uuid: UUID of a K8s cluster known by OSM
1078 :param kdu_instance: unique name for the KDU instance
1079 :param kwargs: Additional parameters (None yet)
1080 :param yaml_format: if the return shall be returned as an YAML string or as a
1082 :return: If successful, it will return the following vector of arguments:
1083 - K8s `namespace` in the cluster where the KDU lives
1084 - `state` of the KDU instance. It can be:
1091 - List of `resources` (objects) that this release consists of, sorted by kind,
1092 and the status of those resources
1093 - Last `deployment_time`.
1097 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1098 cluster_uuid
, kdu_instance
1103 self
.fs
.sync(from_path
=cluster_uuid
)
1105 # get instance: needed to obtain namespace
1106 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1107 for instance
in instances
:
1108 if instance
.get("name") == kdu_instance
:
1111 # instance does not exist
1113 "Instance name: {} not found in cluster: {}".format(
1114 kdu_instance
, cluster_uuid
1118 status
= await self
._status
_kdu
(
1119 cluster_id
=cluster_uuid
,
1120 kdu_instance
=kdu_instance
,
1121 namespace
=instance
["namespace"],
1122 yaml_format
=yaml_format
,
1123 show_error_log
=True,
1127 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1131 async def get_values_kdu(
1132 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1135 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1137 return await self
._exec
_get
_command
(
1138 get_command
="values",
1139 kdu_instance
=kdu_instance
,
1140 namespace
=namespace
,
1141 kubeconfig
=kubeconfig
,
1144 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1145 """Method to obtain the Helm Chart package's values
1148 kdu_model: The name or path of an Helm Chart
1149 repo_url: Helm Chart repository url
1152 str: the values of the Helm Chart package
1156 "inspect kdu_model values {} from (optional) repo: {}".format(
1161 return await self
._exec
_inspect
_command
(
1162 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1165 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1168 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1171 return await self
._exec
_inspect
_command
(
1172 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1175 async def synchronize_repos(self
, cluster_uuid
: str):
1177 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1179 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1180 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1182 local_repo_list
= await self
.repo_list(cluster_uuid
)
1183 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1185 deleted_repo_list
= []
1186 added_repo_dict
= {}
1188 # iterate over the list of repos in the database that should be
1189 # added if not present
1190 for repo_name
, db_repo
in db_repo_dict
.items():
1192 # check if it is already present
1193 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1194 repo_id
= db_repo
.get("_id")
1195 if curr_repo_url
!= db_repo
["url"]:
1198 "repo {} url changed, delete and and again".format(
1202 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1203 deleted_repo_list
.append(repo_id
)
1206 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1207 if "ca_cert" in db_repo
:
1208 await self
.repo_add(
1212 cert
=db_repo
["ca_cert"],
1215 await self
.repo_add(
1220 added_repo_dict
[repo_id
] = db_repo
["name"]
1221 except Exception as e
:
1223 "Error adding repo id: {}, err_msg: {} ".format(
1228 # Delete repos that are present but not in nbi_list
1229 for repo_name
in local_repo_dict
:
1230 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1231 self
.log
.debug("delete repo {}".format(repo_name
))
1233 await self
.repo_remove(cluster_uuid
, repo_name
)
1234 deleted_repo_list
.append(repo_name
)
1235 except Exception as e
:
1237 "Error deleting repo, name: {}, err_msg: {}".format(
1242 return deleted_repo_list
, added_repo_dict
1244 except K8sException
:
1246 except Exception as e
:
1247 # Do not raise errors synchronizing repos
1248 self
.log
.error("Error synchronizing repos: {}".format(e
))
1249 raise Exception("Error synchronizing repos: {}".format(e
))
1251 def _get_db_repos_dict(self
, repo_ids
: list):
1253 for repo_id
in repo_ids
:
1254 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1255 db_repos_dict
[db_repo
["name"]] = db_repo
1256 return db_repos_dict
1259 ####################################################################################
1260 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1261 ####################################################################################
1265 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1267 Creates and returns base cluster and kube dirs and returns them.
1268 Also created helm3 dirs according to new directory specification, paths are
1269 not returned but assigned to helm environment variables
1271 :param cluster_name: cluster_name
1272 :return: Dictionary with config_paths and dictionary with helm environment variables
1276 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1278 Implements the helm version dependent cluster initialization
1282 async def _instances_list(self
, cluster_id
):
1284 Implements the helm version dependent helm instances list
1288 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1290 Implements the helm version dependent method to obtain services from a helm instance
1294 async def _status_kdu(
1298 namespace
: str = None,
1299 yaml_format
: bool = False,
1300 show_error_log
: bool = False,
1301 ) -> Union
[str, dict]:
1303 Implements the helm version dependent method to obtain status of a helm instance
1307 def _get_install_command(
1319 Obtain command to be executed to delete the indicated instance
1323 def _get_upgrade_scale_command(
1336 """Generates the command to scale a Helm Chart release
1339 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1340 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1341 namespace (str): Namespace where this KDU instance is deployed
1342 scale (int): Scale count
1343 version (str): Constraint with specific version of the Chart to use
1344 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1345 The --wait flag will be set automatically if --atomic is used
1346 replica_str (str): The key under resource_name key where the scale count is stored
1347 timeout (float): The time, in seconds, to wait
1348 resource_name (str): The KDU's resource to scale
1349 kubeconfig (str): Kubeconfig file path
1352 str: command to scale a Helm Chart release
1356 def _get_upgrade_command(
1368 """Generates the command to upgrade a Helm Chart release
1371 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1372 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1373 namespace (str): Namespace where this KDU instance is deployed
1374 params_str (str): Params used to upgrade the Helm Chart release
1375 version (str): Constraint with specific version of the Chart to use
1376 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1377 The --wait flag will be set automatically if --atomic is used
1378 timeout (float): The time, in seconds, to wait
1379 kubeconfig (str): Kubeconfig file path
1380 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
1382 str: command to upgrade a Helm Chart release
1386 def _get_rollback_command(
1387 self
, kdu_instance
, namespace
, revision
, kubeconfig
1390 Obtain command to be executed to rollback the indicated instance
1394 def _get_uninstall_command(
1395 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1398 Obtain command to be executed to delete the indicated instance
1402 def _get_inspect_command(
1403 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1405 """Generates the command to obtain the information about an Helm Chart package
1406 (´helm show ...´ command)
1409 show_command: the second part of the command (`helm show <show_command>`)
1410 kdu_model: The name or path of an Helm Chart
1411 repo_url: Helm Chart repository url
1412 version: constraint with specific version of the Chart to use
1415 str: the generated Helm Chart command
1419 def _get_get_command(
1420 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1422 """Obtain command to be executed to get information about the kdu instance."""
1425 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1427 Method call to uninstall cluster software for helm. This method is dependent
1429 For Helm v2 it will be called when Tiller must be uninstalled
1430 For Helm v3 it does nothing and does not need to be callled
1434 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1436 Obtains the cluster repos identifiers
1440 ####################################################################################
1441 ################################### P R I V A T E ##################################
1442 ####################################################################################
1446 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1447 if os
.path
.exists(filename
):
1450 msg
= "File {} does not exist".format(filename
)
1451 if exception_if_not_exists
:
1452 raise K8sException(msg
)
1455 def _remove_multiple_spaces(strobj
):
1456 strobj
= strobj
.strip()
1457 while " " in strobj
:
1458 strobj
= strobj
.replace(" ", " ")
1462 def _output_to_lines(output
: str) -> list:
1463 output_lines
= list()
1464 lines
= output
.splitlines(keepends
=False)
1468 output_lines
.append(line
)
1472 def _output_to_table(output
: str) -> list:
1473 output_table
= list()
1474 lines
= output
.splitlines(keepends
=False)
1476 line
= line
.replace("\t", " ")
1478 output_table
.append(line_list
)
1479 cells
= line
.split(sep
=" ")
1483 line_list
.append(cell
)
1487 def _parse_services(output
: str) -> list:
1488 lines
= output
.splitlines(keepends
=False)
1491 line
= line
.replace("\t", " ")
1492 cells
= line
.split(sep
=" ")
1493 if len(cells
) > 0 and cells
[0].startswith("service/"):
1494 elems
= cells
[0].split(sep
="/")
1496 services
.append(elems
[1])
1500 def _get_deep(dictionary
: dict, members
: tuple):
1505 value
= target
.get(m
)
1514 # find key:value in several lines
1516 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1517 for line
in p_lines
:
1519 if line
.startswith(p_key
+ ":"):
1520 parts
= line
.split(":")
1521 the_value
= parts
[1].strip()
1529 def _lower_keys_list(input_list
: list):
1531 Transform the keys in a list of dictionaries to lower case and returns a new list
1536 for dictionary
in input_list
:
1537 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1538 new_list
.append(new_dict
)
1541 async def _local_async_exec(
1544 raise_exception_on_error
: bool = False,
1545 show_error_log
: bool = True,
1546 encode_utf8
: bool = False,
1550 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1552 "Executing async local command: {}, env: {}".format(command
, env
)
1556 command
= shlex
.split(command
)
1558 environ
= os
.environ
.copy()
1563 async with self
.cmd_lock
:
1564 process
= await asyncio
.create_subprocess_exec(
1566 stdout
=asyncio
.subprocess
.PIPE
,
1567 stderr
=asyncio
.subprocess
.PIPE
,
1571 # wait for command terminate
1572 stdout
, stderr
= await process
.communicate()
1574 return_code
= process
.returncode
1578 output
= stdout
.decode("utf-8").strip()
1579 # output = stdout.decode()
1581 output
= stderr
.decode("utf-8").strip()
1582 # output = stderr.decode()
1584 if return_code
!= 0 and show_error_log
:
1586 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1589 self
.log
.debug("Return code: {}".format(return_code
))
1591 if raise_exception_on_error
and return_code
!= 0:
1592 raise K8sException(output
)
1595 output
= output
.encode("utf-8").strip()
1596 output
= str(output
).replace("\\n", "\n")
1598 return output
, return_code
1600 except asyncio
.CancelledError
:
1601 # first, kill the process if it is still running
1602 if process
.returncode
is None:
1605 except K8sException
:
1607 except Exception as e
:
1608 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1610 if raise_exception_on_error
:
1611 raise K8sException(e
) from e
1615 async def _local_async_exec_pipe(
1619 raise_exception_on_error
: bool = True,
1620 show_error_log
: bool = True,
1621 encode_utf8
: bool = False,
1625 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1626 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1627 command
= "{} | {}".format(command1
, command2
)
1629 "Executing async local command: {}, env: {}".format(command
, env
)
1633 command1
= shlex
.split(command1
)
1634 command2
= shlex
.split(command2
)
1636 environ
= os
.environ
.copy()
1641 async with self
.cmd_lock
:
1642 read
, write
= os
.pipe()
1643 process_1
= await asyncio
.create_subprocess_exec(
1644 *command1
, stdout
=write
, env
=environ
1647 process_2
= await asyncio
.create_subprocess_exec(
1648 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1651 stdout
, stderr
= await process_2
.communicate()
1653 return_code
= process_2
.returncode
1657 output
= stdout
.decode("utf-8").strip()
1658 # output = stdout.decode()
1660 output
= stderr
.decode("utf-8").strip()
1661 # output = stderr.decode()
1663 if return_code
!= 0 and show_error_log
:
1665 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1668 self
.log
.debug("Return code: {}".format(return_code
))
1670 if raise_exception_on_error
and return_code
!= 0:
1671 raise K8sException(output
)
1674 output
= output
.encode("utf-8").strip()
1675 output
= str(output
).replace("\\n", "\n")
1677 return output
, return_code
1678 except asyncio
.CancelledError
:
1679 # first, kill the processes if they are still running
1680 for process
in (process_1
, process_2
):
1681 if process
.returncode
is None:
1684 except K8sException
:
1686 except Exception as e
:
1687 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1689 if raise_exception_on_error
:
1690 raise K8sException(e
) from e
1694 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1696 Obtains the data of the specified service in the k8cluster.
1698 :param cluster_id: id of a K8s cluster known by OSM
1699 :param service_name: name of the K8s service in the specified namespace
1700 :param namespace: K8s namespace used by the KDU instance
1701 :return: If successful, it will return a service with the following data:
1702 - `name` of the service
1703 - `type` type of service in the k8 cluster
1704 - `ports` List of ports offered by the service, for each port includes at least
1705 name, port, protocol
1706 - `cluster_ip` Internal ip to be used inside k8s cluster
1707 - `external_ip` List of external ips (in case they are available)
1711 paths
, env
= self
._init
_paths
_env
(
1712 cluster_name
=cluster_id
, create_if_not_exist
=True
1715 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1716 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1719 output
, _rc
= await self
._local
_async
_exec
(
1720 command
=command
, raise_exception_on_error
=True, env
=env
1723 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1726 "name": service_name
,
1727 "type": self
._get
_deep
(data
, ("spec", "type")),
1728 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1729 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1731 if service
["type"] == "LoadBalancer":
1732 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1733 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1734 service
["external_ip"] = ip_list
1738 async def _exec_get_command(
1739 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1741 """Obtains information about the kdu instance."""
1743 full_command
= self
._get
_get
_command
(
1744 get_command
, kdu_instance
, namespace
, kubeconfig
1747 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1751 async def _exec_inspect_command(
1752 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1754 """Obtains information about an Helm Chart package (´helm show´ command)
1757 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1758 kdu_model: The name or path of an Helm Chart
1759 repo_url: Helm Chart repository url
1762 str: the requested info about the Helm Chart package
1767 repo_str
= " --repo {}".format(repo_url
)
1769 # Obtain the Chart's name and store it in the var kdu_model
1770 kdu_model
, _
= self
._split
_repo
(kdu_model
=kdu_model
)
1772 kdu_model
, version
= self
._split
_version
(kdu_model
)
1774 version_str
= "--version {}".format(version
)
1778 full_command
= self
._get
_inspect
_command
(
1779 show_command
=inspect_command
,
1780 kdu_model
=kdu_model
,
1782 version
=version_str
,
1785 output
, _
= await self
._local
_async
_exec
(command
=full_command
)
1789 async def _get_replica_count_url(
1792 repo_url
: str = None,
1793 resource_name
: str = None,
1795 """Get the replica count value in the Helm Chart Values.
1798 kdu_model: The name or path of an Helm Chart
1799 repo_url: Helm Chart repository url
1800 resource_name: Resource name
1804 - The number of replicas of the specific instance; if not found, returns None; and
1805 - The string corresponding to the replica count key in the Helm values
1808 kdu_values
= yaml
.load(
1809 await self
.values_kdu(kdu_model
=kdu_model
, repo_url
=repo_url
),
1810 Loader
=yaml
.SafeLoader
,
1813 self
.log
.debug(f
"Obtained the Helm package values for the KDU: {kdu_values}")
1817 "kdu_values not found for kdu_model {}".format(kdu_model
)
1821 kdu_values
= kdu_values
.get(resource_name
, None)
1824 msg
= "resource {} not found in the values in model {}".format(
1825 resource_name
, kdu_model
1828 raise K8sException(msg
)
1830 duplicate_check
= False
1835 if kdu_values
.get("replicaCount") is not None:
1836 replicas
= kdu_values
["replicaCount"]
1837 replica_str
= "replicaCount"
1838 elif kdu_values
.get("replicas") is not None:
1839 duplicate_check
= True
1840 replicas
= kdu_values
["replicas"]
1841 replica_str
= "replicas"
1845 "replicaCount or replicas not found in the resource"
1846 "{} values in model {}. Cannot be scaled".format(
1847 resource_name
, kdu_model
1852 "replicaCount or replicas not found in the values"
1853 "in model {}. Cannot be scaled".format(kdu_model
)
1856 raise K8sException(msg
)
1858 # Control if replicas and replicaCount exists at the same time
1859 msg
= "replicaCount and replicas are exists at the same time"
1861 if "replicaCount" in kdu_values
:
1863 raise K8sException(msg
)
1865 if "replicas" in kdu_values
:
1867 raise K8sException(msg
)
1869 return replicas
, replica_str
1871 async def _get_replica_count_instance(
1876 resource_name
: str = None,
1878 """Get the replica count value in the instance.
1881 kdu_instance: The name of the KDU instance
1882 namespace: KDU instance namespace
1884 resource_name: Resource name
1887 The number of replicas of the specific instance; if not found, returns None
1890 kdu_values
= yaml
.load(
1891 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1892 Loader
=yaml
.SafeLoader
,
1895 self
.log
.debug(f
"Obtained the Helm values for the KDU instance: {kdu_values}")
1901 kdu_values
.get(resource_name
, None) if resource_name
else None
1904 for replica_str
in ("replicaCount", "replicas"):
1906 replicas
= resource_values
.get(replica_str
)
1908 replicas
= kdu_values
.get(replica_str
)
1910 if replicas
is not None:
1915 async def _store_status(
1920 namespace
: str = None,
1921 db_dict
: dict = None,
1924 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1926 :param cluster_id (str): the cluster where the KDU instance is deployed
1927 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1928 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1929 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1930 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1931 values for the keys:
1932 - "collection": The Mongo DB collection to write to
1933 - "filter": The query filter to use in the update process
1934 - "path": The dot separated keys which targets the object to be updated
1939 detailed_status
= await self
._status
_kdu
(
1940 cluster_id
=cluster_id
,
1941 kdu_instance
=kdu_instance
,
1943 namespace
=namespace
,
1946 status
= detailed_status
.get("info").get("description")
1947 self
.log
.debug(f
"Status for KDU {kdu_instance} obtained: {status}.")
1949 # write status to db
1950 result
= await self
.write_app_status_to_db(
1953 detailed_status
=str(detailed_status
),
1954 operation
=operation
,
1958 self
.log
.info("Error writing in database. Task exiting...")
1960 except asyncio
.CancelledError
as e
:
1962 f
"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1964 except Exception as e
:
1965 self
.log
.warning(f
"Exception in method {self._store_status.__name__}: {e}")
1967 # params for use in -f file
1968 # returns values file option and filename (in order to delete it at the end)
1969 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1971 if params
and len(params
) > 0:
1972 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1974 def get_random_number():
1975 r
= random
.randrange(start
=1, stop
=99999999)
1983 value
= params
.get(key
)
1984 if "!!yaml" in str(value
):
1985 value
= yaml
.safe_load(value
[7:])
1986 params2
[key
] = value
1988 values_file
= get_random_number() + ".yaml"
1989 with
open(values_file
, "w") as stream
:
1990 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1992 return "-f {}".format(values_file
), values_file
1996 # params for use in --set option
1998 def _params_to_set_option(params
: dict) -> str:
2000 if params
and len(params
) > 0:
2003 value
= params
.get(key
, None)
2004 if value
is not None:
2006 params_str
+= "--set "
2010 params_str
+= "{}={}".format(key
, value
)
2014 def generate_kdu_instance_name(**kwargs
):
2015 chart_name
= kwargs
["kdu_model"]
2016 # check embeded chart (file or dir)
2017 if chart_name
.startswith("/"):
2018 # extract file or directory name
2019 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
2021 elif "://" in chart_name
:
2022 # extract last portion of URL
2023 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
2026 for c
in chart_name
:
2027 if c
.isalpha() or c
.isnumeric():
2034 # if does not start with alpha character, prefix 'a'
2035 if not name
[0].isalpha():
2040 def get_random_number():
2041 r
= random
.randrange(start
=1, stop
=99999999)
2043 s
= s
.rjust(10, "0")
2046 name
= name
+ get_random_number()
2049 def _split_version(self
, kdu_model
: str) -> (str, str):
2051 if not self
._is
_helm
_chart
_a
_file
(kdu_model
) and ":" in kdu_model
:
2052 parts
= kdu_model
.split(sep
=":")
2054 version
= str(parts
[1])
2055 kdu_model
= parts
[0]
2056 return kdu_model
, version
2058 def _split_repo(self
, kdu_model
: str) -> (str, str):
2059 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2062 kdu_model (str): Associated KDU model
2065 (str, str): Tuple with the Chart name in index 0, and the repo name
2066 in index 2; if there was a problem finding them, return None
2073 idx
= kdu_model
.find("/")
2075 chart_name
= kdu_model
[idx
+ 1 :]
2076 repo_name
= kdu_model
[:idx
]
2078 return chart_name
, repo_name
2080 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
2081 """Obtain the Helm repository for an Helm Chart
2084 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2085 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2088 str: the repository URL; if Helm Chart is a local one, the function returns None
2091 _
, repo_name
= self
._split
_repo
(kdu_model
=kdu_model
)
2095 # Find repository link
2096 local_repo_list
= await self
.repo_list(cluster_uuid
)
2097 for repo
in local_repo_list
:
2098 if repo
["name"] == repo_name
:
2099 repo_url
= repo
["url"]
2100 break # it is not necessary to continue the loop if the repo link was found...
2104 async def create_certificate(
2105 self
, cluster_uuid
, namespace
, dns_prefix
, name
, secret_name
, usage
2107 paths
, env
= self
._init
_paths
_env
(
2108 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2110 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2111 await kubectl
.create_certificate(
2112 namespace
=namespace
,
2114 dns_prefix
=dns_prefix
,
2115 secret_name
=secret_name
,
2117 issuer_name
="ca-issuer",
2120 async def delete_certificate(self
, cluster_uuid
, namespace
, certificate_name
):
2121 paths
, env
= self
._init
_paths
_env
(
2122 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2124 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2125 await kubectl
.delete_certificate(namespace
, certificate_name
)