LCM Creates health check file according to configuration storage path
Health check file is created independent from user home directory
Change-Id: Ibb7454857d93b3b7396bd7bd0229533d1b090310
Signed-off-by: aticig <gulsum.atici@canonical.com>
(cherry picked from commit 56b86c280d4c67ea0844b2049b4e505abba8429e)
diff --git a/osm_lcm/tests/test_lcm.py b/osm_lcm/tests/test_lcm.py
new file mode 100644
index 0000000..af6f81d
--- /dev/null
+++ b/osm_lcm/tests/test_lcm.py
@@ -0,0 +1,126 @@
+# Copyright 2022 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import re
+import tempfile
+from unittest import TestCase
+from unittest.mock import Mock
+
+from osm_lcm.lcm import Lcm
+from osm_lcm.data_utils.database.database import Database
+from osm_lcm.data_utils.filesystem.filesystem import Filesystem
+
+from osm_lcm.lcm_utils import LcmException
+
+
+def create_lcm_config(
+ source_path: str, destination_path: str, line_number=None
+) -> None:
+ """This function creates new lcm_config files by
+ using the config file template. If line number is provided,
+ it removes the line from file.
+ Args:
+ source_path: (str) source file path
+ destination_path: (str) destination file path
+ line_number: (int) line to be deleted
+ """
+ with open(source_path, "r+") as fs:
+ # read and store all lines into list
+ contents = fs.readlines()
+
+ with open(destination_path, "w") as fd:
+ if line_number:
+ if line_number < 0:
+ raise LcmException("Line number can not be smaller than zero")
+ contents.pop(line_number)
+ contents = "".join(contents)
+ fd.write(contents)
+
+
+def check_file_content(health_check_file: str) -> str:
+ """Get the health check file contents
+ Args:
+ health_check_file: (str) file path
+
+ Returns:
+ contents: (str) health check file content
+ """
+ with open(health_check_file, "r") as hc:
+ contents = hc.read()
+ return contents
+
+
+class TestLcm(TestCase):
+
+ def setUp(self):
+ self.config_file = os.getcwd() + "/osm_lcm/tests/test_lcm_config_file.yaml"
+ self.config_file_without_storage_path = tempfile.mkstemp()[1]
+ Database.instance = None
+ self.db = Mock(Database({"database": {"driver": "memory"}}).instance.db)
+ Database().instance.db = self.db
+ Filesystem.instance = None
+ self.fs = Mock(
+ Filesystem({"storage": {"driver": "local", "path": "/"}}).instance.fs
+ )
+ Filesystem.instance.fs = self.fs
+ self.fs.path = "/"
+ self.my_lcm = Lcm(config_file=self.config_file)
+
+ def test_get_health_check_file_from_config_file(self):
+ self.assertEqual(self.my_lcm.health_check_file, "/tmp/storage/time_last_ping")
+
+ def test_health_check_file_not_in_config_file(self):
+ create_lcm_config(self.config_file, self.config_file_without_storage_path, 38)
+ with self.assertRaises(LcmException):
+ Lcm(config_file=self.config_file_without_storage_path)
+
+ def test_kafka_admin_topic_ping_command(self):
+ params = {
+ "to": "lcm",
+ "from": "lcm",
+ "worker_id": self.my_lcm.worker_id,
+ }
+ self.my_lcm.health_check_file = tempfile.mkstemp()[1]
+ self.my_lcm.kafka_read_callback("admin", "ping", params)
+ pattern = "[0-9]{10}.[0-9]{5,8}"
+ # Epoch time is written in health check file.
+ result = re.findall(pattern, check_file_content(self.my_lcm.health_check_file))
+ self.assertTrue(result)
+
+ def test_kafka_wrong_topic_ping_command(self):
+ params = {
+ "to": "lcm",
+ "from": "lcm",
+ "worker_id": self.my_lcm.worker_id,
+ }
+ self.my_lcm.health_check_file = tempfile.mkstemp()[1]
+ self.my_lcm.kafka_read_callback("kafka", "ping", params)
+ pattern = "[0-9]{10}.[0-9]{5,8}"
+ # Health check file is empty.
+ result = re.findall(pattern, check_file_content(self.my_lcm.health_check_file))
+ self.assertFalse(result)
+
+ def test_kafka_admin_topic_ping_command_wrong_worker_id(self):
+ params = {
+ "to": "lcm",
+ "from": "lcm",
+ "worker_id": 5,
+ }
+ self.my_lcm.health_check_file = tempfile.mkstemp()[1]
+ self.my_lcm.kafka_read_callback("admin", "ping", params)
+ pattern = "[0-9]{10}.[0-9]{5,8}"
+ # Health check file is empty.
+ result = re.findall(pattern, check_file_content(self.my_lcm.health_check_file))
+ self.assertFalse(result)
diff --git a/osm_lcm/tests/test_lcm_config_file.yaml b/osm_lcm/tests/test_lcm_config_file.yaml
new file mode 100644
index 0000000..4c55a4d
--- /dev/null
+++ b/osm_lcm/tests/test_lcm_config_file.yaml
@@ -0,0 +1,49 @@
+# Copyright 2022 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+global:
+ loglevel: DEBUG
+RO:
+ host: ro
+ port: 9090
+ tenant: osm
+VCA:
+ host: vca
+ port: 17070
+ user: admin
+ secret: secret
+ cloud: localhost
+ k8s_cloud: k8scloud
+ helmpath: /usr/local/bin/helm
+ helm3path: /usr/local/bin/helm3
+ kubectlpath: /usr/bin/kubectl
+ jujupath: /usr/local/bin/juju
+database:
+ driver: memory # mongo or memory
+ # host: mongo # hostname or IP
+ port: 27017
+ name: osm
+storage:
+ driver: local # local filesystem
+ path: /tmp/storage
+message:
+ driver: local # local or kafka
+ path: /tmp/kafka
+ host: kafka
+ port: 9092
+ group_id: lcm-server
+tsdb: # time series database
+ driver: prometheus
+ path: /tmp/prometheus
+ uri: http://prometheus:9090/
diff --git a/osm_lcm/tests/test_lcm_hc.py b/osm_lcm/tests/test_lcm_hc.py
new file mode 100644
index 0000000..90316ee
--- /dev/null
+++ b/osm_lcm/tests/test_lcm_hc.py
@@ -0,0 +1,72 @@
+# Copyright 2022 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from unittest import TestCase
+import os
+import tempfile
+
+from osm_lcm.lcm_hc import get_health_check_file
+from osm_lcm.lcm_utils import LcmException
+from test_lcm import create_lcm_config
+
+
+class TestLcmHealthCheck(TestCase):
+ def setUp(self):
+ self.config_temp = os.getcwd() + "/osm_lcm/tests/test_lcm_config_file.yaml"
+
+ def test_get_health_check_path(self):
+ with self.subTest(i=1, t="Empty Config Input"):
+ hc_path = get_health_check_file()
+ expected_hc_path = "/app/storage/time_last_ping"
+ self.assertEqual(hc_path, expected_hc_path)
+
+ with self.subTest(i=2, t="Config Input as Dictionary"):
+ config_dict = {"storage": {"path": "/tmp/sample_hc"}}
+ hc_path = get_health_check_file(config_dict)
+ expected_hc_path = "/tmp/sample_hc/time_last_ping"
+ self.assertEqual(hc_path, expected_hc_path)
+
+ with self.subTest(i=3, t="Config Input as Dictionary with wrong format"):
+ config_dict = {"folder": {"path": "/tmp/sample_hc"}}
+ # it will return default health check path
+ hc_path = get_health_check_file(config_dict)
+ expected_hc_path = "/app/storage/time_last_ping"
+ self.assertEqual(hc_path, expected_hc_path)
+
+ def test_get_health_check_path_config_file_not_found(self):
+ # open raises the FileNotFoundError
+ with self.assertRaises(LcmException):
+ get_health_check_file("/tmp2/config_yaml")
+
+ def test_get_health_check_path_config_file(self):
+ config_file = tempfile.mkstemp()[1]
+ create_lcm_config(self.config_temp, config_file)
+ hc_path = get_health_check_file(config_file)
+ expected_hc_path = "/tmp/storage/time_last_ping"
+ self.assertEqual(hc_path, expected_hc_path)
+
+ def test_get_health_check_path_config_file_empty(self):
+ new_config = tempfile.mkstemp()[1]
+ # Empty file will cause AttributeError
+ # and it will raise LCMException
+ with self.assertRaises(LcmException):
+ get_health_check_file(new_config)
+
+ def test_get_health_check_path_config_file_not_include_storage_path(self):
+ config_file = tempfile.mkstemp()[1]
+ create_lcm_config(self.config_temp, config_file, 36)
+ # It will return default health check path
+ hc_path = get_health_check_file(config_file)
+ expected_hc_path = "/app/storage/time_last_ping"
+ self.assertEqual(hc_path, expected_hc_path)