##
import asyncio
-import base64
-import binascii
import logging
import os
-import re
+from n2vc.config import ModelConfig
from n2vc.exceptions import (
N2VCBadArgumentsException,
N2VCException,
N2VCConnectionException,
N2VCExecutionException,
- N2VCInvalidCertificate,
# N2VCNotFound,
MethodNotImplemented,
JujuK8sProxycharmNotSupported,
from n2vc.n2vc_conn import N2VCConnector
from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml
from n2vc.libjuju import Libjuju
+from n2vc.utils import base64_to_cacert
class N2VCJujuConnector(N2VCConnector):
# TODO: Verify ca_cert is valid before using. VCA will crash
# if the ca_cert isn't formatted correctly.
- def base64_to_cacert(b64string):
- """Convert the base64-encoded string containing the VCA CACERT.
-
- The input string....
-
- """
- try:
- cacert = base64.b64decode(b64string).decode("utf-8")
-
- cacert = re.sub(r"\\n", r"\n", cacert,)
- except binascii.Error as e:
- self.log.debug("Caught binascii.Error: {}".format(e))
- raise N2VCInvalidCertificate(message="Invalid CA Certificate")
-
- return cacert
self.ca_cert = vca_config.get("ca_cert")
if self.ca_cert:
)
self.api_proxy = None
- if "enable_os_upgrade" in vca_config:
- self.enable_os_upgrade = vca_config["enable_os_upgrade"]
- else:
- self.enable_os_upgrade = True
-
- if "apt_mirror" in vca_config:
- self.apt_mirror = vca_config["apt_mirror"]
- else:
- self.apt_mirror = None
+ model_config = ModelConfig(vca_config)
self.cloud = vca_config.get('cloud')
self.k8s_cloud = None
self.libjuju = Libjuju(
endpoint=self.url,
api_proxy=self.api_proxy,
- enable_os_upgrade=self.enable_os_upgrade,
- apt_mirror=self.apt_mirror,
username=self.username,
password=self.secret,
cacert=self.ca_cert,
log=self.log,
db=self.db,
n2vc=self,
+ model_config=model_config,
)
# create juju pub key file in lcm container at
reuse_ee_id: str = None,
progress_timeout: float = None,
total_timeout: float = None,
+ cloud_name: str = None,
+ credential_name: str = None,
) -> (str, dict):
self.log.info(
# create or reuse a new juju machine
try:
if not await self.libjuju.model_exists(model_name):
- await self.libjuju.add_model(model_name, cloud_name=self.cloud)
+ cloud = cloud_name or self.cloud
+ credential = credential_name or cloud_name if cloud_name else self.cloud
+ await self.libjuju.add_model(
+ model_name,
+ cloud_name=cloud,
+ credential_name=credential
+ )
machine, new = await self.libjuju.create_machine(
model_name=model_name,
machine_id=machine_id,
db_dict: dict,
progress_timeout: float = None,
total_timeout: float = None,
+ cloud_name: str = None,
+ credential_name: str = None,
) -> str:
self.log.info(
# register machine on juju
try:
if not await self.libjuju.model_exists(model_name):
- await self.libjuju.add_model(model_name, cloud_name=self.cloud)
+ cloud = cloud_name or self.cloud
+ credential = credential_name or cloud_name if cloud_name else self.cloud
+ await self.libjuju.add_model(
+ model_name,
+ cloud_name=cloud,
+ credential_name=credential
+ )
machine_id = await self.libjuju.provision_machine(
model_name=model_name,
hostname=hostname,
progress_timeout: float = None,
total_timeout: float = None,
config: dict = None,
+ cloud_name: str = None,
+ credential_name: str = None,
) -> str:
"""
Install a k8s proxy charm
:param float progress_timeout:
:param float total_timeout:
:param config: Dictionary with additional configuration
+ :param cloud_name: Cloud Name in which the charms will be deployed
+ :param credential_name: Credential Name to use in the cloud_name.
+ If not set, cloud_name will be used as credential_name
:returns ee_id: execution environment id.
"""
_, ns_id, _, _, _ = self._get_namespace_components(namespace=namespace)
model_name = '{}-k8s'.format(ns_id)
-
- await self.libjuju.add_model(model_name, self.k8s_cloud)
+ if not await self.libjuju.model_exists(model_name):
+ cloud = cloud_name or self.k8s_cloud
+ credential = credential_name or cloud_name if cloud_name else self.k8s_cloud
+ await self.libjuju.add_model(
+ model_name,
+ cloud_name=cloud,
+ credential_name=credential
+ )
application_name = self._get_application_name(namespace)
try:
# return public key if exists
return output["pubkey"] if "pubkey" in output else output
+ async def get_metrics(self, model_name: str, application_name: str) -> dict:
+ return await self.libjuju.get_metrics(model_name, application_name)
+
async def add_relation(
self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str
):
self.log.info("Namespace {} deleted".format(namespace))
async def delete_execution_environment(
- self, ee_id: str, db_dict: dict = None, total_timeout: float = None
+ self, ee_id: str, db_dict: dict = None, total_timeout: float = None,
+ scaling_in: bool = False
):
self.log.info("Deleting execution environment ee_id={}".format(ee_id))
model_name, application_name, _machine_id = self._get_ee_id_components(
ee_id=ee_id
)
-
- # destroy the application
try:
- await self.libjuju.destroy_model(
- model_name=model_name, total_timeout=total_timeout
- )
+ if not scaling_in:
+ # destroy the model
+ # TODO: should this be removed?
+ await self.libjuju.destroy_model(
+ model_name=model_name, total_timeout=total_timeout
+ )
+ else:
+ # get juju model and observer
+ controller = await self.libjuju.get_controller()
+ model = await self.libjuju.get_model(controller, model_name)
+ # destroy the application
+ await self.libjuju.destroy_application(
+ model=model, application_name=application_name)
except Exception as e:
raise N2VCException(
message=(