from osm_common.msgbase import MsgException
from osm_lcm.data_utils.database.database import Database
from osm_lcm.data_utils.filesystem.filesystem import Filesystem
+from osm_lcm.lcm_hc import get_health_check_file
from os import environ, path
from random import choice as random_choice
from n2vc import version as n2vc_version
min_n2vc_version = "0.0.2"
min_common_version = "0.1.19"
-health_check_file = (
- path.expanduser("~") + "/time_last_ping"
-) # TODO find better location for this file
class Lcm:
# load configuration
config = self.read_config_file(config_file)
self.config = config
+ self.health_check_file = get_health_check_file(self.config)
self.config["ro_config"] = {
"ng": config["RO"].get("ng", False),
"uri": config["RO"].get("uri"),
return
self.pings_not_received = 0
try:
- with open(health_check_file, "w") as f:
+ with open(self.health_check_file, "w") as f:
f.write(str(time()))
except Exception as e:
self.logger.error(
"Cannot write into '{}' for healthcheck: {}".format(
- health_check_file, e
+ self.health_check_file, e
)
)
return
elif o == "--health-check":
from osm_lcm.lcm_hc import health_check
- health_check(health_check_file, Lcm.ping_interval_pace)
+ health_check(config_file, Lcm.ping_interval_pace)
# elif o == "--log-socket-port":
# log_socket_port = a
# elif o == "--log-socket-host":
# under the License.
##
-from os import path
from time import time, sleep
from sys import stderr
-""" This module is used for helth check. A file called time_last_ping is used
+from osm_lcm.lcm_utils import LcmException
+import yaml
+
+""" This module is used for health check. A file called time_last_ping is used
This contains the last time where something is received from kafka
"""
-def health_check(health_check_file=None, ping_interval_pace=120):
- health_check_file = health_check_file or path.expanduser("~") + "/time_last_ping"
+def get_health_check_file(config_file=None):
+ try:
+ health_check_file = "/app/storage/time_last_ping"
+ if not config_file:
+ return health_check_file
+ # If config_input is dictionary
+ if isinstance(config_file, dict) and config_file.get("storage"):
+ health_check_file = config_file["storage"]["path"] + "/time_last_ping"
+ # If config_input is file
+ elif isinstance(config_file, str):
+ with open(config_file) as f:
+ # read file as yaml format
+ conf = yaml.safe_load(f)
+ # Ensure all sections are not empty
+ if conf.get("storage"):
+ health_check_file = conf["storage"]["path"] + "/time_last_ping"
+
+ return health_check_file
+ except (IOError, FileNotFoundError, TypeError, AttributeError, KeyError) as error:
+ raise LcmException(
+ f"Error occured while getting the health check file location: {error}"
+ )
+
+
+def health_check(config_file=None, ping_interval_pace=120):
+ health_check_file = get_health_check_file(config_file)
retry = 2
while retry:
retry -= 1
--- /dev/null
+# Copyright 2022 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import re
+import tempfile
+from unittest import TestCase
+from unittest.mock import Mock
+
+from osm_lcm.lcm import Lcm
+from osm_lcm.data_utils.database.database import Database
+from osm_lcm.data_utils.filesystem.filesystem import Filesystem
+
+from osm_lcm.lcm_utils import LcmException
+
+
+def create_lcm_config(
+ source_path: str, destination_path: str, line_number=None
+) -> None:
+ """This function creates new lcm_config files by
+ using the config file template. If line number is provided,
+ it removes the line from file.
+ Args:
+ source_path: (str) source file path
+ destination_path: (str) destination file path
+ line_number: (int) line to be deleted
+ """
+ with open(source_path, "r+") as fs:
+ # read and store all lines into list
+ contents = fs.readlines()
+
+ with open(destination_path, "w") as fd:
+ if line_number:
+ if line_number < 0:
+ raise LcmException("Line number can not be smaller than zero")
+ contents.pop(line_number)
+ contents = "".join(contents)
+ fd.write(contents)
+
+
+def check_file_content(health_check_file: str) -> str:
+ """Get the health check file contents
+ Args:
+ health_check_file: (str) file path
+
+ Returns:
+ contents: (str) health check file content
+ """
+ with open(health_check_file, "r") as hc:
+ contents = hc.read()
+ return contents
+
+
+class TestLcm(TestCase):
+
+ def setUp(self):
+ self.config_file = os.getcwd() + "/osm_lcm/tests/test_lcm_config_file.yaml"
+ self.config_file_without_storage_path = tempfile.mkstemp()[1]
+ Database.instance = None
+ self.db = Mock(Database({"database": {"driver": "memory"}}).instance.db)
+ Database().instance.db = self.db
+ Filesystem.instance = None
+ self.fs = Mock(
+ Filesystem({"storage": {"driver": "local", "path": "/"}}).instance.fs
+ )
+ Filesystem.instance.fs = self.fs
+ self.fs.path = "/"
+ self.my_lcm = Lcm(config_file=self.config_file)
+
+ def test_get_health_check_file_from_config_file(self):
+ self.assertEqual(self.my_lcm.health_check_file, "/tmp/storage/time_last_ping")
+
+ def test_health_check_file_not_in_config_file(self):
+ create_lcm_config(self.config_file, self.config_file_without_storage_path, 38)
+ with self.assertRaises(LcmException):
+ Lcm(config_file=self.config_file_without_storage_path)
+
+ def test_kafka_admin_topic_ping_command(self):
+ params = {
+ "to": "lcm",
+ "from": "lcm",
+ "worker_id": self.my_lcm.worker_id,
+ }
+ self.my_lcm.health_check_file = tempfile.mkstemp()[1]
+ self.my_lcm.kafka_read_callback("admin", "ping", params)
+ pattern = "[0-9]{10}.[0-9]{5,8}"
+ # Epoch time is written in health check file.
+ result = re.findall(pattern, check_file_content(self.my_lcm.health_check_file))
+ self.assertTrue(result)
+
+ def test_kafka_wrong_topic_ping_command(self):
+ params = {
+ "to": "lcm",
+ "from": "lcm",
+ "worker_id": self.my_lcm.worker_id,
+ }
+ self.my_lcm.health_check_file = tempfile.mkstemp()[1]
+ self.my_lcm.kafka_read_callback("kafka", "ping", params)
+ pattern = "[0-9]{10}.[0-9]{5,8}"
+ # Health check file is empty.
+ result = re.findall(pattern, check_file_content(self.my_lcm.health_check_file))
+ self.assertFalse(result)
+
+ def test_kafka_admin_topic_ping_command_wrong_worker_id(self):
+ params = {
+ "to": "lcm",
+ "from": "lcm",
+ "worker_id": 5,
+ }
+ self.my_lcm.health_check_file = tempfile.mkstemp()[1]
+ self.my_lcm.kafka_read_callback("admin", "ping", params)
+ pattern = "[0-9]{10}.[0-9]{5,8}"
+ # Health check file is empty.
+ result = re.findall(pattern, check_file_content(self.my_lcm.health_check_file))
+ self.assertFalse(result)
--- /dev/null
+# Copyright 2022 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+global:
+ loglevel: DEBUG
+RO:
+ host: ro
+ port: 9090
+ tenant: osm
+VCA:
+ host: vca
+ port: 17070
+ user: admin
+ secret: secret
+ cloud: localhost
+ k8s_cloud: k8scloud
+ helmpath: /usr/local/bin/helm
+ helm3path: /usr/local/bin/helm3
+ kubectlpath: /usr/bin/kubectl
+ jujupath: /usr/local/bin/juju
+database:
+ driver: memory # mongo or memory
+ # host: mongo # hostname or IP
+ port: 27017
+ name: osm
+storage:
+ driver: local # local filesystem
+ path: /tmp/storage
+message:
+ driver: local # local or kafka
+ path: /tmp/kafka
+ host: kafka
+ port: 9092
+ group_id: lcm-server
+tsdb: # time series database
+ driver: prometheus
+ path: /tmp/prometheus
+ uri: http://prometheus:9090/
--- /dev/null
+# Copyright 2022 Canonical Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from unittest import TestCase
+import os
+import tempfile
+
+from osm_lcm.lcm_hc import get_health_check_file
+from osm_lcm.lcm_utils import LcmException
+from test_lcm import create_lcm_config
+
+
+class TestLcmHealthCheck(TestCase):
+ def setUp(self):
+ self.config_temp = os.getcwd() + "/osm_lcm/tests/test_lcm_config_file.yaml"
+
+ def test_get_health_check_path(self):
+ with self.subTest(i=1, t="Empty Config Input"):
+ hc_path = get_health_check_file()
+ expected_hc_path = "/app/storage/time_last_ping"
+ self.assertEqual(hc_path, expected_hc_path)
+
+ with self.subTest(i=2, t="Config Input as Dictionary"):
+ config_dict = {"storage": {"path": "/tmp/sample_hc"}}
+ hc_path = get_health_check_file(config_dict)
+ expected_hc_path = "/tmp/sample_hc/time_last_ping"
+ self.assertEqual(hc_path, expected_hc_path)
+
+ with self.subTest(i=3, t="Config Input as Dictionary with wrong format"):
+ config_dict = {"folder": {"path": "/tmp/sample_hc"}}
+ # it will return default health check path
+ hc_path = get_health_check_file(config_dict)
+ expected_hc_path = "/app/storage/time_last_ping"
+ self.assertEqual(hc_path, expected_hc_path)
+
+ def test_get_health_check_path_config_file_not_found(self):
+ # open raises the FileNotFoundError
+ with self.assertRaises(LcmException):
+ get_health_check_file("/tmp2/config_yaml")
+
+ def test_get_health_check_path_config_file(self):
+ config_file = tempfile.mkstemp()[1]
+ create_lcm_config(self.config_temp, config_file)
+ hc_path = get_health_check_file(config_file)
+ expected_hc_path = "/tmp/storage/time_last_ping"
+ self.assertEqual(hc_path, expected_hc_path)
+
+ def test_get_health_check_path_config_file_empty(self):
+ new_config = tempfile.mkstemp()[1]
+ # Empty file will cause AttributeError
+ # and it will raise LCMException
+ with self.assertRaises(LcmException):
+ get_health_check_file(new_config)
+
+ def test_get_health_check_path_config_file_not_include_storage_path(self):
+ config_file = tempfile.mkstemp()[1]
+ create_lcm_config(self.config_temp, config_file, 36)
+ # It will return default health check path
+ hc_path = get_health_check_file(config_file)
+ expected_hc_path = "/app/storage/time_last_ping"
+ self.assertEqual(hc_path, expected_hc_path)