JujuModelAlreadyExists,
JujuControllerFailedConnecting,
JujuApplicationExists,
+ JujuInvalidK8sConfiguration,
)
from n2vc.utils import DB_DATA
from osm_common.dbbase import DbException
+from kubernetes.client.configuration import Configuration
class Libjuju:
"""
await controller.disconnect()
- async def add_model(self, model_name: str, cloud_name: str):
+ async def add_model(self, model_name: str, cloud_name: str, credential_name=None):
"""
Create model
:param: model_name: Model name
:param: cloud_name: Cloud name
+ :param: credential_name: Credential name to use for adding the model
+ If not specified, same name as the cloud will be used.
"""
# Get controller
model_name,
config=self.model_config,
cloud_name=cloud_name,
- credential_name=cloud_name,
+ credential_name=credential_name or cloud_name,
)
self.models.add(model_name)
finally:
await self.disconnect_model(model)
await self.disconnect_controller(controller)
+ async def get_metrics(self, model_name: str, application_name: str) -> dict:
+ """Get the metrics collected by the VCA.
+
+ :param model_name The name or unique id of the network service
+ :param application_name The name of the application
+ """
+ if not model_name or not application_name:
+ raise Exception("model_name and application_name must be non-empty strings")
+ metrics = {}
+ controller = await self.get_controller()
+ model = await self.get_model(controller, model_name)
+ try:
+ application = self._get_application(model, application_name)
+ if application is not None:
+ metrics = await application.get_metrics()
+ finally:
+ self.disconnect_model(model)
+ self.disconnect_controller(controller)
+ return metrics
+
async def add_relation(
self, model_name: str, endpoint_1: str, endpoint_2: str,
):
finally:
await self.disconnect_controller(controller)
- async def add_k8s(self, name: str, auth_data: dict, storage_class: str):
+ async def add_k8s(
+ self,
+ name: str,
+ configuration: Configuration,
+ storage_class: str,
+ credential_name: str = None,
+ ):
"""
Add a Kubernetes cloud to the controller
Similar to the `juju add-k8s` command in the CLI
- :param: name: Name for the K8s cloud
- :param: auth_data: Dictionary with needed credentials. Format:
- {
- "server": "192.168.0.21:16443",
- "cacert": "-----BEGIN CERTIFI...",
- "token": "clhkRExRem5Xd1dCdnFEVXdvRGt...",
-
- }
- :param: storage_class: Storage Class to use in the cloud
+ :param: name: Name for the K8s cloud
+ :param: configuration: Kubernetes configuration object
+ :param: storage_class: Storage Class to use in the cloud
+ :param: credential_name: Storage Class to use in the cloud
"""
- required_auth_data_keys = ["server", "cacert", "token"]
- missing_keys = []
- for k in required_auth_data_keys:
- if k not in auth_data:
- missing_keys.append(k)
- if missing_keys:
- raise Exception(
- "missing keys in auth_data: {}".format(",".join(missing_keys))
- )
if not storage_class:
raise Exception("storage_class must be a non-empty string")
if not name:
raise Exception("name must be a non-empty string")
-
- endpoint = auth_data["server"]
- cacert = auth_data["cacert"]
- token = auth_data["token"]
- region_name = "{}-region".format(name)
-
+ if not configuration:
+ raise Exception("configuration must be provided")
+
+ endpoint = configuration.host
+ credential = self.get_k8s_cloud_credential(configuration)
+ ca_certificates = (
+ [credential.attrs["ClientCertificateData"]]
+ if "ClientCertificateData" in credential.attrs
+ else []
+ )
cloud = client.Cloud(
- auth_types=["certificate"],
- ca_certificates=[cacert],
+ type_="kubernetes",
+ auth_types=[credential.auth_type],
endpoint=endpoint,
+ ca_certificates=ca_certificates,
config={
"operator-storage": storage_class,
"workload-storage": storage_class,
},
- regions=[client.CloudRegion(endpoint=endpoint, name=region_name)],
- type_="kubernetes",
)
- cred = client.CloudCredential(
- auth_type="certificate",
- attrs={"ClientCertificateData": cacert, "Token": token},
+ return await self.add_cloud(
+ name, cloud, credential, credential_name=credential_name
)
- return await self.add_cloud(name, cloud, cred)
+
+ def get_k8s_cloud_credential(
+ self, configuration: Configuration,
+ ) -> client.CloudCredential:
+ attrs = {}
+ ca_cert = configuration.ssl_ca_cert or configuration.cert_file
+ key = configuration.key_file
+ api_key = configuration.api_key
+ token = None
+ username = configuration.username
+ password = configuration.password
+
+ if "authorization" in api_key:
+ authorization = api_key["authorization"]
+ if "Bearer " in authorization:
+ bearer_list = authorization.split(" ")
+ if len(bearer_list) == 2:
+ [_, token] = bearer_list
+ else:
+ raise JujuInvalidK8sConfiguration("unknown format of api_key")
+ else:
+ token = authorization
+ if ca_cert:
+ attrs["ClientCertificateData"] = open(ca_cert, "r").read()
+ if key:
+ attrs["ClientKeyData"] = open(key, "r").read()
+ if token:
+ if username or password:
+ raise JujuInvalidK8sConfiguration("Cannot set both token and user/pass")
+ attrs["Token"] = token
+
+ auth_type = None
+ if key:
+ auth_type = "oauth2"
+ if not token:
+ raise JujuInvalidK8sConfiguration(
+ "missing token for auth type {}".format(auth_type)
+ )
+ elif username:
+ if not password:
+ self.log.debug(
+ "credential for user {} has empty password".format(username)
+ )
+ attrs["username"] = username
+ attrs["password"] = password
+ if ca_cert:
+ auth_type = "userpasswithcert"
+ else:
+ auth_type = "userpass"
+ elif ca_cert and token:
+ auth_type = "certificate"
+ else:
+ raise JujuInvalidK8sConfiguration("authentication method not supported")
+ return client.CloudCredential(auth_type=auth_type, attrs=attrs,)
async def add_cloud(
- self, name: str, cloud: Cloud, credential: CloudCredential = None
+ self,
+ name: str,
+ cloud: Cloud,
+ credential: CloudCredential = None,
+ credential_name: str = None,
) -> Cloud:
"""
Add cloud to the controller
- :param: name: Name of the cloud to be added
- :param: cloud: Cloud object
- :param: credential: CloudCredentials object for the cloud
+ :param: name: Name of the cloud to be added
+ :param: cloud: Cloud object
+ :param: credential: CloudCredentials object for the cloud
+ :param: credential_name: Credential name.
+ If not defined, cloud of the name will be used.
"""
controller = await self.get_controller()
try:
_ = await controller.add_cloud(name, cloud)
if credential:
- await controller.add_credential(name, credential=credential, cloud=name)
+ await controller.add_credential(
+ credential_name or name, credential=credential, cloud=name
+ )
# Need to return the object returned by the controller.add_cloud() function
# I'm returning the original value now until this bug is fixed:
# https://github.com/juju/python-libjuju/issues/443