blob: 50b8d7a9ce2f843354fe5a6bc24bd72090f06b85 [file] [log] [blame]
# Copyright 2022 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
import tempfile
from unittest import TestCase
from unittest.mock import Mock, patch
from osm_lcm.lcm import Lcm
from osm_lcm.data_utils.database.database import Database
from osm_lcm.data_utils.filesystem.filesystem import Filesystem
from osm_lcm.lcm_utils import LcmException
def create_lcm_config(
source_path: str, destination_path: str, line_number=None
) -> None:
"""This function creates new lcm_config files by
using the config file template. If line number is provided,
it removes the line from file.
Args:
source_path: (str) source file path
destination_path: (str) destination file path
line_number: (int) line to be deleted
"""
with open(source_path, "r+") as fs:
# read and store all lines into list
contents = fs.readlines()
with open(destination_path, "w") as fd:
if line_number:
if line_number < 0:
raise LcmException("Line number can not be smaller than zero")
contents.pop(line_number)
contents = "".join(contents)
fd.write(contents)
def check_file_content(health_check_file: str) -> str:
"""Get the health check file contents
Args:
health_check_file: (str) file path
Returns:
contents: (str) health check file content
"""
with open(health_check_file, "r") as hc:
contents = hc.read()
return contents
class TestLcmBase(TestCase):
def setUp(self):
self.config_file = os.getcwd() + "/osm_lcm/tests/test_lcm_config_file.yaml"
self.config_file_without_storage_path = tempfile.mkstemp()[1]
Database.instance = None
self.db = Mock(Database({"database": {"driver": "memory"}}).instance.db)
Database().instance.db = self.db
Filesystem.instance = None
self.fs = Mock(
Filesystem({"storage": {"driver": "local", "path": "/"}}).instance.fs
)
Filesystem.instance.fs = self.fs
self.fs.path = "/"
self.my_lcm = Lcm(config_file=self.config_file)
class TestLcm(TestLcmBase):
def test_get_health_check_file_from_config_file(self):
self.assertEqual(self.my_lcm.health_check_file, "/tmp/storage/time_last_ping")
def test_health_check_file_not_in_config_file(self):
create_lcm_config(self.config_file, self.config_file_without_storage_path, 38)
with self.assertRaises(LcmException):
Lcm(config_file=self.config_file_without_storage_path)
def test_kafka_admin_topic_ping_command(self):
params = {"to": "lcm", "from": "lcm", "worker_id": self.my_lcm.worker_id}
self.my_lcm.health_check_file = tempfile.mkstemp()[1]
self.my_lcm.kafka_read_callback("admin", "ping", params)
pattern = "[0-9]{10}.[0-9]{5,8}"
# Epoch time is written in health check file.
result = re.findall(pattern, check_file_content(self.my_lcm.health_check_file))
self.assertTrue(result)
def test_kafka_wrong_topic_ping_command(self):
params = {"to": "lcm", "from": "lcm", "worker_id": self.my_lcm.worker_id}
self.my_lcm.health_check_file = tempfile.mkstemp()[1]
self.my_lcm.kafka_read_callback("kafka", "ping", params)
pattern = "[0-9]{10}.[0-9]{5,8}"
# Health check file is empty.
result = re.findall(pattern, check_file_content(self.my_lcm.health_check_file))
self.assertFalse(result)
def test_kafka_admin_topic_ping_command_wrong_worker_id(self):
params = {"to": "lcm", "from": "lcm", "worker_id": 5}
self.my_lcm.health_check_file = tempfile.mkstemp()[1]
self.my_lcm.kafka_read_callback("admin", "ping", params)
pattern = "[0-9]{10}.[0-9]{5,8}"
# Health check file is empty.
result = re.findall(pattern, check_file_content(self.my_lcm.health_check_file))
self.assertFalse(result)
@patch("osm_lcm.lcm.asyncio.ensure_future")
class TestPaasKafkaRead(TestLcmBase):
def setUp(self):
super().setUp()
self.params = {"_id": "paas_id", "name": "paas_name", "type": "juju"}
self.order_id = 2
self.my_lcm.paas = Mock()
self.my_lcm.lcm_tasks = Mock()
self.task = {}
def test_kafka_read_paas_create(self, ensure_future):
ensure_future.return_value = self.task
self.my_lcm.kafka_read_callback("paas", "created", self.params)
self.my_lcm.lcm_tasks.register.assert_called_with(
"paas", "paas_id", self.order_id, "paas_create", self.task
)
ensure_future.assert_called_once_with(self.my_lcm.paas.create())
def test_kafka_read_paas_update(self, ensure_future):
ensure_future.return_value = self.task
self.my_lcm.kafka_read_callback("paas", "edited", self.params)
self.my_lcm.lcm_tasks.register.assert_called_with(
"paas", "paas_id", self.order_id, "paas_edit", self.task
)
ensure_future.assert_called_once_with(self.my_lcm.paas.edit())
def test_kafka_read_paas_delete(self, ensure_future):
ensure_future.return_value = self.task
self.my_lcm.kafka_read_callback("paas", "delete", self.params)
self.my_lcm.lcm_tasks.register.assert_called_with(
"paas", "paas_id", self.order_id, "paas_delete", self.task
)
ensure_future.assert_called_once_with(self.my_lcm.paas.delete())
def test_kafka_read_paas_delete_force(self, ensure_future):
ensure_future.return_value = self.task
self.my_lcm.kafka_read_callback("paas", "deleted", self.params)
self.my_lcm.lcm_tasks.register.assert_not_called()
ensure_future.assert_not_called()
def test_kafka_read_paas_wrong_command(self, ensure_future):
ensure_future.return_value = self.task
self.my_lcm.kafka_read_callback("paas", "invalid", self.params)
self.my_lcm.lcm_tasks.register.assert_not_called()
ensure_future.assert_not_called()
@patch("osm_lcm.lcm.asyncio.ensure_future")
class TestNsKafkaRead(TestLcmBase):
def setUp(self):
super().setUp()
self.task = {}
self.paas_params = {
"_id": "nslcmop_id",
"nsInstanceId": "nsr_id",
"operationParams": {"paasAccountId": "paas_id"},
}
self.vim_params = {
"_id": "nslcmop_id",
"nsInstanceId": "nsr_id",
"operationParams": {},
}
self.my_lcm.ns = Mock()
self.my_lcm.lcm_tasks = Mock()
self.my_lcm.juju_paas = Mock()
self.my_lcm.paas_service = {"juju": self.my_lcm.juju_paas}
def test_kafka_read_ns_instantiate_vim_account(self, mock_ensure_future):
mock_ensure_future.return_value = self.task
self.my_lcm.kafka_read_callback("ns", "instantiate", self.vim_params)
mock_ensure_future.assert_called_once_with(self.my_lcm.ns.instantiate())
self.my_lcm.lcm_tasks.register.assert_called_with(
"ns", "nsr_id", "nslcmop_id", "ns_instantiate", self.task
)
@patch("osm_lcm.lcm.get_paas_id_by_nsr_id")
def test_kafka_read_ns_terminate_vim_account(self, get_paas_id, mock_ensure_future):
mock_ensure_future.return_value = self.task
get_paas_id.return_value = None
self.my_lcm.kafka_read_callback("ns", "terminate", self.vim_params)
self.my_lcm.lcm_tasks.cancel.assert_called_with("ns", "nsr_id")
mock_ensure_future.assert_called_once_with(self.my_lcm.ns.terminate())
self.my_lcm.lcm_tasks.register.assert_called_with(
"ns", "nsr_id", "nslcmop_id", "ns_terminate", self.task
)
@patch("osm_lcm.lcm.get_paas_id_by_nsr_id")
def test_kafka_read_ns_action_vim_account(self, get_paas_id, mock_ensure_future):
mock_ensure_future.return_value = self.task
get_paas_id.return_value = None
self.my_lcm.kafka_read_callback("ns", "action", self.vim_params)
mock_ensure_future.assert_called_once_with(self.my_lcm.ns.action())
self.my_lcm.lcm_tasks.register.assert_called_with(
"ns", "nsr_id", "nslcmop_id", "ns_action", self.task
)
@patch("osm_lcm.lcm.get_paas_type_by_paas_id")
def test_kafka_read_ns_instantiate_paas_account(
self, get_paas_type, mock_ensure_future
):
mock_ensure_future.return_value = self.task
get_paas_type.return_value = "juju"
self.my_lcm.kafka_read_callback("ns", "instantiate", self.paas_params)
mock_ensure_future.assert_called_once_with(self.my_lcm.juju_paas.instantiate())
self.my_lcm.lcm_tasks.register.assert_called_with(
"ns", "nsr_id", "nslcmop_id", "ns_instantiate", self.task
)
get_paas_type.assert_called_with("paas_id", self.my_lcm.db)
@patch("osm_lcm.lcm.get_paas_type_by_paas_id")
@patch("osm_lcm.lcm.get_paas_id_by_nsr_id")
def test_kafka_read_ns_terminate_paas_account(
self, get_paas_id, get_paas_type, mock_ensure_future
):
mock_ensure_future.return_value = self.task
get_paas_id.return_value = "paas_id"
get_paas_type.return_value = "juju"
self.my_lcm.kafka_read_callback("ns", "terminate", self.paas_params)
mock_ensure_future.assert_called_once_with(self.my_lcm.juju_paas.terminate())
self.my_lcm.lcm_tasks.register.assert_called_with(
"ns", "nsr_id", "nslcmop_id", "ns_terminate", self.task
)
get_paas_id.assert_called_with("nsr_id", self.my_lcm.db)
get_paas_type.assert_called_with("paas_id", self.my_lcm.db)
@patch("osm_lcm.lcm.get_paas_type_by_paas_id")
@patch("osm_lcm.lcm.get_paas_id_by_nsr_id")
def test_kafka_read_ns_action_paas_account(
self, get_paas_id, get_paas_type, mock_ensure_future
):
mock_ensure_future.return_value = self.task
get_paas_id.return_value = "paas_id"
get_paas_type.return_value = "juju"
self.my_lcm.kafka_read_callback("ns", "action", self.paas_params)
mock_ensure_future.assert_called_once_with(self.my_lcm.juju_paas.action())
self.my_lcm.lcm_tasks.register.assert_called_with(
"ns", "nsr_id", "nslcmop_id", "ns_action", self.task
)
get_paas_id.assert_called_with("nsr_id", self.my_lcm.db)
get_paas_type.assert_called_with("paas_id", self.my_lcm.db)