14 files changed:
self.log.debug("status={}".format(status))
try:
self.log.debug("status={}".format(status))
try:
the_table = db_dict["collection"]
the_filter = db_dict["filter"]
the_path = db_dict["path"]
the_table = db_dict["collection"]
the_filter = db_dict["filter"]
the_path = db_dict["path"]
return True
async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
return True
async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
self.log.debug(
"inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
)
self.log.debug(
"inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
)
return namespace in namespaces if namespaces else False
async def _get_namespaces(self, cluster_id: str):
return namespace in namespaces if namespaces else False
async def _get_namespaces(self, cluster_id: str):
self.log.debug("get namespaces cluster_id {}".format(cluster_id))
# init config, env
self.log.debug("get namespaces cluster_id {}".format(cluster_id))
# init config, env
return namespaces
async def _create_namespace(self, cluster_id: str, namespace: str):
return namespaces
async def _create_namespace(self, cluster_id: str, namespace: str):
self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
# init config, env
self.log.debug(f"create namespace: {cluster_id} for cluster_id: {namespace}")
# init config, env
async def _get_services(
self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str
):
async def _get_services(
self, cluster_id: str, kdu_instance: str, namespace: str, kubeconfig: str
):
# init config, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
# init config, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
pass
async def _instances_list(self, cluster_id: str):
pass
async def _instances_list(self, cluster_id: str):
# init paths, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
# init paths, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
yaml_format: bool = False,
show_error_log: bool = False,
) -> Union[str, dict]:
yaml_format: bool = False,
show_error_log: bool = False,
) -> Union[str, dict]:
self.log.debug(
"status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
)
self.log.debug(
"status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
)
timeout: float,
kubeconfig: str,
) -> str:
timeout: float,
kubeconfig: str,
) -> str:
timeout_str = ""
if timeout:
timeout_str = "--timeout {}s".format(timeout)
timeout_str = ""
if timeout:
timeout_str = "--timeout {}s".format(timeout)
def _get_uninstall_command(
self, kdu_instance: str, namespace: str, kubeconfig: str
) -> str:
def _get_uninstall_command(
self, kdu_instance: str, namespace: str, kubeconfig: str
) -> str:
return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
kubeconfig, self._helm_command, kdu_instance, namespace
)
return "env KUBECONFIG={} {} uninstall {} --namespace={}".format(
kubeconfig, self._helm_command, kdu_instance, namespace
)
output, rc = exec_task.result()
else:
output, rc = exec_task.result()
else:
output, rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
output, rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
self.log.debug("upgrading: {}".format(command))
if atomic:
self.log.debug("upgrading: {}".format(command))
if atomic:
# exec helm in a task
exec_task = asyncio.ensure_future(
coro_or_future=self._local_async_exec(
# exec helm in a task
exec_task = asyncio.ensure_future(
coro_or_future=self._local_async_exec(
output, rc = exec_task.result()
else:
output, rc = exec_task.result()
else:
output, rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
output, rc = await self._local_async_exec(
command=command, raise_exception_on_error=False, env=env
)
async def get_service(
self, cluster_uuid: str, service_name: str, namespace: str
) -> object:
async def get_service(
self, cluster_uuid: str, service_name: str, namespace: str
) -> object:
self.log.debug(
"get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
service_name, namespace, cluster_uuid
self.log.debug(
"get service, service_name: {}, namespace: {}, cluster_uuid: {}".format(
service_name, namespace, cluster_uuid
async def get_values_kdu(
self, kdu_instance: str, namespace: str, kubeconfig: str
) -> str:
async def get_values_kdu(
self, kdu_instance: str, namespace: str, kubeconfig: str
) -> str:
self.log.debug("get kdu_instance values {}".format(kdu_instance))
return await self._exec_get_command(
self.log.debug("get kdu_instance values {}".format(kdu_instance))
return await self._exec_get_command(
)
async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
)
async def help_kdu(self, kdu_model: str, repo_url: str = None) -> str:
self.log.debug(
"inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
)
self.log.debug(
"inspect kdu_model {} readme.md from repo: {}".format(kdu_model, repo_url)
)
)
async def synchronize_repos(self, cluster_uuid: str):
)
async def synchronize_repos(self, cluster_uuid: str):
self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
try:
db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
self.log.debug("synchronize repos for cluster helm-id: {}".format(cluster_uuid))
try:
db_repo_ids = self._get_helm_chart_repos_ids(cluster_uuid)
encode_utf8: bool = False,
env: dict = None,
) -> (str, int):
encode_utf8: bool = False,
env: dict = None,
) -> (str, int):
command = K8sHelmBaseConnector._remove_multiple_spaces(command)
self.log.debug(
"Executing async local command: {}, env: {}".format(command, env)
command = K8sHelmBaseConnector._remove_multiple_spaces(command)
self.log.debug(
"Executing async local command: {}, env: {}".format(command, env)
encode_utf8: bool = False,
env: dict = None,
):
encode_utf8: bool = False,
env: dict = None,
):
command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
command = "{} | {}".format(command1, command2)
command1 = K8sHelmBaseConnector._remove_multiple_spaces(command1)
command2 = K8sHelmBaseConnector._remove_multiple_spaces(command2)
command = "{} | {}".format(command1, command2)
# params for use in -f file
# returns values file option and filename (in order to delete it at the end)
def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
# params for use in -f file
# returns values file option and filename (in order to delete it at the end)
def _params_to_file_option(self, cluster_id: str, params: dict) -> (str, str):
if params and len(params) > 0:
self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
if params and len(params) > 0:
self._init_paths_env(cluster_name=cluster_id, create_if_not_exist=True)
return True
async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
return True
async def inspect_kdu(self, kdu_model: str, repo_url: str = None) -> str:
self.log.debug(
"inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
)
self.log.debug(
"inspect kdu_model {} from (optional) repo: {}".format(kdu_model, repo_url)
)
return paths, env
async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
return paths, env
async def _get_services(self, cluster_id, kdu_instance, namespace, kubeconfig):
# init config, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
# init config, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
self.log.debug("namespace not found")
async def _instances_list(self, cluster_id):
self.log.debug("namespace not found")
async def _instances_list(self, cluster_id):
# init paths, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
# init paths, env
paths, env = self._init_paths_env(
cluster_name=cluster_id, create_if_not_exist=True
yaml_format: bool = False,
show_error_log: bool = False,
) -> Union[str, dict]:
yaml_format: bool = False,
show_error_log: bool = False,
) -> Union[str, dict]:
self.log.debug(
"status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
)
self.log.debug(
"status of kdu_instance: {}, namespace: {} ".format(kdu_instance, namespace)
)
timeout,
kubeconfig,
) -> str:
timeout,
kubeconfig,
) -> str:
timeout_str = ""
if timeout:
timeout_str = "--timeout {}".format(timeout)
timeout_str = ""
if timeout:
timeout_str = "--timeout {}".format(timeout)
application = self._get_application(model, application_name)
if application is not None:
application = self._get_application(model, application_name)
if application is not None:
# Checks if the given machine id in the model,
# otherwise function raises an error
_machine, _series = self._get_machine_info(model, machine_id)
# Checks if the given machine id in the model,
# otherwise function raises an error
_machine, _series = self._get_machine_info(model, machine_id)
try:
if application_name not in model.applications:
try:
if application_name not in model.applications:
if machine_id is not None:
machine, series = self._get_machine_info(model, machine_id)
if machine_id is not None:
machine, series = self._get_machine_info(model, machine_id)
return application
async def resolve_application(self, model_name: str, application_name: str):
return application
async def resolve_application(self, model_name: str, application_name: str):
controller = await self.get_controller()
model = await self.get_model(controller, model_name)
controller = await self.get_controller()
model = await self.get_model(controller, model_name)
await self.disconnect_controller(controller)
async def resolve(self, model_name: str):
await self.disconnect_controller(controller)
async def resolve(self, model_name: str):
controller = await self.get_controller()
model = await self.get_model(controller, model_name)
all_units_active = False
controller = await self.get_controller()
model = await self.get_model(controller, model_name)
all_units_active = False
class Loggable:
def __init__(self, log, log_to_console: bool = False, prefix: str = ""):
class Loggable:
def __init__(self, log, log_to_console: bool = False, prefix: str = ""):
self._last_log_time = None # used for time increment in logging
self._log_to_console = log_to_console
self._prefix = prefix
self._last_log_time = None # used for time increment in logging
self._log_to_console = log_to_console
self._prefix = prefix
include_thread: bool = False,
include_coroutine: bool = True,
) -> str:
include_thread: bool = False,
include_coroutine: bool = True,
) -> str:
# time increment from last log
now = time.perf_counter()
if self._last_log_time is None:
# time increment from last log
now = time.perf_counter()
if self._last_log_time is None:
# .format(str(status.value), detailed_status, vca_status, entity_type))
try:
# .format(str(status.value), detailed_status, vca_status, entity_type))
try:
the_table = db_dict["collection"]
the_filter = db_dict["filter"]
the_path = db_dict["path"]
the_table = db_dict["collection"]
the_filter = db_dict["filter"]
the_path = db_dict["path"]
await libjuju.upgrade_charm(
application_name=application_name,
path=path,
await libjuju.upgrade_charm(
application_name=application_name,
path=path,
)
def _write_ee_id_db(self, db_dict: dict, ee_id: str):
)
def _write_ee_id_db(self, db_dict: dict, ee_id: str):
# write ee_id to database: _admin.deployed.VCA.x
try:
the_table = db_dict["collection"]
# write ee_id to database: _admin.deployed.VCA.x
try:
the_table = db_dict["collection"]
os.path.join(os.path.dirname(__file__), "testdata", filename),
"r",
) as self.upgrade_file:
os.path.join(os.path.dirname(__file__), "testdata", filename),
"r",
) as self.upgrade_file:
all_changes = AsyncMock()
all_changes.Next.side_effect = self._fetch_next_delta
mock_all_watcher.return_value = all_changes
all_changes = AsyncMock()
all_changes.Next.side_effect = self._fetch_next_delta
mock_all_watcher.return_value = all_changes
@asynctest.fail_on(active_handles=True)
async def test_repo_list(self):
@asynctest.fail_on(active_handles=True)
async def test_repo_list(self):
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
await self.helm_conn.repo_list(self.cluster_uuid)
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
await self.helm_conn.repo_list(self.cluster_uuid)
@asynctest.fail_on(active_handles=True)
async def test_repo_remove(self):
@asynctest.fail_on(active_handles=True)
async def test_repo_remove(self):
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
repo_name = "bitnami"
await self.helm_conn.repo_remove(self.cluster_uuid, repo_name)
self.helm_conn._local_async_exec = asynctest.CoroutineMock(return_value=("", 0))
repo_name = "bitnami"
await self.helm_conn.repo_remove(self.cluster_uuid, repo_name)
mock_get_model,
mock_get_controller,
):
mock_get_model,
mock_get_controller,
):
mock_get_model.return_value = juju.model.Model()
mock__get_application.return_value = FakeApplication()
output = None
mock_get_model.return_value = juju.model.Model()
mock__get_application.return_value = FakeApplication()
output = None
mock_get_model,
mock_get_controller,
):
mock_get_model,
mock_get_controller,
):
mock_get_application.return_value = FakeApplication()
self.loop.run_until_complete(
mock_get_application.return_value = FakeApplication()
self.loop.run_until_complete(
mock_get_model,
mock_get_controller,
):
mock_get_model,
mock_get_controller,
):
mock_get_application.side_effect = Exception()
with self.assertRaises(Exception):
mock_get_application.side_effect = Exception()
with self.assertRaises(Exception):
mock_get_model,
mock_get_controller,
):
mock_get_model,
mock_get_controller,
):
result = {"error": "not found", "response": "response", "request-id": 1}
mock_get_controller.side_effect = JujuAPIError(result)
result = {"error": "not found", "response": "response", "request-id": 1}
mock_get_controller.side_effect = JujuAPIError(result)
mock_get_model,
mock_get_controller,
):
mock_get_model,
mock_get_controller,
):
result = {"error": "not found", "response": "response", "request-id": 1}
mock_get_model.side_effect = JujuAPIError(result)
result = {"error": "not found", "response": "response", "request-id": 1}
mock_get_model.side_effect = JujuAPIError(result)
class GenerateApplicationNameTest(N2VCJujuConnTestCase):
class GenerateApplicationNameTest(N2VCJujuConnTestCase):
vnf_id = "dbfbd751-3de4-4e68-bd40-ec5ae0a53898"
def setUp(self):
vnf_id = "dbfbd751-3de4-4e68-bd40-ec5ae0a53898"
def setUp(self):
class FakeWatcher(AsyncMock):
class FakeWatcher(AsyncMock):
delta_to_return = None
async def Next(self):
delta_to_return = None
async def Next(self):
coverage report --omit='*tests*'
coverage html -d ./cover --omit='*tests*'
coverage xml -o coverage.xml --omit=*tests*
coverage report --omit='*tests*'
coverage html -d ./cover --omit='*tests*'
coverage xml -o coverage.xml --omit=*tests*
-whitelist_externals = sh
+allowlist_externals = sh
#######################################################################################
#######################################################################################
[testenv:pip-compile]
deps = pip-tools==6.6.2
skip_install = true
[testenv:pip-compile]
deps = pip-tools==6.6.2
skip_install = true
-whitelist_externals = bash
+allowlist_externals = bash
[
commands =
- bash -c "for file in requirements*.in ; do \
[
commands =
- bash -c "for file in requirements*.in ; do \
python3 setup.py --command-packages=stdeb.command sdist_dsc
sh -c 'cd deb_dist/n2vc*/ && dpkg-buildpackage -rfakeroot -uc -us'
sh -c 'rm n2vc/requirements.txt'
python3 setup.py --command-packages=stdeb.command sdist_dsc
sh -c 'cd deb_dist/n2vc*/ && dpkg-buildpackage -rfakeroot -uc -us'
sh -c 'rm n2vc/requirements.txt'
-whitelist_externals = sh
+allowlist_externals = sh
#######################################################################################
[flake8]
#######################################################################################
[flake8]