blob: 0aa9b7d44b90aa5d2d0f0120b40ec46cd447d5db [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2020 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact: legal@canonical.com
#
# To get in touch with the maintainers, please contact:
# osm-charmers@lists.launchpad.net
##
from typing import NoReturn
import unittest
from ops.model import BlockedStatus
from ops.testing import Harness
from charm import RoCharm
class TestCharm(unittest.TestCase):
"""RO Charm unit tests."""
def setUp(self) -> NoReturn:
"""Test setup"""
self.harness = Harness(RoCharm)
self.harness.set_leader(is_leader=True)
self.harness.begin()
def test_on_start_without_relations_ng_ro(self) -> NoReturn:
"""Test installation without any relation."""
self.harness.charm.on.start.emit()
# Verifying status
self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
# Verifying status message
self.assertGreater(len(self.harness.charm.unit.status.message), 0)
self.assertTrue(
self.harness.charm.unit.status.message.startswith("Waiting for ")
)
self.assertIn("kafka", self.harness.charm.unit.status.message)
self.assertIn("mongodb", self.harness.charm.unit.status.message)
self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations"))
def test_on_start_without_relations_no_ng_ro(self) -> NoReturn:
"""Test installation without any relation."""
self.harness.update_config({"enable_ng_ro": False})
self.harness.charm.on.start.emit()
# Verifying status
self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
# Verifying status message
self.assertGreater(len(self.harness.charm.unit.status.message), 0)
self.assertTrue(
self.harness.charm.unit.status.message.startswith("Waiting for ")
)
self.assertIn("mysql", self.harness.charm.unit.status.message)
self.assertTrue(self.harness.charm.unit.status.message.endswith(" relation"))
def test_on_start_with_relations_ng_ro(self) -> NoReturn:
"""Test deployment with NG-RO."""
expected_result = {
"version": 3,
"containers": [
{
"name": "ro",
"imageDetails": self.harness.charm.image.fetch(),
"imagePullPolicy": "Always",
"ports": [
{
"name": "ro",
"containerPort": 9090,
"protocol": "TCP",
}
],
"envConfig": {
"OSMRO_LOG_LEVEL": "INFO",
"OSMRO_MESSAGE_DRIVER": "kafka",
"OSMRO_MESSAGE_HOST": "kafka",
"OSMRO_MESSAGE_PORT": "9090",
"OSMRO_DATABASE_DRIVER": "mongo",
"OSMRO_DATABASE_URI": "mongodb://mongo",
"OSMRO_DATABASE_COMMONKEY": "osm",
},
"kubernetes": {
"startupProbe": {
"exec": {"command": ["/usr/bin/pgrep", "python3"]},
"initialDelaySeconds": 60,
"timeoutSeconds": 5,
},
"readinessProbe": {
"httpGet": {
"path": "/openmano/tenants",
"port": 9090,
},
"periodSeconds": 10,
"timeoutSeconds": 5,
"successThreshold": 1,
"failureThreshold": 3,
},
"livenessProbe": {
"httpGet": {
"path": "/openmano/tenants",
"port": 9090,
},
"initialDelaySeconds": 600,
"periodSeconds": 10,
"timeoutSeconds": 5,
"successThreshold": 1,
"failureThreshold": 3,
},
},
}
],
"kubernetesResources": {"ingressResources": []},
}
self.harness.charm.on.start.emit()
# Initializing the kafka relation
relation_id = self.harness.add_relation("kafka", "kafka")
self.harness.add_relation_unit(relation_id, "kafka/0")
self.harness.update_relation_data(
relation_id,
"kafka/0",
{
"host": "kafka",
"port": "9090",
},
)
# Initializing the mongodb relation
relation_id = self.harness.add_relation("mongodb", "mongodb")
self.harness.add_relation_unit(relation_id, "mongodb/0")
self.harness.update_relation_data(
relation_id,
"mongodb/0",
{
"connection_string": "mongodb://mongo",
},
)
# Verifying status
self.assertNotIsInstance(self.harness.charm.unit.status, BlockedStatus)
pod_spec, _ = self.harness.get_pod_spec()
self.assertDictEqual(expected_result, pod_spec)
def test_on_start_with_relations_no_ng_ro(self) -> NoReturn:
"""Test deployment with old RO."""
self.harness.update_config({"enable_ng_ro": False})
expected_result = {
"version": 3,
"containers": [
{
"name": "ro",
"imageDetails": self.harness.charm.image.fetch(),
"imagePullPolicy": "Always",
"ports": [
{
"name": "ro",
"containerPort": 9090,
"protocol": "TCP",
}
],
"envConfig": {
"OSMRO_LOG_LEVEL": "INFO",
"RO_DB_HOST": "mysql",
"RO_DB_OVIM_HOST": "mysql",
"RO_DB_PORT": 3306,
"RO_DB_OVIM_PORT": 3306,
"RO_DB_USER": "mano",
"RO_DB_OVIM_USER": "mano",
"RO_DB_PASSWORD": "manopw",
"RO_DB_OVIM_PASSWORD": "manopw",
"RO_DB_ROOT_PASSWORD": "rootmanopw",
"RO_DB_OVIM_ROOT_PASSWORD": "rootmanopw",
"RO_DB_NAME": "mano_db",
"RO_DB_OVIM_NAME": "mano_vim_db",
"OPENMANO_TENANT": "osm",
},
"kubernetes": {
"startupProbe": {
"exec": {"command": ["/usr/bin/pgrep", "python3"]},
"initialDelaySeconds": 60,
"timeoutSeconds": 5,
},
"readinessProbe": {
"httpGet": {
"path": "/openmano/tenants",
"port": 9090,
},
"periodSeconds": 10,
"timeoutSeconds": 5,
"successThreshold": 1,
"failureThreshold": 3,
},
"livenessProbe": {
"httpGet": {
"path": "/openmano/tenants",
"port": 9090,
},
"initialDelaySeconds": 600,
"periodSeconds": 10,
"timeoutSeconds": 5,
"successThreshold": 1,
"failureThreshold": 3,
},
},
}
],
"kubernetesResources": {"ingressResources": []},
}
self.harness.charm.on.start.emit()
# Initializing the mysql relation
relation_id = self.harness.add_relation("mysql", "mysql")
self.harness.add_relation_unit(relation_id, "mysql/0")
self.harness.update_relation_data(
relation_id,
"mysql/0",
{
"host": "mysql",
"port": 3306,
"user": "mano",
"password": "manopw",
"root_password": "rootmanopw",
},
)
# Verifying status
self.assertNotIsInstance(self.harness.charm.unit.status, BlockedStatus)
pod_spec, _ = self.harness.get_pod_spec()
self.assertDictEqual(expected_result, pod_spec)
def test_on_kafka_unit_relation_changed(self) -> NoReturn:
"""Test to see if kafka relation is updated."""
self.harness.charm.on.start.emit()
relation_id = self.harness.add_relation("kafka", "kafka")
self.harness.add_relation_unit(relation_id, "kafka/0")
self.harness.update_relation_data(
relation_id,
"kafka/0",
{
"host": "kafka",
"port": 9090,
},
)
# Verifying status
self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
# Verifying status message
self.assertGreater(len(self.harness.charm.unit.status.message), 0)
self.assertTrue(
self.harness.charm.unit.status.message.startswith("Waiting for ")
)
self.assertIn("mongodb", self.harness.charm.unit.status.message)
self.assertTrue(self.harness.charm.unit.status.message.endswith(" relation"))
def test_on_mongodb_unit_relation_changed(self) -> NoReturn:
"""Test to see if mongodb relation is updated."""
self.harness.charm.on.start.emit()
relation_id = self.harness.add_relation("mongodb", "mongodb")
self.harness.add_relation_unit(relation_id, "mongodb/0")
self.harness.update_relation_data(
relation_id,
"mongodb/0",
{
"connection_string": "mongodb://mongo",
},
)
# Verifying status
self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
# Verifying status message
self.assertGreater(len(self.harness.charm.unit.status.message), 0)
self.assertTrue(
self.harness.charm.unit.status.message.startswith("Waiting for ")
)
self.assertIn("kafka", self.harness.charm.unit.status.message)
self.assertTrue(self.harness.charm.unit.status.message.endswith(" relation"))
def test_on_mysql_unit_relation_changed(self) -> NoReturn:
"""Test to see if mysql relation is updated."""
self.harness.charm.on.start.emit()
relation_id = self.harness.add_relation("mysql", "mysql")
self.harness.add_relation_unit(relation_id, "mysql/0")
self.harness.update_relation_data(
relation_id,
"mysql/0",
{
"host": "mysql",
"port": 3306,
"user": "mano",
"password": "manopw",
"root_password": "rootmanopw",
},
)
# Verifying status
self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus)
# Verifying status message
self.assertGreater(len(self.harness.charm.unit.status.message), 0)
self.assertTrue(
self.harness.charm.unit.status.message.startswith("Waiting for ")
)
self.assertIn("kafka", self.harness.charm.unit.status.message)
self.assertIn("mongodb", self.harness.charm.unit.status.message)
self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations"))
def test_publish_ro_info(self) -> NoReturn:
"""Test to see if ro relation is updated."""
expected_result = {
"host": "ro",
"port": "9090",
}
self.harness.charm.on.start.emit()
relation_id = self.harness.add_relation("ro", "lcm")
self.harness.add_relation_unit(relation_id, "lcm/0")
relation_data = self.harness.get_relation_data(relation_id, "ro")
self.assertDictEqual(expected_result, relation_data)
if __name__ == "__main__":
unittest.main()