# limitations under the License.
import asyncio
+from typing import Union
import os
import uuid
import yaml
import tempfile
import binascii
-import base64
from n2vc.config import EnvironConfig
+from n2vc.definitions import RelationEndpoint
from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
-from n2vc.kubectl import Kubectl, CORE_CLIENT, RBAC_CLIENT
+from n2vc.kubectl import Kubectl
from .exceptions import MethodNotImplemented
from n2vc.libjuju import Libjuju
from n2vc.utils import obj_to_dict, obj_to_yaml
from n2vc.store import MotorStore
from n2vc.vca.cloud import Cloud
from n2vc.vca.connection import get_connection
-from kubernetes.client.models import (
- V1ClusterRole,
- V1ObjectMeta,
- V1PolicyRule,
- V1ServiceAccount,
- V1ClusterRoleBinding,
- V1RoleRef,
- V1Subject,
-)
-
-from typing import Dict
-
-SERVICE_ACCOUNT_TOKEN_KEY = "token"
-SERVICE_ACCOUNT_ROOT_CA_KEY = "ca.crt"
-RBAC_LABEL_KEY_NAME = "rbac-id"
-ADMIN_NAMESPACE = "kube-system"
+
+RBAC_LABEL_KEY_NAME = "rbac-id"
RBAC_STACK_PREFIX = "juju-credential"
db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
self._store = MotorStore(db_uri)
self.loading_libjuju = asyncio.Lock(loop=self.loop)
+ self.uninstall_locks = {}
self.log.debug("K8S Juju connector initialized")
# TODO: Remove these commented lines:
libjuju = await self._get_libjuju(kwargs.get("vca_id"))
cluster_uuid = reuse_cluster_uuid or str(uuid.uuid4())
-
- kubecfg = tempfile.NamedTemporaryFile()
- with open(kubecfg.name, "w") as kubecfg_file:
- kubecfg_file.write(k8s_creds)
- kubectl = Kubectl(config_file=kubecfg.name)
+ kubectl = self._get_kubectl(k8s_creds)
# CREATING RESOURCES IN K8S
rbac_id = generate_rbac_id()
# if it fails in the middle of the process
cleanup_data = []
try:
- self._create_cluster_role(
- kubectl,
+ self.log.debug("Initializing K8s cluster for juju")
+ kubectl.create_cluster_role(
name=metadata_name,
labels=labels,
)
+ self.log.debug("Cluster role created")
cleanup_data.append(
{
- "delete": self._delete_cluster_role,
- "args": (kubectl, metadata_name),
+ "delete": kubectl.delete_cluster_role,
+ "args": (metadata_name,),
}
)
- self._create_service_account(
- kubectl,
+ kubectl.create_service_account(
name=metadata_name,
labels=labels,
)
+ self.log.debug("Service account created")
cleanup_data.append(
{
- "delete": self._delete_service_account,
- "args": (kubectl, metadata_name),
+ "delete": kubectl.delete_service_account,
+ "args": (metadata_name,),
}
)
- self._create_cluster_role_binding(
- kubectl,
+ kubectl.create_cluster_role_binding(
name=metadata_name,
labels=labels,
)
+ self.log.debug("Role binding created")
cleanup_data.append(
{
- "delete": self._delete_service_account,
- "args": (kubectl, metadata_name),
+ "delete": kubectl.delete_service_account,
+ "args": (metadata_name,),
}
)
- token, client_cert_data = await self._get_secret_data(
- kubectl,
+ token, client_cert_data = await kubectl.get_secret_data(
metadata_name,
)
default_storage_class = kubectl.get_default_storage_class()
+ self.log.debug("Default storage class: {}".format(default_storage_class))
await libjuju.add_k8s(
name=cluster_uuid,
rbac_id=rbac_id,
storage_class=default_storage_class,
credential_name=self._get_credential_name(cluster_uuid),
)
+ self.log.debug("K8s cluster added to juju controller")
return cluster_uuid, True
except Exception as e:
- self.log.error("Error initializing k8scluster: {}".format(e))
+ self.log.error("Error initializing k8scluster: {}".format(e), exc_info=True)
if len(cleanup_data) > 0:
self.log.debug("Cleaning up created resources in k8s cluster...")
for item in cleanup_data:
name: str,
url: str,
_type: str = "charm",
+ cert: str = None,
+ user: str = None,
+ password: str = None,
):
raise MethodNotImplemented()
await libjuju.remove_cloud(cluster_uuid)
- kubecfg = self.get_credentials(cluster_uuid=cluster_uuid)
+ credentials = self.get_credentials(cluster_uuid=cluster_uuid)
- kubecfg_file = tempfile.NamedTemporaryFile()
- with open(kubecfg_file.name, "w") as f:
- f.write(kubecfg)
- kubectl = Kubectl(config_file=kubecfg_file.name)
+ kubectl = self._get_kubectl(credentials)
delete_functions = [
- self._delete_cluster_role_binding,
- self._delete_service_account,
- self._delete_cluster_role,
+ kubectl.delete_cluster_role_binding,
+ kubectl.delete_service_account,
+ kubectl.delete_cluster_role,
]
credential_attrs = cloud_creds[0].result["attrs"]
if RBAC_LABEL_KEY_NAME in credential_attrs:
rbac_id = credential_attrs[RBAC_LABEL_KEY_NAME]
metadata_name = "{}-{}".format(RBAC_STACK_PREFIX, rbac_id)
- delete_args = (kubectl, metadata_name)
for delete_func in delete_functions:
try:
- delete_func(*delete_args)
+ delete_func(metadata_name)
except Exception as e:
self.log.warning("Cannot remove resource in K8s {}".format(e))
raise K8sException("bundle must be set")
if bundle.startswith("cs:"):
+ # For Juju Bundles provided by the Charm Store
+ pass
+ elif bundle.startswith("ch:"):
+ # For Juju Bundles provided by the Charm Hub (this only works for juju version >= 2.9)
pass
elif bundle.startswith("http"):
# Download the file
"""Scale an application in a model
:param: kdu_instance str: KDU instance name
- :param: scale int: Scale to which to set this application
- :param: resource_name str: Resource name (Application name)
+ :param: scale int: Scale to which to set the application
+ :param: resource_name str: The application name in the Juju Bundle
:param: timeout float: The time, in seconds, to wait for the install
to finish
:param kwargs: Additional parameters
) -> int:
"""Get an application scale count
- :param: resource_name str: Resource name (Application name)
+ :param: resource_name str: The application name in the Juju Bundle
:param: kdu_instance str: KDU instance name
:param kwargs: Additional parameters
vca_id (str): VCA ID
:return: Return application instance count
"""
+
try:
libjuju = await self._get_libjuju(kwargs.get("vca_id"))
status = await libjuju.get_model_status(kdu_instance)
"""
self.log.debug("[uninstall] Destroying model")
- libjuju = await self._get_libjuju(kwargs.get("vca_id"))
- await libjuju.destroy_model(kdu_instance, total_timeout=3600)
+ will_not_delete = False
+ if kdu_instance not in self.uninstall_locks:
+ self.uninstall_locks[kdu_instance] = asyncio.Lock(loop=self.loop)
+ delete_lock = self.uninstall_locks[kdu_instance]
+
+ while delete_lock.locked():
+ will_not_delete = True
+ await asyncio.sleep(0.1)
+
+ if will_not_delete:
+ self.log.info("Model {} deleted by another worker.".format(kdu_instance))
+ return True
- # self.log.debug("[uninstall] Model destroyed and disconnecting")
- # await controller.disconnect()
+ try:
+ async with delete_lock:
+ libjuju = await self._get_libjuju(kwargs.get("vca_id"))
+
+ await libjuju.destroy_model(kdu_instance, total_timeout=3600)
+ finally:
+ self.uninstall_locks.pop(kdu_instance)
+ self.log.debug(f"[uninstall] Model {kdu_instance} destroyed")
return True
- # TODO: Remove these commented lines
- # if not self.authenticated:
- # self.log.debug("[uninstall] Connecting to controller")
- # await self.login(cluster_uuid)
+
+ async def upgrade_charm(
+ self,
+ ee_id: str = None,
+ path: str = None,
+ charm_id: str = None,
+ charm_type: str = None,
+ timeout: float = None,
+ ) -> str:
+ """This method upgrade charms in VNFs
+
+ Args:
+ ee_id: Execution environment id
+ path: Local path to the charm
+ charm_id: charm-id
+ charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
+ timeout: (Float) Timeout for the ns update operation
+
+ Returns:
+ The output of the update operation if status equals to "completed"
+ """
+ raise K8sException(
+ "KDUs deployed with Juju Bundle do not support charm upgrade"
+ )
async def exec_primitive(
self,
complete_status: bool = False,
yaml_format: bool = False,
**kwargs,
- ) -> dict:
+ ) -> Union[str, dict]:
"""Get the status of the KDU
Get the current status of the KDU instance.
return status
+ async def add_relation(
+ self,
+ provider: RelationEndpoint,
+ requirer: RelationEndpoint,
+ ):
+ """
+ Add relation between two charmed endpoints
+
+ :param: provider: Provider relation endpoint
+ :param: requirer: Requirer relation endpoint
+ """
+ self.log.debug(f"adding new relation between {provider} and {requirer}")
+ cross_model_relation = (
+ provider.model_name != requirer.model_name
+ or requirer.vca_id != requirer.vca_id
+ )
+ try:
+ if cross_model_relation:
+ # Cross-model relation
+ provider_libjuju = await self._get_libjuju(provider.vca_id)
+ requirer_libjuju = await self._get_libjuju(requirer.vca_id)
+ offer = await provider_libjuju.offer(provider)
+ if offer:
+ saas_name = await requirer_libjuju.consume(
+ requirer.model_name, offer, provider_libjuju
+ )
+ await requirer_libjuju.add_relation(
+ requirer.model_name,
+ requirer.endpoint,
+ saas_name,
+ )
+ else:
+ # Standard relation
+ vca_id = provider.vca_id
+ model = provider.model_name
+ libjuju = await self._get_libjuju(vca_id)
+ # add juju relations between two applications
+ await libjuju.add_relation(
+ model_name=model,
+ endpoint_1=provider.endpoint,
+ endpoint_2=requirer.endpoint,
+ )
+ except Exception as e:
+ message = f"Error adding relation between {provider} and {requirer}: {e}"
+ self.log.error(message)
+ raise Exception(message=message)
+
async def update_vca_status(self, vcastatus: dict, kdu_instance: str, **kwargs):
"""
Add all configs, actions, executed actions of all applications in a model to vcastatus dict
for application in vcastatus[model_name]["applications"]:
# Adding application actions
- vcastatus[model_name]["applications"][application][
- "actions"
- ] = await libjuju.get_actions(application, kdu_instance)
+ vcastatus[model_name]["applications"][application]["actions"] = {}
# Adding application configs
vcastatus[model_name]["applications"][application][
"configs"
"""Return a list of services of a kdu_instance"""
credentials = self.get_credentials(cluster_uuid=cluster_uuid)
-
- kubecfg = tempfile.NamedTemporaryFile()
- with open(kubecfg.name, "w") as kubecfg_file:
- kubecfg_file.write(credentials)
- kubectl = Kubectl(config_file=kubecfg.name)
-
+ kubectl = self._get_kubectl(credentials)
return kubectl.get_services(
field_selector="metadata.namespace={}".format(kdu_instance)
)
"""Return data for a specific service inside a namespace"""
credentials = self.get_credentials(cluster_uuid=cluster_uuid)
-
- kubecfg = tempfile.NamedTemporaryFile()
- with open(kubecfg.name, "w") as kubecfg_file:
- kubecfg_file.write(credentials)
- kubectl = Kubectl(config_file=kubecfg.name)
-
+ kubectl = self._get_kubectl(credentials)
return kubectl.get_services(
field_selector="metadata.name={},metadata.namespace={}".format(
service_name, namespace
"""
pass
- def _create_cluster_role(
- self,
- kubectl: Kubectl,
- name: str,
- labels: Dict[str, str],
- ):
- cluster_roles = kubectl.clients[RBAC_CLIENT].list_cluster_role(
- field_selector="metadata.name={}".format(name)
- )
-
- if len(cluster_roles.items) > 0:
- raise Exception(
- "Cluster role with metadata.name={} already exists".format(name)
- )
-
- metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
- # Cluster role
- cluster_role = V1ClusterRole(
- metadata=metadata,
- rules=[
- V1PolicyRule(api_groups=["*"], resources=["*"], verbs=["*"]),
- V1PolicyRule(non_resource_ur_ls=["*"], verbs=["*"]),
- ],
- )
-
- kubectl.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
-
- def _delete_cluster_role(self, kubectl: Kubectl, name: str):
- kubectl.clients[RBAC_CLIENT].delete_cluster_role(name)
-
- def _create_service_account(
- self,
- kubectl: Kubectl,
- name: str,
- labels: Dict[str, str],
- ):
- service_accounts = kubectl.clients[CORE_CLIENT].list_namespaced_service_account(
- ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
- )
- if len(service_accounts.items) > 0:
- raise Exception(
- "Service account with metadata.name={} already exists".format(name)
- )
-
- metadata = V1ObjectMeta(name=name, labels=labels, namespace=ADMIN_NAMESPACE)
- service_account = V1ServiceAccount(metadata=metadata)
-
- kubectl.clients[CORE_CLIENT].create_namespaced_service_account(
- ADMIN_NAMESPACE, service_account
- )
-
- def _delete_service_account(self, kubectl: Kubectl, name: str):
- kubectl.clients[CORE_CLIENT].delete_namespaced_service_account(
- name, ADMIN_NAMESPACE
- )
-
- def _create_cluster_role_binding(
- self,
- kubectl: Kubectl,
- name: str,
- labels: Dict[str, str],
- ):
- role_bindings = kubectl.clients[RBAC_CLIENT].list_cluster_role_binding(
- field_selector="metadata.name={}".format(name)
- )
- if len(role_bindings.items) > 0:
- raise Exception("Generated rbac id already exists")
-
- role_binding = V1ClusterRoleBinding(
- metadata=V1ObjectMeta(name=name, labels=labels),
- role_ref=V1RoleRef(kind="ClusterRole", name=name, api_group=""),
- subjects=[
- V1Subject(kind="ServiceAccount", name=name, namespace=ADMIN_NAMESPACE)
- ],
- )
- kubectl.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
-
- def _delete_cluster_role_binding(self, kubectl: Kubectl, name: str):
- kubectl.clients[RBAC_CLIENT].delete_cluster_role_binding(name)
-
- async def _get_secret_data(self, kubectl: Kubectl, name: str) -> (str, str):
- v1_core = kubectl.clients[CORE_CLIENT]
-
- retries_limit = 10
- secret_name = None
- while True:
- retries_limit -= 1
- service_accounts = v1_core.list_namespaced_service_account(
- ADMIN_NAMESPACE, field_selector="metadata.name={}".format(name)
- )
- if len(service_accounts.items) == 0:
- raise Exception(
- "Service account not found with metadata.name={}".format(name)
- )
- service_account = service_accounts.items[0]
- if service_account.secrets and len(service_account.secrets) > 0:
- secret_name = service_account.secrets[0].name
- if secret_name is not None or not retries_limit:
- break
- if not secret_name:
- raise Exception(
- "Failed getting the secret from service account {}".format(name)
- )
- secret = v1_core.list_namespaced_secret(
- ADMIN_NAMESPACE,
- field_selector="metadata.name={}".format(secret_name),
- ).items[0]
-
- token = secret.data[SERVICE_ACCOUNT_TOKEN_KEY]
- client_certificate_data = secret.data[SERVICE_ACCOUNT_ROOT_CA_KEY]
-
- return (
- base64.b64decode(token).decode("utf-8"),
- base64.b64decode(client_certificate_data).decode("utf-8"),
- )
-
@staticmethod
def generate_kdu_instance_name(**kwargs):
db_dict = kwargs.get("db_dict")
log=self.log,
n2vc=self,
)
+
+ def _get_kubectl(self, credentials: str) -> Kubectl:
+ """
+ Get Kubectl object
+
+ :param: kubeconfig_credentials: Kubeconfig credentials
+ """
+ kubecfg = tempfile.NamedTemporaryFile()
+ with open(kubecfg.name, "w") as kubecfg_file:
+ kubecfg_file.write(credentials)
+ return Kubectl(config_file=kubecfg.name)