2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
32 from uuid
import uuid4
34 from n2vc
.config
import EnvironConfig
35 from n2vc
.exceptions
import K8sException
36 from n2vc
.k8s_conn
import K8sConnector
37 from n2vc
.kubectl
import Kubectl
40 class K8sHelmBaseConnector(K8sConnector
):
43 ####################################################################################
44 ################################### P U B L I C ####################################
45 ####################################################################################
48 service_account
= "osm"
54 kubectl_command
: str = "/usr/bin/kubectl",
55 helm_command
: str = "/usr/bin/helm",
61 :param fs: file system for kubernetes and helm configuration
62 :param db: database object to write current operation status
63 :param kubectl_command: path to kubectl executable
64 :param helm_command: path to helm executable
66 :param on_update_db: callback called when k8s connector updates database
70 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
72 self
.log
.info("Initializing K8S Helm connector")
74 self
.config
= EnvironConfig()
75 # random numbers for release name generation
76 random
.seed(time
.time())
81 # exception if kubectl is not installed
82 self
.kubectl_command
= kubectl_command
83 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
85 # exception if helm is not installed
86 self
._helm
_command
= helm_command
87 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
89 # obtain stable repo url from config or apply default
90 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
91 if self
._stable
_repo
_url
== "None":
92 self
._stable
_repo
_url
= None
94 # Lock to avoid concurrent execution of helm commands
95 self
.cmd_lock
= asyncio
.Lock()
97 def _get_namespace(self
, cluster_uuid
: str) -> str:
99 Obtains the namespace used by the cluster with the uuid passed by argument
101 param: cluster_uuid: cluster's uuid
104 # first, obtain the cluster corresponding to the uuid passed by argument
105 k8scluster
= self
.db
.get_one(
106 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
108 return k8scluster
.get("namespace")
113 namespace
: str = "kube-system",
114 reuse_cluster_uuid
=None,
118 It prepares a given K8s cluster environment to run Charts
120 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
122 :param namespace: optional namespace to be used for helm. By default,
123 'kube-system' will be used
124 :param reuse_cluster_uuid: existing cluster uuid for reuse
125 :param kwargs: Additional parameters (None yet)
126 :return: uuid of the K8s cluster and True if connector has installed some
127 software in the cluster
128 (on error, an exception will be raised)
131 if reuse_cluster_uuid
:
132 cluster_id
= reuse_cluster_uuid
134 cluster_id
= str(uuid4())
137 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
140 paths
, env
= self
._init
_paths
_env
(
141 cluster_name
=cluster_id
, create_if_not_exist
=True
143 mode
= stat
.S_IRUSR | stat
.S_IWUSR
144 with
open(paths
["kube_config"], "w", mode
) as f
:
146 os
.chmod(paths
["kube_config"], 0o600)
148 # Code with initialization specific of helm version
149 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
151 # sync fs with local data
152 self
.fs
.reverse_sync(from_path
=cluster_id
)
154 self
.log
.info("Cluster {} initialized".format(cluster_id
))
156 return cluster_id
, n2vc_installed_sw
163 repo_type
: str = "chart",
166 password
: str = None,
169 "Cluster {}, adding {} repository {}. URL: {}".format(
170 cluster_uuid
, repo_type
, name
, url
175 paths
, env
= self
._init
_paths
_env
(
176 cluster_name
=cluster_uuid
, create_if_not_exist
=True
180 self
.fs
.sync(from_path
=cluster_uuid
)
182 # helm repo add name url
183 command
= ("env KUBECONFIG={} {} repo add {} {}").format(
184 paths
["kube_config"], self
._helm
_command
, name
, url
188 temp_cert_file
= os
.path
.join(
189 self
.fs
.path
, "{}/helmcerts/".format(cluster_uuid
), "temp.crt"
191 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
192 with
open(temp_cert_file
, "w") as the_cert
:
194 command
+= " --ca-file {}".format(temp_cert_file
)
197 command
+= " --username={}".format(user
)
200 command
+= " --password={}".format(password
)
202 self
.log
.debug("adding repo: {}".format(command
))
203 await self
._local
_async
_exec
(
204 command
=command
, raise_exception_on_error
=True, env
=env
208 command
= "env KUBECONFIG={} {} repo update {}".format(
209 paths
["kube_config"], self
._helm
_command
, name
211 self
.log
.debug("updating repo: {}".format(command
))
212 await self
._local
_async
_exec
(
213 command
=command
, raise_exception_on_error
=False, env
=env
217 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
219 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
221 "Cluster {}, updating {} repository {}".format(
222 cluster_uuid
, repo_type
, name
227 paths
, env
= self
._init
_paths
_env
(
228 cluster_name
=cluster_uuid
, create_if_not_exist
=True
232 self
.fs
.sync(from_path
=cluster_uuid
)
235 command
= "{} repo update {}".format(self
._helm
_command
, name
)
236 self
.log
.debug("updating repo: {}".format(command
))
237 await self
._local
_async
_exec
(
238 command
=command
, raise_exception_on_error
=False, env
=env
242 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
244 async def repo_list(self
, cluster_uuid
: str) -> list:
246 Get the list of registered repositories
248 :return: list of registered repositories: [ (name, url) .... ]
251 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
254 paths
, env
= self
._init
_paths
_env
(
255 cluster_name
=cluster_uuid
, create_if_not_exist
=True
259 self
.fs
.sync(from_path
=cluster_uuid
)
261 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
262 paths
["kube_config"], self
._helm
_command
265 # Set exception to false because if there are no repos just want an empty list
266 output
, _rc
= await self
._local
_async
_exec
(
267 command
=command
, raise_exception_on_error
=False, env
=env
271 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
274 if output
and len(output
) > 0:
275 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
276 # unify format between helm2 and helm3 setting all keys lowercase
277 return self
._lower
_keys
_list
(repos
)
283 async def repo_remove(self
, cluster_uuid
: str, name
: str):
285 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
289 paths
, env
= self
._init
_paths
_env
(
290 cluster_name
=cluster_uuid
, create_if_not_exist
=True
294 self
.fs
.sync(from_path
=cluster_uuid
)
296 command
= "env KUBECONFIG={} {} repo remove {}".format(
297 paths
["kube_config"], self
._helm
_command
, name
299 await self
._local
_async
_exec
(
300 command
=command
, raise_exception_on_error
=True, env
=env
304 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
310 uninstall_sw
: bool = False,
315 Resets the Kubernetes cluster by removing the helm deployment that represents it.
317 :param cluster_uuid: The UUID of the cluster to reset
318 :param force: Boolean to force the reset
319 :param uninstall_sw: Boolean to force the reset
320 :param kwargs: Additional parameters (None yet)
321 :return: Returns True if successful or raises an exception.
323 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
325 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
326 cluster_uuid
, uninstall_sw
331 self
.fs
.sync(from_path
=cluster_uuid
)
333 # uninstall releases if needed.
335 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
336 if len(releases
) > 0:
340 kdu_instance
= r
.get("name")
341 chart
= r
.get("chart")
343 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
345 await self
.uninstall(
346 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
348 except Exception as e
:
349 # will not raise exception as it was found
350 # that in some cases of previously installed helm releases it
353 "Error uninstalling release {}: {}".format(
359 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
360 ).format(cluster_uuid
)
363 False # Allow to remove k8s cluster without removing Tiller
367 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
369 # delete cluster directory
370 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
371 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
372 # Remove also local directorio if still exist
373 direct
= self
.fs
.path
+ "/" + cluster_uuid
374 shutil
.rmtree(direct
, ignore_errors
=True)
378 def _is_helm_chart_a_file(self
, chart_name
: str):
379 return chart_name
.count("/") > 1
381 async def _install_impl(
389 timeout
: float = 300,
391 db_dict
: dict = None,
392 kdu_name
: str = None,
393 namespace
: str = None,
396 paths
, env
= self
._init
_paths
_env
(
397 cluster_name
=cluster_id
, create_if_not_exist
=True
401 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
402 cluster_id
=cluster_id
, params
=params
406 kdu_model
, version
= self
._split
_version
(kdu_model
)
408 _
, repo
= self
._split
_repo
(kdu_model
)
410 await self
.repo_update(cluster_id
, repo
)
412 command
= self
._get
_install
_command
(
420 paths
["kube_config"],
423 self
.log
.debug("installing: {}".format(command
))
426 # exec helm in a task
427 exec_task
= asyncio
.ensure_future(
428 coro_or_future
=self
._local
_async
_exec
(
429 command
=command
, raise_exception_on_error
=False, env
=env
433 # write status in another task
434 status_task
= asyncio
.ensure_future(
435 coro_or_future
=self
._store
_status
(
436 cluster_id
=cluster_id
,
437 kdu_instance
=kdu_instance
,
444 # wait for execution task
445 await asyncio
.wait([exec_task
])
450 output
, rc
= exec_task
.result()
453 output
, rc
= await self
._local
_async
_exec
(
454 command
=command
, raise_exception_on_error
=False, env
=env
457 # remove temporal values yaml file
459 os
.remove(file_to_delete
)
462 await self
._store
_status
(
463 cluster_id
=cluster_id
,
464 kdu_instance
=kdu_instance
,
471 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
473 raise K8sException(msg
)
479 kdu_model
: str = None,
481 timeout
: float = 300,
483 db_dict
: dict = None,
484 namespace
: str = None,
487 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
490 self
.fs
.sync(from_path
=cluster_uuid
)
492 # look for instance to obtain namespace
496 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
497 if not instance_info
:
498 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
499 namespace
= instance_info
["namespace"]
502 paths
, env
= self
._init
_paths
_env
(
503 cluster_name
=cluster_uuid
, create_if_not_exist
=True
507 self
.fs
.sync(from_path
=cluster_uuid
)
510 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
511 cluster_id
=cluster_uuid
, params
=params
515 kdu_model
, version
= self
._split
_version
(kdu_model
)
517 _
, repo
= self
._split
_repo
(kdu_model
)
519 await self
.repo_update(cluster_uuid
, repo
)
521 command
= self
._get
_upgrade
_command
(
529 paths
["kube_config"],
533 self
.log
.debug("upgrading: {}".format(command
))
536 # exec helm in a task
537 exec_task
= asyncio
.ensure_future(
538 coro_or_future
=self
._local
_async
_exec
(
539 command
=command
, raise_exception_on_error
=False, env
=env
542 # write status in another task
543 status_task
= asyncio
.ensure_future(
544 coro_or_future
=self
._store
_status
(
545 cluster_id
=cluster_uuid
,
546 kdu_instance
=kdu_instance
,
553 # wait for execution task
554 await asyncio
.wait([exec_task
])
558 output
, rc
= exec_task
.result()
561 output
, rc
= await self
._local
_async
_exec
(
562 command
=command
, raise_exception_on_error
=False, env
=env
565 # remove temporal values yaml file
567 os
.remove(file_to_delete
)
570 await self
._store
_status
(
571 cluster_id
=cluster_uuid
,
572 kdu_instance
=kdu_instance
,
579 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
581 raise K8sException(msg
)
584 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
586 # return new revision number
587 instance
= await self
.get_instance_info(
588 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
591 revision
= int(instance
.get("revision"))
592 self
.log
.debug("New revision: {}".format(revision
))
602 total_timeout
: float = 1800,
603 cluster_uuid
: str = None,
604 kdu_model
: str = None,
606 db_dict
: dict = None,
609 """Scale a resource in a Helm Chart.
612 kdu_instance: KDU instance name
613 scale: Scale to which to set the resource
614 resource_name: Resource name
615 total_timeout: The time, in seconds, to wait
616 cluster_uuid: The UUID of the cluster
617 kdu_model: The chart reference
618 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
619 The --wait flag will be set automatically if --atomic is used
620 db_dict: Dictionary for any additional data
621 kwargs: Additional parameters
624 True if successful, False otherwise
627 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
629 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
630 resource_name
, kdu_model
, cluster_uuid
633 self
.log
.debug(debug_mgs
)
635 # look for instance to obtain namespace
636 # get_instance_info function calls the sync command
637 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
638 if not instance_info
:
639 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
642 paths
, env
= self
._init
_paths
_env
(
643 cluster_name
=cluster_uuid
, create_if_not_exist
=True
647 kdu_model
, version
= self
._split
_version
(kdu_model
)
649 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
651 _
, replica_str
= await self
._get
_replica
_count
_url
(
652 kdu_model
, repo_url
, resource_name
655 command
= self
._get
_upgrade
_scale
_command
(
658 instance_info
["namespace"],
665 paths
["kube_config"],
668 self
.log
.debug("scaling: {}".format(command
))
671 # exec helm in a task
672 exec_task
= asyncio
.ensure_future(
673 coro_or_future
=self
._local
_async
_exec
(
674 command
=command
, raise_exception_on_error
=False, env
=env
677 # write status in another task
678 status_task
= asyncio
.ensure_future(
679 coro_or_future
=self
._store
_status
(
680 cluster_id
=cluster_uuid
,
681 kdu_instance
=kdu_instance
,
682 namespace
=instance_info
["namespace"],
688 # wait for execution task
689 await asyncio
.wait([exec_task
])
693 output
, rc
= exec_task
.result()
696 output
, rc
= await self
._local
_async
_exec
(
697 command
=command
, raise_exception_on_error
=False, env
=env
701 await self
._store
_status
(
702 cluster_id
=cluster_uuid
,
703 kdu_instance
=kdu_instance
,
704 namespace
=instance_info
["namespace"],
710 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
712 raise K8sException(msg
)
715 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
719 async def get_scale_count(
727 """Get a resource scale count.
730 cluster_uuid: The UUID of the cluster
731 resource_name: Resource name
732 kdu_instance: KDU instance name
733 kdu_model: The name or path of an Helm Chart
734 kwargs: Additional parameters
737 Resource instance count
741 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
744 # look for instance to obtain namespace
745 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
746 if not instance_info
:
747 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
750 paths
, _
= self
._init
_paths
_env
(
751 cluster_name
=cluster_uuid
, create_if_not_exist
=True
754 replicas
= await self
._get
_replica
_count
_instance
(
755 kdu_instance
=kdu_instance
,
756 namespace
=instance_info
["namespace"],
757 kubeconfig
=paths
["kube_config"],
758 resource_name
=resource_name
,
762 f
"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
765 # Get default value if scale count is not found from provided values
766 # Important note: this piece of code shall only be executed in the first scaling operation,
767 # since it is expected that the _get_replica_count_instance is able to obtain the number of
768 # replicas when a scale operation was already conducted previously for this KDU/resource!
770 repo_url
= await self
._find
_repo
(
771 kdu_model
=kdu_model
, cluster_uuid
=cluster_uuid
773 replicas
, _
= await self
._get
_replica
_count
_url
(
774 kdu_model
=kdu_model
, repo_url
=repo_url
, resource_name
=resource_name
778 f
"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
779 f
"{resource_name} obtained: {replicas}"
783 msg
= "Replica count not found. Cannot be scaled"
785 raise K8sException(msg
)
790 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
793 "rollback kdu_instance {} to revision {} from cluster {}".format(
794 kdu_instance
, revision
, cluster_uuid
799 self
.fs
.sync(from_path
=cluster_uuid
)
801 # look for instance to obtain namespace
802 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
803 if not instance_info
:
804 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
807 paths
, env
= self
._init
_paths
_env
(
808 cluster_name
=cluster_uuid
, create_if_not_exist
=True
812 self
.fs
.sync(from_path
=cluster_uuid
)
814 command
= self
._get
_rollback
_command
(
815 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
818 self
.log
.debug("rolling_back: {}".format(command
))
820 # exec helm in a task
821 exec_task
= asyncio
.ensure_future(
822 coro_or_future
=self
._local
_async
_exec
(
823 command
=command
, raise_exception_on_error
=False, env
=env
826 # write status in another task
827 status_task
= asyncio
.ensure_future(
828 coro_or_future
=self
._store
_status
(
829 cluster_id
=cluster_uuid
,
830 kdu_instance
=kdu_instance
,
831 namespace
=instance_info
["namespace"],
833 operation
="rollback",
837 # wait for execution task
838 await asyncio
.wait([exec_task
])
843 output
, rc
= exec_task
.result()
846 await self
._store
_status
(
847 cluster_id
=cluster_uuid
,
848 kdu_instance
=kdu_instance
,
849 namespace
=instance_info
["namespace"],
851 operation
="rollback",
855 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
857 raise K8sException(msg
)
860 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
862 # return new revision number
863 instance
= await self
.get_instance_info(
864 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
867 revision
= int(instance
.get("revision"))
868 self
.log
.debug("New revision: {}".format(revision
))
873 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
875 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
876 (this call should happen after all _terminate-config-primitive_ of the VNF
879 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
880 :param kdu_instance: unique name for the KDU instance to be deleted
881 :param kwargs: Additional parameters (None yet)
882 :return: True if successful
886 "uninstall kdu_instance {} from cluster {}".format(
887 kdu_instance
, cluster_uuid
892 self
.fs
.sync(from_path
=cluster_uuid
)
894 # look for instance to obtain namespace
895 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
896 if not instance_info
:
897 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
900 paths
, env
= self
._init
_paths
_env
(
901 cluster_name
=cluster_uuid
, create_if_not_exist
=True
905 self
.fs
.sync(from_path
=cluster_uuid
)
907 command
= self
._get
_uninstall
_command
(
908 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
910 output
, _rc
= await self
._local
_async
_exec
(
911 command
=command
, raise_exception_on_error
=True, env
=env
915 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
917 return self
._output
_to
_table
(output
)
919 async def instances_list(self
, cluster_uuid
: str) -> list:
921 returns a list of deployed releases in a cluster
923 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
927 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
930 self
.fs
.sync(from_path
=cluster_uuid
)
932 # execute internal command
933 result
= await self
._instances
_list
(cluster_uuid
)
936 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
940 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
941 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
942 for instance
in instances
:
943 if instance
.get("name") == kdu_instance
:
945 self
.log
.debug("Instance {} not found".format(kdu_instance
))
948 async def upgrade_charm(
952 charm_id
: str = None,
953 charm_type
: str = None,
954 timeout
: float = None,
956 """This method upgrade charms in VNFs
959 ee_id: Execution environment id
960 path: Local path to the charm
962 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
963 timeout: (Float) Timeout for the ns update operation
966 The output of the update operation if status equals to "completed"
968 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
970 async def exec_primitive(
972 cluster_uuid
: str = None,
973 kdu_instance
: str = None,
974 primitive_name
: str = None,
975 timeout
: float = 300,
977 db_dict
: dict = None,
980 """Exec primitive (Juju action)
982 :param cluster_uuid: The UUID of the cluster or namespace:cluster
983 :param kdu_instance: The unique name of the KDU instance
984 :param primitive_name: Name of action that will be executed
985 :param timeout: Timeout for action execution
986 :param params: Dictionary of all the parameters needed for the action
987 :db_dict: Dictionary for any additional data
988 :param kwargs: Additional parameters (None yet)
990 :return: Returns the output of the action
993 "KDUs deployed with Helm don't support actions "
994 "different from rollback, upgrade and status"
997 async def get_services(
998 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
1001 Returns a list of services defined for the specified kdu instance.
1003 :param cluster_uuid: UUID of a K8s cluster known by OSM
1004 :param kdu_instance: unique name for the KDU instance
1005 :param namespace: K8s namespace used by the KDU instance
1006 :return: If successful, it will return a list of services, Each service
1007 can have the following data:
1008 - `name` of the service
1009 - `type` type of service in the k8 cluster
1010 - `ports` List of ports offered by the service, for each port includes at least
1011 name, port, protocol
1012 - `cluster_ip` Internal ip to be used inside k8s cluster
1013 - `external_ip` List of external ips (in case they are available)
1017 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1018 cluster_uuid
, kdu_instance
1023 paths
, env
= self
._init
_paths
_env
(
1024 cluster_name
=cluster_uuid
, create_if_not_exist
=True
1028 self
.fs
.sync(from_path
=cluster_uuid
)
1030 # get list of services names for kdu
1031 service_names
= await self
._get
_services
(
1032 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
1036 for service
in service_names
:
1037 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
1038 service_list
.append(service
)
1041 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1045 async def get_service(
1046 self
, cluster_uuid
: str, service_name
: str, namespace
: str
1049 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1050 service_name
, namespace
, cluster_uuid
1055 self
.fs
.sync(from_path
=cluster_uuid
)
1057 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
1060 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1064 async def status_kdu(
1065 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
1066 ) -> Union
[str, dict]:
1068 This call would retrieve tha current state of a given KDU instance. It would be
1069 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1070 values_ of the configuration parameters applied to a given instance. This call
1071 would be based on the `status` call.
1073 :param cluster_uuid: UUID of a K8s cluster known by OSM
1074 :param kdu_instance: unique name for the KDU instance
1075 :param kwargs: Additional parameters (None yet)
1076 :param yaml_format: if the return shall be returned as an YAML string or as a
1078 :return: If successful, it will return the following vector of arguments:
1079 - K8s `namespace` in the cluster where the KDU lives
1080 - `state` of the KDU instance. It can be:
1087 - List of `resources` (objects) that this release consists of, sorted by kind,
1088 and the status of those resources
1089 - Last `deployment_time`.
1093 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1094 cluster_uuid
, kdu_instance
1099 self
.fs
.sync(from_path
=cluster_uuid
)
1101 # get instance: needed to obtain namespace
1102 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1103 for instance
in instances
:
1104 if instance
.get("name") == kdu_instance
:
1107 # instance does not exist
1109 "Instance name: {} not found in cluster: {}".format(
1110 kdu_instance
, cluster_uuid
1114 status
= await self
._status
_kdu
(
1115 cluster_id
=cluster_uuid
,
1116 kdu_instance
=kdu_instance
,
1117 namespace
=instance
["namespace"],
1118 yaml_format
=yaml_format
,
1119 show_error_log
=True,
1123 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1127 async def get_values_kdu(
1128 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1130 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1132 return await self
._exec
_get
_command
(
1133 get_command
="values",
1134 kdu_instance
=kdu_instance
,
1135 namespace
=namespace
,
1136 kubeconfig
=kubeconfig
,
1139 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1140 """Method to obtain the Helm Chart package's values
1143 kdu_model: The name or path of an Helm Chart
1144 repo_url: Helm Chart repository url
1147 str: the values of the Helm Chart package
1151 "inspect kdu_model values {} from (optional) repo: {}".format(
1156 return await self
._exec
_inspect
_command
(
1157 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1160 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1162 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1165 return await self
._exec
_inspect
_command
(
1166 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1169 async def synchronize_repos(self
, cluster_uuid
: str):
1170 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1172 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1173 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1175 local_repo_list
= await self
.repo_list(cluster_uuid
)
1176 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1178 deleted_repo_list
= []
1179 added_repo_dict
= {}
1181 # iterate over the list of repos in the database that should be
1182 # added if not present
1183 for repo_name
, db_repo
in db_repo_dict
.items():
1185 # check if it is already present
1186 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1187 repo_id
= db_repo
.get("_id")
1188 if curr_repo_url
!= db_repo
["url"]:
1191 "repo {} url changed, delete and and again".format(
1195 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1196 deleted_repo_list
.append(repo_id
)
1199 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1200 if "ca_cert" in db_repo
:
1201 await self
.repo_add(
1205 cert
=db_repo
["ca_cert"],
1208 await self
.repo_add(
1213 added_repo_dict
[repo_id
] = db_repo
["name"]
1214 except Exception as e
:
1216 "Error adding repo id: {}, err_msg: {} ".format(
1221 # Delete repos that are present but not in nbi_list
1222 for repo_name
in local_repo_dict
:
1223 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1224 self
.log
.debug("delete repo {}".format(repo_name
))
1226 await self
.repo_remove(cluster_uuid
, repo_name
)
1227 deleted_repo_list
.append(repo_name
)
1228 except Exception as e
:
1230 "Error deleting repo, name: {}, err_msg: {}".format(
1235 return deleted_repo_list
, added_repo_dict
1237 except K8sException
:
1239 except Exception as e
:
1240 # Do not raise errors synchronizing repos
1241 self
.log
.error("Error synchronizing repos: {}".format(e
))
1242 raise Exception("Error synchronizing repos: {}".format(e
))
1244 def _get_db_repos_dict(self
, repo_ids
: list):
1246 for repo_id
in repo_ids
:
1247 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1248 db_repos_dict
[db_repo
["name"]] = db_repo
1249 return db_repos_dict
1252 ####################################################################################
1253 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1254 ####################################################################################
1258 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1260 Creates and returns base cluster and kube dirs and returns them.
1261 Also created helm3 dirs according to new directory specification, paths are
1262 not returned but assigned to helm environment variables
1264 :param cluster_name: cluster_name
1265 :return: Dictionary with config_paths and dictionary with helm environment variables
1269 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1271 Implements the helm version dependent cluster initialization
1275 async def _instances_list(self
, cluster_id
):
1277 Implements the helm version dependent helm instances list
1281 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1283 Implements the helm version dependent method to obtain services from a helm instance
1287 async def _status_kdu(
1291 namespace
: str = None,
1292 yaml_format
: bool = False,
1293 show_error_log
: bool = False,
1294 ) -> Union
[str, dict]:
1296 Implements the helm version dependent method to obtain status of a helm instance
1300 def _get_install_command(
1312 Obtain command to be executed to delete the indicated instance
1316 def _get_upgrade_scale_command(
1329 """Generates the command to scale a Helm Chart release
1332 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1333 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1334 namespace (str): Namespace where this KDU instance is deployed
1335 scale (int): Scale count
1336 version (str): Constraint with specific version of the Chart to use
1337 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1338 The --wait flag will be set automatically if --atomic is used
1339 replica_str (str): The key under resource_name key where the scale count is stored
1340 timeout (float): The time, in seconds, to wait
1341 resource_name (str): The KDU's resource to scale
1342 kubeconfig (str): Kubeconfig file path
1345 str: command to scale a Helm Chart release
1349 def _get_upgrade_command(
1361 """Generates the command to upgrade a Helm Chart release
1364 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1365 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1366 namespace (str): Namespace where this KDU instance is deployed
1367 params_str (str): Params used to upgrade the Helm Chart release
1368 version (str): Constraint with specific version of the Chart to use
1369 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1370 The --wait flag will be set automatically if --atomic is used
1371 timeout (float): The time, in seconds, to wait
1372 kubeconfig (str): Kubeconfig file path
1373 force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
1375 str: command to upgrade a Helm Chart release
1379 def _get_rollback_command(
1380 self
, kdu_instance
, namespace
, revision
, kubeconfig
1383 Obtain command to be executed to rollback the indicated instance
1387 def _get_uninstall_command(
1388 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1391 Obtain command to be executed to delete the indicated instance
1395 def _get_inspect_command(
1396 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1398 """Generates the command to obtain the information about an Helm Chart package
1399 (´helm show ...´ command)
1402 show_command: the second part of the command (`helm show <show_command>`)
1403 kdu_model: The name or path of an Helm Chart
1404 repo_url: Helm Chart repository url
1405 version: constraint with specific version of the Chart to use
1408 str: the generated Helm Chart command
1412 def _get_get_command(
1413 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1415 """Obtain command to be executed to get information about the kdu instance."""
1418 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1420 Method call to uninstall cluster software for helm. This method is dependent
1422 For Helm v2 it will be called when Tiller must be uninstalled
1423 For Helm v3 it does nothing and does not need to be callled
1427 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1429 Obtains the cluster repos identifiers
1433 ####################################################################################
1434 ################################### P R I V A T E ##################################
1435 ####################################################################################
1439 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1440 if os
.path
.exists(filename
):
1443 msg
= "File {} does not exist".format(filename
)
1444 if exception_if_not_exists
:
1445 raise K8sException(msg
)
1448 def _remove_multiple_spaces(strobj
):
1449 strobj
= strobj
.strip()
1450 while " " in strobj
:
1451 strobj
= strobj
.replace(" ", " ")
1455 def _output_to_lines(output
: str) -> list:
1456 output_lines
= list()
1457 lines
= output
.splitlines(keepends
=False)
1461 output_lines
.append(line
)
1465 def _output_to_table(output
: str) -> list:
1466 output_table
= list()
1467 lines
= output
.splitlines(keepends
=False)
1469 line
= line
.replace("\t", " ")
1471 output_table
.append(line_list
)
1472 cells
= line
.split(sep
=" ")
1476 line_list
.append(cell
)
1480 def _parse_services(output
: str) -> list:
1481 lines
= output
.splitlines(keepends
=False)
1484 line
= line
.replace("\t", " ")
1485 cells
= line
.split(sep
=" ")
1486 if len(cells
) > 0 and cells
[0].startswith("service/"):
1487 elems
= cells
[0].split(sep
="/")
1489 services
.append(elems
[1])
1493 def _get_deep(dictionary
: dict, members
: tuple):
1498 value
= target
.get(m
)
1507 # find key:value in several lines
1509 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1510 for line
in p_lines
:
1512 if line
.startswith(p_key
+ ":"):
1513 parts
= line
.split(":")
1514 the_value
= parts
[1].strip()
1522 def _lower_keys_list(input_list
: list):
1524 Transform the keys in a list of dictionaries to lower case and returns a new list
1529 for dictionary
in input_list
:
1530 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1531 new_list
.append(new_dict
)
1534 async def _local_async_exec(
1537 raise_exception_on_error
: bool = False,
1538 show_error_log
: bool = True,
1539 encode_utf8
: bool = False,
1542 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1544 "Executing async local command: {}, env: {}".format(command
, env
)
1548 command
= shlex
.split(command
)
1550 environ
= os
.environ
.copy()
1555 async with self
.cmd_lock
:
1556 process
= await asyncio
.create_subprocess_exec(
1558 stdout
=asyncio
.subprocess
.PIPE
,
1559 stderr
=asyncio
.subprocess
.PIPE
,
1563 # wait for command terminate
1564 stdout
, stderr
= await process
.communicate()
1566 return_code
= process
.returncode
1570 output
= stdout
.decode("utf-8").strip()
1571 # output = stdout.decode()
1573 output
= stderr
.decode("utf-8").strip()
1574 # output = stderr.decode()
1576 if return_code
!= 0 and show_error_log
:
1578 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1581 self
.log
.debug("Return code: {}".format(return_code
))
1583 if raise_exception_on_error
and return_code
!= 0:
1584 raise K8sException(output
)
1587 output
= output
.encode("utf-8").strip()
1588 output
= str(output
).replace("\\n", "\n")
1590 return output
, return_code
1592 except asyncio
.CancelledError
:
1593 # first, kill the process if it is still running
1594 if process
.returncode
is None:
1597 except K8sException
:
1599 except Exception as e
:
1600 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1602 if raise_exception_on_error
:
1603 raise K8sException(e
) from e
1607 async def _local_async_exec_pipe(
1611 raise_exception_on_error
: bool = True,
1612 show_error_log
: bool = True,
1613 encode_utf8
: bool = False,
1616 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1617 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1618 command
= "{} | {}".format(command1
, command2
)
1620 "Executing async local command: {}, env: {}".format(command
, env
)
1624 command1
= shlex
.split(command1
)
1625 command2
= shlex
.split(command2
)
1627 environ
= os
.environ
.copy()
1632 async with self
.cmd_lock
:
1633 read
, write
= os
.pipe()
1634 process_1
= await asyncio
.create_subprocess_exec(
1635 *command1
, stdout
=write
, env
=environ
1638 process_2
= await asyncio
.create_subprocess_exec(
1639 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1642 stdout
, stderr
= await process_2
.communicate()
1644 return_code
= process_2
.returncode
1648 output
= stdout
.decode("utf-8").strip()
1649 # output = stdout.decode()
1651 output
= stderr
.decode("utf-8").strip()
1652 # output = stderr.decode()
1654 if return_code
!= 0 and show_error_log
:
1656 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1659 self
.log
.debug("Return code: {}".format(return_code
))
1661 if raise_exception_on_error
and return_code
!= 0:
1662 raise K8sException(output
)
1665 output
= output
.encode("utf-8").strip()
1666 output
= str(output
).replace("\\n", "\n")
1668 return output
, return_code
1669 except asyncio
.CancelledError
:
1670 # first, kill the processes if they are still running
1671 for process
in (process_1
, process_2
):
1672 if process
.returncode
is None:
1675 except K8sException
:
1677 except Exception as e
:
1678 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1680 if raise_exception_on_error
:
1681 raise K8sException(e
) from e
1685 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1687 Obtains the data of the specified service in the k8cluster.
1689 :param cluster_id: id of a K8s cluster known by OSM
1690 :param service_name: name of the K8s service in the specified namespace
1691 :param namespace: K8s namespace used by the KDU instance
1692 :return: If successful, it will return a service with the following data:
1693 - `name` of the service
1694 - `type` type of service in the k8 cluster
1695 - `ports` List of ports offered by the service, for each port includes at least
1696 name, port, protocol
1697 - `cluster_ip` Internal ip to be used inside k8s cluster
1698 - `external_ip` List of external ips (in case they are available)
1702 paths
, env
= self
._init
_paths
_env
(
1703 cluster_name
=cluster_id
, create_if_not_exist
=True
1706 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1707 self
.kubectl_command
, paths
["kube_config"], namespace
, service_name
1710 output
, _rc
= await self
._local
_async
_exec
(
1711 command
=command
, raise_exception_on_error
=True, env
=env
1714 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1717 "name": service_name
,
1718 "type": self
._get
_deep
(data
, ("spec", "type")),
1719 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1720 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1722 if service
["type"] == "LoadBalancer":
1723 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1724 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1725 service
["external_ip"] = ip_list
1729 async def _exec_get_command(
1730 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1732 """Obtains information about the kdu instance."""
1734 full_command
= self
._get
_get
_command
(
1735 get_command
, kdu_instance
, namespace
, kubeconfig
1738 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1742 async def _exec_inspect_command(
1743 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1745 """Obtains information about an Helm Chart package (´helm show´ command)
1748 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1749 kdu_model: The name or path of an Helm Chart
1750 repo_url: Helm Chart repository url
1753 str: the requested info about the Helm Chart package
1758 repo_str
= " --repo {}".format(repo_url
)
1760 # Obtain the Chart's name and store it in the var kdu_model
1761 kdu_model
, _
= self
._split
_repo
(kdu_model
=kdu_model
)
1763 kdu_model
, version
= self
._split
_version
(kdu_model
)
1765 version_str
= "--version {}".format(version
)
1769 full_command
= self
._get
_inspect
_command
(
1770 show_command
=inspect_command
,
1771 kdu_model
=kdu_model
,
1773 version
=version_str
,
1776 output
, _
= await self
._local
_async
_exec
(command
=full_command
)
1780 async def _get_replica_count_url(
1783 repo_url
: str = None,
1784 resource_name
: str = None,
1786 """Get the replica count value in the Helm Chart Values.
1789 kdu_model: The name or path of an Helm Chart
1790 repo_url: Helm Chart repository url
1791 resource_name: Resource name
1795 - The number of replicas of the specific instance; if not found, returns None; and
1796 - The string corresponding to the replica count key in the Helm values
1799 kdu_values
= yaml
.load(
1800 await self
.values_kdu(kdu_model
=kdu_model
, repo_url
=repo_url
),
1801 Loader
=yaml
.SafeLoader
,
1804 self
.log
.debug(f
"Obtained the Helm package values for the KDU: {kdu_values}")
1808 "kdu_values not found for kdu_model {}".format(kdu_model
)
1812 kdu_values
= kdu_values
.get(resource_name
, None)
1815 msg
= "resource {} not found in the values in model {}".format(
1816 resource_name
, kdu_model
1819 raise K8sException(msg
)
1821 duplicate_check
= False
1826 if kdu_values
.get("replicaCount") is not None:
1827 replicas
= kdu_values
["replicaCount"]
1828 replica_str
= "replicaCount"
1829 elif kdu_values
.get("replicas") is not None:
1830 duplicate_check
= True
1831 replicas
= kdu_values
["replicas"]
1832 replica_str
= "replicas"
1836 "replicaCount or replicas not found in the resource"
1837 "{} values in model {}. Cannot be scaled".format(
1838 resource_name
, kdu_model
1843 "replicaCount or replicas not found in the values"
1844 "in model {}. Cannot be scaled".format(kdu_model
)
1847 raise K8sException(msg
)
1849 # Control if replicas and replicaCount exists at the same time
1850 msg
= "replicaCount and replicas are exists at the same time"
1852 if "replicaCount" in kdu_values
:
1854 raise K8sException(msg
)
1856 if "replicas" in kdu_values
:
1858 raise K8sException(msg
)
1860 return replicas
, replica_str
1862 async def _get_replica_count_instance(
1867 resource_name
: str = None,
1869 """Get the replica count value in the instance.
1872 kdu_instance: The name of the KDU instance
1873 namespace: KDU instance namespace
1875 resource_name: Resource name
1878 The number of replicas of the specific instance; if not found, returns None
1881 kdu_values
= yaml
.load(
1882 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1883 Loader
=yaml
.SafeLoader
,
1886 self
.log
.debug(f
"Obtained the Helm values for the KDU instance: {kdu_values}")
1892 kdu_values
.get(resource_name
, None) if resource_name
else None
1895 for replica_str
in ("replicaCount", "replicas"):
1897 replicas
= resource_values
.get(replica_str
)
1899 replicas
= kdu_values
.get(replica_str
)
1901 if replicas
is not None:
1906 async def _store_status(
1911 namespace
: str = None,
1912 db_dict
: dict = None,
1915 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1917 :param cluster_id (str): the cluster where the KDU instance is deployed
1918 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1919 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1920 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1921 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1922 values for the keys:
1923 - "collection": The Mongo DB collection to write to
1924 - "filter": The query filter to use in the update process
1925 - "path": The dot separated keys which targets the object to be updated
1930 detailed_status
= await self
._status
_kdu
(
1931 cluster_id
=cluster_id
,
1932 kdu_instance
=kdu_instance
,
1934 namespace
=namespace
,
1937 status
= detailed_status
.get("info").get("description")
1938 self
.log
.debug(f
"Status for KDU {kdu_instance} obtained: {status}.")
1940 # write status to db
1941 result
= await self
.write_app_status_to_db(
1944 detailed_status
=str(detailed_status
),
1945 operation
=operation
,
1949 self
.log
.info("Error writing in database. Task exiting...")
1951 except asyncio
.CancelledError
as e
:
1953 f
"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1955 except Exception as e
:
1956 self
.log
.warning(f
"Exception in method {self._store_status.__name__}: {e}")
1958 # params for use in -f file
1959 # returns values file option and filename (in order to delete it at the end)
1960 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1961 if params
and len(params
) > 0:
1962 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1964 def get_random_number():
1965 r
= random
.randrange(start
=1, stop
=99999999)
1973 value
= params
.get(key
)
1974 if "!!yaml" in str(value
):
1975 value
= yaml
.safe_load(value
[7:])
1976 params2
[key
] = value
1978 values_file
= get_random_number() + ".yaml"
1979 with
open(values_file
, "w") as stream
:
1980 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1982 return "-f {}".format(values_file
), values_file
1986 # params for use in --set option
1988 def _params_to_set_option(params
: dict) -> str:
1990 if params
and len(params
) > 0:
1993 value
= params
.get(key
, None)
1994 if value
is not None:
1996 params_str
+= "--set "
2000 params_str
+= "{}={}".format(key
, value
)
2004 def generate_kdu_instance_name(**kwargs
):
2005 chart_name
= kwargs
["kdu_model"]
2006 # check embeded chart (file or dir)
2007 if chart_name
.startswith("/"):
2008 # extract file or directory name
2009 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
2011 elif "://" in chart_name
:
2012 # extract last portion of URL
2013 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
2016 for c
in chart_name
:
2017 if c
.isalpha() or c
.isnumeric():
2024 # if does not start with alpha character, prefix 'a'
2025 if not name
[0].isalpha():
2030 def get_random_number():
2031 r
= random
.randrange(start
=1, stop
=99999999)
2033 s
= s
.rjust(10, "0")
2036 name
= name
+ get_random_number()
2039 def _split_version(self
, kdu_model
: str) -> (str, str):
2041 if not self
._is
_helm
_chart
_a
_file
(kdu_model
) and ":" in kdu_model
:
2042 parts
= kdu_model
.split(sep
=":")
2044 version
= str(parts
[1])
2045 kdu_model
= parts
[0]
2046 return kdu_model
, version
2048 def _split_repo(self
, kdu_model
: str) -> (str, str):
2049 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2052 kdu_model (str): Associated KDU model
2055 (str, str): Tuple with the Chart name in index 0, and the repo name
2056 in index 2; if there was a problem finding them, return None
2063 idx
= kdu_model
.find("/")
2065 chart_name
= kdu_model
[idx
+ 1 :]
2066 repo_name
= kdu_model
[:idx
]
2068 return chart_name
, repo_name
2070 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
2071 """Obtain the Helm repository for an Helm Chart
2074 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2075 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2078 str: the repository URL; if Helm Chart is a local one, the function returns None
2081 _
, repo_name
= self
._split
_repo
(kdu_model
=kdu_model
)
2085 # Find repository link
2086 local_repo_list
= await self
.repo_list(cluster_uuid
)
2087 for repo
in local_repo_list
:
2088 if repo
["name"] == repo_name
:
2089 repo_url
= repo
["url"]
2090 break # it is not necessary to continue the loop if the repo link was found...
2094 async def create_certificate(
2095 self
, cluster_uuid
, namespace
, dns_prefix
, name
, secret_name
, usage
2097 paths
, env
= self
._init
_paths
_env
(
2098 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2100 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2101 await kubectl
.create_certificate(
2102 namespace
=namespace
,
2104 dns_prefix
=dns_prefix
,
2105 secret_name
=secret_name
,
2107 issuer_name
="ca-issuer",
2110 async def delete_certificate(self
, cluster_uuid
, namespace
, certificate_name
):
2111 paths
, env
= self
._init
_paths
_env
(
2112 cluster_name
=cluster_uuid
, create_if_not_exist
=True
2114 kubectl
= Kubectl(config_file
=paths
["kube_config"])
2115 await kubectl
.delete_certificate(namespace
, certificate_name
)