| #!/usr/bin/env python3 |
| # Copyright 2020 Canonical Ltd. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| # |
| # For those usages not covered by the Apache License, Version 2.0 please |
| # contact: legal@canonical.com |
| # |
| # To get in touch with the maintainers, please contact: |
| # osm-charmers@lists.launchpad.net |
| ## |
| |
| from typing import NoReturn |
| import unittest |
| from ops.model import BlockedStatus |
| |
| from ops.testing import Harness |
| |
| from charm import RoCharm |
| |
| |
| class TestCharm(unittest.TestCase): |
| """RO Charm unit tests.""" |
| |
| def setUp(self) -> NoReturn: |
| """Test setup""" |
| self.harness = Harness(RoCharm) |
| self.harness.set_leader(is_leader=True) |
| self.harness.begin() |
| |
| def test_on_start_without_relations_ng_ro(self) -> NoReturn: |
| """Test installation without any relation.""" |
| self.harness.charm.on.start.emit() |
| |
| # Verifying status |
| self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus) |
| |
| # Verifying status message |
| self.assertGreater(len(self.harness.charm.unit.status.message), 0) |
| self.assertTrue( |
| self.harness.charm.unit.status.message.startswith("Waiting for ") |
| ) |
| self.assertIn("kafka", self.harness.charm.unit.status.message) |
| self.assertIn("mongodb", self.harness.charm.unit.status.message) |
| self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations")) |
| |
| def test_on_start_without_relations_no_ng_ro(self) -> NoReturn: |
| """Test installation without any relation.""" |
| self.harness.update_config({"enable_ng_ro": False}) |
| |
| self.harness.charm.on.start.emit() |
| |
| # Verifying status |
| self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus) |
| |
| # Verifying status message |
| self.assertGreater(len(self.harness.charm.unit.status.message), 0) |
| self.assertTrue( |
| self.harness.charm.unit.status.message.startswith("Waiting for ") |
| ) |
| self.assertIn("mysql", self.harness.charm.unit.status.message) |
| self.assertTrue(self.harness.charm.unit.status.message.endswith(" relation")) |
| |
| def test_on_start_with_relations_ng_ro(self) -> NoReturn: |
| """Test deployment with NG-RO.""" |
| expected_result = { |
| "version": 3, |
| "containers": [ |
| { |
| "name": "ro", |
| "imageDetails": self.harness.charm.image.fetch(), |
| "imagePullPolicy": "Always", |
| "ports": [ |
| { |
| "name": "ro", |
| "containerPort": 9090, |
| "protocol": "TCP", |
| } |
| ], |
| "envConfig": { |
| "OSMRO_LOG_LEVEL": "INFO", |
| "OSMRO_MESSAGE_DRIVER": "kafka", |
| "OSMRO_MESSAGE_HOST": "kafka", |
| "OSMRO_MESSAGE_PORT": "9090", |
| "OSMRO_DATABASE_DRIVER": "mongo", |
| "OSMRO_DATABASE_URI": "mongodb://mongo", |
| "OSMRO_DATABASE_COMMONKEY": "osm", |
| }, |
| "kubernetes": { |
| "startupProbe": { |
| "exec": {"command": ["/usr/bin/pgrep", "python3"]}, |
| "initialDelaySeconds": 60, |
| "timeoutSeconds": 5, |
| }, |
| "readinessProbe": { |
| "httpGet": { |
| "path": "/openmano/tenants", |
| "port": 9090, |
| }, |
| "periodSeconds": 10, |
| "timeoutSeconds": 5, |
| "successThreshold": 1, |
| "failureThreshold": 3, |
| }, |
| "livenessProbe": { |
| "httpGet": { |
| "path": "/openmano/tenants", |
| "port": 9090, |
| }, |
| "initialDelaySeconds": 600, |
| "periodSeconds": 10, |
| "timeoutSeconds": 5, |
| "successThreshold": 1, |
| "failureThreshold": 3, |
| }, |
| }, |
| } |
| ], |
| "kubernetesResources": {"ingressResources": []}, |
| } |
| |
| self.harness.charm.on.start.emit() |
| |
| # Initializing the kafka relation |
| relation_id = self.harness.add_relation("kafka", "kafka") |
| self.harness.add_relation_unit(relation_id, "kafka/0") |
| self.harness.update_relation_data( |
| relation_id, |
| "kafka/0", |
| { |
| "host": "kafka", |
| "port": "9090", |
| }, |
| ) |
| |
| # Initializing the mongodb relation |
| relation_id = self.harness.add_relation("mongodb", "mongodb") |
| self.harness.add_relation_unit(relation_id, "mongodb/0") |
| self.harness.update_relation_data( |
| relation_id, |
| "mongodb/0", |
| { |
| "connection_string": "mongodb://mongo", |
| }, |
| ) |
| |
| # Verifying status |
| self.assertNotIsInstance(self.harness.charm.unit.status, BlockedStatus) |
| |
| pod_spec, _ = self.harness.get_pod_spec() |
| |
| self.assertDictEqual(expected_result, pod_spec) |
| |
| def test_on_start_with_relations_no_ng_ro(self) -> NoReturn: |
| """Test deployment with old RO.""" |
| self.harness.update_config({"enable_ng_ro": False}) |
| |
| expected_result = { |
| "version": 3, |
| "containers": [ |
| { |
| "name": "ro", |
| "imageDetails": self.harness.charm.image.fetch(), |
| "imagePullPolicy": "Always", |
| "ports": [ |
| { |
| "name": "ro", |
| "containerPort": 9090, |
| "protocol": "TCP", |
| } |
| ], |
| "envConfig": { |
| "OSMRO_LOG_LEVEL": "INFO", |
| "RO_DB_HOST": "mysql", |
| "RO_DB_OVIM_HOST": "mysql", |
| "RO_DB_PORT": 3306, |
| "RO_DB_OVIM_PORT": 3306, |
| "RO_DB_USER": "mano", |
| "RO_DB_OVIM_USER": "mano", |
| "RO_DB_PASSWORD": "manopw", |
| "RO_DB_OVIM_PASSWORD": "manopw", |
| "RO_DB_ROOT_PASSWORD": "rootmanopw", |
| "RO_DB_OVIM_ROOT_PASSWORD": "rootmanopw", |
| "RO_DB_NAME": "mano_db", |
| "RO_DB_OVIM_NAME": "mano_vim_db", |
| "OPENMANO_TENANT": "osm", |
| }, |
| "kubernetes": { |
| "startupProbe": { |
| "exec": {"command": ["/usr/bin/pgrep", "python3"]}, |
| "initialDelaySeconds": 60, |
| "timeoutSeconds": 5, |
| }, |
| "readinessProbe": { |
| "httpGet": { |
| "path": "/openmano/tenants", |
| "port": 9090, |
| }, |
| "periodSeconds": 10, |
| "timeoutSeconds": 5, |
| "successThreshold": 1, |
| "failureThreshold": 3, |
| }, |
| "livenessProbe": { |
| "httpGet": { |
| "path": "/openmano/tenants", |
| "port": 9090, |
| }, |
| "initialDelaySeconds": 600, |
| "periodSeconds": 10, |
| "timeoutSeconds": 5, |
| "successThreshold": 1, |
| "failureThreshold": 3, |
| }, |
| }, |
| } |
| ], |
| "kubernetesResources": {"ingressResources": []}, |
| } |
| |
| self.harness.charm.on.start.emit() |
| |
| # Initializing the mysql relation |
| relation_id = self.harness.add_relation("mysql", "mysql") |
| self.harness.add_relation_unit(relation_id, "mysql/0") |
| self.harness.update_relation_data( |
| relation_id, |
| "mysql/0", |
| { |
| "host": "mysql", |
| "port": 3306, |
| "user": "mano", |
| "password": "manopw", |
| "root_password": "rootmanopw", |
| }, |
| ) |
| |
| # Verifying status |
| self.assertNotIsInstance(self.harness.charm.unit.status, BlockedStatus) |
| |
| pod_spec, _ = self.harness.get_pod_spec() |
| |
| self.assertDictEqual(expected_result, pod_spec) |
| |
| def test_on_kafka_unit_relation_changed(self) -> NoReturn: |
| """Test to see if kafka relation is updated.""" |
| self.harness.charm.on.start.emit() |
| |
| relation_id = self.harness.add_relation("kafka", "kafka") |
| self.harness.add_relation_unit(relation_id, "kafka/0") |
| self.harness.update_relation_data( |
| relation_id, |
| "kafka/0", |
| { |
| "host": "kafka", |
| "port": 9090, |
| }, |
| ) |
| |
| # Verifying status |
| self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus) |
| |
| # Verifying status message |
| self.assertGreater(len(self.harness.charm.unit.status.message), 0) |
| self.assertTrue( |
| self.harness.charm.unit.status.message.startswith("Waiting for ") |
| ) |
| self.assertIn("mongodb", self.harness.charm.unit.status.message) |
| self.assertTrue(self.harness.charm.unit.status.message.endswith(" relation")) |
| |
| def test_on_mongodb_unit_relation_changed(self) -> NoReturn: |
| """Test to see if mongodb relation is updated.""" |
| self.harness.charm.on.start.emit() |
| |
| relation_id = self.harness.add_relation("mongodb", "mongodb") |
| self.harness.add_relation_unit(relation_id, "mongodb/0") |
| self.harness.update_relation_data( |
| relation_id, |
| "mongodb/0", |
| { |
| "connection_string": "mongodb://mongo", |
| }, |
| ) |
| |
| # Verifying status |
| self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus) |
| |
| # Verifying status message |
| self.assertGreater(len(self.harness.charm.unit.status.message), 0) |
| self.assertTrue( |
| self.harness.charm.unit.status.message.startswith("Waiting for ") |
| ) |
| self.assertIn("kafka", self.harness.charm.unit.status.message) |
| self.assertTrue(self.harness.charm.unit.status.message.endswith(" relation")) |
| |
| def test_on_mysql_unit_relation_changed(self) -> NoReturn: |
| """Test to see if mysql relation is updated.""" |
| self.harness.charm.on.start.emit() |
| |
| relation_id = self.harness.add_relation("mysql", "mysql") |
| self.harness.add_relation_unit(relation_id, "mysql/0") |
| self.harness.update_relation_data( |
| relation_id, |
| "mysql/0", |
| { |
| "host": "mysql", |
| "port": 3306, |
| "user": "mano", |
| "password": "manopw", |
| "root_password": "rootmanopw", |
| }, |
| ) |
| |
| # Verifying status |
| self.assertIsInstance(self.harness.charm.unit.status, BlockedStatus) |
| |
| # Verifying status message |
| self.assertGreater(len(self.harness.charm.unit.status.message), 0) |
| self.assertTrue( |
| self.harness.charm.unit.status.message.startswith("Waiting for ") |
| ) |
| self.assertIn("kafka", self.harness.charm.unit.status.message) |
| self.assertIn("mongodb", self.harness.charm.unit.status.message) |
| self.assertTrue(self.harness.charm.unit.status.message.endswith(" relations")) |
| |
| def test_publish_ro_info(self) -> NoReturn: |
| """Test to see if ro relation is updated.""" |
| expected_result = { |
| "host": "ro", |
| "port": "9090", |
| } |
| |
| self.harness.charm.on.start.emit() |
| |
| relation_id = self.harness.add_relation("ro", "lcm") |
| self.harness.add_relation_unit(relation_id, "lcm/0") |
| relation_data = self.harness.get_relation_data(relation_id, "ro") |
| |
| self.assertDictEqual(expected_result, relation_data) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |