import binascii
import base64
+from n2vc.config import ModelConfig
from n2vc.exceptions import K8sException, N2VCBadArgumentsException
from n2vc.k8s_conn import K8sConnector
from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
)
port = vca_config["port"] if "port" in vca_config else 17070
url = "{}:{}".format(vca_config["host"], port)
- enable_os_upgrade = vca_config.get("enable_os_upgrade", True)
- apt_mirror = vca_config.get("apt_mirror", None)
+ model_config = ModelConfig(vca_config)
username = vca_config["user"]
secret = vca_config["secret"]
ca_cert = base64_to_cacert(vca_config["ca_cert"])
self.libjuju = Libjuju(
endpoint=url,
api_proxy=None, # Not needed for k8s charms
- enable_os_upgrade=enable_os_upgrade,
- apt_mirror=apt_mirror,
+ model_config=model_config,
username=username,
password=secret,
cacert=ca_cert,
self,
cluster_uuid: str,
kdu_model: str,
+ kdu_instance: str,
atomic: bool = True,
timeout: float = 1800,
params: dict = None,
:param cluster_uuid str: The UUID of the cluster to install to
:param kdu_model str: The name or path of a bundle to install
+ :param kdu_instance: Kdu instance name
:param atomic bool: If set, waits until the model is active and resets
the cluster on failure.
:param timeout int: The time, in seconds, to wait for the install
os.chdir(new_workdir)
bundle = "local:{}".format(kdu_model)
- if kdu_name:
- kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
- else:
- kdu_instance = db_dict["filter"]["_id"]
-
self.log.debug("Checking for model named {}".format(kdu_instance))
# Create the new model
# await model.disconnect()
# await controller.disconnect()
os.chdir(previous_workdir)
-
- return kdu_instance
+ return True
async def instances_list(self, cluster_uuid: str) -> list:
"""
base64.b64decode(token).decode("utf-8"),
base64.b64decode(client_certificate_data).decode("utf-8"),
)
+
+ @staticmethod
+ def generate_kdu_instance_name(**kwargs):
+ db_dict = kwargs.get("db_dict")
+ kdu_name = kwargs.get("kdu_name", None)
+ if kdu_name:
+ kdu_instance = "{}-{}".format(kdu_name, db_dict["filter"]["_id"])
+ else:
+ kdu_instance = db_dict["filter"]["_id"]
+ return kdu_instance