2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
24 from typing
import Union
25 from shlex
import quote
33 from uuid
import uuid4
35 from n2vc
.config
import EnvironConfig
36 from n2vc
.exceptions
import K8sException
37 from n2vc
.k8s_conn
import K8sConnector
40 class K8sHelmBaseConnector(K8sConnector
):
43 ####################################################################################
44 ################################### P U B L I C ####################################
45 ####################################################################################
48 service_account
= "osm"
54 kubectl_command
: str = "/usr/bin/kubectl",
55 helm_command
: str = "/usr/bin/helm",
61 :param fs: file system for kubernetes and helm configuration
62 :param db: database object to write current operation status
63 :param kubectl_command: path to kubectl executable
64 :param helm_command: path to helm executable
66 :param on_update_db: callback called when k8s connector updates database
70 K8sConnector
.__init
__(self
, db
=db
, log
=log
, on_update_db
=on_update_db
)
72 self
.log
.info("Initializing K8S Helm connector")
74 self
.config
= EnvironConfig()
75 # random numbers for release name generation
76 random
.seed(time
.time())
81 # exception if kubectl is not installed
82 self
.kubectl_command
= kubectl_command
83 self
._check
_file
_exists
(filename
=kubectl_command
, exception_if_not_exists
=True)
85 # exception if helm is not installed
86 self
._helm
_command
= helm_command
87 self
._check
_file
_exists
(filename
=helm_command
, exception_if_not_exists
=True)
89 # obtain stable repo url from config or apply default
90 self
._stable
_repo
_url
= self
.config
.get("stablerepourl")
91 if self
._stable
_repo
_url
== "None":
92 self
._stable
_repo
_url
= None
94 # Lock to avoid concurrent execution of helm commands
95 self
.cmd_lock
= asyncio
.Lock()
97 def _get_namespace(self
, cluster_uuid
: str) -> str:
99 Obtains the namespace used by the cluster with the uuid passed by argument
101 param: cluster_uuid: cluster's uuid
104 # first, obtain the cluster corresponding to the uuid passed by argument
105 k8scluster
= self
.db
.get_one(
106 "k8sclusters", q_filter
={"_id": cluster_uuid
}, fail_on_empty
=False
108 return k8scluster
.get("namespace")
113 namespace
: str = "kube-system",
114 reuse_cluster_uuid
=None,
118 It prepares a given K8s cluster environment to run Charts
120 :param k8s_creds: credentials to access a given K8s cluster, i.e. a valid
122 :param namespace: optional namespace to be used for helm. By default,
123 'kube-system' will be used
124 :param reuse_cluster_uuid: existing cluster uuid for reuse
125 :param kwargs: Additional parameters (None yet)
126 :return: uuid of the K8s cluster and True if connector has installed some
127 software in the cluster
128 (on error, an exception will be raised)
131 if reuse_cluster_uuid
:
132 cluster_id
= reuse_cluster_uuid
134 cluster_id
= str(uuid4())
137 "Initializing K8S Cluster {}. namespace: {}".format(cluster_id
, namespace
)
140 paths
, env
= self
._init
_paths
_env
(
141 cluster_name
=cluster_id
, create_if_not_exist
=True
143 mode
= stat
.S_IRUSR | stat
.S_IWUSR
144 with
open(paths
["kube_config"], "w", mode
) as f
:
146 os
.chmod(paths
["kube_config"], 0o600)
148 # Code with initialization specific of helm version
149 n2vc_installed_sw
= await self
._cluster
_init
(cluster_id
, namespace
, paths
, env
)
151 # sync fs with local data
152 self
.fs
.reverse_sync(from_path
=cluster_id
)
154 self
.log
.info("Cluster {} initialized".format(cluster_id
))
156 return cluster_id
, n2vc_installed_sw
163 repo_type
: str = "chart",
166 password
: str = None,
169 "Cluster {}, adding {} repository {}. URL: {}".format(
170 cluster_uuid
, repo_type
, name
, url
175 paths
, env
= self
._init
_paths
_env
(
176 cluster_name
=cluster_uuid
, create_if_not_exist
=True
180 self
.fs
.sync(from_path
=cluster_uuid
)
182 # helm repo add name url
183 command
= ("env KUBECONFIG={} {} repo add {} {}").format(
184 paths
["kube_config"], self
._helm
_command
, quote(name
), quote(url
)
188 temp_cert_file
= os
.path
.join(
189 self
.fs
.path
, "{}/helmcerts/".format(cluster_uuid
), "temp.crt"
191 os
.makedirs(os
.path
.dirname(temp_cert_file
), exist_ok
=True)
192 with
open(temp_cert_file
, "w") as the_cert
:
194 command
+= " --ca-file {}".format(quote(temp_cert_file
))
197 command
+= " --username={}".format(quote(user
))
200 command
+= " --password={}".format(quote(password
))
202 self
.log
.debug("adding repo: {}".format(command
))
203 await self
._local
_async
_exec
(
204 command
=command
, raise_exception_on_error
=True, env
=env
208 command
= "env KUBECONFIG={} {} repo update {}".format(
209 paths
["kube_config"], self
._helm
_command
, quote(name
)
211 self
.log
.debug("updating repo: {}".format(command
))
212 await self
._local
_async
_exec
(
213 command
=command
, raise_exception_on_error
=False, env
=env
217 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
219 async def repo_update(self
, cluster_uuid
: str, name
: str, repo_type
: str = "chart"):
221 "Cluster {}, updating {} repository {}".format(
222 cluster_uuid
, repo_type
, name
227 paths
, env
= self
._init
_paths
_env
(
228 cluster_name
=cluster_uuid
, create_if_not_exist
=True
232 self
.fs
.sync(from_path
=cluster_uuid
)
235 command
= "{} repo update {}".format(self
._helm
_command
, quote(name
))
236 self
.log
.debug("updating repo: {}".format(command
))
237 await self
._local
_async
_exec
(
238 command
=command
, raise_exception_on_error
=False, env
=env
242 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
244 async def repo_list(self
, cluster_uuid
: str) -> list:
246 Get the list of registered repositories
248 :return: list of registered repositories: [ (name, url) .... ]
251 self
.log
.debug("list repositories for cluster {}".format(cluster_uuid
))
254 paths
, env
= self
._init
_paths
_env
(
255 cluster_name
=cluster_uuid
, create_if_not_exist
=True
259 self
.fs
.sync(from_path
=cluster_uuid
)
261 command
= "env KUBECONFIG={} {} repo list --output yaml".format(
262 paths
["kube_config"], self
._helm
_command
265 # Set exception to false because if there are no repos just want an empty list
266 output
, _rc
= await self
._local
_async
_exec
(
267 command
=command
, raise_exception_on_error
=False, env
=env
271 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
274 if output
and len(output
) > 0:
275 repos
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
276 # unify format between helm2 and helm3 setting all keys lowercase
277 return self
._lower
_keys
_list
(repos
)
283 async def repo_remove(self
, cluster_uuid
: str, name
: str):
285 "remove {} repositories for cluster {}".format(name
, cluster_uuid
)
289 paths
, env
= self
._init
_paths
_env
(
290 cluster_name
=cluster_uuid
, create_if_not_exist
=True
294 self
.fs
.sync(from_path
=cluster_uuid
)
296 command
= "env KUBECONFIG={} {} repo remove {}".format(
297 paths
["kube_config"], self
._helm
_command
, quote(name
)
299 await self
._local
_async
_exec
(
300 command
=command
, raise_exception_on_error
=True, env
=env
304 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
310 uninstall_sw
: bool = False,
315 Resets the Kubernetes cluster by removing the helm deployment that represents it.
317 :param cluster_uuid: The UUID of the cluster to reset
318 :param force: Boolean to force the reset
319 :param uninstall_sw: Boolean to force the reset
320 :param kwargs: Additional parameters (None yet)
321 :return: Returns True if successful or raises an exception.
323 namespace
= self
._get
_namespace
(cluster_uuid
=cluster_uuid
)
325 "Resetting K8s environment. cluster uuid: {} uninstall={}".format(
326 cluster_uuid
, uninstall_sw
331 self
.fs
.sync(from_path
=cluster_uuid
)
333 # uninstall releases if needed.
335 releases
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
336 if len(releases
) > 0:
340 kdu_instance
= r
.get("name")
341 chart
= r
.get("chart")
343 "Uninstalling {} -> {}".format(chart
, kdu_instance
)
345 await self
.uninstall(
346 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
348 except Exception as e
:
349 # will not raise exception as it was found
350 # that in some cases of previously installed helm releases it
353 "Error uninstalling release {}: {}".format(
359 "Cluster uuid: {} has releases and not force. Leaving K8s helm environment"
360 ).format(cluster_uuid
)
363 False # Allow to remove k8s cluster without removing Tiller
367 await self
._uninstall
_sw
(cluster_id
=cluster_uuid
, namespace
=namespace
)
369 # delete cluster directory
370 self
.log
.debug("Removing directory {}".format(cluster_uuid
))
371 self
.fs
.file_delete(cluster_uuid
, ignore_non_exist
=True)
372 # Remove also local directorio if still exist
373 direct
= self
.fs
.path
+ "/" + cluster_uuid
374 shutil
.rmtree(direct
, ignore_errors
=True)
378 def _is_helm_chart_a_file(self
, chart_name
: str):
379 return chart_name
.count("/") > 1
381 async def _install_impl(
389 timeout
: float = 300,
391 db_dict
: dict = None,
392 kdu_name
: str = None,
393 namespace
: str = None,
396 paths
, env
= self
._init
_paths
_env
(
397 cluster_name
=cluster_id
, create_if_not_exist
=True
401 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
402 cluster_id
=cluster_id
, params
=params
406 kdu_model
, version
= self
._split
_version
(kdu_model
)
408 _
, repo
= self
._split
_repo
(kdu_model
)
410 await self
.repo_update(cluster_id
, repo
)
412 command
= self
._get
_install
_command
(
420 paths
["kube_config"],
423 self
.log
.debug("installing: {}".format(command
))
426 # exec helm in a task
427 exec_task
= asyncio
.ensure_future(
428 coro_or_future
=self
._local
_async
_exec
(
429 command
=command
, raise_exception_on_error
=False, env
=env
433 # write status in another task
434 status_task
= asyncio
.ensure_future(
435 coro_or_future
=self
._store
_status
(
436 cluster_id
=cluster_id
,
437 kdu_instance
=kdu_instance
,
444 # wait for execution task
445 await asyncio
.wait([exec_task
])
450 output
, rc
= exec_task
.result()
453 output
, rc
= await self
._local
_async
_exec
(
454 command
=command
, raise_exception_on_error
=False, env
=env
457 # remove temporal values yaml file
459 os
.remove(file_to_delete
)
462 await self
._store
_status
(
463 cluster_id
=cluster_id
,
464 kdu_instance
=kdu_instance
,
471 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
473 raise K8sException(msg
)
479 kdu_model
: str = None,
481 timeout
: float = 300,
483 db_dict
: dict = None,
485 self
.log
.debug("upgrading {} in cluster {}".format(kdu_model
, cluster_uuid
))
488 self
.fs
.sync(from_path
=cluster_uuid
)
490 # look for instance to obtain namespace
491 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
492 if not instance_info
:
493 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
496 paths
, env
= self
._init
_paths
_env
(
497 cluster_name
=cluster_uuid
, create_if_not_exist
=True
501 self
.fs
.sync(from_path
=cluster_uuid
)
504 params_str
, file_to_delete
= self
._params
_to
_file
_option
(
505 cluster_id
=cluster_uuid
, params
=params
509 kdu_model
, version
= self
._split
_version
(kdu_model
)
511 _
, repo
= self
._split
_repo
(kdu_model
)
513 await self
.repo_update(cluster_uuid
, repo
)
515 command
= self
._get
_upgrade
_command
(
518 instance_info
["namespace"],
523 paths
["kube_config"],
526 self
.log
.debug("upgrading: {}".format(command
))
529 # exec helm in a task
530 exec_task
= asyncio
.ensure_future(
531 coro_or_future
=self
._local
_async
_exec
(
532 command
=command
, raise_exception_on_error
=False, env
=env
535 # write status in another task
536 status_task
= asyncio
.ensure_future(
537 coro_or_future
=self
._store
_status
(
538 cluster_id
=cluster_uuid
,
539 kdu_instance
=kdu_instance
,
540 namespace
=instance_info
["namespace"],
546 # wait for execution task
547 await asyncio
.wait([exec_task
])
551 output
, rc
= exec_task
.result()
554 output
, rc
= await self
._local
_async
_exec
(
555 command
=command
, raise_exception_on_error
=False, env
=env
558 # remove temporal values yaml file
560 os
.remove(file_to_delete
)
563 await self
._store
_status
(
564 cluster_id
=cluster_uuid
,
565 kdu_instance
=kdu_instance
,
566 namespace
=instance_info
["namespace"],
572 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
574 raise K8sException(msg
)
577 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
579 # return new revision number
580 instance
= await self
.get_instance_info(
581 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
584 revision
= int(instance
.get("revision"))
585 self
.log
.debug("New revision: {}".format(revision
))
595 total_timeout
: float = 1800,
596 cluster_uuid
: str = None,
597 kdu_model
: str = None,
599 db_dict
: dict = None,
602 """Scale a resource in a Helm Chart.
605 kdu_instance: KDU instance name
606 scale: Scale to which to set the resource
607 resource_name: Resource name
608 total_timeout: The time, in seconds, to wait
609 cluster_uuid: The UUID of the cluster
610 kdu_model: The chart reference
611 atomic: if set, upgrade process rolls back changes made in case of failed upgrade.
612 The --wait flag will be set automatically if --atomic is used
613 db_dict: Dictionary for any additional data
614 kwargs: Additional parameters
617 True if successful, False otherwise
620 debug_mgs
= "scaling {} in cluster {}".format(kdu_model
, cluster_uuid
)
622 debug_mgs
= "scaling resource {} in model {} (cluster {})".format(
623 resource_name
, kdu_model
, cluster_uuid
626 self
.log
.debug(debug_mgs
)
628 # look for instance to obtain namespace
629 # get_instance_info function calls the sync command
630 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
631 if not instance_info
:
632 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
635 paths
, env
= self
._init
_paths
_env
(
636 cluster_name
=cluster_uuid
, create_if_not_exist
=True
640 kdu_model
, version
= self
._split
_version
(kdu_model
)
642 repo_url
= await self
._find
_repo
(kdu_model
, cluster_uuid
)
644 _
, replica_str
= await self
._get
_replica
_count
_url
(
645 kdu_model
, repo_url
, resource_name
648 command
= self
._get
_upgrade
_scale
_command
(
651 instance_info
["namespace"],
658 paths
["kube_config"],
661 self
.log
.debug("scaling: {}".format(command
))
664 # exec helm in a task
665 exec_task
= asyncio
.ensure_future(
666 coro_or_future
=self
._local
_async
_exec
(
667 command
=command
, raise_exception_on_error
=False, env
=env
670 # write status in another task
671 status_task
= asyncio
.ensure_future(
672 coro_or_future
=self
._store
_status
(
673 cluster_id
=cluster_uuid
,
674 kdu_instance
=kdu_instance
,
675 namespace
=instance_info
["namespace"],
681 # wait for execution task
682 await asyncio
.wait([exec_task
])
686 output
, rc
= exec_task
.result()
689 output
, rc
= await self
._local
_async
_exec
(
690 command
=command
, raise_exception_on_error
=False, env
=env
694 await self
._store
_status
(
695 cluster_id
=cluster_uuid
,
696 kdu_instance
=kdu_instance
,
697 namespace
=instance_info
["namespace"],
703 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
705 raise K8sException(msg
)
708 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
712 async def get_scale_count(
720 """Get a resource scale count.
723 cluster_uuid: The UUID of the cluster
724 resource_name: Resource name
725 kdu_instance: KDU instance name
726 kdu_model: The name or path of an Helm Chart
727 kwargs: Additional parameters
730 Resource instance count
734 "getting scale count for {} in cluster {}".format(kdu_model
, cluster_uuid
)
737 # look for instance to obtain namespace
738 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
739 if not instance_info
:
740 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
743 paths
, _
= self
._init
_paths
_env
(
744 cluster_name
=cluster_uuid
, create_if_not_exist
=True
747 replicas
= await self
._get
_replica
_count
_instance
(
748 kdu_instance
=kdu_instance
,
749 namespace
=instance_info
["namespace"],
750 kubeconfig
=paths
["kube_config"],
751 resource_name
=resource_name
,
755 f
"Number of replicas of the KDU instance {kdu_instance} and resource {resource_name} obtained: {replicas}"
758 # Get default value if scale count is not found from provided values
759 # Important note: this piece of code shall only be executed in the first scaling operation,
760 # since it is expected that the _get_replica_count_instance is able to obtain the number of
761 # replicas when a scale operation was already conducted previously for this KDU/resource!
763 repo_url
= await self
._find
_repo
(
764 kdu_model
=kdu_model
, cluster_uuid
=cluster_uuid
766 replicas
, _
= await self
._get
_replica
_count
_url
(
767 kdu_model
=kdu_model
, repo_url
=repo_url
, resource_name
=resource_name
771 f
"Number of replicas of the Helm Chart package for KDU instance {kdu_instance} and resource "
772 f
"{resource_name} obtained: {replicas}"
776 msg
= "Replica count not found. Cannot be scaled"
778 raise K8sException(msg
)
783 self
, cluster_uuid
: str, kdu_instance
: str, revision
=0, db_dict
: dict = None
786 "rollback kdu_instance {} to revision {} from cluster {}".format(
787 kdu_instance
, revision
, cluster_uuid
792 self
.fs
.sync(from_path
=cluster_uuid
)
794 # look for instance to obtain namespace
795 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
796 if not instance_info
:
797 raise K8sException("kdu_instance {} not found".format(kdu_instance
))
800 paths
, env
= self
._init
_paths
_env
(
801 cluster_name
=cluster_uuid
, create_if_not_exist
=True
805 self
.fs
.sync(from_path
=cluster_uuid
)
807 command
= self
._get
_rollback
_command
(
808 kdu_instance
, instance_info
["namespace"], revision
, paths
["kube_config"]
811 self
.log
.debug("rolling_back: {}".format(command
))
813 # exec helm in a task
814 exec_task
= asyncio
.ensure_future(
815 coro_or_future
=self
._local
_async
_exec
(
816 command
=command
, raise_exception_on_error
=False, env
=env
819 # write status in another task
820 status_task
= asyncio
.ensure_future(
821 coro_or_future
=self
._store
_status
(
822 cluster_id
=cluster_uuid
,
823 kdu_instance
=kdu_instance
,
824 namespace
=instance_info
["namespace"],
826 operation
="rollback",
830 # wait for execution task
831 await asyncio
.wait([exec_task
])
836 output
, rc
= exec_task
.result()
839 await self
._store
_status
(
840 cluster_id
=cluster_uuid
,
841 kdu_instance
=kdu_instance
,
842 namespace
=instance_info
["namespace"],
844 operation
="rollback",
848 msg
= "Error executing command: {}\nOutput: {}".format(command
, output
)
850 raise K8sException(msg
)
853 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
855 # return new revision number
856 instance
= await self
.get_instance_info(
857 cluster_uuid
=cluster_uuid
, kdu_instance
=kdu_instance
860 revision
= int(instance
.get("revision"))
861 self
.log
.debug("New revision: {}".format(revision
))
866 async def uninstall(self
, cluster_uuid
: str, kdu_instance
: str, **kwargs
):
868 Removes an existing KDU instance. It would implicitly use the `delete` or 'uninstall' call
869 (this call should happen after all _terminate-config-primitive_ of the VNF
872 :param cluster_uuid: UUID of a K8s cluster known by OSM, or namespace:cluster_id
873 :param kdu_instance: unique name for the KDU instance to be deleted
874 :param kwargs: Additional parameters (None yet)
875 :return: True if successful
879 "uninstall kdu_instance {} from cluster {}".format(
880 kdu_instance
, cluster_uuid
885 self
.fs
.sync(from_path
=cluster_uuid
)
887 # look for instance to obtain namespace
888 instance_info
= await self
.get_instance_info(cluster_uuid
, kdu_instance
)
889 if not instance_info
:
890 self
.log
.warning(("kdu_instance {} not found".format(kdu_instance
)))
893 paths
, env
= self
._init
_paths
_env
(
894 cluster_name
=cluster_uuid
, create_if_not_exist
=True
898 self
.fs
.sync(from_path
=cluster_uuid
)
900 command
= self
._get
_uninstall
_command
(
901 kdu_instance
, instance_info
["namespace"], paths
["kube_config"]
903 output
, _rc
= await self
._local
_async
_exec
(
904 command
=command
, raise_exception_on_error
=True, env
=env
908 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
910 return self
._output
_to
_table
(output
)
912 async def instances_list(self
, cluster_uuid
: str) -> list:
914 returns a list of deployed releases in a cluster
916 :param cluster_uuid: the 'cluster' or 'namespace:cluster'
920 self
.log
.debug("list releases for cluster {}".format(cluster_uuid
))
923 self
.fs
.sync(from_path
=cluster_uuid
)
925 # execute internal command
926 result
= await self
._instances
_list
(cluster_uuid
)
929 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
933 async def get_instance_info(self
, cluster_uuid
: str, kdu_instance
: str):
934 instances
= await self
.instances_list(cluster_uuid
=cluster_uuid
)
935 for instance
in instances
:
936 if instance
.get("name") == kdu_instance
:
938 self
.log
.debug("Instance {} not found".format(kdu_instance
))
941 async def upgrade_charm(
945 charm_id
: str = None,
946 charm_type
: str = None,
947 timeout
: float = None,
949 """This method upgrade charms in VNFs
952 ee_id: Execution environment id
953 path: Local path to the charm
955 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
956 timeout: (Float) Timeout for the ns update operation
959 The output of the update operation if status equals to "completed"
961 raise K8sException("KDUs deployed with Helm do not support charm upgrade")
963 async def exec_primitive(
965 cluster_uuid
: str = None,
966 kdu_instance
: str = None,
967 primitive_name
: str = None,
968 timeout
: float = 300,
970 db_dict
: dict = None,
973 """Exec primitive (Juju action)
975 :param cluster_uuid: The UUID of the cluster or namespace:cluster
976 :param kdu_instance: The unique name of the KDU instance
977 :param primitive_name: Name of action that will be executed
978 :param timeout: Timeout for action execution
979 :param params: Dictionary of all the parameters needed for the action
980 :db_dict: Dictionary for any additional data
981 :param kwargs: Additional parameters (None yet)
983 :return: Returns the output of the action
986 "KDUs deployed with Helm don't support actions "
987 "different from rollback, upgrade and status"
990 async def get_services(
991 self
, cluster_uuid
: str, kdu_instance
: str, namespace
: str
994 Returns a list of services defined for the specified kdu instance.
996 :param cluster_uuid: UUID of a K8s cluster known by OSM
997 :param kdu_instance: unique name for the KDU instance
998 :param namespace: K8s namespace used by the KDU instance
999 :return: If successful, it will return a list of services, Each service
1000 can have the following data:
1001 - `name` of the service
1002 - `type` type of service in the k8 cluster
1003 - `ports` List of ports offered by the service, for each port includes at least
1004 name, port, protocol
1005 - `cluster_ip` Internal ip to be used inside k8s cluster
1006 - `external_ip` List of external ips (in case they are available)
1010 "get_services: cluster_uuid: {}, kdu_instance: {}".format(
1011 cluster_uuid
, kdu_instance
1016 paths
, env
= self
._init
_paths
_env
(
1017 cluster_name
=cluster_uuid
, create_if_not_exist
=True
1021 self
.fs
.sync(from_path
=cluster_uuid
)
1023 # get list of services names for kdu
1024 service_names
= await self
._get
_services
(
1025 cluster_uuid
, kdu_instance
, namespace
, paths
["kube_config"]
1029 for service
in service_names
:
1030 service
= await self
._get
_service
(cluster_uuid
, service
, namespace
)
1031 service_list
.append(service
)
1034 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1038 async def get_service(
1039 self
, cluster_uuid
: str, service_name
: str, namespace
: str
1042 "get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
1043 service_name
, namespace
, cluster_uuid
1048 self
.fs
.sync(from_path
=cluster_uuid
)
1050 service
= await self
._get
_service
(cluster_uuid
, service_name
, namespace
)
1053 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1057 async def status_kdu(
1058 self
, cluster_uuid
: str, kdu_instance
: str, yaml_format
: str = False, **kwargs
1059 ) -> Union
[str, dict]:
1061 This call would retrieve tha current state of a given KDU instance. It would be
1062 would allow to retrieve the _composition_ (i.e. K8s objects) and _specific
1063 values_ of the configuration parameters applied to a given instance. This call
1064 would be based on the `status` call.
1066 :param cluster_uuid: UUID of a K8s cluster known by OSM
1067 :param kdu_instance: unique name for the KDU instance
1068 :param kwargs: Additional parameters (None yet)
1069 :param yaml_format: if the return shall be returned as an YAML string or as a
1071 :return: If successful, it will return the following vector of arguments:
1072 - K8s `namespace` in the cluster where the KDU lives
1073 - `state` of the KDU instance. It can be:
1080 - List of `resources` (objects) that this release consists of, sorted by kind,
1081 and the status of those resources
1082 - Last `deployment_time`.
1086 "status_kdu: cluster_uuid: {}, kdu_instance: {}".format(
1087 cluster_uuid
, kdu_instance
1092 self
.fs
.sync(from_path
=cluster_uuid
)
1094 # get instance: needed to obtain namespace
1095 instances
= await self
._instances
_list
(cluster_id
=cluster_uuid
)
1096 for instance
in instances
:
1097 if instance
.get("name") == kdu_instance
:
1100 # instance does not exist
1102 "Instance name: {} not found in cluster: {}".format(
1103 kdu_instance
, cluster_uuid
1107 status
= await self
._status
_kdu
(
1108 cluster_id
=cluster_uuid
,
1109 kdu_instance
=kdu_instance
,
1110 namespace
=instance
["namespace"],
1111 yaml_format
=yaml_format
,
1112 show_error_log
=True,
1116 self
.fs
.reverse_sync(from_path
=cluster_uuid
)
1120 async def get_values_kdu(
1121 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1123 self
.log
.debug("get kdu_instance values {}".format(kdu_instance
))
1125 return await self
._exec
_get
_command
(
1126 get_command
="values",
1127 kdu_instance
=kdu_instance
,
1128 namespace
=namespace
,
1129 kubeconfig
=kubeconfig
,
1132 async def values_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1133 """Method to obtain the Helm Chart package's values
1136 kdu_model: The name or path of an Helm Chart
1137 repo_url: Helm Chart repository url
1140 str: the values of the Helm Chart package
1144 "inspect kdu_model values {} from (optional) repo: {}".format(
1149 return await self
._exec
_inspect
_command
(
1150 inspect_command
="values", kdu_model
=kdu_model
, repo_url
=repo_url
1153 async def help_kdu(self
, kdu_model
: str, repo_url
: str = None) -> str:
1155 "inspect kdu_model {} readme.md from repo: {}".format(kdu_model
, repo_url
)
1158 return await self
._exec
_inspect
_command
(
1159 inspect_command
="readme", kdu_model
=kdu_model
, repo_url
=repo_url
1162 async def synchronize_repos(self
, cluster_uuid
: str):
1163 self
.log
.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid
))
1165 db_repo_ids
= self
._get
_helm
_chart
_repos
_ids
(cluster_uuid
)
1166 db_repo_dict
= self
._get
_db
_repos
_dict
(db_repo_ids
)
1168 local_repo_list
= await self
.repo_list(cluster_uuid
)
1169 local_repo_dict
= {repo
["name"]: repo
["url"] for repo
in local_repo_list
}
1171 deleted_repo_list
= []
1172 added_repo_dict
= {}
1174 # iterate over the list of repos in the database that should be
1175 # added if not present
1176 for repo_name
, db_repo
in db_repo_dict
.items():
1178 # check if it is already present
1179 curr_repo_url
= local_repo_dict
.get(db_repo
["name"])
1180 repo_id
= db_repo
.get("_id")
1181 if curr_repo_url
!= db_repo
["url"]:
1184 "repo {} url changed, delete and and again".format(
1188 await self
.repo_remove(cluster_uuid
, db_repo
["name"])
1189 deleted_repo_list
.append(repo_id
)
1192 self
.log
.debug("add repo {}".format(db_repo
["name"]))
1193 if "ca_cert" in db_repo
:
1194 await self
.repo_add(
1198 cert
=db_repo
["ca_cert"],
1201 await self
.repo_add(
1206 added_repo_dict
[repo_id
] = db_repo
["name"]
1207 except Exception as e
:
1209 "Error adding repo id: {}, err_msg: {} ".format(
1214 # Delete repos that are present but not in nbi_list
1215 for repo_name
in local_repo_dict
:
1216 if not db_repo_dict
.get(repo_name
) and repo_name
!= "stable":
1217 self
.log
.debug("delete repo {}".format(repo_name
))
1219 await self
.repo_remove(cluster_uuid
, repo_name
)
1220 deleted_repo_list
.append(repo_name
)
1221 except Exception as e
:
1223 "Error deleting repo, name: {}, err_msg: {}".format(
1228 return deleted_repo_list
, added_repo_dict
1230 except K8sException
:
1232 except Exception as e
:
1233 # Do not raise errors synchronizing repos
1234 self
.log
.error("Error synchronizing repos: {}".format(e
))
1235 raise Exception("Error synchronizing repos: {}".format(e
))
1237 def _get_db_repos_dict(self
, repo_ids
: list):
1239 for repo_id
in repo_ids
:
1240 db_repo
= self
.db
.get_one("k8srepos", {"_id": repo_id
})
1241 db_repos_dict
[db_repo
["name"]] = db_repo
1242 return db_repos_dict
1245 ####################################################################################
1246 ################################### TO BE IMPLEMENTED SUBCLASSES ###################
1247 ####################################################################################
1251 def _init_paths_env(self
, cluster_name
: str, create_if_not_exist
: bool = True):
1253 Creates and returns base cluster and kube dirs and returns them.
1254 Also created helm3 dirs according to new directory specification, paths are
1255 not returned but assigned to helm environment variables
1257 :param cluster_name: cluster_name
1258 :return: Dictionary with config_paths and dictionary with helm environment variables
1262 async def _cluster_init(self
, cluster_id
, namespace
, paths
, env
):
1264 Implements the helm version dependent cluster initialization
1268 async def _instances_list(self
, cluster_id
):
1270 Implements the helm version dependent helm instances list
1274 async def _get_services(self
, cluster_id
, kdu_instance
, namespace
, kubeconfig
):
1276 Implements the helm version dependent method to obtain services from a helm instance
1280 async def _status_kdu(
1284 namespace
: str = None,
1285 yaml_format
: bool = False,
1286 show_error_log
: bool = False,
1287 ) -> Union
[str, dict]:
1289 Implements the helm version dependent method to obtain status of a helm instance
1293 def _get_install_command(
1305 Obtain command to be executed to delete the indicated instance
1309 def _get_upgrade_scale_command(
1322 """Generates the command to scale a Helm Chart release
1325 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1326 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1327 namespace (str): Namespace where this KDU instance is deployed
1328 scale (int): Scale count
1329 version (str): Constraint with specific version of the Chart to use
1330 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1331 The --wait flag will be set automatically if --atomic is used
1332 replica_str (str): The key under resource_name key where the scale count is stored
1333 timeout (float): The time, in seconds, to wait
1334 resource_name (str): The KDU's resource to scale
1335 kubeconfig (str): Kubeconfig file path
1338 str: command to scale a Helm Chart release
1342 def _get_upgrade_command(
1353 """Generates the command to upgrade a Helm Chart release
1356 kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
1357 kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
1358 namespace (str): Namespace where this KDU instance is deployed
1359 params_str (str): Params used to upgrade the Helm Chart release
1360 version (str): Constraint with specific version of the Chart to use
1361 atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
1362 The --wait flag will be set automatically if --atomic is used
1363 timeout (float): The time, in seconds, to wait
1364 kubeconfig (str): Kubeconfig file path
1367 str: command to upgrade a Helm Chart release
1371 def _get_rollback_command(
1372 self
, kdu_instance
, namespace
, revision
, kubeconfig
1375 Obtain command to be executed to rollback the indicated instance
1379 def _get_uninstall_command(
1380 self
, kdu_instance
: str, namespace
: str, kubeconfig
: str
1383 Obtain command to be executed to delete the indicated instance
1387 def _get_inspect_command(
1388 self
, show_command
: str, kdu_model
: str, repo_str
: str, version
: str
1390 """Generates the command to obtain the information about an Helm Chart package
1391 (´helm show ...´ command)
1394 show_command: the second part of the command (`helm show <show_command>`)
1395 kdu_model: The name or path of an Helm Chart
1396 repo_url: Helm Chart repository url
1397 version: constraint with specific version of the Chart to use
1400 str: the generated Helm Chart command
1404 def _get_get_command(
1405 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1407 """Obtain command to be executed to get information about the kdu instance."""
1410 async def _uninstall_sw(self
, cluster_id
: str, namespace
: str):
1412 Method call to uninstall cluster software for helm. This method is dependent
1414 For Helm v2 it will be called when Tiller must be uninstalled
1415 For Helm v3 it does nothing and does not need to be callled
1419 def _get_helm_chart_repos_ids(self
, cluster_uuid
) -> list:
1421 Obtains the cluster repos identifiers
1425 ####################################################################################
1426 ################################### P R I V A T E ##################################
1427 ####################################################################################
1431 def _check_file_exists(filename
: str, exception_if_not_exists
: bool = False):
1432 if os
.path
.exists(filename
):
1435 msg
= "File {} does not exist".format(filename
)
1436 if exception_if_not_exists
:
1437 raise K8sException(msg
)
1440 def _remove_multiple_spaces(strobj
):
1441 strobj
= strobj
.strip()
1442 while " " in strobj
:
1443 strobj
= strobj
.replace(" ", " ")
1447 def _output_to_lines(output
: str) -> list:
1448 output_lines
= list()
1449 lines
= output
.splitlines(keepends
=False)
1453 output_lines
.append(line
)
1457 def _output_to_table(output
: str) -> list:
1458 output_table
= list()
1459 lines
= output
.splitlines(keepends
=False)
1461 line
= line
.replace("\t", " ")
1463 output_table
.append(line_list
)
1464 cells
= line
.split(sep
=" ")
1468 line_list
.append(cell
)
1472 def _parse_services(output
: str) -> list:
1473 lines
= output
.splitlines(keepends
=False)
1476 line
= line
.replace("\t", " ")
1477 cells
= line
.split(sep
=" ")
1478 if len(cells
) > 0 and cells
[0].startswith("service/"):
1479 elems
= cells
[0].split(sep
="/")
1481 services
.append(elems
[1])
1485 def _get_deep(dictionary
: dict, members
: tuple):
1490 value
= target
.get(m
)
1499 # find key:value in several lines
1501 def _find_in_lines(p_lines
: list, p_key
: str) -> str:
1502 for line
in p_lines
:
1504 if line
.startswith(p_key
+ ":"):
1505 parts
= line
.split(":")
1506 the_value
= parts
[1].strip()
1514 def _lower_keys_list(input_list
: list):
1516 Transform the keys in a list of dictionaries to lower case and returns a new list
1521 for dictionary
in input_list
:
1522 new_dict
= dict((k
.lower(), v
) for k
, v
in dictionary
.items())
1523 new_list
.append(new_dict
)
1526 async def _local_async_exec(
1529 raise_exception_on_error
: bool = False,
1530 show_error_log
: bool = True,
1531 encode_utf8
: bool = False,
1534 command
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command
)
1536 "Executing async local command: {}, env: {}".format(command
, env
)
1540 command
= shlex
.split(command
)
1542 environ
= os
.environ
.copy()
1547 async with self
.cmd_lock
:
1548 process
= await asyncio
.create_subprocess_exec(
1550 stdout
=asyncio
.subprocess
.PIPE
,
1551 stderr
=asyncio
.subprocess
.PIPE
,
1555 # wait for command terminate
1556 stdout
, stderr
= await process
.communicate()
1558 return_code
= process
.returncode
1562 output
= stdout
.decode("utf-8").strip()
1563 # output = stdout.decode()
1565 output
= stderr
.decode("utf-8").strip()
1566 # output = stderr.decode()
1568 if return_code
!= 0 and show_error_log
:
1570 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1573 self
.log
.debug("Return code: {}".format(return_code
))
1575 if raise_exception_on_error
and return_code
!= 0:
1576 raise K8sException(output
)
1579 output
= output
.encode("utf-8").strip()
1580 output
= str(output
).replace("\\n", "\n")
1582 return output
, return_code
1584 except asyncio
.CancelledError
:
1586 except K8sException
:
1588 except Exception as e
:
1589 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1591 if raise_exception_on_error
:
1592 raise K8sException(e
) from e
1596 async def _local_async_exec_pipe(
1600 raise_exception_on_error
: bool = True,
1601 show_error_log
: bool = True,
1602 encode_utf8
: bool = False,
1605 command1
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command1
)
1606 command2
= K8sHelmBaseConnector
._remove
_multiple
_spaces
(command2
)
1607 command
= "{} | {}".format(command1
, command2
)
1609 "Executing async local command: {}, env: {}".format(command
, env
)
1613 command1
= shlex
.split(command1
)
1614 command2
= shlex
.split(command2
)
1616 environ
= os
.environ
.copy()
1621 async with self
.cmd_lock
:
1622 read
, write
= os
.pipe()
1623 await asyncio
.create_subprocess_exec(
1624 *command1
, stdout
=write
, env
=environ
1627 process_2
= await asyncio
.create_subprocess_exec(
1628 *command2
, stdin
=read
, stdout
=asyncio
.subprocess
.PIPE
, env
=environ
1631 stdout
, stderr
= await process_2
.communicate()
1633 return_code
= process_2
.returncode
1637 output
= stdout
.decode("utf-8").strip()
1638 # output = stdout.decode()
1640 output
= stderr
.decode("utf-8").strip()
1641 # output = stderr.decode()
1643 if return_code
!= 0 and show_error_log
:
1645 "Return code (FAIL): {}\nOutput:\n{}".format(return_code
, output
)
1648 self
.log
.debug("Return code: {}".format(return_code
))
1650 if raise_exception_on_error
and return_code
!= 0:
1651 raise K8sException(output
)
1654 output
= output
.encode("utf-8").strip()
1655 output
= str(output
).replace("\\n", "\n")
1657 return output
, return_code
1658 except asyncio
.CancelledError
:
1660 except K8sException
:
1662 except Exception as e
:
1663 msg
= "Exception executing command: {} -> {}".format(command
, e
)
1665 if raise_exception_on_error
:
1666 raise K8sException(e
) from e
1670 async def _get_service(self
, cluster_id
, service_name
, namespace
):
1672 Obtains the data of the specified service in the k8cluster.
1674 :param cluster_id: id of a K8s cluster known by OSM
1675 :param service_name: name of the K8s service in the specified namespace
1676 :param namespace: K8s namespace used by the KDU instance
1677 :return: If successful, it will return a service with the following data:
1678 - `name` of the service
1679 - `type` type of service in the k8 cluster
1680 - `ports` List of ports offered by the service, for each port includes at least
1681 name, port, protocol
1682 - `cluster_ip` Internal ip to be used inside k8s cluster
1683 - `external_ip` List of external ips (in case they are available)
1687 paths
, env
= self
._init
_paths
_env
(
1688 cluster_name
=cluster_id
, create_if_not_exist
=True
1691 command
= "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
1692 self
.kubectl_command
,
1693 paths
["kube_config"],
1695 quote(service_name
),
1698 output
, _rc
= await self
._local
_async
_exec
(
1699 command
=command
, raise_exception_on_error
=True, env
=env
1702 data
= yaml
.load(output
, Loader
=yaml
.SafeLoader
)
1705 "name": service_name
,
1706 "type": self
._get
_deep
(data
, ("spec", "type")),
1707 "ports": self
._get
_deep
(data
, ("spec", "ports")),
1708 "cluster_ip": self
._get
_deep
(data
, ("spec", "clusterIP")),
1710 if service
["type"] == "LoadBalancer":
1711 ip_map_list
= self
._get
_deep
(data
, ("status", "loadBalancer", "ingress"))
1712 ip_list
= [elem
["ip"] for elem
in ip_map_list
]
1713 service
["external_ip"] = ip_list
1717 async def _exec_get_command(
1718 self
, get_command
: str, kdu_instance
: str, namespace
: str, kubeconfig
: str
1720 """Obtains information about the kdu instance."""
1722 full_command
= self
._get
_get
_command
(
1723 get_command
, kdu_instance
, namespace
, kubeconfig
1726 output
, _rc
= await self
._local
_async
_exec
(command
=full_command
)
1730 async def _exec_inspect_command(
1731 self
, inspect_command
: str, kdu_model
: str, repo_url
: str = None
1733 """Obtains information about an Helm Chart package (´helm show´ command)
1736 inspect_command: the Helm sub command (`helm show <inspect_command> ...`)
1737 kdu_model: The name or path of an Helm Chart
1738 repo_url: Helm Chart repository url
1741 str: the requested info about the Helm Chart package
1746 repo_str
= " --repo {}".format(quote(repo_url
))
1748 # Obtain the Chart's name and store it in the var kdu_model
1749 kdu_model
, _
= self
._split
_repo
(kdu_model
=kdu_model
)
1751 kdu_model
, version
= self
._split
_version
(kdu_model
)
1753 version_str
= "--version {}".format(quote(version
))
1757 full_command
= self
._get
_inspect
_command
(
1758 show_command
=inspect_command
,
1759 kdu_model
=quote(kdu_model
),
1761 version
=version_str
,
1764 output
, _
= await self
._local
_async
_exec
(command
=full_command
)
1768 async def _get_replica_count_url(
1771 repo_url
: str = None,
1772 resource_name
: str = None,
1774 """Get the replica count value in the Helm Chart Values.
1777 kdu_model: The name or path of an Helm Chart
1778 repo_url: Helm Chart repository url
1779 resource_name: Resource name
1783 - The number of replicas of the specific instance; if not found, returns None; and
1784 - The string corresponding to the replica count key in the Helm values
1787 kdu_values
= yaml
.load(
1788 await self
.values_kdu(kdu_model
=kdu_model
, repo_url
=repo_url
),
1789 Loader
=yaml
.SafeLoader
,
1792 self
.log
.debug(f
"Obtained the Helm package values for the KDU: {kdu_values}")
1796 "kdu_values not found for kdu_model {}".format(kdu_model
)
1800 kdu_values
= kdu_values
.get(resource_name
, None)
1803 msg
= "resource {} not found in the values in model {}".format(
1804 resource_name
, kdu_model
1807 raise K8sException(msg
)
1809 duplicate_check
= False
1814 if kdu_values
.get("replicaCount") is not None:
1815 replicas
= kdu_values
["replicaCount"]
1816 replica_str
= "replicaCount"
1817 elif kdu_values
.get("replicas") is not None:
1818 duplicate_check
= True
1819 replicas
= kdu_values
["replicas"]
1820 replica_str
= "replicas"
1824 "replicaCount or replicas not found in the resource"
1825 "{} values in model {}. Cannot be scaled".format(
1826 resource_name
, kdu_model
1831 "replicaCount or replicas not found in the values"
1832 "in model {}. Cannot be scaled".format(kdu_model
)
1835 raise K8sException(msg
)
1837 # Control if replicas and replicaCount exists at the same time
1838 msg
= "replicaCount and replicas are exists at the same time"
1840 if "replicaCount" in kdu_values
:
1842 raise K8sException(msg
)
1844 if "replicas" in kdu_values
:
1846 raise K8sException(msg
)
1848 return replicas
, replica_str
1850 async def _get_replica_count_instance(
1855 resource_name
: str = None,
1857 """Get the replica count value in the instance.
1860 kdu_instance: The name of the KDU instance
1861 namespace: KDU instance namespace
1863 resource_name: Resource name
1866 The number of replicas of the specific instance; if not found, returns None
1869 kdu_values
= yaml
.load(
1870 await self
.get_values_kdu(kdu_instance
, namespace
, kubeconfig
),
1871 Loader
=yaml
.SafeLoader
,
1874 self
.log
.debug(f
"Obtained the Helm values for the KDU instance: {kdu_values}")
1880 kdu_values
.get(resource_name
, None) if resource_name
else None
1883 for replica_str
in ("replicaCount", "replicas"):
1885 replicas
= resource_values
.get(replica_str
)
1887 replicas
= kdu_values
.get(replica_str
)
1889 if replicas
is not None:
1894 async def _store_status(
1899 namespace
: str = None,
1900 db_dict
: dict = None,
1903 Obtains the status of the KDU instance based on Helm Charts, and stores it in the database.
1905 :param cluster_id (str): the cluster where the KDU instance is deployed
1906 :param operation (str): The operation related to the status to be updated (for instance, "install" or "upgrade")
1907 :param kdu_instance (str): The KDU instance in relation to which the status is obtained
1908 :param namespace (str): The Kubernetes namespace where the KDU instance was deployed. Defaults to None
1909 :param db_dict (dict): A dictionary with the database necessary information. It shall contain the
1910 values for the keys:
1911 - "collection": The Mongo DB collection to write to
1912 - "filter": The query filter to use in the update process
1913 - "path": The dot separated keys which targets the object to be updated
1918 detailed_status
= await self
._status
_kdu
(
1919 cluster_id
=cluster_id
,
1920 kdu_instance
=kdu_instance
,
1922 namespace
=namespace
,
1925 status
= detailed_status
.get("info").get("description")
1926 self
.log
.debug(f
"Status for KDU {kdu_instance} obtained: {status}.")
1928 # write status to db
1929 result
= await self
.write_app_status_to_db(
1932 detailed_status
=str(detailed_status
),
1933 operation
=operation
,
1937 self
.log
.info("Error writing in database. Task exiting...")
1939 except asyncio
.CancelledError
as e
:
1941 f
"Exception in method {self._store_status.__name__} (task cancelled): {e}"
1943 except Exception as e
:
1944 self
.log
.warning(f
"Exception in method {self._store_status.__name__}: {e}")
1946 # params for use in -f file
1947 # returns values file option and filename (in order to delete it at the end)
1948 def _params_to_file_option(self
, cluster_id
: str, params
: dict) -> (str, str):
1949 if params
and len(params
) > 0:
1950 self
._init
_paths
_env
(cluster_name
=cluster_id
, create_if_not_exist
=True)
1952 def get_random_number():
1953 r
= random
.randrange(start
=1, stop
=99999999)
1961 value
= params
.get(key
)
1962 if "!!yaml" in str(value
):
1963 value
= yaml
.safe_load(value
[7:])
1964 params2
[key
] = value
1966 values_file
= get_random_number() + ".yaml"
1967 with
open(values_file
, "w") as stream
:
1968 yaml
.dump(params2
, stream
, indent
=4, default_flow_style
=False)
1970 return "-f {}".format(values_file
), values_file
1974 # params for use in --set option
1976 def _params_to_set_option(params
: dict) -> str:
1978 f
"{quote(str(key))}={quote(str(value))}"
1979 for key
, value
in params
.items()
1980 if value
is not None
1984 return "--set " + ",".join(pairs
)
1987 def generate_kdu_instance_name(**kwargs
):
1988 chart_name
= kwargs
["kdu_model"]
1989 # check embeded chart (file or dir)
1990 if chart_name
.startswith("/"):
1991 # extract file or directory name
1992 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1994 elif "://" in chart_name
:
1995 # extract last portion of URL
1996 chart_name
= chart_name
[chart_name
.rfind("/") + 1 :]
1999 for c
in chart_name
:
2000 if c
.isalpha() or c
.isnumeric():
2007 # if does not start with alpha character, prefix 'a'
2008 if not name
[0].isalpha():
2013 def get_random_number():
2014 r
= random
.randrange(start
=1, stop
=99999999)
2016 s
= s
.rjust(10, "0")
2019 name
= name
+ get_random_number()
2022 def _split_version(self
, kdu_model
: str) -> (str, str):
2024 if not self
._is
_helm
_chart
_a
_file
(kdu_model
) and ":" in kdu_model
:
2025 parts
= kdu_model
.split(sep
=":")
2027 version
= str(parts
[1])
2028 kdu_model
= parts
[0]
2029 return kdu_model
, version
2031 def _split_repo(self
, kdu_model
: str) -> (str, str):
2032 """Obtain the Helm Chart's repository and Chart's names from the KDU model
2035 kdu_model (str): Associated KDU model
2038 (str, str): Tuple with the Chart name in index 0, and the repo name
2039 in index 2; if there was a problem finding them, return None
2046 idx
= kdu_model
.find("/")
2048 chart_name
= kdu_model
[idx
+ 1 :]
2049 repo_name
= kdu_model
[:idx
]
2051 return chart_name
, repo_name
2053 async def _find_repo(self
, kdu_model
: str, cluster_uuid
: str) -> str:
2054 """Obtain the Helm repository for an Helm Chart
2057 kdu_model (str): the KDU model associated with the Helm Chart instantiation
2058 cluster_uuid (str): The cluster UUID associated with the Helm Chart instantiation
2061 str: the repository URL; if Helm Chart is a local one, the function returns None
2064 _
, repo_name
= self
._split
_repo
(kdu_model
=kdu_model
)
2068 # Find repository link
2069 local_repo_list
= await self
.repo_list(cluster_uuid
)
2070 for repo
in local_repo_list
:
2071 if repo
["name"] == repo_name
:
2072 repo_url
= repo
["url"]
2073 break # it is not necessary to continue the loop if the repo link was found...