# devops-stages/stage-build.sh
#
-FROM ubuntu:20.04
+FROM ubuntu:22.04
ARG APT_PROXY
RUN if [ ! -z $APT_PROXY ] ; then \
python3 \
python3-all \
python3-dev \
- python3-setuptools
+ python3-setuptools \
+ python3-pip \
+ tox
-RUN python3 -m easy_install pip==21.3.1
-RUN pip install tox==3.24.5
+ENV LC_ALL C.UTF-8
+ENV LANG C.UTF-8
rm -rf dists
mkdir -p pool/$MDG
mv deb_dist/*.deb pool/$MDG/
-mkdir -p dists/unstable/$MDG/binary-amd64/
-apt-ftparchive packages pool/$MDG > dists/unstable/$MDG/binary-amd64/Packages
-gzip -9fk dists/unstable/$MDG/binary-amd64/Packages
-echo "dists/**,pool/$MDG/*.deb"
+
self.log.debug("status={}".format(status))
try:
-
the_table = db_dict["collection"]
the_filter = db_dict["filter"]
the_path = db_dict["path"]
# contact with: nfvlabs@tid.es
##
from typing import Union
+from shlex import quote
import os
import yaml
if namespace and namespace != "kube-system":
if not await self._namespace_exists(cluster_uuid, namespace):
try:
+ # TODO: refactor to use kubernetes API client
await self._create_namespace(cluster_uuid, namespace)
except Exception as e:
if not await self._namespace_exists(cluster_uuid, namespace):
return True
async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
self.log.debug(
"inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
)
return namespace in namespaces if namespaces else False
async def _get_namespaces(self, cluster_id: str):
-
self.log.debug("get namespaces cluster_id {}".format(cluster_id))
# init config, env
)
command = "{} --kubeconfig={} get namespaces -o=yaml".format(
- self.kubectl_command, paths["kube_config"]
+ self.kubectl_command, quote(paths["kube_config"])
)
output, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
return namespaces
async def _create_namespace(self, cluster_id: str, namespace: str):
-
self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
# init config, env
)
command = "{} --kubeconfig={} create namespace {}".format(
- self.kubectl_command, paths["kube_config"], namespace
+ self.kubectl_command, quote(paths["kube_config"]), quote(namespace)
)
_, _rc = await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
async def _get_services(
self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str
):
-
# init config, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
)
command1 = "env KUBECONFIG={} {} get manifest {} --namespace={}".format(
- kubeconfig, self._helm_command, kdu_instance, namespace
+ kubeconfig, self._helm_command, quote(kdu_instance), quote(namespace)
+ )
+ command2 = "{} get --namespace={} -f -".format(
+ self.kubectl_command, quote(namespace)
)
- command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
output, _rc = await self._local_async_exec_pipe(
command1, command2, env=env, raise_exception_on_error=True
)
if namespace != "kube-system":
namespaces = await self._get_namespaces(cluster_id)
if namespace not in namespaces:
+ # TODO: refactor to use kubernetes API client
await self._create_namespace(cluster_id, namespace)
repo_list = await self.repo_list(cluster_id)
pass
async def _instances_list(self, cluster_id: str):
-
# init paths, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
Args:
show_command: the second part of the command (`helm show <show_command>`)
- kdu_model: The name or path of an Helm Chart
- repo_url: Helm Chart repository url
+ kdu_model: The name or path of a Helm Chart
+ repo_str: Helm Chart repository url
version: constraint with specific version of the Chart to use
Returns:
"""
inspect_command = "{} show {} {}{} {}".format(
- self._helm_command, show_command, kdu_model, repo_str, version
+ self._helm_command, show_command, quote(kdu_model), repo_str, version
)
return inspect_command
):
get_command = (
"env KUBECONFIG={} {} get {} {} --namespace={} --output yaml".format(
- kubeconfig, self._helm_command, get_command, kdu_instance, namespace
+ kubeconfig,
+ self._helm_command,
+ get_command,
+ quote(kdu_instance),
+ quote(namespace),
)
)
return get_command
yaml_format: bool = False,
show_error_log: bool = False,
) -> Union[str, dict]:
-
self.log.debug(
"status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
)
cluster_name=cluster_id, create_if_not_exist=True
)
command = "env KUBECONFIG={} {} status {} --namespace={} --output yaml".format(
- paths["kube_config"], self._helm_command, kdu_instance, namespace
+ paths["kube_config"],
+ self._helm_command,
+ quote(kdu_instance),
+ quote(namespace),
)
output, rc = await self._local_async_exec(
timeout: float,
kubeconfig: str,
) -> str:
-
timeout_str = ""
if timeout:
timeout_str = "--timeout {}s".format(timeout)
# namespace
namespace_str = ""
if namespace:
- namespace_str = "--namespace {}".format(namespace)
+ namespace_str = "--namespace {}".format(quote(namespace))
# version
version_str = ""
"{params} {timeout} {ns} {model} {ver}".format(
kubeconfig=kubeconfig,
helm=self._helm_command,
- name=kdu_instance,
+ name=quote(kdu_instance),
atomic=atomic_str,
params=params_str,
timeout=timeout_str,
ns=namespace_str,
- model=kdu_model,
+ model=quote(kdu_model),
ver=version_str,
)
)
# version
version_str = ""
if version:
- version_str = "--version {}".format(version)
+ version_str = "--version {}".format(quote(version))
# namespace
namespace_str = ""
if namespace:
- namespace_str = "--namespace {}".format(namespace)
+ namespace_str = "--namespace {}".format(quote(namespace))
command = (
"env KUBECONFIG={kubeconfig} {helm} upgrade {name} {model} {namespace} {atomic} {force}"
).format(
kubeconfig=kubeconfig,
helm=self._helm_command,
- name=kdu_instance,
+ name=quote(kdu_instance),
namespace=namespace_str,
atomic=atomic_str,
force=force_str,
params=params_str,
timeout=timeout_str,
- model=kdu_model,
+ model=quote(kdu_model),
ver=version_str,
)
return command
self, kdu_instance: str, namespace: str, revision: float, kubeconfig: str
) -> str:
return "env KUBECONFIG={} {} rollback {} {} --namespace={} --wait".format(
- kubeconfig, self._helm_command, kdu_instance, revision, namespace
+ kubeconfig,
+ self._helm_command,
+ quote(kdu_instance),
+ revision,
+ quote(namespace),
)
def _get_uninstall_command(
self, kdu_instance: str, namespace: str, kubeconfig: str
) -> str:
-
return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
- kubeconfig, self._helm_command, kdu_instance, namespace
+ kubeconfig, self._helm_command, quote(kdu_instance), quote(namespace)
)
def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
import abc
import asyncio
from typing import Union
+from shlex import quote
import random
import time
import shlex
import os
import yaml
from uuid import uuid4
+from urllib.parse import urlparse
from n2vc.config import EnvironConfig
from n2vc.exceptions import K8sException
from n2vc.k8s_conn import K8sConnector
+from n2vc.kubectl import Kubectl
class K8sHelmBaseConnector(K8sConnector):
namespace: str = "kube-system",
reuse_cluster_uuid=None,
**kwargs,
- ) -> (str, bool):
+ ) -> tuple[str, bool]:
"""
It prepares a given K8s cluster environment to run Charts
cert: str = None,
user: str = None,
password: str = None,
+ oci: bool = False,
):
self.log.debug(
"Cluster {}, adding {} repository {}. URL: {}".format(
# sync local dir
self.fs.sync(from_path=cluster_uuid)
- # helm repo add name url
- command = ("env KUBECONFIG={} {} repo add {} {}").format(
- paths["kube_config"], self._helm_command, name, url
- )
+ if oci:
+ if user and password:
+ host_port = urlparse(url).netloc if url.startswith("oci://") else url
+ # helm registry login url
+ command = "env KUBECONFIG={} {} registry login {}".format(
+ paths["kube_config"], self._helm_command, quote(host_port)
+ )
+ else:
+ self.log.debug(
+ "OCI registry login is not needed for repo: {}".format(name)
+ )
+ return
+ else:
+ # helm repo add name url
+ command = "env KUBECONFIG={} {} repo add {} {}".format(
+ paths["kube_config"], self._helm_command, quote(name), quote(url)
+ )
if cert:
temp_cert_file = os.path.join(
os.makedirs(os.path.dirname(temp_cert_file), exist_ok=True)
with open(temp_cert_file, "w") as the_cert:
the_cert.write(cert)
- command += " --ca-file {}".format(temp_cert_file)
+ command += " --ca-file {}".format(quote(temp_cert_file))
if user:
- command += " --username={}".format(user)
+ command += " --username={}".format(quote(user))
if password:
- command += " --password={}".format(password)
+ command += " --password={}".format(quote(password))
self.log.debug("adding repo: {}".format(command))
await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
)
- # helm repo update
- command = "env KUBECONFIG={} {} repo update {}".format(
- paths["kube_config"], self._helm_command, name
- )
- self.log.debug("updating repo: {}".format(command))
- await self._local_async_exec(
- command=command, raise_exception_on_error=False, env=env
- )
+ if not oci:
+ # helm repo update
+ command = "env KUBECONFIG={} {} repo update {}".format(
+ paths["kube_config"], self._helm_command, quote(name)
+ )
+ self.log.debug("updating repo: {}".format(command))
+ await self._local_async_exec(
+ command=command, raise_exception_on_error=False, env=env
+ )
# sync fs
self.fs.reverse_sync(from_path=cluster_uuid)
self.fs.sync(from_path=cluster_uuid)
# helm repo update
- command = "{} repo update {}".format(self._helm_command, name)
+ command = "{} repo update {}".format(self._helm_command, quote(name))
self.log.debug("updating repo: {}".format(command))
await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
self.fs.sync(from_path=cluster_uuid)
command = "env KUBECONFIG={} {} repo remove {}".format(
- paths["kube_config"], self._helm_command, name
+ paths["kube_config"], self._helm_command, quote(name)
)
await self._local_async_exec(
command=command, raise_exception_on_error=True, env=env
def _is_helm_chart_a_file(self, chart_name: str):
return chart_name.count("/") > 1
+ @staticmethod
+ def _is_helm_chart_a_url(chart_name: str):
+ result = urlparse(chart_name)
+ return all([result.scheme, result.netloc])
+
async def _install_impl(
self,
cluster_id: str,
cluster_id=cluster_id, params=params
)
- # version
- kdu_model, version = self._split_version(kdu_model)
-
- _, repo = self._split_repo(kdu_model)
- if repo:
- await self.repo_update(cluster_id, repo)
+ kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_id)
command = self._get_install_command(
kdu_model,
output, rc = exec_task.result()
else:
-
output, rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
cluster_id=cluster_uuid, params=params
)
- # version
- kdu_model, version = self._split_version(kdu_model)
-
- _, repo = self._split_repo(kdu_model)
- if repo:
- await self.repo_update(cluster_uuid, repo)
+ kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid)
command = self._get_upgrade_command(
kdu_model,
self.log.debug("upgrading: {}".format(command))
if atomic:
-
# exec helm in a task
exec_task = asyncio.ensure_future(
coro_or_future=self._local_async_exec(
output, rc = exec_task.result()
else:
-
output, rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
)
# version
- kdu_model, version = self._split_version(kdu_model)
+ kdu_model, version = await self._prepare_helm_chart(kdu_model, cluster_uuid)
repo_url = await self._find_repo(kdu_model, cluster_uuid)
async def get_service(
self, cluster_uuid: str, service_name: str, namespace: str
) -> object:
-
self.log.debug(
"get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
service_name, namespace, cluster_uuid
async def get_values_kdu(
self, kdu_instance: str, namespace: str, kubeconfig: str
) -> str:
-
self.log.debug("get kdu_instance values {}".format(kdu_instance))
return await self._exec_get_command(
)
async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
self.log.debug(
"inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
)
)
async def synchronize_repos(self, cluster_uuid: str):
-
self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
try:
db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
# add repo
self.log.debug("add repo {}".format(db_repo["name"]))
- if "ca_cert" in db_repo:
- await self.repo_add(
- cluster_uuid,
- db_repo["name"],
- db_repo["url"],
- cert=db_repo["ca_cert"],
- )
- else:
- await self.repo_add(
- cluster_uuid,
- db_repo["name"],
- db_repo["url"],
- )
+ await self.repo_add(
+ cluster_uuid,
+ db_repo["name"],
+ db_repo["url"],
+ cert=db_repo.get("ca_cert"),
+ user=db_repo.get("user"),
+ password=db_repo.get("password"),
+ oci=db_repo.get("oci", False),
+ )
added_repo_dict[repo_id] = db_repo["name"]
except Exception as e:
raise K8sException(
show_error_log: bool = True,
encode_utf8: bool = False,
env: dict = None,
- ) -> (str, int):
-
+ ) -> tuple[str, int]:
command = K8sHelmBaseConnector._remove_multiple_spaces(command)
self.log.debug(
"Executing async local command: {}, env: {}".format(command, env)
encode_utf8: bool = False,
env: dict = None,
):
-
command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
command = "{} | {}".format(command1, command2)
)
command = "{} --kubeconfig={} --namespace={} get service {} -o=yaml".format(
- self.kubectl_command, paths["kube_config"], namespace, service_name
+ self.kubectl_command,
+ paths["kube_config"],
+ quote(namespace),
+ quote(service_name),
)
output, _rc = await self._local_async_exec(
repo_str = ""
if repo_url:
- repo_str = " --repo {}".format(repo_url)
+ repo_str = " --repo {}".format(quote(repo_url))
# Obtain the Chart's name and store it in the var kdu_model
kdu_model, _ = self._split_repo(kdu_model=kdu_model)
kdu_model, version = self._split_version(kdu_model)
if version:
- version_str = "--version {}".format(version)
+ version_str = "--version {}".format(quote(version))
else:
version_str = ""
full_command = self._get_inspect_command(
show_command=inspect_command,
- kdu_model=kdu_model,
+ kdu_model=quote(kdu_model),
repo_str=repo_str,
version=version_str,
)
kdu_model: str,
repo_url: str = None,
resource_name: str = None,
- ) -> (int, str):
+ ) -> tuple[int, str]:
"""Get the replica count value in the Helm Chart Values.
Args:
# params for use in -f file
# returns values file option and filename (in order to delete it at the end)
- def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
-
+ def _params_to_file_option(self, cluster_id: str, params: dict) -> tuple[str, str]:
if params and len(params) > 0:
self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
def get_random_number():
- r = random.randrange(start=1, stop=99999999)
+ r = random.SystemRandom().randint(1, 99999999)
s = str(r)
while len(s) < 10:
s = "0" + s
# params for use in --set option
@staticmethod
def _params_to_set_option(params: dict) -> str:
- params_str = ""
- if params and len(params) > 0:
- start = True
- for key in params:
- value = params.get(key, None)
- if value is not None:
- if start:
- params_str += "--set "
- start = False
- else:
- params_str += ","
- params_str += "{}={}".format(key, value)
- return params_str
+ pairs = [
+ f"{quote(str(key))}={quote(str(value))}"
+ for key, value in params.items()
+ if value is not None
+ ]
+ if not pairs:
+ return ""
+ return "--set " + ",".join(pairs)
@staticmethod
def generate_kdu_instance_name(**kwargs):
name += "-"
def get_random_number():
- r = random.randrange(start=1, stop=99999999)
+ r = random.SystemRandom().randint(1, 99999999)
s = str(r)
s = s.rjust(10, "0")
return s
name = name + get_random_number()
return name.lower()
- def _split_version(self, kdu_model: str) -> (str, str):
+ def _split_version(self, kdu_model: str) -> tuple[str, str]:
version = None
- if not self._is_helm_chart_a_file(kdu_model) and ":" in kdu_model:
+ if (
+ not (
+ self._is_helm_chart_a_file(kdu_model)
+ or self._is_helm_chart_a_url(kdu_model)
+ )
+ and ":" in kdu_model
+ ):
parts = kdu_model.split(sep=":")
if len(parts) == 2:
version = str(parts[1])
kdu_model = parts[0]
return kdu_model, version
- def _split_repo(self, kdu_model: str) -> (str, str):
+ def _split_repo(self, kdu_model: str) -> tuple[str, str]:
"""Obtain the Helm Chart's repository and Chart's names from the KDU model
Args:
repo_name = None
idx = kdu_model.find("/")
- if idx >= 0:
+ if not self._is_helm_chart_a_url(kdu_model) and idx >= 0:
chart_name = kdu_model[idx + 1 :]
repo_name = kdu_model[:idx]
break # it is not necessary to continue the loop if the repo link was found...
return repo_url
+
+ def _repo_to_oci_url(self, repo):
+ db_repo = self.db.get_one("k8srepos", {"name": repo}, fail_on_empty=False)
+ if db_repo and "oci" in db_repo:
+ return db_repo.get("url")
+
+ async def _prepare_helm_chart(self, kdu_model, cluster_id):
+ # e.g.: "stable/openldap", "1.0"
+ kdu_model, version = self._split_version(kdu_model)
+ # e.g.: "openldap, stable"
+ chart_name, repo = self._split_repo(kdu_model)
+ if repo and chart_name: # repo/chart case
+ oci_url = self._repo_to_oci_url(repo)
+ if oci_url: # oci does not require helm repo update
+ kdu_model = f"{oci_url.rstrip('/')}/{chart_name.lstrip('/')}" # urljoin doesn't work for oci schema
+ else:
+ await self.repo_update(cluster_id, repo)
+ return kdu_model, version
+
+ async def create_certificate(
+ self, cluster_uuid, namespace, dns_prefix, name, secret_name, usage
+ ):
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.create_certificate(
+ namespace=namespace,
+ name=name,
+ dns_prefix=dns_prefix,
+ secret_name=secret_name,
+ usages=[usage],
+ issuer_name="ca-issuer",
+ )
+
+ async def delete_certificate(self, cluster_uuid, namespace, certificate_name):
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.delete_certificate(namespace, certificate_name)
+
+ async def create_namespace(
+ self,
+ namespace,
+ cluster_uuid,
+ labels,
+ ):
+ """
+ Create a namespace in a specific cluster
+
+ :param namespace: Namespace to be created
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :param labels: Dictionary with labels for the new namespace
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.create_namespace(
+ name=namespace,
+ labels=labels,
+ )
+
+ async def delete_namespace(
+ self,
+ namespace,
+ cluster_uuid,
+ ):
+ """
+ Delete a namespace in a specific cluster
+
+ :param namespace: namespace to be deleted
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.delete_namespace(
+ name=namespace,
+ )
+
+ async def copy_secret_data(
+ self,
+ src_secret: str,
+ dst_secret: str,
+ cluster_uuid: str,
+ data_key: str,
+ src_namespace: str = "osm",
+ dst_namespace: str = "osm",
+ ):
+ """
+ Copy a single key and value from an existing secret to a new one
+
+ :param src_secret: name of the existing secret
+ :param dst_secret: name of the new secret
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :param data_key: key of the existing secret to be copied
+ :param src_namespace: Namespace of the existing secret
+ :param dst_namespace: Namespace of the new secret
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ secret_data = await kubectl.get_secret_content(
+ name=src_secret,
+ namespace=src_namespace,
+ )
+ # Only the corresponding data_key value needs to be copy
+ data = {data_key: secret_data.get(data_key)}
+ await kubectl.create_secret(
+ name=dst_secret,
+ data=data,
+ namespace=dst_namespace,
+ secret_type="Opaque",
+ )
+
+ async def setup_default_rbac(
+ self,
+ name,
+ namespace,
+ cluster_uuid,
+ api_groups,
+ resources,
+ verbs,
+ service_account,
+ ):
+ """
+ Create a basic RBAC for a new namespace.
+
+ :param name: name of both Role and Role Binding
+ :param namespace: K8s namespace
+ :param cluster_uuid: K8s cluster uuid used to retrieve kubeconfig
+ :param api_groups: Api groups to be allowed in Policy Rule
+ :param resources: Resources to be allowed in Policy Rule
+ :param verbs: Verbs to be allowed in Policy Rule
+ :param service_account: Service Account name used to bind the Role
+ :returns: None
+ """
+ paths, env = self._init_paths_env(
+ cluster_name=cluster_uuid, create_if_not_exist=True
+ )
+ kubectl = Kubectl(config_file=paths["kube_config"])
+ await kubectl.create_role(
+ name=name,
+ labels={},
+ namespace=namespace,
+ api_groups=api_groups,
+ resources=resources,
+ verbs=verbs,
+ )
+ await kubectl.create_role_binding(
+ name=name,
+ labels={},
+ namespace=namespace,
+ role_name=name,
+ sa_name=service_account,
+ )
+++ /dev/null
-##
-# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
-# This file is part of OSM
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact with: nfvlabs@tid.es
-##
-import asyncio
-from typing import Union
-import os
-import yaml
-
-from n2vc.k8s_helm_base_conn import K8sHelmBaseConnector
-from n2vc.exceptions import K8sException
-
-
-class K8sHelmConnector(K8sHelmBaseConnector):
-
- """
- ####################################################################################
- ################################### P U B L I C ####################################
- ####################################################################################
- """
-
- def __init__(
- self,
- fs: object,
- db: object,
- kubectl_command: str = "/usr/bin/kubectl",
- helm_command: str = "/usr/bin/helm",
- log: object = None,
- on_update_db=None,
- ):
- """
- Initializes helm connector for helm v2
-
- :param fs: file system for kubernetes and helm configuration
- :param db: database object to write current operation status
- :param kubectl_command: path to kubectl executable
- :param helm_command: path to helm executable
- :param log: logger
- :param on_update_db: callback called when k8s connector updates database
- """
-
- # parent class
- K8sHelmBaseConnector.__init__(
- self,
- db=db,
- log=log,
- fs=fs,
- kubectl_command=kubectl_command,
- helm_command=helm_command,
- on_update_db=on_update_db,
- )
-
- self.log.info("Initializing K8S Helm2 connector")
-
- # initialize helm client-only
- self.log.debug("Initializing helm client-only...")
- command = "{} init --client-only {} ".format(
- self._helm_command,
- "--stable-repo-url {}".format(self._stable_repo_url)
- if self._stable_repo_url
- else "--skip-repos",
- )
- try:
- asyncio.ensure_future(
- self._local_async_exec(command=command, raise_exception_on_error=False)
- )
- # loop = asyncio.get_event_loop()
- # loop.run_until_complete(self._local_async_exec(command=command,
- # raise_exception_on_error=False))
- except Exception as e:
- self.warning(
- msg="helm init failed (it was already initialized): {}".format(e)
- )
-
- self.log.info("K8S Helm2 connector initialized")
-
- async def install(
- self,
- cluster_uuid: str,
- kdu_model: str,
- kdu_instance: str,
- atomic: bool = True,
- timeout: float = 300,
- params: dict = None,
- db_dict: dict = None,
- kdu_name: str = None,
- namespace: str = None,
- **kwargs,
- ):
- """
- Deploys of a new KDU instance. It would implicitly rely on the `install` call
- to deploy the Chart/Bundle properly parametrized (in practice, this call would
- happen before any _initial-config-primitive_of the VNF is called).
-
- :param cluster_uuid: UUID of a K8s cluster known by OSM
- :param kdu_model: chart/reference (string), which can be either
- of these options:
- - a name of chart available via the repos known by OSM
- (e.g. stable/openldap, stable/openldap:1.2.4)
- - a path to a packaged chart (e.g. mychart.tgz)
- - a path to an unpacked chart directory or a URL (e.g. mychart)
- :param kdu_instance: Kdu instance name
- :param atomic: If set, installation process purges chart/bundle on fail, also
- will wait until all the K8s objects are active
- :param timeout: Time in seconds to wait for the install of the chart/bundle
- (defaults to Helm default timeout: 300s)
- :param params: dictionary of key-value pairs for instantiation parameters
- (overriding default values)
- :param dict db_dict: where to write into database when the status changes.
- It contains a dict with {collection: <str>, filter: {},
- path: <str>},
- e.g. {collection: "nsrs", filter:
- {_id: <nsd-id>, path: "_admin.deployed.K8S.3"}
- :param kdu_name: Name of the KDU instance to be installed
- :param namespace: K8s namespace to use for the KDU instance
- :param kwargs: Additional parameters (None yet)
- :return: True if successful
- """
- self.log.debug("installing {} in cluster {}".format(kdu_model, cluster_uuid))
-
- # sync local dir
- self.fs.sync(from_path=cluster_uuid)
-
- # init env, paths
- paths, env = self._init_paths_env(
- cluster_name=cluster_uuid, create_if_not_exist=True
- )
-
- await self._install_impl(
- cluster_uuid,
- kdu_model,
- paths,
- env,
- kdu_instance,
- atomic=atomic,
- timeout=timeout,
- params=params,
- db_dict=db_dict,
- kdu_name=kdu_name,
- namespace=namespace,
- )
-
- # sync fs
- self.fs.reverse_sync(from_path=cluster_uuid)
-
- self.log.debug("Returning kdu_instance {}".format(kdu_instance))
- return True
-
- async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
-
- self.log.debug(
- "inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
- )
-
- return await self._exec_inspect_command(
- inspect_command="", kdu_model=kdu_model, repo_url=repo_url
- )
-
- """
- ####################################################################################
- ################################### P R I V A T E ##################################
- ####################################################################################
- """
-
- def _init_paths_env(self, cluster_name: str, create_if_not_exist: bool = True):
- """
- Creates and returns base cluster and kube dirs and returns them.
- Also created helm3 dirs according to new directory specification, paths are
- returned and also environment variables that must be provided to execute commands
-
- Helm 2 directory specification uses helm_home dir:
-
- The variables assigned for this paths are:
- - Helm hone: $HELM_HOME
- - helm kubeconfig: $KUBECONFIG
-
- :param cluster_name: cluster_name
- :return: Dictionary with config_paths and dictionary with helm environment variables
- """
- base = self.fs.path
- if base.endswith("/") or base.endswith("\\"):
- base = base[:-1]
-
- # base dir for cluster
- cluster_dir = base + "/" + cluster_name
-
- # kube dir
- kube_dir = cluster_dir + "/" + ".kube"
- if create_if_not_exist and not os.path.exists(kube_dir):
- self.log.debug("Creating dir {}".format(kube_dir))
- os.makedirs(kube_dir)
-
- # helm home dir
- helm_dir = cluster_dir + "/" + ".helm"
- if create_if_not_exist and not os.path.exists(helm_dir):
- self.log.debug("Creating dir {}".format(helm_dir))
- os.makedirs(helm_dir)
-
- config_filename = kube_dir + "/config"
-
- # 2 - Prepare dictionary with paths
- paths = {
- "kube_dir": kube_dir,
- "kube_config": config_filename,
- "cluster_dir": cluster_dir,
- "helm_dir": helm_dir,
- }
-
- for file_name, file in paths.items():
- if "dir" in file_name and not os.path.exists(file):
- err_msg = "{} dir does not exist".format(file)
- self.log.error(err_msg)
- raise K8sException(err_msg)
-
- # 3 - Prepare environment variables
- env = {"HELM_HOME": helm_dir, "KUBECONFIG": config_filename}
-
- return paths, env
-
- async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
-
- # init config, env
- paths, env = self._init_paths_env(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
- command1 = "env KUBECONFIG={} {} get manifest {} ".format(
- kubeconfig, self._helm_command, kdu_instance
- )
- command2 = "{} get --namespace={} -f -".format(self.kubectl_command, namespace)
- output, _rc = await self._local_async_exec_pipe(
- command1, command2, env=env, raise_exception_on_error=True
- )
- services = self._parse_services(output)
-
- return services
-
- async def _cluster_init(
- self, cluster_id: str, namespace: str, paths: dict, env: dict
- ):
- """
- Implements the helm version dependent cluster initialization:
- For helm2 it initialized tiller environment if needed
- """
-
- # check if tiller pod is up in cluster
- command = "{} --kubeconfig={} --namespace={} get deployments".format(
- self.kubectl_command, paths["kube_config"], namespace
- )
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True, env=env
- )
-
- output_table = self._output_to_table(output=output)
-
- # find 'tiller' pod in all pods
- already_initialized = False
- try:
- for row in output_table:
- if row[0].startswith("tiller-deploy"):
- already_initialized = True
- break
- except Exception:
- pass
-
- # helm init
- n2vc_installed_sw = False
- if not already_initialized:
- self.log.info(
- "Initializing helm in client and server: {}".format(cluster_id)
- )
- command = "{} --kubeconfig={} --namespace kube-system create serviceaccount {}".format(
- self.kubectl_command, paths["kube_config"], self.service_account
- )
- _, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=False, env=env
- )
-
- command = (
- "{} --kubeconfig={} create clusterrolebinding osm-tiller-cluster-rule "
- "--clusterrole=cluster-admin --serviceaccount=kube-system:{}"
- ).format(self.kubectl_command, paths["kube_config"], self.service_account)
- _, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=False, env=env
- )
-
- command = (
- "{} --kubeconfig={} --tiller-namespace={} --home={} --service-account {} "
- " {} init"
- ).format(
- self._helm_command,
- paths["kube_config"],
- namespace,
- paths["helm_dir"],
- self.service_account,
- "--stable-repo-url {}".format(self._stable_repo_url)
- if self._stable_repo_url
- else "--skip-repos",
- )
- _, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True, env=env
- )
- n2vc_installed_sw = True
- else:
- # check client helm installation
- check_file = paths["helm_dir"] + "/repository/repositories.yaml"
- if not self._check_file_exists(
- filename=check_file, exception_if_not_exists=False
- ):
- self.log.info("Initializing helm in client: {}".format(cluster_id))
- command = (
- "{} --kubeconfig={} --tiller-namespace={} "
- "--home={} init --client-only {} "
- ).format(
- self._helm_command,
- paths["kube_config"],
- namespace,
- paths["helm_dir"],
- "--stable-repo-url {}".format(self._stable_repo_url)
- if self._stable_repo_url
- else "--skip-repos",
- )
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True, env=env
- )
- else:
- self.log.info("Helm client already initialized")
-
- repo_list = await self.repo_list(cluster_id)
- for repo in repo_list:
- if repo["name"] == "stable" and repo["url"] != self._stable_repo_url:
- self.log.debug("Add new stable repo url: {}")
- await self.repo_remove(cluster_id, "stable")
- if self._stable_repo_url:
- await self.repo_add(cluster_id, "stable", self._stable_repo_url)
- break
-
- return n2vc_installed_sw
-
- async def _uninstall_sw(self, cluster_id: str, namespace: str):
- # uninstall Tiller if necessary
-
- self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
-
- # init paths, env
- paths, env = self._init_paths_env(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
- if not namespace:
- # find namespace for tiller pod
- command = "{} --kubeconfig={} get deployments --all-namespaces".format(
- self.kubectl_command, paths["kube_config"]
- )
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=False, env=env
- )
- output_table = self._output_to_table(output=output)
- namespace = None
- for r in output_table:
- try:
- if "tiller-deploy" in r[1]:
- namespace = r[0]
- break
- except Exception:
- pass
- else:
- msg = "Tiller deployment not found in cluster {}".format(cluster_id)
- self.log.error(msg)
-
- self.log.debug("namespace for tiller: {}".format(namespace))
-
- if namespace:
- # uninstall tiller from cluster
- self.log.debug("Uninstalling tiller from cluster {}".format(cluster_id))
- command = "{} --kubeconfig={} --home={} reset".format(
- self._helm_command, paths["kube_config"], paths["helm_dir"]
- )
- self.log.debug("resetting: {}".format(command))
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True, env=env
- )
- # Delete clusterrolebinding and serviceaccount.
- # Ignore if errors for backward compatibility
- command = (
- "{} --kubeconfig={} delete clusterrolebinding.rbac.authorization.k8s."
- "io/osm-tiller-cluster-rule"
- ).format(self.kubectl_command, paths["kube_config"])
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=False, env=env
- )
- command = (
- "{} --kubeconfig={} --namespace {} delete serviceaccount/{}".format(
- self.kubectl_command,
- paths["kube_config"],
- namespace,
- self.service_account,
- )
- )
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=False, env=env
- )
-
- else:
- self.log.debug("namespace not found")
-
- async def _instances_list(self, cluster_id):
-
- # init paths, env
- paths, env = self._init_paths_env(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
- command = "{} list --output yaml".format(self._helm_command)
-
- output, _rc = await self._local_async_exec(
- command=command, raise_exception_on_error=True, env=env
- )
-
- if output and len(output) > 0:
- # parse yaml and update keys to lower case to unify with helm3
- instances = yaml.load(output, Loader=yaml.SafeLoader).get("Releases")
- new_instances = []
- for instance in instances:
- new_instance = dict((k.lower(), v) for k, v in instance.items())
- new_instances.append(new_instance)
- return new_instances
- else:
- return []
-
- def _get_inspect_command(
- self, show_command: str, kdu_model: str, repo_str: str, version: str
- ):
- inspect_command = "{} inspect {} {}{} {}".format(
- self._helm_command, show_command, kdu_model, repo_str, version
- )
- return inspect_command
-
- def _get_get_command(
- self, get_command: str, kdu_instance: str, namespace: str, kubeconfig: str
- ):
- get_command = "env KUBECONFIG={} {} get {} {} --output yaml".format(
- kubeconfig, self._helm_command, get_command, kdu_instance
- )
- return get_command
-
- async def _status_kdu(
- self,
- cluster_id: str,
- kdu_instance: str,
- namespace: str = None,
- yaml_format: bool = False,
- show_error_log: bool = False,
- ) -> Union[str, dict]:
-
- self.log.debug(
- "status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
- )
-
- # init config, env
- paths, env = self._init_paths_env(
- cluster_name=cluster_id, create_if_not_exist=True
- )
- command = ("env KUBECONFIG={} {} status {} --output yaml").format(
- paths["kube_config"], self._helm_command, kdu_instance
- )
- output, rc = await self._local_async_exec(
- command=command,
- raise_exception_on_error=True,
- show_error_log=show_error_log,
- env=env,
- )
-
- if yaml_format:
- return str(output)
-
- if rc != 0:
- return None
-
- data = yaml.load(output, Loader=yaml.SafeLoader)
-
- # remove field 'notes'
- try:
- del data.get("info").get("status")["notes"]
- except KeyError:
- pass
-
- # parse the manifest to a list of dictionaries
- if "manifest" in data:
- manifest_str = data.get("manifest")
- manifest_docs = yaml.load_all(manifest_str, Loader=yaml.SafeLoader)
-
- data["manifest"] = []
- for doc in manifest_docs:
- data["manifest"].append(doc)
-
- # parse field 'resources'
- try:
- resources = str(data.get("info").get("status").get("resources"))
- resource_table = self._output_to_table(resources)
- data.get("info").get("status")["resources"] = resource_table
- except Exception:
- pass
-
- # set description to lowercase (unify with helm3)
- try:
- data.get("info")["description"] = data.get("info").pop("Description")
- except KeyError:
- pass
-
- return data
-
- def _get_helm_chart_repos_ids(self, cluster_uuid) -> list:
- repo_ids = []
- cluster_filter = {"_admin.helm-chart.id": cluster_uuid}
- cluster = self.db.get_one("k8sclusters", cluster_filter)
- if cluster:
- repo_ids = cluster.get("_admin").get("helm_chart_repos") or []
- return repo_ids
- else:
- raise K8sException(
- "k8cluster with helm-id : {} not found".format(cluster_uuid)
- )
-
- async def _is_install_completed(self, cluster_id: str, kdu_instance: str) -> bool:
- # init config, env
- paths, env = self._init_paths_env(
- cluster_name=cluster_id, create_if_not_exist=True
- )
-
- status = await self._status_kdu(
- cluster_id=cluster_id, kdu_instance=kdu_instance, yaml_format=False
- )
-
- # extract info.status.resources-> str
- # format:
- # ==> v1/Deployment
- # NAME READY UP-TO-DATE AVAILABLE AGE
- # halting-horse-mongodb 0/1 1 0 0s
- # halting-petit-mongodb 1/1 1 0 0s
- # blank line
- resources = K8sHelmBaseConnector._get_deep(
- status, ("info", "status", "resources")
- )
-
- # convert to table
- resources = K8sHelmBaseConnector._output_to_table(resources)
-
- num_lines = len(resources)
- index = 0
- ready = True
- while index < num_lines:
- try:
- line1 = resources[index]
- index += 1
- # find '==>' in column 0
- if line1[0] == "==>":
- line2 = resources[index]
- index += 1
- # find READY in column 1
- if line2[1] == "READY":
- # read next lines
- line3 = resources[index]
- index += 1
- while len(line3) > 1 and index < num_lines:
- ready_value = line3[1]
- parts = ready_value.split(sep="/")
- current = int(parts[0])
- total = int(parts[1])
- if current < total:
- self.log.debug("NOT READY:\n {}".format(line3))
- ready = False
- line3 = resources[index]
- index += 1
-
- except Exception:
- pass
-
- return ready
-
- def _get_install_command(
- self,
- kdu_model,
- kdu_instance,
- namespace,
- params_str,
- version,
- atomic,
- timeout,
- kubeconfig,
- ) -> str:
-
- timeout_str = ""
- if timeout:
- timeout_str = "--timeout {}".format(timeout)
-
- # atomic
- atomic_str = ""
- if atomic:
- atomic_str = "--atomic"
- # namespace
- namespace_str = ""
- if namespace:
- namespace_str = "--namespace {}".format(namespace)
-
- # version
- version_str = ""
- if version:
- version_str = "--version {}".format(version)
-
- command = (
- "env KUBECONFIG={kubeconfig} {helm} install {atomic} --output yaml "
- "{params} {timeout} --name={name} {ns} {model} {ver}".format(
- kubeconfig=kubeconfig,
- helm=self._helm_command,
- atomic=atomic_str,
- params=params_str,
- timeout=timeout_str,
- name=kdu_instance,
- ns=namespace_str,
- model=kdu_model,
- ver=version_str,
- )
- )
- return command
-
- def _get_upgrade_scale_command(
- self,
- kdu_model: str,
- kdu_instance: str,
- namespace: str,
- scale: int,
- version: str,
- atomic: bool,
- replica_str: str,
- timeout: float,
- resource_name: str,
- kubeconfig: str,
- ) -> str:
- """Generates the command to scale a Helm Chart release
-
- Args:
- kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
- kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
- namespace (str): Namespace where this KDU instance is deployed
- scale (int): Scale count
- version (str): Constraint with specific version of the Chart to use
- atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
- The --wait flag will be set automatically if --atomic is used
- replica_str (str): The key under resource_name key where the scale count is stored
- timeout (float): The time, in seconds, to wait
- resource_name (str): The KDU's resource to scale
- kubeconfig (str): Kubeconfig file path
-
- Returns:
- str: command to scale a Helm Chart release
- """
-
- # scale
- if resource_name:
- scale_dict = {"{}.{}".format(resource_name, replica_str): scale}
- else:
- scale_dict = {replica_str: scale}
-
- scale_str = self._params_to_set_option(scale_dict)
-
- return self._get_upgrade_command(
- kdu_model=kdu_model,
- kdu_instance=kdu_instance,
- namespace=namespace,
- params_str=scale_str,
- version=version,
- atomic=atomic,
- timeout=timeout,
- kubeconfig=kubeconfig,
- )
-
- def _get_upgrade_command(
- self,
- kdu_model,
- kdu_instance,
- namespace,
- params_str,
- version,
- atomic,
- timeout,
- kubeconfig,
- force: bool = False,
- ) -> str:
- """Generates the command to upgrade a Helm Chart release
-
- Args:
- kdu_model (str): Kdu model name, corresponding to the Helm local location or repository
- kdu_instance (str): KDU instance, corresponding to the Helm Chart release in question
- namespace (str): Namespace where this KDU instance is deployed
- params_str (str): Params used to upgrade the Helm Chart release
- version (str): Constraint with specific version of the Chart to use
- atomic (bool): If set, upgrade process rolls back changes made in case of failed upgrade.
- The --wait flag will be set automatically if --atomic is used
- timeout (float): The time, in seconds, to wait
- kubeconfig (str): Kubeconfig file path
- force (bool): If set, helm forces resource updates through a replacement strategy. This may recreate pods.
- Returns:
- str: command to upgrade a Helm Chart release
- """
-
- timeout_str = ""
- if timeout:
- timeout_str = "--timeout {}".format(timeout)
-
- # atomic
- atomic_str = ""
- if atomic:
- atomic_str = "--atomic"
-
- # force
- force_str = ""
- if force:
- force_str = "--force "
-
- # version
- version_str = ""
- if version:
- version_str = "--version {}".format(version)
-
- # namespace
- namespace_str = ""
- if namespace:
- namespace_str = "--namespace {}".format(namespace)
-
- command = (
- "env KUBECONFIG={kubeconfig} {helm} upgrade {namespace} {atomic} --output yaml {params} {timeout} {force}"
- "--reuse-values {name} {model} {ver}"
- ).format(
- kubeconfig=kubeconfig,
- helm=self._helm_command,
- namespace=namespace_str,
- atomic=atomic_str,
- force=force_str,
- params=params_str,
- timeout=timeout_str,
- name=kdu_instance,
- model=kdu_model,
- ver=version_str,
- )
- return command
-
- def _get_rollback_command(
- self, kdu_instance, namespace, revision, kubeconfig
- ) -> str:
- return "env KUBECONFIG={} {} rollback {} {} --wait".format(
- kubeconfig, self._helm_command, kdu_instance, revision
- )
-
- def _get_uninstall_command(
- self, kdu_instance: str, namespace: str, kubeconfig: str
- ) -> str:
- return "env KUBECONFIG={} {} delete --purge {}".format(
- kubeconfig, self._helm_command, kdu_instance
- )
kubectl_command: str = "/usr/bin/kubectl",
juju_command: str = "/usr/bin/juju",
log: object = None,
- loop: object = None,
on_update_db=None,
):
"""
:param kubectl_command: path to kubectl executable
:param helm_command: path to helm executable
:param log: logger
- :param: loop: Asyncio loop
"""
# parent class
- K8sConnector.__init__(
- self,
- db,
- log=log,
- on_update_db=on_update_db,
- )
+ K8sConnector.__init__(self, db, log=log, on_update_db=on_update_db)
self.fs = fs
- self.loop = loop or asyncio.get_event_loop()
self.log.debug("Initializing K8S Juju connector")
db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
self._store = MotorStore(db_uri)
- self.loading_libjuju = asyncio.Lock(loop=self.loop)
+ self.loading_libjuju = asyncio.Lock()
self.uninstall_locks = {}
self.log.debug("K8S Juju connector initialized")
cleanup_data = []
try:
self.log.debug("Initializing K8s cluster for juju")
- kubectl.create_cluster_role(
- name=metadata_name,
- labels=labels,
- )
+ kubectl.create_cluster_role(name=metadata_name, labels=labels)
self.log.debug("Cluster role created")
cleanup_data.append(
- {
- "delete": kubectl.delete_cluster_role,
- "args": (metadata_name,),
- }
+ {"delete": kubectl.delete_cluster_role, "args": (metadata_name,)}
)
- kubectl.create_service_account(
- name=metadata_name,
- labels=labels,
- )
+ kubectl.create_service_account(name=metadata_name, labels=labels)
self.log.debug("Service account created")
cleanup_data.append(
- {
- "delete": kubectl.delete_service_account,
- "args": (metadata_name,),
- }
+ {"delete": kubectl.delete_service_account, "args": (metadata_name,)}
)
- kubectl.create_cluster_role_binding(
- name=metadata_name,
- labels=labels,
- )
+ kubectl.create_cluster_role_binding(name=metadata_name, labels=labels)
self.log.debug("Role binding created")
cleanup_data.append(
{
"args": (metadata_name,),
}
)
- token, client_cert_data = await kubectl.get_secret_data(
- metadata_name,
- )
+ token, client_cert_data = await kubectl.get_secret_data(metadata_name)
default_storage_class = kubectl.get_default_storage_class()
self.log.debug("Default storage class: {}".format(default_storage_class))
async def repo_list(self):
raise MethodNotImplemented()
- async def repo_remove(
- self,
- name: str,
- ):
+ async def repo_remove(self, name: str):
raise MethodNotImplemented()
async def synchronize_repos(self, cluster_uuid: str, name: str):
previous_workdir = "/app/storage"
self.log.debug("[install] deploying {}".format(bundle))
- await libjuju.deploy(bundle, model_name=namespace, wait=atomic, timeout=timeout)
+ instantiation_params = params.get("overlay") if params else None
+ await libjuju.deploy(
+ bundle,
+ model_name=namespace,
+ wait=atomic,
+ timeout=timeout,
+ instantiation_params=instantiation_params,
+ )
os.chdir(previous_workdir)
# update information in the database (first, the VCA status, and then, the namespace)
return True
async def get_scale_count(
- self,
- resource_name: str,
- kdu_instance: str,
- namespace: str = None,
- **kwargs,
+ self, resource_name: str, kdu_instance: str, namespace: str = None, **kwargs
) -> int:
"""Get an application scale count
"""Rollback"""
async def rollback(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- revision: int = 0,
+ self, cluster_uuid: str, kdu_instance: str, revision: int = 0
) -> str:
"""Rollback a model
"""Deletion"""
async def uninstall(
- self,
- cluster_uuid: str,
- kdu_instance: str,
- namespace: str = None,
- **kwargs,
+ self, cluster_uuid: str, kdu_instance: str, namespace: str = None, **kwargs
) -> bool:
"""Uninstall a KDU instance
will_not_delete = False
if model_name not in self.uninstall_locks:
- self.uninstall_locks[model_name] = asyncio.Lock(loop=self.loop)
+ self.uninstall_locks[model_name] = asyncio.Lock()
delete_lock = self.uninstall_locks[model_name]
while delete_lock.locked():
"""Introspection"""
- async def inspect_kdu(
- self,
- kdu_model: str,
- ) -> dict:
+ async def inspect_kdu(self, kdu_model: str) -> dict:
"""Inspect a KDU
Inspects a bundle and returns a dictionary of config parameters and
return kdu
- async def help_kdu(
- self,
- kdu_model: str,
- ) -> str:
+ async def help_kdu(self, kdu_model: str) -> str:
"""View the README
If available, returns the README of the bundle.
return status
async def add_relation(
- self,
- provider: RelationEndpoint,
- requirer: RelationEndpoint,
+ self, provider: RelationEndpoint, requirer: RelationEndpoint
):
"""
Add relation between two charmed endpoints
self.log.debug(f"adding new relation between {provider} and {requirer}")
cross_model_relation = (
provider.model_name != requirer.model_name
- or requirer.vca_id != requirer.vca_id
+ or provider.vca_id != requirer.vca_id
)
try:
if cross_model_relation:
requirer.model_name, offer, provider_libjuju
)
await requirer_libjuju.add_relation(
- requirer.model_name,
- requirer.endpoint,
- saas_name,
+ requirer.model_name, requirer.endpoint, saas_name
)
else:
# Standard relation
"""
return "cred-{}".format(cluster_uuid)
- def get_namespace(
- self,
- cluster_uuid: str,
- ) -> str:
+ def get_namespace(self, cluster_uuid: str) -> str:
"""Get the namespace UUID
Gets the namespace's unique name
if not self.libjuju:
async with self.loading_libjuju:
vca_connection = await get_connection(self._store)
- self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
+ self.libjuju = Libjuju(vca_connection, log=self.log)
return self.libjuju
else:
vca_connection = await get_connection(self._store, vca_id)
- return Libjuju(
- vca_connection,
- loop=self.loop,
- log=self.log,
- n2vc=self,
- )
+ return Libjuju(vca_connection, log=self.log, n2vc=self)
def _get_kubectl(self, credentials: str) -> Kubectl:
"""
from typing import Dict
import typing
import uuid
+import json
from distutils.version import LooseVersion
from kubernetes.client.api import VersionApi
from kubernetes.client.models import (
V1ClusterRole,
+ V1Role,
V1ObjectMeta,
V1PolicyRule,
V1ServiceAccount,
V1ClusterRoleBinding,
+ V1RoleBinding,
V1RoleRef,
V1Subject,
V1Secret,
V1SecretReference,
+ V1Namespace,
)
from kubernetes.client.rest import ApiException
+from n2vc.libjuju import retry_callback
from retrying_async import retry
CORE_CLIENT = "core_v1"
RBAC_CLIENT = "rbac_v1"
STORAGE_CLIENT = "storage_v1"
+CUSTOM_OBJECT_CLIENT = "custom_object"
class Kubectl:
CORE_CLIENT: client.CoreV1Api(),
RBAC_CLIENT: client.RbacAuthorizationV1Api(),
STORAGE_CLIENT: client.StorageV1Api(),
+ CUSTOM_OBJECT_CLIENT: client.CustomObjectsApi(),
}
self._configuration = config.kube_config.Configuration.get_default_copy()
self.logger = logging.getLogger("Kubectl")
)
if len(cluster_roles.items) > 0:
- raise Exception(
- "Cluster role with metadata.name={} already exists".format(name)
- )
+ raise Exception("Role with metadata.name={} already exists".format(name))
metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
# Cluster role
self.clients[RBAC_CLIENT].create_cluster_role(cluster_role)
+ async def create_role(
+ self,
+ name: str,
+ labels: Dict[str, str],
+ api_groups: list,
+ resources: list,
+ verbs: list,
+ namespace: str,
+ ):
+ """
+ Create a role with one PolicyRule
+
+ :param: name: Name of the namespaced Role
+ :param: labels: Labels for namespaced Role metadata
+ :param: api_groups: List with api-groups allowed in the policy rule
+ :param: resources: List with resources allowed in the policy rule
+ :param: verbs: List with verbs allowed in the policy rule
+ :param: namespace: Kubernetes namespace for Role metadata
+
+ :return: None
+ """
+
+ roles = self.clients[RBAC_CLIENT].list_namespaced_role(
+ namespace, field_selector="metadata.name={}".format(name)
+ )
+
+ if len(roles.items) > 0:
+ raise Exception("Role with metadata.name={} already exists".format(name))
+
+ metadata = V1ObjectMeta(name=name, labels=labels, namespace=namespace)
+
+ role = V1Role(
+ metadata=metadata,
+ rules=[
+ V1PolicyRule(api_groups=api_groups, resources=resources, verbs=verbs),
+ ],
+ )
+
+ self.clients[RBAC_CLIENT].create_namespaced_role(namespace, role)
+
def delete_cluster_role(self, name: str):
"""
Delete a cluster role
)
self.clients[RBAC_CLIENT].create_cluster_role_binding(role_binding)
+ async def create_role_binding(
+ self,
+ name: str,
+ role_name: str,
+ sa_name: str,
+ labels: Dict[str, str],
+ namespace: str,
+ ):
+ """
+ Create a cluster role binding
+
+ :param: name: Name of the namespaced Role Binding
+ :param: role_name: Name of the namespaced Role to be bound
+ :param: sa_name: Name of the Service Account to be bound
+ :param: labels: Labels for Role Binding metadata
+ :param: namespace: Kubernetes namespace for Role Binding metadata
+
+ :return: None
+ """
+ role_bindings = self.clients[RBAC_CLIENT].list_namespaced_role_binding(
+ namespace, field_selector="metadata.name={}".format(name)
+ )
+ if len(role_bindings.items) > 0:
+ raise Exception(
+ "Role Binding with metadata.name={} already exists".format(name)
+ )
+
+ role_binding = V1RoleBinding(
+ metadata=V1ObjectMeta(name=name, labels=labels),
+ role_ref=V1RoleRef(kind="Role", name=role_name, api_group=""),
+ subjects=[
+ V1Subject(kind="ServiceAccount", name=sa_name, namespace=namespace)
+ ],
+ )
+ self.clients[RBAC_CLIENT].create_namespaced_role_binding(
+ namespace, role_binding
+ )
+
def delete_cluster_role_binding(self, name: str):
"""
Delete a cluster role binding
attempts=10,
delay=1,
fallback=Exception("Failed getting the secret from service account"),
+ callback=retry_callback,
)
async def get_secret_data(
self, name: str, namespace: str = "kube-system"
raise Exception(
"Failed getting the secret from service account {}".format(name)
)
+ # TODO: refactor to use get_secret_content
secret = v1_core.list_namespaced_secret(
namespace, field_selector="metadata.name={}".format(secret_name)
).items[0]
base64.b64decode(token).decode("utf-8"),
base64.b64decode(client_certificate_data).decode("utf-8"),
)
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed getting data from the secret"),
+ )
+ async def get_secret_content(
+ self,
+ name: str,
+ namespace: str,
+ ) -> dict:
+ """
+ Get secret data
+
+ :param: name: Name of the secret
+ :param: namespace: Name of the namespace where the secret is stored
+
+ :return: Dictionary with secret's data
+ """
+ v1_core = self.clients[CORE_CLIENT]
+
+ secret = v1_core.read_namespaced_secret(name, namespace)
+
+ return secret.data
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed creating the secret"),
+ )
+ async def create_secret(
+ self, name: str, data: dict, namespace: str, secret_type: str
+ ):
+ """
+ Get secret data
+
+ :param: name: Name of the secret
+ :param: data: Dict with data content. Values must be already base64 encoded
+ :param: namespace: Name of the namespace where the secret will be stored
+ :param: secret_type: Type of the secret, e.g., Opaque, kubernetes.io/service-account-token, kubernetes.io/tls
+
+ :return: None
+ """
+ v1_core = self.clients[CORE_CLIENT]
+ metadata = V1ObjectMeta(name=name, namespace=namespace)
+ secret = V1Secret(metadata=metadata, data=data, type=secret_type)
+ v1_core.create_namespaced_secret(namespace, secret)
+
+ async def create_certificate(
+ self,
+ namespace: str,
+ name: str,
+ dns_prefix: str,
+ secret_name: str,
+ usages: list,
+ issuer_name: str,
+ ):
+ """
+ Creates cert-manager certificate object
+
+ :param: namespace: Name of the namespace where the certificate and secret is stored
+ :param: name: Name of the certificate object
+ :param: dns_prefix: Prefix for the dnsNames. They will be prefixed to the common k8s svc suffixes
+ :param: secret_name: Name of the secret created by cert-manager
+ :param: usages: List of X.509 key usages
+ :param: issuer_name: Name of the cert-manager's Issuer or ClusterIssuer object
+
+ """
+ certificate_body = {
+ "apiVersion": "cert-manager.io/v1",
+ "kind": "Certificate",
+ "metadata": {"name": name, "namespace": namespace},
+ "spec": {
+ "secretName": secret_name,
+ "privateKey": {
+ "rotationPolicy": "Always",
+ "algorithm": "ECDSA",
+ "size": 256,
+ },
+ "duration": "8760h", # 1 Year
+ "renewBefore": "2208h", # 9 months
+ "subject": {"organizations": ["osm"]},
+ "commonName": "osm",
+ "isCA": False,
+ "usages": usages,
+ "dnsNames": [
+ "{}.{}".format(dns_prefix, namespace),
+ "{}.{}.svc".format(dns_prefix, namespace),
+ "{}.{}.svc.cluster".format(dns_prefix, namespace),
+ "{}.{}.svc.cluster.local".format(dns_prefix, namespace),
+ ],
+ "issuerRef": {"name": issuer_name, "kind": "ClusterIssuer"},
+ },
+ }
+ client = self.clients[CUSTOM_OBJECT_CLIENT]
+ try:
+ client.create_namespaced_custom_object(
+ group="cert-manager.io",
+ plural="certificates",
+ version="v1",
+ body=certificate_body,
+ namespace=namespace,
+ )
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "alreadyexists":
+ self.logger.warning("Certificate already exists: {}".format(e))
+ else:
+ raise e
+
+ async def delete_certificate(self, namespace, object_name):
+ client = self.clients[CUSTOM_OBJECT_CLIENT]
+ try:
+ client.delete_namespaced_custom_object(
+ group="cert-manager.io",
+ plural="certificates",
+ version="v1",
+ name=object_name,
+ namespace=namespace,
+ )
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "notfound":
+ self.logger.warning("Certificate already deleted: {}".format(e))
+ else:
+ raise e
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed creating the namespace"),
+ )
+ async def create_namespace(self, name: str, labels: dict = None):
+ """
+ Create a namespace
+
+ :param: name: Name of the namespace to be created
+ :param: labels: Dictionary with labels for the new namespace
+
+ """
+ v1_core = self.clients[CORE_CLIENT]
+ metadata = V1ObjectMeta(name=name, labels=labels)
+ namespace = V1Namespace(
+ metadata=metadata,
+ )
+
+ try:
+ v1_core.create_namespace(namespace)
+ self.logger.debug("Namespace created: {}".format(name))
+ except ApiException as e:
+ info = json.loads(e.body)
+ if info.get("reason").lower() == "alreadyexists":
+ self.logger.warning("Namespace already exists: {}".format(e))
+ else:
+ raise e
+
+ @retry(
+ attempts=10,
+ delay=1,
+ fallback=Exception("Failed deleting the namespace"),
+ )
+ async def delete_namespace(self, name: str):
+ """
+ Delete a namespace
+
+ :param: name: Name of the namespace to be deleted
+
+ """
+ try:
+ self.clients[CORE_CLIENT].delete_namespace(name)
+ except ApiException as e:
+ if e.reason == "Not Found":
+ self.logger.warning("Namespace already deleted: {}".format(e))
import asyncio
import logging
+import os
import typing
+import yaml
import time
import juju.errors
+from juju.bundle import BundleHandler
from juju.model import Model
from juju.machine import Machine
from juju.application import Application
from juju.unit import Unit
+from juju.url import URL
+from juju.version import DEFAULT_ARCHITECTURE
from juju.client._definitions import (
FullStatus,
QueryApplicationOffersResults,
RBAC_LABEL_KEY_NAME = "rbac-id"
+@asyncio.coroutine
+def retry_callback(attempt, exc, args, kwargs, delay=0.5, *, loop):
+ # Specifically overridden from upstream implementation so it can
+ # continue to work with Python 3.10
+ yield from asyncio.sleep(attempt * delay)
+ return retry
+
+
class Libjuju:
def __init__(
self,
vca_connection: Connection,
- loop: asyncio.AbstractEventLoop = None,
log: logging.Logger = None,
n2vc: N2VCConnector = None,
):
Constructor
:param: vca_connection: n2vc.vca.connection object
- :param: loop: Asyncio loop
:param: log: Logger
:param: n2vc: N2VC object
"""
self.n2vc = n2vc
self.vca_connection = vca_connection
- self.loop = loop or asyncio.get_event_loop()
- self.loop.set_exception_handler(self.handle_exception)
- self.creating_model = asyncio.Lock(loop=self.loop)
+ self.creating_model = asyncio.Lock()
if self.vca_connection.is_default:
self.health_check_task = self._create_health_check_task()
def _create_health_check_task(self):
- return self.loop.create_task(self.health_check())
+ return asyncio.get_event_loop().create_task(self.health_check())
async def get_controller(self, timeout: float = 60.0) -> Controller:
"""
if controller:
await controller.disconnect()
- @retry(attempts=3, delay=5, timeout=None)
+ @retry(attempts=3, delay=5, timeout=None, callback=retry_callback)
async def add_model(self, model_name: str, cloud: VcaCloud):
"""
Create model
await self.disconnect_controller(controller)
return application_configs
- @retry(attempts=3, delay=5)
+ @retry(attempts=3, delay=5, callback=retry_callback)
async def get_model(self, controller: Controller, model_name: str) -> Model:
"""
Get model from controller
return machine_id
async def deploy(
- self, uri: str, model_name: str, wait: bool = True, timeout: float = 3600
+ self,
+ uri: str,
+ model_name: str,
+ wait: bool = True,
+ timeout: float = 3600,
+ instantiation_params: dict = None,
):
"""
Deploy bundle or charm: Similar to the juju CLI command `juju deploy`
- :param: uri: Path or Charm Store uri in which the charm or bundle can be found
- :param: model_name: Model name
- :param: wait: Indicates whether to wait or not until all applications are active
- :param: timeout: Time in seconds to wait until all applications are active
+ :param uri: Path or Charm Store uri in which the charm or bundle can be found
+ :param model_name: Model name
+ :param wait: Indicates whether to wait or not until all applications are active
+ :param timeout: Time in seconds to wait until all applications are active
+ :param instantiation_params: To be applied as overlay bundle over primary bundle.
"""
controller = await self.get_controller()
model = await self.get_model(controller, model_name)
+ overlays = []
try:
- await model.deploy(uri, trust=True)
+ await self._validate_instantiation_params(uri, model, instantiation_params)
+ overlays = self._get_overlays(model_name, instantiation_params)
+ await model.deploy(uri, trust=True, overlays=overlays)
if wait:
await JujuModelWatcher.wait_for_model(model, timeout=timeout)
self.log.debug("All units active in model {}".format(model_name))
finally:
+ self._remove_overlay_file(overlays)
await self.disconnect_model(model)
await self.disconnect_controller(controller)
+ async def _validate_instantiation_params(
+ self, uri: str, model, instantiation_params: dict
+ ) -> None:
+ """Checks if all the applications in instantiation_params
+ exist ins the original bundle.
+
+ Raises:
+ JujuApplicationNotFound if there is an invalid app in
+ the instantiation params.
+ """
+ overlay_apps = self._get_apps_in_instantiation_params(instantiation_params)
+ if not overlay_apps:
+ return
+ original_apps = await self._get_apps_in_original_bundle(uri, model)
+ if not all(app in original_apps for app in overlay_apps):
+ raise JujuApplicationNotFound(
+ "Cannot find application {} in original bundle {}".format(
+ overlay_apps, original_apps
+ )
+ )
+
+ async def _get_apps_in_original_bundle(self, uri: str, model) -> set:
+ """Bundle is downloaded in BundleHandler.fetch_plan.
+ That method takes care of opening and exception handling.
+
+ Resolve method gets all the information regarding the channel,
+ track, revision, type, source.
+
+ Returns:
+ Set with the names of the applications in original bundle.
+ """
+ url = URL.parse(uri)
+ architecture = DEFAULT_ARCHITECTURE # only AMD64 is allowed
+ res = await model.deploy_types[str(url.schema)].resolve(
+ url, architecture, entity_url=uri
+ )
+ handler = BundleHandler(model, trusted=True, forced=False)
+ await handler.fetch_plan(url, res.origin)
+ return handler.applications
+
+ def _get_apps_in_instantiation_params(self, instantiation_params: dict) -> list:
+ """Extract applications key in instantiation params.
+
+ Returns:
+ List with the names of the applications in instantiation params.
+
+ Raises:
+ JujuError if applications key is not found.
+ """
+ if not instantiation_params:
+ return []
+ try:
+ return [key for key in instantiation_params.get("applications")]
+ except Exception as e:
+ raise JujuError("Invalid overlay format. {}".format(str(e)))
+
+ def _get_overlays(self, model_name: str, instantiation_params: dict) -> list:
+ """Creates a temporary overlay file which includes the instantiation params.
+ Only one overlay file is created.
+
+ Returns:
+ List with one overlay filename. Empty list if there are no instantiation params.
+ """
+ if not instantiation_params:
+ return []
+ file_name = model_name + "-overlay.yaml"
+ self._write_overlay_file(file_name, instantiation_params)
+ return [file_name]
+
+ def _write_overlay_file(self, file_name: str, instantiation_params: dict) -> None:
+ with open(file_name, "w") as file:
+ yaml.dump(instantiation_params, file)
+
+ def _remove_overlay_file(self, overlay: list) -> None:
+ """Overlay contains either one or zero file names."""
+ if not overlay:
+ return
+ try:
+ filename = overlay[0]
+ os.remove(filename)
+ except OSError as e:
+ self.log.warning(
+ "Overlay file {} could not be removed: {}".format(filename, e)
+ )
+
async def add_unit(
self,
application_name: str,
application = self._get_application(model, application_name)
if application is not None:
-
# Checks if the given machine id in the model,
# otherwise function raises an error
_machine, _series = self._get_machine_info(model, machine_id)
try:
if application_name not in model.applications:
-
if machine_id is not None:
machine, series = self._get_machine_info(model, machine_id)
return application
async def resolve_application(self, model_name: str, application_name: str):
-
controller = await self.get_controller()
model = await self.get_model(controller, model_name)
await self.disconnect_controller(controller)
async def resolve(self, model_name: str):
-
controller = await self.get_controller()
model = await self.get_model(controller, model_name)
all_units_active = False
try:
await model.add_relation(endpoint_1, endpoint_2)
except juju.errors.JujuAPIError as e:
- if "not found" in e.message:
+ if self._relation_is_not_found(e):
self.log.warning("Relation not found: {}".format(e.message))
return
- if "already exists" in e.message:
+ if self._relation_already_exist(e):
self.log.warning("Relation already exists: {}".format(e.message))
return
# another exception, raise it
await self.disconnect_model(model)
await self.disconnect_controller(controller)
+ def _relation_is_not_found(self, juju_error):
+ text = "not found"
+ return (text in juju_error.message) or (
+ juju_error.error_code and text in juju_error.error_code
+ )
+
+ def _relation_already_exist(self, juju_error):
+ text = "already exists"
+ return (text in juju_error.message) or (
+ juju_error.error_code and text in juju_error.error_code
+ )
+
async def offer(self, endpoint: RelationEndpoint) -> Offer:
"""
Create an offer from a RelationEndpoint
await self.disconnect_model(model)
await self.disconnect_controller(controller)
- def handle_exception(self, loop, context):
- # All unhandled exceptions by libjuju are handled here.
- pass
-
async def health_check(self, interval: float = 300.0):
"""
Health check to make sure controller and controller_model connections are OK
finally:
await self.disconnect_controller(controller)
- @retry(attempts=20, delay=5, fallback=JujuLeaderUnitNotFound())
+ @retry(
+ attempts=20, delay=5, fallback=JujuLeaderUnitNotFound(), callback=retry_callback
+ )
async def _get_leader_unit(self, application: Application) -> Unit:
unit = None
for u in application.units:
class Loggable:
def __init__(self, log, log_to_console: bool = False, prefix: str = ""):
-
self._last_log_time = None # used for time increment in logging
self._log_to_console = log_to_console
self._prefix = prefix
include_thread: bool = False,
include_coroutine: bool = True,
) -> str:
-
# time increment from last log
now = time.perf_counter()
if self._last_log_time is None:
coroutine_id = ""
if include_coroutine:
try:
- if asyncio.Task.current_task() is not None:
+ if asyncio.current_task() is not None:
def print_cor_name(c):
import inspect
except Exception:
pass
- coro = asyncio.Task.current_task()._coro
+ coro = asyncio.current_task()._coro
coroutine_id = "coro-{} {}()".format(
hex(id(coro))[2:], print_cor_name(coro)
)
import abc
import asyncio
from http import HTTPStatus
+from shlex import quote
import os
import shlex
import subprocess
db: object,
fs: object,
log: object,
- loop: object,
on_update_db=None,
**kwargs,
):
:param object fs: FileSystem object managing the package artifacts (repo common
FsBase)
:param object log: the logging object to log to
- :param object loop: the loop to use for asyncio (default current thread loop)
:param on_update_db: callback called when n2vc connector updates database.
Received arguments:
table: e.g. "nsrs"
# store arguments into self
self.db = db
self.fs = fs
- self.loop = loop or asyncio.get_event_loop()
self.on_update_db = on_update_db
# generate private/public key-pair
self.log.warning("No HOME environment variable, using /tmp")
homedir = "/tmp"
sshdir = "{}/.ssh".format(homedir)
+ sshdir = os.path.realpath(os.path.normpath(os.path.abspath(sshdir)))
if not os.path.exists(sshdir):
os.mkdir(sshdir)
self.private_key_path = "{}/id_n2vc_rsa".format(sshdir)
+ self.private_key_path = os.path.realpath(
+ os.path.normpath(os.path.abspath(self.private_key_path))
+ )
self.public_key_path = "{}.pub".format(self.private_key_path)
+ self.public_key_path = os.path.realpath(
+ os.path.normpath(os.path.abspath(self.public_key_path))
+ )
# If we don't have a key generated, then we have to generate it using ssh-keygen
if not os.path.exists(self.private_key_path):
- cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
- "rsa", "4096", self.private_key_path
+ command = "ssh-keygen -t {} -b {} -N '' -f {}".format(
+ "rsa", "4096", quote(self.private_key_path)
)
# run command with arguments
- subprocess.check_output(shlex.split(cmd))
+ args = shlex.split(command)
+ subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Read the public key. Only one public key (one line) in the file
with open(self.public_key_path, "r") as file:
reuse_ee_id: str = None,
progress_timeout: float = None,
total_timeout: float = None,
- ) -> (str, dict):
+ ) -> tuple[str, dict]:
"""Create an Execution Environment. Returns when it is created or raises an
exception on failing
####################################################################################
"""
- def _get_namespace_components(self, namespace: str) -> (str, str, str, str, str):
+ def _get_namespace_components(
+ self, namespace: str
+ ) -> tuple[str, str, str, str, str]:
"""
Split namespace components
# .format(str(status.value), detailed_status, vca_status, entity_type))
try:
-
the_table = db_dict["collection"]
the_filter = db_dict["filter"]
the_path = db_dict["path"]
)
from n2vc.n2vc_conn import N2VCConnector
from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml
-from n2vc.libjuju import Libjuju
+from n2vc.libjuju import Libjuju, retry_callback
from n2vc.store import MotorStore
from n2vc.utils import get_ee_id_components, generate_random_alfanum_string
from n2vc.vca.connection import get_connection
db: object,
fs: object,
log: object = None,
- loop: object = None,
on_update_db=None,
):
"""
:param: db: Database object from osm_common
:param: fs: Filesystem object from osm_common
:param: log: Logger
- :param: loop: Asyncio loop
:param: on_update_db: Callback function to be called for updating the database.
"""
# parent class constructor
- N2VCConnector.__init__(
- self,
- db=db,
- fs=fs,
- log=log,
- loop=loop,
- on_update_db=on_update_db,
- )
+ N2VCConnector.__init__(self, db=db, fs=fs, log=log, on_update_db=on_update_db)
# silence websocket traffic log
logging.getLogger("websockets.protocol").setLevel(logging.INFO)
db_uri = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"]).get("database_uri")
self._store = MotorStore(db_uri)
- self.loading_libjuju = asyncio.Lock(loop=self.loop)
+ self.loading_libjuju = asyncio.Lock()
self.delete_namespace_locks = {}
self.log.info("N2VC juju connector initialized")
# create or reuse a new juju machine
try:
if not await libjuju.model_exists(model_name):
- await libjuju.add_model(
- model_name,
- libjuju.vca_connection.lxd_cloud,
- )
+ await libjuju.add_model(model_name, libjuju.vca_connection.lxd_cloud)
machine, new = await libjuju.create_machine(
model_name=model_name,
machine_id=machine_id,
raise N2VCException(message=message)
# new machine credentials
- credentials = {
- "hostname": machine.dns_name,
- }
+ credentials = {"hostname": machine.dns_name}
self.log.info(
"Execution environment created. ee_id: {}, credentials: {}".format(
# register machine on juju
try:
if not await libjuju.model_exists(model_name):
- await libjuju.add_model(
- model_name,
- libjuju.vca_connection.lxd_cloud,
- )
+ await libjuju.add_model(model_name, libjuju.vca_connection.lxd_cloud)
machine_id = await libjuju.provision_machine(
model_name=model_name,
hostname=hostname,
# In case of native_charm is being deployed, if JujuApplicationExists error happens
# it will try to add_unit
- @retry(attempts=3, delay=5, retry_exceptions=(N2VCApplicationExists,), timeout=None)
+ @retry(
+ attempts=3,
+ delay=5,
+ retry_exceptions=(N2VCApplicationExists,),
+ timeout=None,
+ callback=retry_callback,
+ )
async def install_configuration_sw(
self,
ee_id: str,
_, ns_id, _, _, _ = self._get_namespace_components(namespace=namespace)
model_name = "{}-k8s".format(ns_id)
if not await libjuju.model_exists(model_name):
- await libjuju.add_model(
- model_name,
- libjuju.vca_connection.k8s_cloud,
- )
+ await libjuju.add_model(model_name, libjuju.vca_connection.k8s_cloud)
application_name = self._get_application_name(namespace)
try:
self.log.info("K8s proxy charm installed")
ee_id = N2VCJujuConnector._build_ee_id(
- model_name=model_name,
- application_name=application_name,
- machine_id="k8s",
+ model_name=model_name, application_name=application_name, machine_id="k8s"
)
self._write_ee_id_db(db_dict=db_dict, ee_id=ee_id)
return await libjuju.get_metrics(model_name, application_name)
async def add_relation(
- self,
- provider: RelationEndpoint,
- requirer: RelationEndpoint,
+ self, provider: RelationEndpoint, requirer: RelationEndpoint
):
"""
Add relation between two charmed endpoints
self.log.debug(f"adding new relation between {provider} and {requirer}")
cross_model_relation = (
provider.model_name != requirer.model_name
- or requirer.vca_id != requirer.vca_id
+ or provider.vca_id != requirer.vca_id
)
try:
if cross_model_relation:
requirer.model_name, offer, provider_libjuju
)
await requirer_libjuju.add_relation(
- requirer.model_name,
- requirer.endpoint,
- saas_name,
+ requirer.model_name, requirer.endpoint, saas_name
)
else:
# Standard relation
self.log.info("Deleting namespace={}".format(namespace))
will_not_delete = False
if namespace not in self.delete_namespace_locks:
- self.delete_namespace_locks[namespace] = asyncio.Lock(loop=self.loop)
+ self.delete_namespace_locks[namespace] = asyncio.Lock()
delete_lock = self.delete_namespace_locks[namespace]
while delete_lock.locked():
scaling_in: bool = False,
vca_type: str = None,
vca_id: str = None,
+ application_to_delete: str = None,
):
"""
Delete an execution environment
{collection: <str>, filter: {}, path: <str>},
e.g. {collection: "nsrs", filter:
{_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
- :param: total_timeout: Total timeout
- :param: scaling_in: Boolean to indicate if it is a scaling in operation
- :param: vca_type: VCA type
- :param: vca_id: VCA ID
+ :param total_timeout: Total timeout
+ :param scaling_in: Boolean to indicate if it is a scaling in operation
+ :param vca_type: VCA type
+ :param vca_id: VCA ID
+ :param application_to_delete: name of the single application to be deleted
"""
self.log.info("Deleting execution environment ee_id={}".format(ee_id))
libjuju = await self._get_libjuju(vca_id)
ee_id=ee_id
)
try:
- if not scaling_in:
- # destroy the model
- await libjuju.destroy_model(
+ if application_to_delete == application_name:
+ # destroy the application
+ await libjuju.destroy_application(
model_name=model_name,
+ application_name=application_name,
total_timeout=total_timeout,
)
+ # if model is empty delete it
+ controller = await libjuju.get_controller()
+ model = await libjuju.get_model(
+ controller=controller,
+ model_name=model_name,
+ )
+ if not model.applications:
+ self.log.info("Model {} is empty, deleting it".format(model_name))
+ await libjuju.destroy_model(
+ model_name=model_name,
+ total_timeout=total_timeout,
+ )
+ elif not scaling_in:
+ # destroy the model
+ await libjuju.destroy_model(
+ model_name=model_name, total_timeout=total_timeout
+ )
elif vca_type == "native_charm" and scaling_in:
# destroy the unit in the application
await libjuju.destroy_unit(
config=params_dict,
)
actions = await libjuju.get_actions(
- application_name=application_name,
- model_name=model_name,
+ application_name=application_name, model_name=model_name
)
self.log.debug(
"Application {} has these actions: {}".format(
)
try:
-
await libjuju.upgrade_charm(
application_name=application_name,
path=path,
if not self.libjuju:
async with self.loading_libjuju:
vca_connection = await get_connection(self._store)
- self.libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log)
+ self.libjuju = Libjuju(vca_connection, log=self.log)
return self.libjuju
else:
vca_connection = await get_connection(self._store, vca_id)
- return Libjuju(
- vca_connection,
- loop=self.loop,
- log=self.log,
- n2vc=self,
- )
+ return Libjuju(vca_connection, log=self.log, n2vc=self)
def _write_ee_id_db(self, db_dict: dict, ee_id: str):
-
# write ee_id to database: _admin.deployed.VCA.x
try:
the_table = db_dict["collection"]
:param: vca_id: VCA ID
"""
vca_connection = await get_connection(self._store, vca_id=vca_id)
- libjuju = Libjuju(vca_connection, loop=self.loop, log=self.log, n2vc=self)
+ libjuju = Libjuju(vca_connection, log=self.log, n2vc=self)
controller = await libjuju.get_controller()
await libjuju.disconnect_controller(controller)
# limitations under the License.
import abc
-import asyncio
-from base64 import b64decode
-import re
import typing
-from Crypto.Cipher import AES
from motor.motor_asyncio import AsyncIOMotorClient
from n2vc.config import EnvironConfig
from n2vc.vca.connection_data import ConnectionData
from osm_common.dbmongo import DbMongo, DbException
+from osm_common.dbbase import Encryption
+
DB_NAME = "osm"
class MotorStore(Store):
- def __init__(self, uri: str, loop=None):
+ def __init__(self, uri: str):
"""
Constructor
:param: uri: Connection string to connect to the database.
- :param: loop: Asyncio Loop
"""
self._client = AsyncIOMotorClient(uri)
- self.loop = loop or asyncio.get_event_loop()
self._secret_key = None
self._config = EnvironConfig(prefixes=["OSMLCM_", "OSMMON_"])
+ self.encryption = Encryption(
+ uri=uri,
+ config=self._config,
+ encoding_type="utf-8",
+ logger_name="db",
+ )
@property
def _database(self):
data = await self._vca_collection.find_one({"_id": vca_id})
if not data:
raise Exception("vca with id {} not found".format(vca_id))
- await self.decrypt_fields(
+ await self.encryption.decrypt_fields(
data,
["secret", "cacert"],
schema_version=data["schema_version"],
async def _get_juju_info(self):
"""Get Juju information (the default VCA) from the admin collection"""
return await self._admin_collection.find_one({"_id": "juju"})
-
- # DECRYPT METHODS
- async def decrypt_fields(
- self,
- item: dict,
- fields: typing.List[str],
- schema_version: str = None,
- salt: str = None,
- ):
- """
- Decrypt fields
-
- Decrypt fields from a dictionary. Follows the same logic as in osm_common.
-
- :param: item: Dictionary with the keys to be decrypted
- :param: fields: List of keys to decrypt
- :param: schema version: Schema version. (i.e. 1.11)
- :param: salt: Salt for the decryption
- """
- flags = re.I
-
- async def process(_item):
- if isinstance(_item, list):
- for elem in _item:
- await process(elem)
- elif isinstance(_item, dict):
- for key, val in _item.items():
- if isinstance(val, str):
- if any(re.search(f, key, flags) for f in fields):
- _item[key] = await self.decrypt(val, schema_version, salt)
- else:
- await process(val)
-
- await process(item)
-
- async def decrypt(self, value, schema_version=None, salt=None):
- """
- Decrypt an encrypted value
- :param value: value to be decrypted. It is a base64 string
- :param schema_version: used for known encryption method used. If None or '1.0' no encryption has been done.
- If '1.1' symmetric AES encryption has been done
- :param salt: optional salt to be used
- :return: Plain content of value
- """
- await self.get_secret_key()
- if not self.secret_key or not schema_version or schema_version == "1.0":
- return value
- else:
- secret_key = self._join_secret_key(salt)
- encrypted_msg = b64decode(value)
- cipher = AES.new(secret_key)
- decrypted_msg = cipher.decrypt(encrypted_msg)
- try:
- unpadded_private_msg = decrypted_msg.decode().rstrip("\0")
- except UnicodeDecodeError:
- raise DbException(
- "Cannot decrypt information. Are you using same COMMONKEY in all OSM components?",
- http_code=500,
- )
- return unpadded_private_msg
-
- def _join_secret_key(self, update_key: typing.Any) -> bytes:
- """
- Join key with secret key
-
- :param: update_key: str or bytes with the to update
-
- :return: Joined key
- """
- return self._join_keys(update_key, self.secret_key)
-
- def _join_keys(self, key: typing.Any, secret_key: bytes) -> bytes:
- """
- Join key with secret_key
-
- :param: key: str or bytesof the key to update
- :param: secret_key: bytes of the secret key
-
- :return: Joined key
- """
- if isinstance(key, str):
- update_key_bytes = key.encode()
- else:
- update_key_bytes = key
- new_secret_key = bytearray(secret_key) if secret_key else bytearray(32)
- for i, b in enumerate(update_key_bytes):
- new_secret_key[i % 32] ^= b
- return bytes(new_secret_key)
-
- @property
- def secret_key(self):
- return self._secret_key
-
- async def get_secret_key(self):
- """
- Get secret key using the database key and the serial key in the DB
- The key is populated in the property self.secret_key
- """
- if self.secret_key:
- return
- secret_key = None
- if self.database_key:
- secret_key = self._join_keys(self.database_key, None)
- version_data = await self._admin_collection.find_one({"_id": "version"})
- if version_data and version_data.get("serial"):
- secret_key = self._join_keys(b64decode(version_data["serial"]), secret_key)
- self._secret_key = secret_key
-
- @property
- def database_key(self):
- return self._config["database_commonkey"]
os.path.join(os.path.dirname(__file__), "testdata", filename),
"r",
) as self.upgrade_file:
-
all_changes = AsyncMock()
all_changes.Next.side_effect = self._fetch_next_delta
mock_all_watcher.return_value = all_changes
@asynctest.fail_on(active_handles=True)
async def test_repo_list(self):
-
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
await self.helm_conn.repo_list(self.cluster_uuid)
@asynctest.fail_on(active_handles=True)
async def test_repo_remove(self):
-
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
repo_name = "bitnami"
await self.helm_conn.repo_remove(self.cluster_uuid, repo_name)
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
self.helm_conn._status_kdu = asynctest.CoroutineMock(return_value=None)
self.helm_conn._store_status = asynctest.CoroutineMock()
+ self.helm_conn._repo_to_oci_url = Mock(return_value=None)
self.kdu_instance = "stable-openldap-0005399828"
self.helm_conn.generate_kdu_instance_name = Mock(return_value=self.kdu_instance)
self.helm_conn._get_namespaces = asynctest.CoroutineMock(return_value=[])
}
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
self.helm_conn._store_status = asynctest.CoroutineMock()
+ self.helm_conn._repo_to_oci_url = Mock(return_value=None)
self.helm_conn.get_instance_info = asynctest.CoroutineMock(
return_value=instance_info
)
}
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
self.helm_conn._store_status = asynctest.CoroutineMock()
+ self.helm_conn._repo_to_oci_url = Mock(return_value=None)
self.helm_conn.get_instance_info = asynctest.CoroutineMock(
return_value=instance_info
)
self.helm_conn.values_kdu = asynctest.CoroutineMock(return_value=kdu_values)
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
self.helm_conn._store_status = asynctest.CoroutineMock()
+ self.helm_conn._repo_to_oci_url = Mock(return_value=None)
self.helm_conn.get_instance_info = asynctest.CoroutineMock(
return_value=instance_info
)
"--namespace testk8s --atomic --output yaml --set replicaCount=2 --timeout 1800s "
"--reuse-values --version 1.2.3"
)
- self.helm_conn._local_async_exec.assert_called_once_with(
+ self.helm_conn._local_async_exec.assert_called_with(
command=command, env=self.env, raise_exception_on_error=False
)
# TEST-2
)
self.helm_conn.repo_remove.assert_not_called()
self.helm_conn.repo_add.assert_called_once_with(
- self.cluster_uuid, "bitnami", "https://charts.bitnami.com/bitnami"
+ self.cluster_uuid,
+ "bitnami",
+ "https://charts.bitnami.com/bitnami",
+ cert=None,
+ user=None,
+ password=None,
+ oci=False,
)
self.assertEqual(deleted_repo_list, [], "Deleted repo list should be empty")
self.assertEqual(
+++ /dev/null
-##
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: alfonso.tiernosepulveda@telefonica.com
-##
-
-import asynctest
-import logging
-
-from asynctest.mock import Mock
-from osm_common.dbmemory import DbMemory
-from osm_common.fslocal import FsLocal
-from n2vc.k8s_helm_conn import K8sHelmConnector
-
-__author__ = "Isabel Lloret <illoret@indra.es>"
-
-
-class TestK8sHelmConn(asynctest.TestCase):
- logging.basicConfig(level=logging.DEBUG)
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.DEBUG)
-
- async def setUp(self):
- self.db = Mock(DbMemory())
- self.fs = asynctest.Mock(FsLocal())
- self.fs.path = "./tmp/"
- self.namespace = "testk8s"
- self.service_account = "osm"
- self.cluster_id = "helm_cluster_id"
- self.cluster_uuid = self.cluster_id
- # pass fake kubectl and helm commands to make sure it does not call actual commands
- K8sHelmConnector._check_file_exists = asynctest.Mock(return_value=True)
- K8sHelmConnector._local_async_exec = asynctest.CoroutineMock(
- return_value=(0, "")
- )
- cluster_dir = self.fs.path + self.cluster_id
- self.kube_config = self.fs.path + self.cluster_id + "/.kube/config"
- self.helm_home = self.fs.path + self.cluster_id + "/.helm"
- self.env = {
- "HELM_HOME": "{}/.helm".format(cluster_dir),
- "KUBECONFIG": "{}/.kube/config".format(cluster_dir),
- }
- self.helm_conn = K8sHelmConnector(self.fs, self.db, log=self.logger)
- self.logger.debug("Set up executed")
-
- @asynctest.fail_on(active_handles=True)
- async def test_init_env(self):
- # TODO
- pass
-
- @asynctest.fail_on(active_handles=True)
- async def test_repo_add(self):
- repo_name = "bitnami"
- repo_url = "https://charts.bitnami.com/bitnami"
- self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
-
- await self.helm_conn.repo_add(self.cluster_uuid, repo_name, repo_url)
-
- self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(
- from_path=self.cluster_id
- )
- self.assertEqual(
- self.helm_conn._local_async_exec.call_count,
- 2,
- "local_async_exec expected 2 calls, called {}".format(
- self.helm_conn._local_async_exec.call_count
- ),
- )
-
- repo_update_command = (
- "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm repo update {}"
- ).format(repo_name)
- repo_add_command = (
- "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm repo add {} {}"
- ).format(repo_name, repo_url)
- calls = self.helm_conn._local_async_exec.call_args_list
- call0_kargs = calls[0][1]
- self.assertEqual(
- call0_kargs.get("command"),
- repo_add_command,
- "Invalid repo add command: {}".format(call0_kargs.get("command")),
- )
- self.assertEqual(
- call0_kargs.get("env"),
- self.env,
- "Invalid env for add command: {}".format(call0_kargs.get("env")),
- )
- call1_kargs = calls[1][1]
- self.assertEqual(
- call1_kargs.get("command"),
- repo_update_command,
- "Invalid repo update command: {}".format(call1_kargs.get("command")),
- )
- self.assertEqual(
- call1_kargs.get("env"),
- self.env,
- "Invalid env for update command: {}".format(call1_kargs.get("env")),
- )
-
- @asynctest.fail_on(active_handles=True)
- async def test_repo_list(self):
- self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
-
- await self.helm_conn.repo_list(self.cluster_uuid)
-
- self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(
- from_path=self.cluster_id
- )
- command = "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm repo list --output yaml"
- self.helm_conn._local_async_exec.assert_called_with(
- command=command, env=self.env, raise_exception_on_error=False
- )
-
- @asynctest.fail_on(active_handles=True)
- async def test_repo_remove(self):
- self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
- repo_name = "bitnami"
- await self.helm_conn.repo_remove(self.cluster_uuid, repo_name)
-
- self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(
- from_path=self.cluster_id
- )
- command = "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm repo remove {}".format(
- repo_name
- )
- self.helm_conn._local_async_exec.assert_called_once_with(
- command=command, env=self.env, raise_exception_on_error=True
- )
-
- @asynctest.fail_on(active_handles=True)
- async def test_install(self):
- kdu_model = "stable/openldap:1.2.2"
- kdu_instance = "stable-openldap-0005399828"
- db_dict = {}
- self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
- self.helm_conn._status_kdu = asynctest.CoroutineMock(return_value=None)
- self.helm_conn._store_status = asynctest.CoroutineMock()
- self.helm_conn.generate_kdu_instance_name = Mock(return_value=kdu_instance)
-
- await self.helm_conn.install(
- self.cluster_uuid,
- kdu_model,
- kdu_instance,
- atomic=True,
- namespace=self.namespace,
- db_dict=db_dict,
- )
-
- self.helm_conn.fs.sync.assert_has_calls(
- [
- asynctest.call(from_path=self.cluster_id),
- asynctest.call(from_path=self.cluster_id),
- ]
- )
- self.helm_conn.fs.reverse_sync.assert_has_calls(
- [
- asynctest.call(from_path=self.cluster_id),
- asynctest.call(from_path=self.cluster_id),
- ]
- )
- self.helm_conn._store_status.assert_called_with(
- cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace=self.namespace,
- db_dict=db_dict,
- operation="install",
- )
- command = (
- "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm install "
- "--atomic --output yaml --timeout 300 "
- "--name=stable-openldap-0005399828 --namespace testk8s stable/openldap "
- "--version 1.2.2"
- )
- self.helm_conn._local_async_exec.assert_called_with(
- command=command, env=self.env, raise_exception_on_error=False
- )
-
- @asynctest.fail_on(active_handles=True)
- async def test_upgrade_force_true(self):
- kdu_model = "stable/openldap:1.2.3"
- kdu_instance = "stable-openldap-0005399828"
- db_dict = {}
- instance_info = {
- "chart": "openldap-1.2.2",
- "name": kdu_instance,
- "namespace": self.namespace,
- "revision": 1,
- "status": "DEPLOYED",
- }
- self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
- self.helm_conn._store_status = asynctest.CoroutineMock()
- self.helm_conn.get_instance_info = asynctest.CoroutineMock(
- return_value=instance_info
- )
- # TEST-1 (--force true)
- await self.helm_conn.upgrade(
- self.cluster_uuid,
- kdu_instance,
- kdu_model,
- atomic=True,
- db_dict=db_dict,
- force=True,
- )
- self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_has_calls(
- [
- asynctest.call(from_path=self.cluster_id),
- asynctest.call(from_path=self.cluster_id),
- ]
- )
- self.helm_conn._store_status.assert_called_with(
- cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace=self.namespace,
- db_dict=db_dict,
- operation="upgrade",
- )
- command = (
- "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm upgrade --namespace testk8s "
- "--atomic --output yaml --timeout 300 --force --reuse-values stable-openldap-0005399828 stable/openldap "
- "--version 1.2.3"
- )
- self.helm_conn._local_async_exec.assert_called_with(
- command=command, env=self.env, raise_exception_on_error=False
- )
- # TEST-2 (--force false)
- await self.helm_conn.upgrade(
- self.cluster_uuid,
- kdu_instance,
- kdu_model,
- atomic=True,
- db_dict=db_dict,
- )
- self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_has_calls(
- [
- asynctest.call(from_path=self.cluster_id),
- asynctest.call(from_path=self.cluster_id),
- ]
- )
- self.helm_conn._store_status.assert_called_with(
- cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace=self.namespace,
- db_dict=db_dict,
- operation="upgrade",
- )
- command = (
- "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm upgrade --namespace testk8s "
- "--atomic --output yaml --timeout 300 --reuse-values stable-openldap-0005399828 stable/openldap "
- "--version 1.2.3"
- )
- self.helm_conn._local_async_exec.assert_called_with(
- command=command, env=self.env, raise_exception_on_error=False
- )
-
- @asynctest.fail_on(active_handles=True)
- async def test_upgrade_namespace(self):
- kdu_model = "stable/openldap:1.2.3"
- kdu_instance = "stable-openldap-0005399828"
- db_dict = {}
- instance_info = {
- "chart": "openldap-1.2.2",
- "name": kdu_instance,
- "namespace": self.namespace,
- "revision": 1,
- "status": "DEPLOYED",
- }
- self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
- self.helm_conn._store_status = asynctest.CoroutineMock()
- self.helm_conn.get_instance_info = asynctest.CoroutineMock(
- return_value=instance_info
- )
-
- await self.helm_conn.upgrade(
- self.cluster_uuid,
- kdu_instance,
- kdu_model,
- atomic=True,
- db_dict=db_dict,
- namespace="default",
- )
- self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_has_calls(
- [
- asynctest.call(from_path=self.cluster_id),
- asynctest.call(from_path=self.cluster_id),
- ]
- )
- self.helm_conn._store_status.assert_called_with(
- cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace="default",
- db_dict=db_dict,
- operation="upgrade",
- )
- command = (
- "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm upgrade --namespace default "
- "--atomic --output yaml --timeout 300 --reuse-values stable-openldap-0005399828 stable/openldap "
- "--version 1.2.3"
- )
- self.helm_conn._local_async_exec.assert_called_with(
- command=command, env=self.env, raise_exception_on_error=False
- )
-
- @asynctest.fail_on(active_handles=True)
- async def test_scale(self):
- kdu_model = "stable/openldap:1.2.3"
- kdu_instance = "stable-openldap-0005399828"
- db_dict = {}
- instance_info = {
- "chart": "openldap-1.2.3",
- "name": kdu_instance,
- "namespace": self.namespace,
- "revision": 1,
- "status": "DEPLOYED",
- }
- repo_list = [
- {
- "name": "stable",
- "url": "https://kubernetes-charts.storage.googleapis.com/",
- }
- ]
- kdu_values = """
- # Default values for openldap.
- # This is a YAML-formatted file.
- # Declare variables to be passed into your templates.
-
- replicaCount: 1
- dummy-app:
- replicas: 2
- """
-
- self.helm_conn.repo_list = asynctest.CoroutineMock(return_value=repo_list)
- self.helm_conn.values_kdu = asynctest.CoroutineMock(return_value=kdu_values)
- self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
- self.helm_conn._store_status = asynctest.CoroutineMock()
- self.helm_conn.get_instance_info = asynctest.CoroutineMock(
- return_value=instance_info
- )
-
- # TEST-1
- await self.helm_conn.scale(
- kdu_instance,
- 2,
- "",
- kdu_model=kdu_model,
- cluster_uuid=self.cluster_uuid,
- atomic=True,
- db_dict=db_dict,
- )
- command = (
- "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config "
- "/usr/bin/helm upgrade --namespace testk8s --atomic --output yaml --set replicaCount=2 "
- "--timeout 1800 --reuse-values stable-openldap-0005399828 stable/openldap "
- "--version 1.2.3"
- )
- self.helm_conn._local_async_exec.assert_called_once_with(
- command=command, env=self.env, raise_exception_on_error=False
- )
-
- # TEST-2
- await self.helm_conn.scale(
- kdu_instance,
- 3,
- "dummy-app",
- kdu_model=kdu_model,
- cluster_uuid=self.cluster_uuid,
- atomic=True,
- db_dict=db_dict,
- )
- command = (
- "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config "
- "/usr/bin/helm upgrade --namespace testk8s --atomic --output yaml --set dummy-app.replicas=3 "
- "--timeout 1800 --reuse-values stable-openldap-0005399828 stable/openldap "
- "--version 1.2.3"
- )
- self.helm_conn._local_async_exec.assert_called_with(
- command=command, env=self.env, raise_exception_on_error=False
- )
- self.helm_conn.fs.reverse_sync.assert_called_with(from_path=self.cluster_id)
- self.helm_conn._store_status.assert_called_with(
- cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace=self.namespace,
- db_dict=db_dict,
- operation="scale",
- )
-
- @asynctest.fail_on(active_handles=True)
- async def test_rollback(self):
- kdu_instance = "stable-openldap-0005399828"
- db_dict = {}
- instance_info = {
- "chart": "openldap-1.2.3",
- "name": kdu_instance,
- "namespace": self.namespace,
- "revision": 2,
- "status": "DEPLOYED",
- }
- self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
- self.helm_conn._store_status = asynctest.CoroutineMock()
- self.helm_conn.get_instance_info = asynctest.CoroutineMock(
- return_value=instance_info
- )
-
- await self.helm_conn.rollback(
- self.cluster_uuid, kdu_instance=kdu_instance, revision=1, db_dict=db_dict
- )
- self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(
- from_path=self.cluster_id
- )
- self.helm_conn._store_status.assert_called_with(
- cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace=self.namespace,
- db_dict=db_dict,
- operation="rollback",
- )
- command = (
- "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config "
- "/usr/bin/helm rollback stable-openldap-0005399828 1 --wait"
- )
- self.helm_conn._local_async_exec.assert_called_once_with(
- command=command, env=self.env, raise_exception_on_error=False
- )
-
- @asynctest.fail_on(active_handles=True)
- async def test_uninstall(self):
- kdu_instance = "stable-openldap-0005399828"
- instance_info = {
- "chart": "openldap-1.2.2",
- "name": kdu_instance,
- "namespace": self.namespace,
- "revision": 3,
- "status": "DEPLOYED",
- }
- self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
- self.helm_conn._store_status = asynctest.CoroutineMock()
- self.helm_conn.get_instance_info = asynctest.CoroutineMock(
- return_value=instance_info
- )
-
- await self.helm_conn.uninstall(self.cluster_uuid, kdu_instance)
- self.helm_conn.fs.sync.assert_called_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(
- from_path=self.cluster_id
- )
- command = "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm delete --purge {}".format(
- kdu_instance
- )
- self.helm_conn._local_async_exec.assert_called_once_with(
- command=command, env=self.env, raise_exception_on_error=True
- )
-
- @asynctest.fail_on(active_handles=True)
- async def test_get_services(self):
- kdu_instance = "test_services_1"
- service = {"name": "testservice", "type": "LoadBalancer"}
- self.helm_conn._local_async_exec_pipe = asynctest.CoroutineMock(
- return_value=("", 0)
- )
- self.helm_conn._parse_services = Mock(return_value=["testservice"])
- self.helm_conn._get_service = asynctest.CoroutineMock(return_value=service)
-
- services = await self.helm_conn.get_services(
- self.cluster_uuid, kdu_instance, self.namespace
- )
- self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(
- from_path=self.cluster_id
- )
- self.helm_conn._parse_services.assert_called_once()
- command1 = "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm get manifest {} ".format(
- kdu_instance
- )
- command2 = "/usr/bin/kubectl get --namespace={} -f -".format(self.namespace)
- self.helm_conn._local_async_exec_pipe.assert_called_once_with(
- command1, command2, env=self.env, raise_exception_on_error=True
- )
- self.assertEqual(
- services, [service], "Invalid service returned from get_service"
- )
-
- @asynctest.fail_on(active_handles=True)
- async def test_get_service(self):
- service_name = "service1"
-
- self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
- await self.helm_conn.get_service(
- self.cluster_uuid, service_name, self.namespace
- )
-
- self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(
- from_path=self.cluster_id
- )
- command = (
- "/usr/bin/kubectl --kubeconfig=./tmp/helm_cluster_id/.kube/config "
- "--namespace=testk8s get service service1 -o=yaml"
- )
- self.helm_conn._local_async_exec.assert_called_once_with(
- command=command, env=self.env, raise_exception_on_error=True
- )
-
- @asynctest.fail_on(active_handles=True)
- async def test_inspect_kdu(self):
- self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
-
- kdu_model = "stable/openldap:1.2.4"
- repo_url = "https://kubernetes-charts.storage.googleapis.com/"
- await self.helm_conn.inspect_kdu(kdu_model, repo_url)
-
- command = (
- "/usr/bin/helm inspect openldap --repo "
- "https://kubernetes-charts.storage.googleapis.com/ "
- "--version 1.2.4"
- )
- self.helm_conn._local_async_exec.assert_called_with(command=command)
-
- @asynctest.fail_on(active_handles=True)
- async def test_help_kdu(self):
- self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
-
- kdu_model = "stable/openldap:1.2.4"
- repo_url = "https://kubernetes-charts.storage.googleapis.com/"
- await self.helm_conn.help_kdu(kdu_model, repo_url)
-
- command = (
- "/usr/bin/helm inspect readme openldap --repo "
- "https://kubernetes-charts.storage.googleapis.com/ "
- "--version 1.2.4"
- )
- self.helm_conn._local_async_exec.assert_called_with(command=command)
-
- @asynctest.fail_on(active_handles=True)
- async def test_values_kdu(self):
- self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
-
- kdu_model = "stable/openldap:1.2.4"
- repo_url = "https://kubernetes-charts.storage.googleapis.com/"
- await self.helm_conn.values_kdu(kdu_model, repo_url)
-
- command = (
- "/usr/bin/helm inspect values openldap --repo "
- "https://kubernetes-charts.storage.googleapis.com/ "
- "--version 1.2.4"
- )
- self.helm_conn._local_async_exec.assert_called_with(command=command)
-
- @asynctest.fail_on(active_handles=True)
- async def test_get_values_kdu(self):
- self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
-
- kdu_instance = "stable-openldap-0005399828"
- await self.helm_conn.get_values_kdu(
- kdu_instance, self.namespace, self.env["KUBECONFIG"]
- )
-
- command = (
- "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm get values "
- "stable-openldap-0005399828 --output yaml"
- )
- self.helm_conn._local_async_exec.assert_called_with(command=command)
-
- @asynctest.fail_on(active_handles=True)
- async def test_instances_list(self):
- self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
-
- await self.helm_conn.instances_list(self.cluster_uuid)
- self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.reverse_sync.assert_called_once_with(
- from_path=self.cluster_id
- )
- command = "/usr/bin/helm list --output yaml"
- self.helm_conn._local_async_exec.assert_called_once_with(
- command=command, env=self.env, raise_exception_on_error=True
- )
-
- @asynctest.fail_on(active_handles=True)
- async def test_status_kdu(self):
- kdu_instance = "stable-openldap-0005399828"
- self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
-
- await self.helm_conn._status_kdu(
- self.cluster_id, kdu_instance, self.namespace, yaml_format=True
- )
- command = (
- "env KUBECONFIG=./tmp/helm_cluster_id/.kube/config /usr/bin/helm status {} --output yaml"
- ).format(kdu_instance)
- self.helm_conn._local_async_exec.assert_called_once_with(
- command=command,
- env=self.env,
- raise_exception_on_error=True,
- show_error_log=False,
- )
-
- @asynctest.fail_on(active_handles=True)
- async def test_store_status(self):
- kdu_instance = "stable-openldap-0005399828"
- db_dict = {}
- status = {
- "info": {
- "description": "Install complete",
- "status": {
- "code": "1",
- "notes": "The openldap helm chart has been installed",
- },
- }
- }
- self.helm_conn._status_kdu = asynctest.CoroutineMock(return_value=status)
- self.helm_conn.write_app_status_to_db = asynctest.CoroutineMock(
- return_value=status
- )
-
- await self.helm_conn._store_status(
- cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace=self.namespace,
- db_dict=db_dict,
- operation="install",
- )
- self.helm_conn._status_kdu.assert_called_once_with(
- cluster_id=self.cluster_id,
- kdu_instance=kdu_instance,
- namespace=self.namespace,
- yaml_format=False,
- )
- self.helm_conn.write_app_status_to_db.assert_called_once_with(
- db_dict=db_dict,
- status="Install complete",
- detailed_status=str(status),
- operation="install",
- )
-
- @asynctest.fail_on(active_handles=True)
- async def test_reset_uninstall_false(self):
- self.helm_conn._uninstall_sw = asynctest.CoroutineMock()
-
- await self.helm_conn.reset(self.cluster_uuid, force=False, uninstall_sw=False)
- self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.file_delete.assert_called_once_with(
- self.cluster_id, ignore_non_exist=True
- )
- self.helm_conn._uninstall_sw.assert_not_called()
-
- @asynctest.fail_on(active_handles=True)
- async def test_reset_uninstall(self):
- kdu_instance = "stable-openldap-0021099429"
- instances = [
- {
- "app_version": "2.4.48",
- "chart": "openldap-1.2.3",
- "name": kdu_instance,
- "namespace": self.namespace,
- "revision": "1",
- "status": "deployed",
- "updated": "2020-10-30 11:11:20.376744191 +0000 UTC",
- }
- ]
- self.helm_conn._get_namespace = Mock(return_value=self.namespace)
- self.helm_conn._uninstall_sw = asynctest.CoroutineMock()
- self.helm_conn.instances_list = asynctest.CoroutineMock(return_value=instances)
- self.helm_conn.uninstall = asynctest.CoroutineMock()
-
- await self.helm_conn.reset(self.cluster_uuid, force=True, uninstall_sw=True)
- self.helm_conn.fs.sync.assert_called_once_with(from_path=self.cluster_id)
- self.helm_conn.fs.file_delete.assert_called_once_with(
- self.cluster_id, ignore_non_exist=True
- )
- self.helm_conn._get_namespace.assert_called_once_with(
- cluster_uuid=self.cluster_uuid
- )
- self.helm_conn.instances_list.assert_called_once_with(
- cluster_uuid=self.cluster_uuid
- )
- self.helm_conn.uninstall.assert_called_once_with(
- cluster_uuid=self.cluster_uuid, kdu_instance=kdu_instance
- )
- self.helm_conn._uninstall_sw.assert_called_once_with(
- cluster_id=self.cluster_id, namespace=self.namespace
- )
-
- @asynctest.fail_on(active_handles=True)
- async def test_uninstall_sw_namespace(self):
- self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
-
- await self.helm_conn._uninstall_sw(self.cluster_id, self.namespace)
- calls = self.helm_conn._local_async_exec.call_args_list
- self.assertEqual(
- len(calls), 3, "To uninstall should have executed three commands"
- )
- call0_kargs = calls[0][1]
- command_0 = "/usr/bin/helm --kubeconfig={} --home={} reset".format(
- self.kube_config, self.helm_home
- )
- self.assertEqual(
- call0_kargs,
- {"command": command_0, "raise_exception_on_error": True, "env": self.env},
- "Invalid args for first call to local_exec",
- )
- call1_kargs = calls[1][1]
- command_1 = (
- "/usr/bin/kubectl --kubeconfig={} delete "
- "clusterrolebinding.rbac.authorization.k8s.io/osm-tiller-cluster-rule".format(
- self.kube_config
- )
- )
- self.assertEqual(
- call1_kargs,
- {"command": command_1, "raise_exception_on_error": False, "env": self.env},
- "Invalid args for second call to local_exec",
- )
- call2_kargs = calls[2][1]
- command_2 = (
- "/usr/bin/kubectl --kubeconfig={} --namespace {} delete "
- "serviceaccount/{}".format(
- self.kube_config, self.namespace, self.service_account
- )
- )
- self.assertEqual(
- call2_kargs,
- {"command": command_2, "raise_exception_on_error": False, "env": self.env},
- "Invalid args for third call to local_exec",
- )
from n2vc.k8s_juju_conn import K8sJujuConnector, RBAC_LABEL_KEY_NAME
from osm_common import fslocal
from .utils import kubeconfig, FakeModel, FakeFileWrapper, AsyncMock, FakeApplication
-from n2vc.exceptions import (
- MethodNotImplemented,
- K8sException,
-)
+from n2vc.exceptions import MethodNotImplemented, K8sException
from n2vc.vca.connection_data import ConnectionData
fs=fslocal.FsLocal(),
db=self.db,
log=None,
- loop=self.loop,
on_update_db=None,
)
self.k8s_juju_conn._store.get_vca_id.return_value = None
kdu_name=self.kdu_name,
db_dict=self.db_dict,
timeout=1800,
+ params=None,
)
)
self.assertEqual(mock_chdir.call_count, 2)
model_name=self.default_namespace,
wait=True,
timeout=1800,
+ instantiation_params=None,
)
def test_success_cs(self, mock_chdir):
kdu_name=self.kdu_name,
db_dict=self.db_dict,
timeout=1800,
+ params={},
)
)
self.k8s_juju_conn.libjuju.add_model.assert_called_once()
model_name=self.default_namespace,
wait=True,
timeout=1800,
+ instantiation_params=None,
)
def test_success_http(self, mock_chdir):
+ params = {"overlay": {"applications": {"squid": {"scale": 2}}}}
self.loop.run_until_complete(
self.k8s_juju_conn.install(
self.cluster_uuid,
kdu_name=self.kdu_name,
db_dict=self.db_dict,
timeout=1800,
+ params=params,
)
)
self.k8s_juju_conn.libjuju.add_model.assert_called_once()
model_name=self.default_namespace,
wait=True,
timeout=1800,
+ instantiation_params=params.get("overlay"),
)
def test_success_not_kdu_name(self, mock_chdir):
+ params = {"some_key": {"applications": {"squid": {"scale": 2}}}}
self.loop.run_until_complete(
self.k8s_juju_conn.install(
self.cluster_uuid,
atomic=True,
db_dict=self.db_dict,
timeout=1800,
+ params=params,
)
)
self.k8s_juju_conn.libjuju.add_model.assert_called_once()
model_name=self.default_namespace,
wait=True,
timeout=1800,
+ instantiation_params=None,
)
def test_missing_db_dict(self, mock_chdir):
model_name=self.default_namespace,
wait=True,
timeout=1800,
+ instantiation_params=None,
)
def test_missing_bundle(self, mock_chdir):
model_name=self.default_namespace,
wait=True,
timeout=1800,
+ instantiation_params=None,
)
self.k8s_juju_conn.libjuju.get_controller = AsyncMock()
self.k8s_juju_conn.libjuju.consume = AsyncMock()
- def test_standard_relation(self):
- relation_endpoint_1 = RelationEndpoint("model-1.app1.0", None, "endpoint")
- relation_endpoint_2 = RelationEndpoint("model-1.app2.1", None, "endpoint")
+ def test_standard_relation_same_model_and_controller(self):
+ relation_endpoint_1 = RelationEndpoint("model-1.app1.0", None, "endpoint1")
+ relation_endpoint_2 = RelationEndpoint("model-1.app2.1", None, "endpoint2")
self.loop.run_until_complete(
self.k8s_juju_conn.add_relation(relation_endpoint_1, relation_endpoint_2)
)
self.k8s_juju_conn.libjuju.add_relation.assert_called_once_with(
- model_name="model-1", endpoint_1="app1:endpoint", endpoint_2="app2:endpoint"
+ model_name="model-1",
+ endpoint_1="app1:endpoint1",
+ endpoint_2="app2:endpoint2",
)
self.k8s_juju_conn.libjuju.offer.assert_not_called()
self.k8s_juju_conn.libjuju.consume.assert_not_called()
"model-2", "app2:endpoint", "saas"
)
+ def test_cmr_relation_different_controller(self):
+ self.k8s_juju_conn._get_libjuju = AsyncMock(
+ return_value=self.k8s_juju_conn.libjuju
+ )
+ relation_endpoint_1 = RelationEndpoint("model-1.app1.0", "vca-id-1", "endpoint")
+ relation_endpoint_2 = RelationEndpoint("model-1.app2.1", "vca-id-2", "endpoint")
+ offer = Offer("admin/model-1.app1")
+ self.k8s_juju_conn.libjuju.offer.return_value = offer
+ self.k8s_juju_conn.libjuju.consume.return_value = "saas"
+ self.loop.run_until_complete(
+ self.k8s_juju_conn.add_relation(relation_endpoint_1, relation_endpoint_2)
+ )
+ self.k8s_juju_conn.libjuju.offer.assert_called_once_with(relation_endpoint_1)
+ self.k8s_juju_conn.libjuju.consume.assert_called_once()
+ self.k8s_juju_conn.libjuju.add_relation.assert_called_once_with(
+ "model-1", "app2:endpoint", "saas"
+ )
+
def test_relation_exception(self):
relation_endpoint_1 = RelationEndpoint("model-1.app1.0", None, "endpoint")
relation_endpoint_2 = RelationEndpoint("model-2.app2.1", None, "endpoint")
# See the License for the specific language governing permissions and
# limitations under the License.
+import asynctest
+import yaml
+import os
from unittest import TestCase, mock
-from n2vc.kubectl import Kubectl, CORE_CLIENT
+from n2vc.kubectl import Kubectl, CORE_CLIENT, CUSTOM_OBJECT_CLIENT
from n2vc.utils import Dict
from kubernetes.client.rest import ApiException
from kubernetes.client import (
V1Secret,
V1ServiceAccount,
V1SecretReference,
+ V1Role,
+ V1RoleBinding,
+ V1RoleRef,
+ V1Subject,
+ V1PolicyRule,
+ V1Namespace,
)
return self._items
+class FakeK8sRoleList:
+ def __init__(self, items=[]):
+ self._items = items
+
+ @property
+ def items(self):
+ return self._items
+
+
+class FakeK8sRoleBindingList:
+ def __init__(self, items=[]):
+ self._items = items
+
+ @property
+ def items(self):
+ return self._items
+
+
class FakeK8sVersionApiCode:
def __init__(self, major: str, minor: str):
self._major = major
)
mock_create_service_account.assert_called()
mock_create_secret.assert_not_called()
+
+
+@mock.patch("kubernetes.client.CustomObjectsApi.create_namespaced_custom_object")
+class CreateCertificateClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(CreateCertificateClass, self).setUp()
+ self.namespace = "osm"
+ self.name = "test-cert"
+ self.dns_prefix = "*"
+ self.secret_name = "test-cert-secret"
+ self.usages = ["server auth"]
+ self.issuer_name = "ca-issuer"
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_certificate_is_created(
+ self,
+ mock_create_certificate,
+ ):
+ with open(
+ os.path.join(
+ os.path.dirname(__file__), "testdata", "test_certificate.yaml"
+ ),
+ "r",
+ ) as test_certificate:
+ certificate_body = yaml.safe_load(test_certificate.read())
+ print(certificate_body)
+ await self.kubectl.create_certificate(
+ namespace=self.namespace,
+ name=self.name,
+ dns_prefix=self.dns_prefix,
+ secret_name=self.secret_name,
+ usages=self.usages,
+ issuer_name=self.issuer_name,
+ )
+ mock_create_certificate.assert_called_once_with(
+ group="cert-manager.io",
+ plural="certificates",
+ version="v1",
+ body=certificate_body,
+ namespace=self.namespace,
+ )
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_no_exception_if_alreadyexists(
+ self,
+ mock_create_certificate,
+ ):
+ api_exception = ApiException()
+ api_exception.body = '{"reason": "AlreadyExists"}'
+ self.kubectl.clients[
+ CUSTOM_OBJECT_CLIENT
+ ].create_namespaced_custom_object.side_effect = api_exception
+ raised = False
+ try:
+ await self.kubectl.create_certificate(
+ namespace=self.namespace,
+ name=self.name,
+ dns_prefix=self.dns_prefix,
+ secret_name=self.secret_name,
+ usages=self.usages,
+ issuer_name=self.issuer_name,
+ )
+ except Exception:
+ raised = True
+ self.assertFalse(raised, "An exception was raised")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_other_exceptions(
+ self,
+ mock_create_certificate,
+ ):
+ self.kubectl.clients[
+ CUSTOM_OBJECT_CLIENT
+ ].create_namespaced_custom_object.side_effect = Exception()
+ with self.assertRaises(Exception):
+ await self.kubectl.create_certificate(
+ namespace=self.namespace,
+ name=self.name,
+ dns_prefix=self.dns_prefix,
+ secret_name=self.secret_name,
+ usages=self.usages,
+ issuer_name=self.issuer_name,
+ )
+
+
+@mock.patch("kubernetes.client.CustomObjectsApi.delete_namespaced_custom_object")
+class DeleteCertificateClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(DeleteCertificateClass, self).setUp()
+ self.namespace = "osm"
+ self.object_name = "test-cert"
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_no_exception_if_notfound(
+ self,
+ mock_create_certificate,
+ ):
+ api_exception = ApiException()
+ api_exception.body = '{"reason": "NotFound"}'
+ self.kubectl.clients[
+ CUSTOM_OBJECT_CLIENT
+ ].delete_namespaced_custom_object.side_effect = api_exception
+ raised = False
+ try:
+ await self.kubectl.delete_certificate(
+ namespace=self.namespace,
+ object_name=self.object_name,
+ )
+ except Exception:
+ raised = True
+ self.assertFalse(raised, "An exception was raised")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_other_exceptions(
+ self,
+ mock_create_certificate,
+ ):
+ self.kubectl.clients[
+ CUSTOM_OBJECT_CLIENT
+ ].delete_namespaced_custom_object.side_effect = Exception()
+ with self.assertRaises(Exception):
+ await self.kubectl.delete_certificate(
+ namespace=self.namespace,
+ object_name=self.object_name,
+ )
+
+
+@mock.patch("kubernetes.client.RbacAuthorizationV1Api.create_namespaced_role")
+@mock.patch("kubernetes.client.RbacAuthorizationV1Api.list_namespaced_role")
+class CreateRoleClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(CreateRoleClass, self).setUp()
+ self.name = "role"
+ self.namespace = "osm"
+ self.resources = ["*"]
+ self.api_groups = ["*"]
+ self.verbs = ["*"]
+ self.labels = {}
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def assert_create_role(self, mock_create_role):
+ metadata = V1ObjectMeta(
+ name=self.name, labels=self.labels, namespace=self.namespace
+ )
+ role = V1Role(
+ metadata=metadata,
+ rules=[
+ V1PolicyRule(
+ api_groups=self.api_groups,
+ resources=self.resources,
+ verbs=self.verbs,
+ ),
+ ],
+ )
+ await self.kubectl.create_role(
+ namespace=self.namespace,
+ api_groups=self.api_groups,
+ name=self.name,
+ resources=self.resources,
+ verbs=self.verbs,
+ labels=self.labels,
+ )
+ mock_create_role.assert_called_once_with(self.namespace, role)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_raise_exception_if_role_already_exists(
+ self,
+ mock_list_role,
+ mock_create_role,
+ ):
+ mock_list_role.return_value = FakeK8sRoleList(items=[1])
+ with self.assertRaises(Exception) as context:
+ await self.kubectl.create_role(
+ self.name,
+ self.labels,
+ self.api_groups,
+ self.resources,
+ self.verbs,
+ self.namespace,
+ )
+ self.assertTrue(
+ "Role with metadata.name={} already exists".format(self.name)
+ in str(context.exception)
+ )
+ mock_create_role.assert_not_called()
+
+
+@mock.patch("kubernetes.client.RbacAuthorizationV1Api.create_namespaced_role_binding")
+@mock.patch("kubernetes.client.RbacAuthorizationV1Api.list_namespaced_role_binding")
+class CreateRoleBindingClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(CreateRoleBindingClass, self).setUp()
+ self.name = "rolebinding"
+ self.namespace = "osm"
+ self.role_name = "role"
+ self.sa_name = "Default"
+ self.labels = {}
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def assert_create_role_binding(self, mock_create_role_binding):
+ role_binding = V1RoleBinding(
+ metadata=V1ObjectMeta(name=self.name, labels=self.labels),
+ role_ref=V1RoleRef(kind="Role", name=self.role_name, api_group=""),
+ subjects=[
+ V1Subject(
+ kind="ServiceAccount",
+ name=self.sa_name,
+ namespace=self.namespace,
+ )
+ ],
+ )
+ await self.kubectl.create_role_binding(
+ namespace=self.namespace,
+ role_name=self.role_name,
+ name=self.name,
+ sa_name=self.sa_name,
+ labels=self.labels,
+ )
+ mock_create_role_binding.assert_called_once_with(self.namespace, role_binding)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_raise_exception_if_role_binding_already_exists(
+ self,
+ mock_list_role_binding,
+ mock_create_role_binding,
+ ):
+ mock_list_role_binding.return_value = FakeK8sRoleBindingList(items=[1])
+ with self.assertRaises(Exception) as context:
+ await self.kubectl.create_role_binding(
+ self.name,
+ self.role_name,
+ self.sa_name,
+ self.labels,
+ self.namespace,
+ )
+ self.assertTrue(
+ "Role Binding with metadata.name={} already exists".format(self.name)
+ in str(context.exception)
+ )
+ mock_create_role_binding.assert_not_called()
+
+
+@mock.patch("kubernetes.client.CoreV1Api.create_namespaced_secret")
+class CreateSecretClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(CreateSecretClass, self).setUp()
+ self.name = "secret"
+ self.namespace = "osm"
+ self.data = {"test": "1234"}
+ self.secret_type = "Opaque"
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def assert_create_secret(self, mock_create_secret):
+ secret_metadata = V1ObjectMeta(name=self.name, namespace=self.namespace)
+ secret = V1Secret(
+ metadata=secret_metadata,
+ data=self.data,
+ type=self.secret_type,
+ )
+ await self.kubectl.create_secret(
+ namespace=self.namespace,
+ data=self.data,
+ name=self.name,
+ secret_type=self.secret_type,
+ )
+ mock_create_secret.assert_called_once_with(self.namespace, secret)
+
+
+@mock.patch("kubernetes.client.CoreV1Api.create_namespace")
+class CreateNamespaceClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(CreateNamespaceClass, self).setUp()
+ self.namespace = "osm"
+ self.labels = {"key": "value"}
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_namespace_is_created(
+ self,
+ mock_create_namespace,
+ ):
+ metadata = V1ObjectMeta(name=self.namespace, labels=self.labels)
+ namespace = V1Namespace(
+ metadata=metadata,
+ )
+ await self.kubectl.create_namespace(
+ name=self.namespace,
+ labels=self.labels,
+ )
+ mock_create_namespace.assert_called_once_with(namespace)
+
+ async def test_namespace_is_created_default_labels(
+ self,
+ mock_create_namespace,
+ ):
+ metadata = V1ObjectMeta(name=self.namespace, labels=None)
+ namespace = V1Namespace(
+ metadata=metadata,
+ )
+ await self.kubectl.create_namespace(
+ name=self.namespace,
+ )
+ mock_create_namespace.assert_called_once_with(namespace)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_no_exception_if_alreadyexists(
+ self,
+ mock_create_namespace,
+ ):
+ api_exception = ApiException()
+ api_exception.body = '{"reason": "AlreadyExists"}'
+ self.kubectl.clients[CORE_CLIENT].create_namespace.side_effect = api_exception
+ raised = False
+ try:
+ await self.kubectl.create_namespace(
+ name=self.namespace,
+ )
+ except Exception:
+ raised = True
+ self.assertFalse(raised, "An exception was raised")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_other_exceptions(
+ self,
+ mock_create_namespace,
+ ):
+ self.kubectl.clients[CORE_CLIENT].create_namespace.side_effect = Exception()
+ with self.assertRaises(Exception):
+ await self.kubectl.create_namespace(
+ name=self.namespace,
+ )
+
+
+@mock.patch("kubernetes.client.CoreV1Api.delete_namespace")
+class DeleteNamespaceClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(DeleteNamespaceClass, self).setUp()
+ self.namespace = "osm"
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_no_exception_if_notfound(
+ self,
+ mock_delete_namespace,
+ ):
+ api_exception = ApiException()
+ api_exception.body = '{"reason": "NotFound"}'
+ self.kubectl.clients[CORE_CLIENT].delete_namespace.side_effect = api_exception
+ raised = False
+ try:
+ await self.kubectl.delete_namespace(
+ name=self.namespace,
+ )
+ except Exception:
+ raised = True
+ self.assertFalse(raised, "An exception was raised")
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_other_exceptions(
+ self,
+ mock_delete_namespace,
+ ):
+ self.kubectl.clients[CORE_CLIENT].delete_namespace.side_effect = Exception()
+ with self.assertRaises(Exception):
+ await self.kubectl.delete_namespace(
+ name=self.namespace,
+ )
+
+
+@mock.patch("kubernetes.client.CoreV1Api.read_namespaced_secret")
+class GetSecretContentClass(asynctest.TestCase):
+ @mock.patch("kubernetes.config.load_kube_config")
+ def setUp(self, mock_load_kube_config):
+ super(GetSecretContentClass, self).setUp()
+ self.name = "my_secret"
+ self.namespace = "osm"
+ self.data = {"my_key": "my_value"}
+ self.type = "Opaque"
+ self.kubectl = Kubectl()
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_return_type_is_dict(
+ self,
+ mock_read_namespaced_secret,
+ ):
+ metadata = V1ObjectMeta(name=self.name, namespace=self.namespace)
+ secret = V1Secret(metadata=metadata, data=self.data, type=self.type)
+ mock_read_namespaced_secret.return_value = secret
+ content = await self.kubectl.get_secret_content(self.name, self.namespace)
+ assert type(content) is dict
}
)
logging.disable(logging.CRITICAL)
- self.libjuju = Libjuju(vca_connection, self.loop)
+ self.libjuju = Libjuju(vca_connection)
self.loop.run_until_complete(self.libjuju.disconnect())
# TODO test provision machine
+@asynctest.mock.patch("os.remove")
+@asynctest.mock.patch("n2vc.libjuju.yaml.dump")
+@asynctest.mock.patch("builtins.open", create=True)
@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_controller")
@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_model")
@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_model")
@asynctest.mock.patch("n2vc.libjuju.Libjuju.disconnect_controller")
@asynctest.mock.patch("n2vc.juju_watcher.JujuModelWatcher.wait_for_model")
@asynctest.mock.patch("juju.model.Model.deploy")
+@asynctest.mock.patch("juju.model.CharmhubDeployType.resolve")
+@asynctest.mock.patch("n2vc.libjuju.BundleHandler")
+@asynctest.mock.patch("juju.url.URL.parse")
class DeployTest(LibjujuTestCase):
def setUp(self):
super(DeployTest, self).setUp()
+ self.instantiation_params = {"applications": {"squid": {"scale": 2}}}
+ self.architecture = "amd64"
+ self.uri = "cs:osm"
+ self.url = AsyncMock()
+ self.url.schema = juju.url.Schema.CHARM_HUB
+ self.bundle_instance = None
+
+ def setup_bundle_download_mocks(
+ self, mock_url_parse, mock_bundle, mock_resolve, mock_get_model
+ ):
+ mock_url_parse.return_value = self.url
+ mock_bundle.return_value = AsyncMock()
+ mock_resolve.return_value = AsyncMock()
+ mock_resolve.origin = AsyncMock()
+ mock_get_model.return_value = juju.model.Model()
+ self.bundle_instance = mock_bundle.return_value
+ self.bundle_instance.applications = {"squid"}
+
+ def assert_overlay_file_is_written(self, filename, mocked_file, mock_yaml, mock_os):
+ mocked_file.assert_called_once_with(filename, "w")
+ mock_yaml.assert_called_once_with(
+ self.instantiation_params, mocked_file.return_value.__enter__.return_value
+ )
+ mock_os.assert_called_once_with(filename)
+
+ def assert_overlay_file_is_not_written(self, mocked_file, mock_yaml, mock_os):
+ mocked_file.assert_not_called()
+ mock_yaml.assert_not_called()
+ mock_os.assert_not_called()
+
+ def assert_bundle_is_downloaded(self, mock_resolve, mock_url_parse):
+ mock_resolve.assert_called_once_with(
+ self.url, self.architecture, entity_url=self.uri
+ )
+ mock_url_parse.assert_called_once_with(self.uri)
+ self.bundle_instance.fetch_plan.assert_called_once_with(
+ self.url, mock_resolve.origin
+ )
+
+ def assert_bundle_is_not_downloaded(self, mock_resolve, mock_url_parse):
+ mock_resolve.assert_not_called()
+ mock_url_parse.assert_not_called()
+ self.bundle_instance.fetch_plan.assert_not_called()
def test_deploy(
self,
+ mock_url_parse,
+ mock_bundle,
+ mock_resolve,
mock_deploy,
mock_wait_for_model,
mock_disconnect_controller,
mock_disconnect_model,
mock_get_model,
mock_get_controller,
+ mocked_file,
+ mock_yaml,
+ mock_os,
):
- mock_get_model.return_value = juju.model.Model()
+ self.setup_bundle_download_mocks(
+ mock_url_parse, mock_bundle, mock_resolve, mock_get_model
+ )
+ model_name = "model1"
+
self.loop.run_until_complete(
- self.libjuju.deploy("cs:osm", "model", wait=True, timeout=0)
+ self.libjuju.deploy(
+ "cs:osm",
+ model_name,
+ wait=True,
+ timeout=0,
+ instantiation_params=None,
+ )
)
- mock_deploy.assert_called_once()
+ self.assert_overlay_file_is_not_written(mocked_file, mock_yaml, mock_os)
+ self.assert_bundle_is_not_downloaded(mock_resolve, mock_url_parse)
+ mock_deploy.assert_called_once_with("cs:osm", trust=True, overlays=[])
mock_wait_for_model.assert_called_once()
mock_disconnect_controller.assert_called_once()
mock_disconnect_model.assert_called_once()
def test_deploy_no_wait(
self,
+ mock_url_parse,
+ mock_bundle,
+ mock_resolve,
mock_deploy,
mock_wait_for_model,
mock_disconnect_controller,
mock_disconnect_model,
mock_get_model,
mock_get_controller,
+ mocked_file,
+ mock_yaml,
+ mock_os,
):
- mock_get_model.return_value = juju.model.Model()
+ self.setup_bundle_download_mocks(
+ mock_url_parse, mock_bundle, mock_resolve, mock_get_model
+ )
self.loop.run_until_complete(
- self.libjuju.deploy("cs:osm", "model", wait=False, timeout=0)
+ self.libjuju.deploy(
+ "cs:osm", "model", wait=False, timeout=0, instantiation_params={}
+ )
)
- mock_deploy.assert_called_once()
+ self.assert_overlay_file_is_not_written(mocked_file, mock_yaml, mock_os)
+ self.assert_bundle_is_not_downloaded(mock_resolve, mock_url_parse)
+ mock_deploy.assert_called_once_with("cs:osm", trust=True, overlays=[])
mock_wait_for_model.assert_not_called()
mock_disconnect_controller.assert_called_once()
mock_disconnect_model.assert_called_once()
def test_deploy_exception(
self,
+ mock_url_parse,
+ mock_bundle,
+ mock_resolve,
mock_deploy,
mock_wait_for_model,
mock_disconnect_controller,
mock_disconnect_model,
mock_get_model,
mock_get_controller,
+ mocked_file,
+ mock_yaml,
+ mock_os,
):
+ self.setup_bundle_download_mocks(
+ mock_url_parse, mock_bundle, mock_resolve, mock_get_model
+ )
mock_deploy.side_effect = Exception()
- mock_get_model.return_value = juju.model.Model()
with self.assertRaises(Exception):
self.loop.run_until_complete(self.libjuju.deploy("cs:osm", "model"))
+ self.assert_overlay_file_is_not_written(mocked_file, mock_yaml, mock_os)
+ self.assert_bundle_is_not_downloaded(mock_resolve, mock_url_parse)
mock_deploy.assert_called_once()
mock_wait_for_model.assert_not_called()
mock_disconnect_controller.assert_called_once()
mock_disconnect_model.assert_called_once()
+ def test_deploy_with_instantiation_params(
+ self,
+ mock_url_parse,
+ mock_bundle,
+ mock_resolve,
+ mock_deploy,
+ mock_wait_for_model,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_model,
+ mock_get_controller,
+ mocked_file,
+ mock_yaml,
+ mock_os,
+ ):
+ self.setup_bundle_download_mocks(
+ mock_url_parse, mock_bundle, mock_resolve, mock_get_model
+ )
+ model_name = "model1"
+ expected_filename = "{}-overlay.yaml".format(model_name)
+ self.loop.run_until_complete(
+ self.libjuju.deploy(
+ self.uri,
+ model_name,
+ wait=True,
+ timeout=0,
+ instantiation_params=self.instantiation_params,
+ )
+ )
+ self.assert_overlay_file_is_written(
+ expected_filename, mocked_file, mock_yaml, mock_os
+ )
+ self.assert_bundle_is_downloaded(mock_resolve, mock_url_parse)
+ mock_deploy.assert_called_once_with(
+ self.uri, trust=True, overlays=[expected_filename]
+ )
+ mock_wait_for_model.assert_called_once()
+ mock_disconnect_controller.assert_called_once()
+ mock_disconnect_model.assert_called_once()
+
+ def test_deploy_with_instantiation_params_no_applications(
+ self,
+ mock_url_parse,
+ mock_bundle,
+ mock_resolve,
+ mock_deploy,
+ mock_wait_for_model,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_model,
+ mock_get_controller,
+ mocked_file,
+ mock_yaml,
+ mock_os,
+ ):
+ self.instantiation_params = {"applications": {}}
+ self.setup_bundle_download_mocks(
+ mock_url_parse, mock_bundle, mock_resolve, mock_get_model
+ )
+
+ model_name = "model3"
+ expected_filename = "{}-overlay.yaml".format(model_name)
+ self.loop.run_until_complete(
+ self.libjuju.deploy(
+ self.uri,
+ model_name,
+ wait=False,
+ timeout=0,
+ instantiation_params=self.instantiation_params,
+ )
+ )
+
+ self.assert_overlay_file_is_written(
+ expected_filename, mocked_file, mock_yaml, mock_os
+ )
+ self.assert_bundle_is_not_downloaded(mock_resolve, mock_url_parse)
+ mock_deploy.assert_called_once_with(
+ self.uri, trust=True, overlays=[expected_filename]
+ )
+ mock_wait_for_model.assert_not_called()
+ mock_disconnect_controller.assert_called_once()
+ mock_disconnect_model.assert_called_once()
+
+ def test_deploy_with_instantiation_params_applications_not_found(
+ self,
+ mock_url_parse,
+ mock_bundle,
+ mock_resolve,
+ mock_deploy,
+ mock_wait_for_model,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_model,
+ mock_get_controller,
+ mocked_file,
+ mock_yaml,
+ mock_os,
+ ):
+ self.instantiation_params = {"some_key": {"squid": {"scale": 2}}}
+ self.setup_bundle_download_mocks(
+ mock_url_parse, mock_bundle, mock_resolve, mock_get_model
+ )
+
+ with self.assertRaises(JujuError):
+ self.loop.run_until_complete(
+ self.libjuju.deploy(
+ self.uri,
+ "model1",
+ wait=True,
+ timeout=0,
+ instantiation_params=self.instantiation_params,
+ )
+ )
+
+ self.assert_overlay_file_is_not_written(mocked_file, mock_yaml, mock_os)
+ self.assert_bundle_is_not_downloaded(mock_resolve, mock_url_parse)
+ mock_deploy.assert_not_called()
+ mock_wait_for_model.assert_not_called()
+ mock_disconnect_controller.assert_called_once()
+ mock_disconnect_model.assert_called_once()
+
+ def test_deploy_overlay_contains_invalid_app(
+ self,
+ mock_url_parse,
+ mock_bundle,
+ mock_resolve,
+ mock_deploy,
+ mock_wait_for_model,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_model,
+ mock_get_controller,
+ mocked_file,
+ mock_yaml,
+ mock_os,
+ ):
+ self.setup_bundle_download_mocks(
+ mock_url_parse, mock_bundle, mock_resolve, mock_get_model
+ )
+ self.bundle_instance.applications = {"new_app"}
+
+ with self.assertRaises(JujuApplicationNotFound) as error:
+ self.loop.run_until_complete(
+ self.libjuju.deploy(
+ self.uri,
+ "model2",
+ wait=True,
+ timeout=0,
+ instantiation_params=self.instantiation_params,
+ )
+ )
+ error_msg = "Cannot find application ['squid'] in original bundle {'new_app'}"
+ self.assertEqual(str(error.exception), error_msg)
+
+ self.assert_overlay_file_is_not_written(mocked_file, mock_yaml, mock_os)
+ self.assert_bundle_is_downloaded(mock_resolve, mock_url_parse)
+ mock_deploy.assert_not_called()
+ mock_wait_for_model.assert_not_called()
+ mock_disconnect_controller.assert_called_once()
+ mock_disconnect_model.assert_called_once()
+
+ def test_deploy_exception_with_instantiation_params(
+ self,
+ mock_url_parse,
+ mock_bundle,
+ mock_resolve,
+ mock_deploy,
+ mock_wait_for_model,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_model,
+ mock_get_controller,
+ mocked_file,
+ mock_yaml,
+ mock_os,
+ ):
+ self.setup_bundle_download_mocks(
+ mock_url_parse, mock_bundle, mock_resolve, mock_get_model
+ )
+
+ mock_deploy.side_effect = Exception()
+ model_name = "model2"
+ expected_filename = "{}-overlay.yaml".format(model_name)
+ with self.assertRaises(Exception):
+ self.loop.run_until_complete(
+ self.libjuju.deploy(
+ self.uri,
+ model_name,
+ instantiation_params=self.instantiation_params,
+ )
+ )
+
+ self.assert_overlay_file_is_written(
+ expected_filename, mocked_file, mock_yaml, mock_os
+ )
+ self.assert_bundle_is_downloaded(mock_resolve, mock_url_parse)
+ mock_deploy.assert_called_once_with(
+ self.uri, trust=True, overlays=[expected_filename]
+ )
+ mock_wait_for_model.assert_not_called()
+ mock_disconnect_controller.assert_called_once()
+ mock_disconnect_model.assert_called_once()
+
+ @asynctest.mock.patch("logging.Logger.warning")
+ def test_deploy_exception_when_deleting_file_is_not_propagated(
+ self,
+ mock_warning,
+ mock_url_parse,
+ mock_bundle,
+ mock_resolve,
+ mock_deploy,
+ mock_wait_for_model,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_model,
+ mock_get_controller,
+ mocked_file,
+ mock_yaml,
+ mock_os,
+ ):
+ self.setup_bundle_download_mocks(
+ mock_url_parse, mock_bundle, mock_resolve, mock_get_model
+ )
+
+ mock_os.side_effect = OSError("Error")
+ model_name = "model2"
+ expected_filename = "{}-overlay.yaml".format(model_name)
+ self.loop.run_until_complete(
+ self.libjuju.deploy(
+ self.uri,
+ model_name,
+ instantiation_params=self.instantiation_params,
+ )
+ )
+
+ self.assert_overlay_file_is_written(
+ expected_filename, mocked_file, mock_yaml, mock_os
+ )
+ self.assert_bundle_is_downloaded(mock_resolve, mock_url_parse)
+ mock_deploy.assert_called_once_with(
+ self.uri, trust=True, overlays=[expected_filename]
+ )
+ mock_wait_for_model.assert_called_once()
+ mock_disconnect_controller.assert_called_once()
+ mock_disconnect_model.assert_called_once()
+ mock_warning.assert_called_with(
+ "Overlay file {} could not be removed: Error".format(expected_filename)
+ )
+
@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_controller")
@asynctest.mock.patch("n2vc.libjuju.Libjuju.get_model")
mock_get_model,
mock_get_controller,
):
-
mock_get_model.return_value = juju.model.Model()
mock__get_application.return_value = FakeApplication()
output = None
mock_disconnect_controller.assert_called_once()
mock_disconnect_model.assert_called_once()
+ @asynctest.mock.patch("logging.Logger.warning")
+ def test_not_found_in_error_code(
+ self,
+ mock_warning,
+ mock_add_relation,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_model,
+ mock_get_controller,
+ ):
+ result = {
+ "error": "relation cannot be added",
+ "error-code": "not found",
+ "response": "response",
+ "request-id": 1,
+ }
+
+ mock_get_model.return_value = juju.model.Model()
+ mock_add_relation.side_effect = JujuAPIError(result)
+
+ self.loop.run_until_complete(
+ self.libjuju.add_relation(
+ "model",
+ "app1:relation1",
+ "app2:relation2",
+ )
+ )
+
+ mock_warning.assert_called_with("Relation not found: relation cannot be added")
+ mock_disconnect_controller.assert_called_once()
+ mock_disconnect_model.assert_called_once()
+
@asynctest.mock.patch("logging.Logger.warning")
def test_already_exists(
self,
mock_disconnect_controller.assert_called_once()
mock_disconnect_model.assert_called_once()
+ @asynctest.mock.patch("logging.Logger.warning")
+ def test_already_exists_error_code(
+ self,
+ mock_warning,
+ mock_add_relation,
+ mock_disconnect_controller,
+ mock_disconnect_model,
+ mock_get_model,
+ mock_get_controller,
+ ):
+ result = {
+ "error": "relation cannot be added",
+ "error-code": "already exists",
+ "response": "response",
+ "request-id": 1,
+ }
+
+ mock_get_model.return_value = juju.model.Model()
+ mock_add_relation.side_effect = JujuAPIError(result)
+
+ self.loop.run_until_complete(
+ self.libjuju.add_relation(
+ "model",
+ "app1:relation1",
+ "app2:relation2",
+ )
+ )
+
+ mock_warning.assert_called_with(
+ "Relation already exists: relation cannot be added"
+ )
+ mock_disconnect_controller.assert_called_once()
+ mock_disconnect_model.assert_called_once()
+
def test_exception(
self,
mock_add_relation,
mock_get_model,
mock_get_controller,
):
-
mock_get_application.return_value = FakeApplication()
self.loop.run_until_complete(
mock_get_model,
mock_get_controller,
):
-
mock_get_application.side_effect = Exception()
with self.assertRaises(Exception):
mock_get_model,
mock_get_controller,
):
-
result = {"error": "not found", "response": "response", "request-id": 1}
mock_get_controller.side_effect = JujuAPIError(result)
mock_get_model,
mock_get_controller,
):
-
result = {"error": "not found", "response": "response", "request-id": 1}
mock_get_model.side_effect = JujuAPIError(result)
@asynctest.mock.patch("n2vc.n2vc_juju_conn.get_connection")
@asynctest.mock.patch("n2vc.vca.connection_data.base64_to_cacert")
def setUp(
- self,
- mock_base64_to_cacert=None,
- mock_get_connection=None,
- mock_store=None,
+ self, mock_base64_to_cacert=None, mock_get_connection=None, mock_store=None
):
self.loop = asyncio.get_event_loop()
self.db = Mock()
db=self.db,
fs=fslocal.FsLocal(),
log=None,
- loop=self.loop,
on_update_db=None,
)
N2VCJujuConnector.get_public_key.assert_not_called()
"n2vc.n2vc_juju_conn.generate_random_alfanum_string",
**{"return_value": "random"}
)
- def test_success(
- self,
- mock_generate_random_alfanum_string,
- ):
+ def test_success(self, mock_generate_random_alfanum_string):
self.n2vc.fs.file_exists = MagicMock(create_autospec=True)
self.n2vc.fs.file_exists.return_value = True
ee_id = self.loop.run_until_complete(
self.n2vc.libjuju.get_controller = AsyncMock()
self.n2vc.libjuju.consume = AsyncMock()
- def test_standard_relation(self):
- relation_endpoint_1 = RelationEndpoint("model-1.app1.0", None, "endpoint")
- relation_endpoint_2 = RelationEndpoint("model-1.app2.1", None, "endpoint")
+ def test_standard_relation_same_model_and_controller(self):
+ relation_endpoint_1 = RelationEndpoint("model-1.app1.0", None, "endpoint1")
+ relation_endpoint_2 = RelationEndpoint("model-1.app2.1", None, "endpoint2")
self.loop.run_until_complete(
self.n2vc.add_relation(relation_endpoint_1, relation_endpoint_2)
)
self.n2vc.libjuju.add_relation.assert_called_once_with(
- model_name="model-1", endpoint_1="app1:endpoint", endpoint_2="app2:endpoint"
+ model_name="model-1",
+ endpoint_1="app1:endpoint1",
+ endpoint_2="app2:endpoint2",
)
self.n2vc.libjuju.offer.assert_not_called()
self.n2vc.libjuju.consume.assert_not_called()
"model-2", "app2:endpoint", "saas"
)
+ def test_cmr_relation_different_controller(self):
+ self.n2vc._get_libjuju = AsyncMock(return_value=self.n2vc.libjuju)
+ relation_endpoint_1 = RelationEndpoint(
+ "model-1.app1.0", "vca-id-1", "endpoint1"
+ )
+ relation_endpoint_2 = RelationEndpoint(
+ "model-1.app2.1", "vca-id-2", "endpoint2"
+ )
+ offer = Offer("admin/model-1.app1")
+ self.n2vc.libjuju.offer.return_value = offer
+ self.n2vc.libjuju.consume.return_value = "saas"
+ self.loop.run_until_complete(
+ self.n2vc.add_relation(relation_endpoint_1, relation_endpoint_2)
+ )
+ self.n2vc.libjuju.offer.assert_called_once_with(relation_endpoint_1)
+ self.n2vc.libjuju.consume.assert_called_once()
+ self.n2vc.libjuju.add_relation.assert_called_once_with(
+ "model-1", "app2:endpoint2", "saas"
+ )
+
def test_relation_exception(self):
relation_endpoint_1 = RelationEndpoint("model-1.app1.0", None, "endpoint")
relation_endpoint_2 = RelationEndpoint("model-2.app2.1", None, "endpoint")
class GenerateApplicationNameTest(N2VCJujuConnTestCase):
-
vnf_id = "dbfbd751-3de4-4e68-bd40-ec5ae0a53898"
def setUp(self):
"ee_id": None,
"application": "",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
- },
+ }
]
vnf_count = ""
vdu_count = ""
def test_generate_application_name_vnf_charm(self):
charm_level = "vnf-level"
- vnfrs = {
- "member-vnf-index-ref": "vnf111-xxx-yyy-zzz",
- }
+ vnfrs = {"member-vnf-index-ref": "vnf111-xxx-yyy-zzz"}
vca_records = [
{
"target_element": "vnf/vnf1",
"ee_descriptor_id": "simple-ee-abc-000-rrrr-nnnn-4444-hhh-3333-yyyy-333-hhh-ttt-444",
"charm_name": "",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
- },
+ }
]
vnf_count = "1"
vdu_count = ""
self,
):
charm_level = "vdu-level"
- vnfrs = {
- "member-vnf-index-ref": "vnf111-xxx-yyy-zzz",
- }
+ vnfrs = {"member-vnf-index-ref": "vnf111-xxx-yyy-zzz"}
vca_records = [
{
"target_element": "vnf/vnf1/mgmtVM",
"ee_descriptor_id": "simple-ee-abc-000-rrrr-nnnn-4444-hhh-3333-yyyy-333-hhh-ttt-444",
"charm_name": "",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
- },
+ }
]
vnf_count = "2"
vdu_count = "0"
def test_generate_application_name_vdu_charm_given_vdu_id_is_none(self):
charm_level = "vdu-level"
- vnfrs = {
- "member-vnf-index-ref": "vnf111-xxx-yyy-zzz",
- }
+ vnfrs = {"member-vnf-index-ref": "vnf111-xxx-yyy-zzz"}
vca_records = [
{
"target_element": "vnf/vnf1/mgmtvVM",
"ee_descriptor_id": "simple-ee-abc-000-rrrr-nnnn-4444-hhh-3333-yyyy-333-hhh-ttt-444",
"charm_name": "",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
- },
+ }
]
vnf_count = "2"
vdu_count = "0"
self,
):
charm_level = "vdu-level"
- vnfrs = {
- "member-vnf-index-ref": "vnf111-xxx-yyy-zzz",
- }
+ vnfrs = {"member-vnf-index-ref": "vnf111-xxx-yyy-zzz"}
vca_records = [
{
"target_element": "vnf/vnf1/mgmtVM",
"ee_descriptor_id": "simple-ee-abc-000-rrrr-nnnn-4444-hhh-3333-yyyy-333-hhh-ttt-444",
"charm_name": "",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
- },
+ }
]
vnf_count = "2"
vdu_count = "0"
def test_generate_application_name_vdu_charm_vdu_id_in_vca_record_is_none(self):
charm_level = "vdu-level"
- vnfrs = {
- "member-vnf-index-ref": "vnf111-xxx-yyy-zzz",
- }
+ vnfrs = {"member-vnf-index-ref": "vnf111-xxx-yyy-zzz"}
vca_records = [
{
"target_element": "vnf/vnf1/mgmtVM",
"ee_descriptor_id": "simple-ee-abc-000-rrrr-nnnn-4444-hhh-3333-yyyy-333-hhh-ttt-444",
"charm_name": "",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
- },
+ }
]
vnf_count = "2"
vdu_count = "0"
"charm_name": "",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
},
- ],
- },
- },
+ ]
+ }
+ }
}
expected_result = [
{
"ee_descriptor_id": "simple-ee-abc-000-rrrr-nnnn-4444-hhh-3333-yyyy-333-hhh-ttt-444",
"charm_name": "",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
- },
+ }
]
vca_records = self.n2vc._get_vca_records(charm_level, db_nsr, db_vnfr)
self.assertEqual(vca_records, expected_result)
def test_get_vca_records_vnf_charm_member_vnf_index_mismatch(self):
charm_level = "vnf-level"
- db_vnfr = {
- "member-vnf-index-ref": "vnf222-xxx-yyy-zzz",
- }
+ db_vnfr = {"member-vnf-index-ref": "vnf222-xxx-yyy-zzz"}
db_nsr = {
"_admin": {
"deployed": {
"charm_name": "",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
},
- ],
- },
- },
+ ]
+ }
+ }
}
expected_result = []
vca_records = self.n2vc._get_vca_records(charm_level, db_nsr, db_vnfr)
def test_get_vca_records_ns_charm(self):
charm_level = "ns-level"
- db_vnfr = {
- "member-vnf-index-ref": "vnf222-xxx-yyy-zzz",
- }
+ db_vnfr = {"member-vnf-index-ref": "vnf222-xxx-yyy-zzz"}
db_nsr = {
"_admin": {
"deployed": {
"charm_name": "simple-ns-charm-abc-000-rrrr-nnnn-4444-hhh-3333-yyyy-333-hhh-ttt-444",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
},
- ],
- },
- },
+ ]
+ }
+ }
}
expected_result = [
{
"ee_descriptor_id": "",
"charm_name": "simple-ns-charm-abc-000-rrrr-nnnn-4444-hhh-3333-yyyy-333-hhh-ttt-444",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
- },
+ }
]
vca_records = self.n2vc._get_vca_records(charm_level, db_nsr, db_vnfr)
self.assertEqual(vca_records, expected_result)
def test_get_vca_records_ns_charm_empty_charm_name(self):
charm_level = "ns-level"
- db_vnfr = {
- "member-vnf-index-ref": "vnf222-xxx-yyy-zzz",
- }
+ db_vnfr = {"member-vnf-index-ref": "vnf222-xxx-yyy-zzz"}
db_nsr = {
"_admin": {
"deployed": {
"charm_name": "",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
},
- ],
- },
- },
+ ]
+ }
+ }
}
expected_result = [
{
"ee_descriptor_id": "",
"charm_name": "",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
- },
+ }
]
vca_records = self.n2vc._get_vca_records(charm_level, db_nsr, db_vnfr)
self.assertEqual(vca_records, expected_result)
"charm_name": "",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
},
- ],
- },
- },
+ ]
+ }
+ }
}
mock_vnf_count_and_record = MagicMock()
- db_vnfr = {
- "member-vnf-index-ref": "vnf111-xxx-yyy-zzz",
- }
+ db_vnfr = {"member-vnf-index-ref": "vnf111-xxx-yyy-zzz"}
vnf_count = "0"
mock_vnf_count_and_record.return_value = (vnf_count, db_vnfr)
expected_result = "simple-ee-ab-z0-vnf111-xxx-y-vnf"
"ee_descriptor_id": "",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
},
- ],
- },
- },
+ ]
+ }
+ }
}
mock_vnf_count_and_record = MagicMock()
- db_vnfr = {
- "member-vnf-index-ref": "vnf111-xxx-yyy-zzz",
- }
+ db_vnfr = {"member-vnf-index-ref": "vnf111-xxx-yyy-zzz"}
vnf_count = "0"
mock_vnf_count_and_record.return_value = (vnf_count, db_vnfr)
expected_result = "app-vnf-eb3161eec0-z0-random"
"charm_name": "",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
},
- ],
- },
- },
+ ]
+ }
+ }
}
mock_vnf_count_and_record = MagicMock()
- db_vnfr = {
- "member-vnf-index-ref": "vnf222-xxx-yyy-zzz",
- }
+ db_vnfr = {"member-vnf-index-ref": "vnf222-xxx-yyy-zzz"}
vnf_count = "0"
mock_vnf_count_and_record.return_value = (vnf_count, db_vnfr)
with patch.object(self.n2vc, "db", self.db), patch.object(
"application": "openldap-ee-z0-openldap-vdu",
"model": "82b11965-e580-47c0-9ee0-329f318a305b",
"config_sw_installed": True,
- },
+ }
]
}
}
}
mock_vnf_count_and_record = MagicMock()
- db_vnfr = {
- "member-vnf-index-ref": "openldap",
- "vdur": {},
- }
+ db_vnfr = {"member-vnf-index-ref": "openldap", "vdur": {}}
vnf_count = "0"
mock_vnf_count_and_record.return_value = (vnf_count, db_vnfr)
expected_result = "openldap-ee-z0-openldap-ldap-vdu"
"ee_descriptor_id": "",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
},
- ],
- },
- },
+ ]
+ }
+ }
}
mock_vnf_count_and_record = MagicMock()
db_vnfr = {
"ee_descriptor_id": "",
"charm_name": "simple-ns-charm-abc-000-rrrr-nnnn-4444-hhh-3333-yyyy-333-hhh-ttt-444",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
- },
- ],
- },
- },
+ }
+ ]
+ }
+ }
}
mock_vnf_count_and_record = MagicMock()
db_vnfr = {}
"ee_descriptor_id": "",
"charm_name": "",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
- },
- ],
- },
- },
+ }
+ ]
+ }
+ }
}
mock_vnf_count_and_record = MagicMock()
db_vnfr = {}
"vdu_name": "",
"ee_descriptor_id": "",
"model": "dbfbd751-3de4-4e68-bd40-ec5ae0a53898",
- },
- ],
- },
- },
+ }
+ ]
+ }
+ }
}
mock_vnf_count_and_record = MagicMock()
db_vnfr = {}
self.assertLess(len(application_name), 50)
mock_vnf_count_and_record.assert_called_once_with("ns-level", None)
self.db.get_one.assert_called_once()
+
+
+class DeleteExecutionEnvironmentTest(N2VCJujuConnTestCase):
+ def setUp(self):
+ super(DeleteExecutionEnvironmentTest, self).setUp()
+ self.n2vc.libjuju.get_controller = AsyncMock()
+ self.n2vc.libjuju.destroy_model = AsyncMock()
+ self.n2vc.libjuju.destroy_application = AsyncMock()
+
+ def test_remove_ee__target_application_exists__model_is_deleted(self):
+ get_ee_id_components = MagicMock()
+ get_ee_id_components.return_value = ("my_model", "my_app", None)
+ model = MagicMock(create_autospec=True)
+ model.applications = {}
+ self.n2vc.libjuju.get_model = AsyncMock()
+ self.n2vc.libjuju.get_model.return_value = model
+ with patch.object(self.n2vc, "_get_ee_id_components", get_ee_id_components):
+ self.loop.run_until_complete(
+ self.n2vc.delete_execution_environment(
+ "my_ee", application_to_delete="my_app"
+ )
+ )
+ self.n2vc.libjuju.destroy_application.assert_called_with(
+ model_name="my_model",
+ application_name="my_app",
+ total_timeout=None,
+ )
+ self.n2vc.libjuju.destroy_model.assert_called_with(
+ model_name="my_model",
+ total_timeout=None,
+ )
+
+ def test_remove_ee__multiple_applications_exist__model_is_not_deleted(self):
+ get_ee_id_components = MagicMock()
+ get_ee_id_components.return_value = ("my_model", "my_app", None)
+ model = MagicMock(create_autospec=True)
+ model.applications = {MagicMock(create_autospec=True)}
+ self.n2vc.libjuju.get_model = AsyncMock()
+ self.n2vc.libjuju.get_model.return_value = model
+ with patch.object(self.n2vc, "_get_ee_id_components", get_ee_id_components):
+ self.loop.run_until_complete(
+ self.n2vc.delete_execution_environment(
+ "my_ee", application_to_delete="my_app"
+ )
+ )
+ self.n2vc.libjuju.destroy_application.assert_called_with(
+ model_name="my_model",
+ application_name="my_app",
+ total_timeout=None,
+ )
+ self.n2vc.libjuju.destroy_model.assert_not_called()
+
+ def test_remove_ee__target_application_does_not_exist__model_is_deleted(self):
+ get_ee_id_components = MagicMock()
+ get_ee_id_components.return_value = ("my_model", "my_app", None)
+ with patch.object(self.n2vc, "_get_ee_id_components", get_ee_id_components):
+ self.loop.run_until_complete(
+ self.n2vc.delete_execution_environment("my_ee")
+ )
+ self.n2vc.libjuju.destroy_model.assert_called_with(
+ model_name="my_model",
+ total_timeout=None,
+ )
self.vca_collection.find_one = AsyncMock()
self.vca_collection.insert_one = AsyncMock()
self.vca_collection.replace_one = AsyncMock()
+ self.encryption = Mock()
+ self.encryption.admin_collection = Mock()
+ self.encryption.admin_collection.find_one = AsyncMock()
self.admin_collection = Mock()
self.admin_collection.find_one = AsyncMock()
self.admin_collection.insert_one = AsyncMock()
self.admin_collection.replace_one = AsyncMock()
self.vim_accounts_collection = Mock()
self.vim_accounts_collection.find_one = AsyncMock()
+ self.store.encryption._client = {
+ "osm": {
+ "admin": self.encryption.admin_collection,
+ }
+ }
self.store._client = {
"osm": {
"vca": self.vca_collection,
}
}
self.store._config = {"database_commonkey": "osm"}
- # self.store.decrypt_fields = Mock()
+ self.store.encryption._config = {"database_commonkey": "osm"}
self.loop = asyncio.get_event_loop()
@patch("n2vc.vca.connection_data.base64_to_cacert")
db_find_one = conn_data.copy()
db_find_one.update({"schema_version": "1.1", "_id": "id"})
self.vca_collection.find_one.return_value = db_find_one
- self.store.decrypt_fields = AsyncMock()
+ self.store.encryption.decrypt_fields = AsyncMock()
connection_data = self.loop.run_until_complete(
self.store.get_vca_connection_data("vca_id")
)
encrypted_secret = "kI46kRJh828ExSNpr16OG/q5a5/qTsE0bsHrv/W/2/g="
cacert = "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUQ4ekNDQWx1Z0F3SUJBZ0lVRWlzTTBoQWxiYzQ0Z1ZhZWh6bS80ZUsyNnRZd0RRWUpLb1pJaHZjTkFRRUwKQlFBd0lURU5NQXNHQTFVRUNoTUVTblZxZFRFUU1BNEdBMVVFQXhNSGFuVnFkUzFqWVRBZUZ3MHlNVEEwTWpNeApNRFV3TXpSYUZ3MHpNVEEwTWpNeE1EVTFNelJhTUNFeERUQUxCZ05WQkFvVEJFcDFhblV4RURBT0JnTlZCQU1UCkIycDFhblV0WTJFd2dnR2lNQTBHQ1NxR1NJYjNEUUVCQVFVQUE0SUJqd0F3Z2dHS0FvSUJnUUNhTmFvNGZab2gKTDJWYThtdy9LdCs3RG9tMHBYTlIvbEUxSHJyVmZvbmZqZFVQV01zSHpTSjJZZXlXcUNSd3BiaHlLaE82N1c1dgpUY2RsV3Y3WGFLTGtsdVkraDBZY3BQT3BFTmZZYmxrNGk0QkV1L0wzYVY5MFFkUFFrMG94S01CS2R5QlBNZVNNCkJmS2pPWXdyOGgzM0ZWUWhmVkJnMXVGZ2tGaDdTamNuNHczUFdvc1BCMjNiVHBCbGR3VE9zemN4Qm9TaDNSVTkKTzZjb3lQdDdEN0drOCtHRlA3RGRUQTdoV1RkaUM4cDBkeHp2RUNmY0psMXNFeFEyZVprS1QvVzZyelNtVDhUTApCM0ErM1FDRDhEOEVsQU1IVy9zS25SeHphYU8welpNVmVlQnRnNlFGZ1F3M0dJMGo2ZTY0K2w3VExoOW8wSkZVCjdpUitPY01xUzVDY0NROGpWV3JPSk9Xc2dEbDZ4T2FFREczYnR5SVJHY29jbVcvcEZFQjNZd1A2S1BRTUIrNXkKWDdnZExEWmFGRFBVakZmblhkMnhHdUZlMnpRTDNVbXZEUkZuUlBBaW02QlpQbWo1OFh2emFhZXROa3lyaUZLZwp4Z0Z1dVpTcDUwV2JWdjF0MkdzOTMrRE53NlhFZHRFYnlWWUNBa28xTTY0MkozczFnN3NoQnRFQ0F3RUFBYU1qCk1DRXdEZ1lEVlIwUEFRSC9CQVFEQWdLa01BOEdBMVVkRXdFQi93UUZNQU1CQWY4d0RRWUpLb1pJaHZjTkFRRUwKQlFBRGdnR0JBRXYxM2o2ZGFVbDBqeERPSnNTV1ZJZS9JdXNXVTRpN2ZXSWlqMHAwRU1GNS9LTE8yemRndTR5SQoreVd2T3N5aVFPanEzMlRYVlo2bTRDSnBkR1dGVE5HK2lLdXVOU3M0N3g3Q3dmVUNBWm5VVzhyamd3ZWJyS3BmCkJMNEVQcTZTcW0rSmltN0VPankyMWJkY2cyUXdZb3A3eUhvaHcveWEvL0l6RTMzVzZxNHlJeEFvNDBVYUhPTEMKTGtGbnNVYitjcFZBeFlPZGp6bjFzNWhnclpuWXlETEl3WmtIdFdEWm94alUzeC9jdnZzZ1FzLytzTWYrRFU4RgpZMkJKRHJjQ1VQM2xzclc0QVpFMFplZkEwOTlncFEvb3dSN0REYnMwSjZUeFM4NGt6Tldjc1FuWnRraXZheHJNClkyVHNnaWVndFExVFdGRWpxLy9sUFV4emJCdmpnd1FBZm5CQXZGeVNKejdTa0VuVm5rUXJGaUlUQVArTHljQVIKMlg4UFI2ZGI1bEt0SitBSENDM3kvZmNQS2k0ZzNTL3djeXRRdmdvOXJ6ODRFalp5YUNTaGJXNG9jNzNrMS9RcAowQWtHRDU0ZGVDWWVPYVJNbW96c0w3ZzdxWkpFekhtODdOcVBYSy9EZFoweWNxaVFhMXY2T3QxNjdXNUlzMUkzCjBWb0IzUzloSlE9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCgo=" # noqa: E501
encrypted_cacert = "QeV4evTLXzcKwZZvmXQ/OvSHToXH3ISwfoLmU+Q9JlQWAFUHSJ9IhO0ewaQrJmx3NkfFb7NCxsQhh+wE57zDW4rWgn4w/SWkzvwSi1h2xYOO3ECEHzzVqgUm15Sk0xaj1Fv9Ed4hipf6PRijeOZ7A1G9zekr1w9WIvebMyJZrK+f6QJ8AP20NUZqG/3k+MeJr3kjrl+8uwU5aPOrHAexSQGAqSKTkWzW7glmlyMWTjwkuSgNVgFg0ctdWTZ5JnNwxXbpjwIKrC4E4sIHcxko2vsTeLF8pZFPk+3QUZIg8BrgtyM3lJC2kO1g3emPQhCIk3VDb5GBgssc/GyFyRXNS651d5BNgcABOKZ4Rv/gGnprB35zP7TKJKkST44XJTEBiugWMkSZg+T9H98/l3eE34O6thfTZXgIyG+ZM6uGlW2XOce0OoEIyJiEL039WJe3izjbD3b9sCCdgQc0MgS+hTaayJI6oCUWPsJLmRji19jLi/wjOsU5gPItCFWw3pBye/A4Zf8Hxm+hShvqBnk8R2yx1fPTiyw/Zx4Jn8m49XQJyjDSZnhIck0PVHR9xWzKCr++PKljLMLdkdFxVRVPFQk/FBbesqofjSXsq9DASY6ACTL3Jmignx2OXD6ac4SlBqCTjV2dIM0yEgZF7zwMNCtppRdXTV8S29JP4W2mfaiqXCUSRTggv8EYU+9diCE+8sPB6HjuLrsfiySbFlYR2m4ysDGXjsVx5CDAf0Nh4IRfcSceYnnBGIQ2sfgGcJFOZoJqr/QeE2NWz6jlWYbWT7MjS/0decpKxP7L88qrR+F48WXQvfsvjWgKjlMKw7lHmFF8FeY836VWWICTRZx+y6IlY1Ys2ML4kySF27Hal4OPhOOoBljMNMVwUEvBulOnKUWw4BGz8eGCl8Hw6tlyJdC7kcBj/aCyNCR/NnuDk4Wck6e//He8L6mS83OJi/hIFc8vYQxnCJMXj9Ou7wr5hxtBnvxXzZM3kFHxCDO24Cd5UyBV9GD8TiQJfBGAy7a2BCBMb5ESVX8NOkyyv2hXMHOjpnKhUM9yP3Ke4CBImO7mCKJNHdFVtAmuyVKJ+jT6ooAAArkX2xwEAvBEpvGNmW2jgs6wxSuKY0h5aUm0rA4v/s8fqSZhzdInB54sMldyAnt9G+9e+g933DfyA/tkc56Ed0vZ/XEvTkThVHyUbfYR/Gjsoab1RpnDBi4aZ2E7iceoBshy+L6NXdL0jlWEs4ZubiWlbVNWlN/MqJcjV/quLU7q4HtkG0MDEFm6To3o48x7xpv8otih6YBduNqBFnwQ6Qz9rM2chFgOR4IgNSZKPxHO0AGCi1gnK/CeCvrSfWYAMn+2rmw0hMZybqKMStG28+rXsKDdqmy6vAwL/+dJwkAW+ix68rWRXpeqHlWidu4SkIBELuwEkFIC/GJU/DRvcN2GG9uP1m+VFifCIS2UdiO4OVrP6PVoW1O+jBJvFH3K1YT7CRqevb9OzjS9fO1wjkOff0W8zZyJK9Mp25aynpf0k3oMpZDpjnlOsFXFUb3N6SvXD1Yi95szIlmsr5yRYaeGUJH7/SAmMr8R6RqsCR0ANptL2dtRoGPi/qcDQE15vnjJ+QMYCg9KbCdV+Qq5di93XAjmwPj6tKZv0aXQuaTZgYR7bdLmAnJaFLbHWcQG1k6F/vdKNEb7llLsoAD9KuKXPZT/LErIyKcI0RZySy9yvhTZb4jQWn17b83yfvqfd5/2NpcyaY4gNERhDRJHw7VhoS5Leai5ZnFaO3C1vU9tIJ85XgCUASTsBLoQWVCKPSQZGxzF7PVLnHui3YA5OsOQpVqAPtgGZ12tP9XkEKj+u2/Atj2bgYrqBF7zUL64X/AQpwr/UElWDhJLSD/KStVeDOUx3AwAVVi9eTUJr6NiNMutCE1sqUf9XVIddgZ/BaG5t3NV2L+T+11QzAl+Xrh8wH/XeUCTmnU3NGkvCz/9Y7PMS+qQL7T7WeGdYmEhb5s/5p/yjSYeqybr5sANOHs83OdeSXbop9cLWW+JksHmS//rHHcrrJhZgCb3P0EOpEoEMCarT6sJq0V1Hwf/YNFdJ9V7Ac654ALS+a9ffNthMUEJeY21QMtNOrEg3QH5RWBPn+yOYN/f38tzwlT1k6Ec94y/sBmeQVv8rRzkkiMSXeAL5ATdJntq8NQq5JbvLQDNnZnHQthZt+uhcUf08mWlRrxxBUaE6xLppgMqFdYSjLGvgn/d8FZ9y7UCg5ZBhgP1rrRQL1COpNKKlJLf5laqwiGAucIDmzSbhO+MidSauDLWuv+fsdd2QYk98PHxqNrPYLrlAlABFi3JEApBm4IlrGbHxKg6dRiy7L1c9xWnAD7E3XrZrSc6DXvGRsjMXWoQdlp4CX5H3cdH9sjIE6akWqiwwrOP6QTbJcxmJGv/MVhsDVrVKmrKSn2H0/Us1fyYCHCOyCSc2L96uId8i9wQO1NXj+1PJmUq3tJ8U0TUwTblOEQdYej99xEI8EzsXLjNJHCgbDygtHBYd/SHToXH3ISwfoLmU+Q9JlS1woaUpVa5sdvbsr4BXR6J" # noqa: E501
-
self.vca_collection.find_one.return_value = {
"_id": "2ade7f0e-9b58-4dbd-93a3-4ec076185d39",
"schema_version": "1.11",
"secret": encrypted_secret,
"cacert": encrypted_cacert,
}
- self.admin_collection.find_one.return_value = {
+ self.encryption.admin_collection.find_one.return_value = {
"serial": b"l+U3HDp9td+UjQ+AN+Ypj/Uh7n3C+rMJueQNNxkIpWI="
}
connection_data = self.loop.run_until_complete(
--- /dev/null
+# Copyright 2022 Whitestack, LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+apiVersion: cert-manager.io/v1
+kind: Certificate
+metadata:
+ name: test-cert
+ namespace: osm
+spec:
+ secretName: test-cert-secret
+ privateKey:
+ rotationPolicy: Always
+ algorithm: ECDSA
+ size: 256
+ duration: 8760h
+ renewBefore: 2208h
+ subject:
+ organizations:
+ - osm
+ commonName: osm
+ isCA: false
+ usages:
+ - server auth
+ dnsNames:
+ - "*.osm"
+ - "*.osm.svc"
+ - "*.osm.svc.cluster"
+ - "*.osm.svc.cluster.local"
+ issuerRef:
+ name: ca-issuer
+ kind: ClusterIssuer
class FakeWatcher(AsyncMock):
-
delta_to_return = None
async def Next(self):
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
-aiokafka==0.7.2
+aiokafka==0.8.1
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
+async-timeout==4.0.3
+ # via
+ # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
+ # aiokafka
dataclasses==0.6
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
+dnspython==2.4.2
+ # via
+ # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
+ # pymongo
kafka-python==2.0.2
# via
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
# aiokafka
+motor==3.3.1
+ # via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
osm-common @ git+https://osm.etsi.org/gerrit/osm/common.git@master
# via -r requirements-dev.in
-pycrypto==2.6.1
- # via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
-pymongo==3.12.3
+packaging==23.1
+ # via
+ # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
+ # aiokafka
+pycryptodome==3.19.0
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
-pyyaml==5.4.1
+pymongo==4.5.0
+ # via
+ # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
+ # motor
+pyyaml==6.0.1
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=master
# limitations under the License.
asynctest
+charset-normalizer
coverage
flake8<5.0.0
mock
#######################################################################################
asynctest==0.13.0
# via -r requirements-test.in
-certifi==2022.9.24
+certifi==2023.7.22
# via requests
-charset-normalizer==2.1.1
- # via requests
-coverage==6.5.0
+charset-normalizer==3.2.0
+ # via
+ # -r requirements-test.in
+ # requests
+coverage==7.3.1
# via -r requirements-test.in
flake8==4.0.1
# via -r requirements-test.in
# via requests
mccabe==0.6.1
# via flake8
-mock==4.0.3
+mock==5.1.0
# via -r requirements-test.in
-nose2==0.12.0
+nose2==0.13.0
# via -r requirements-test.in
pycodestyle==2.8.0
# via flake8
pyflakes==2.4.0
# via flake8
-requests==2.28.1
+requests==2.31.0
# via requests-mock
-requests-mock==1.10.0
+requests-mock==1.11.0
# via -r requirements-test.in
six==1.16.0
# via requests-mock
-urllib3==1.26.12
+urllib3==2.0.5
# via requests
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-async-timeout<4
-juju==3.0.0
-kubernetes
-motor==1.3.1
+charset-normalizer
+google-auth<2.18.0
+juju==2.9.44.0
+kubernetes==26.1.0
+motor
pyasn1
-pyyaml<6
+pyyaml>6
retrying-async
-certifi==2022.9.24
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
-async-timeout==3.0.1
- # via
- # -r requirements.in
- # retrying-async
+async-timeout==4.0.3
+ # via retrying-async
bcrypt==4.0.1
# via paramiko
-cachetools==5.2.0
+cachetools==5.3.1
# via google-auth
-certifi==2022.9.24
+certifi==2023.7.22
# via
- # -r requirements.in
# kubernetes
# requests
-cffi==1.15.1
+cffi==1.16.0
# via
# cryptography
# pynacl
-charset-normalizer==2.1.1
- # via requests
-cryptography==38.0.1
+charset-normalizer==3.2.0
+ # via
+ # -r requirements.in
+ # requests
+cryptography==41.0.4
# via paramiko
-google-auth==2.12.0
- # via kubernetes
+dnspython==2.4.2
+ # via pymongo
+google-auth==2.17.3
+ # via
+ # -r requirements.in
+ # kubernetes
idna==3.4
# via requests
-juju==3.0.0
+juju==2.9.44.0
# via -r requirements.in
jujubundlelib==0.5.7
# via theblues
-kubernetes==24.2.0
+kubernetes==26.1.0
# via
# -r requirements.in
# juju
# via
# juju
# theblues
-motor==1.3.1
+motor==3.3.1
# via -r requirements.in
-mypy-extensions==0.4.3
+mypy-extensions==1.0.0
# via typing-inspect
-oauthlib==3.2.1
+oauthlib==3.2.2
# via requests-oauthlib
-paramiko==2.11.0
+paramiko==2.12.0
# via juju
protobuf==3.20.3
# via macaroonbakery
-pyasn1==0.4.8
+pyasn1==0.5.0
# via
# -r requirements.in
# juju
# pyasn1-modules
# rsa
-pyasn1-modules==0.2.8
+pyasn1-modules==0.3.0
# via google-auth
pycparser==2.21
# via cffi
pymacaroons==0.13.0
# via macaroonbakery
-pymongo==3.12.3
+pymongo==4.5.0
# via motor
pynacl==1.5.0
# via
# macaroonbakery
python-dateutil==2.8.2
# via kubernetes
-pytz==2022.4
+pytz==2023.3.post1
# via pyrfc3339
-pyyaml==5.4.1
+pyyaml==6.0.1
# via
# -r requirements.in
# juju
# jujubundlelib
# kubernetes
-requests==2.28.1
+requests==2.31.0
# via
# kubernetes
# macaroonbakery
# python-dateutil
theblues==0.5.2
# via juju
-toposort==1.7
+toposort==1.10
# via juju
-typing-extensions==4.4.0
+typing-extensions==4.8.0
# via typing-inspect
-typing-inspect==0.8.0
+typing-inspect==0.9.0
# via juju
-urllib3==1.26.12
+urllib3==2.0.5
# via
# kubernetes
# requests
-websocket-client==1.4.1
+websocket-client==1.6.3
# via kubernetes
-websockets==7.0
+websockets==11.0.3
# via juju
# The following packages are considered to be unsafe in a requirements file:
[testenv]
usedevelop = True
-basepython = python3.8
+basepython = python3.10
setenv = VIRTUAL_ENV={envdir}
PYTHONDONTWRITEBYTECODE = 1
deps = -r{toxinidir}/requirements.txt
#######################################################################################
[testenv:black]
-deps = black
+deps = black==23.12.1
skip_install = true
commands =
black --check --diff n2vc/
coverage report --omit='*tests*'
coverage html -d ./cover --omit='*tests*'
coverage xml -o coverage.xml --omit=*tests*
-whitelist_externals = sh
+allowlist_externals = sh
#######################################################################################
#######################################################################################
[testenv:pip-compile]
-deps = pip-tools==6.6.2
+deps = pip-tools==6.13.0
skip_install = true
-whitelist_externals = bash
+allowlist_externals = bash
[
commands =
- bash -c "for file in requirements*.in ; do \
UNSAFE="" ; \
if [[ $file =~ 'dist' ]] ; then UNSAFE='--allow-unsafe' ; fi ; \
- pip-compile -rU --no-header $UNSAFE $file ;\
+ pip-compile --resolver=backtracking -rU --no-header $UNSAFE $file ;\
out=`echo $file | sed 's/.in/.txt/'` ; \
sed -i -e '1 e head -16 tox.ini' $out ;\
done"
python3 setup.py --command-packages=stdeb.command sdist_dsc
sh -c 'cd deb_dist/n2vc*/ && dpkg-buildpackage -rfakeroot -uc -us'
sh -c 'rm n2vc/requirements.txt'
-whitelist_externals = sh
+allowlist_externals = sh
#######################################################################################
[flake8]