# under the License.
##
-from os import path
from time import time, sleep
from sys import stderr
-""" This module is used for helth check. A file called time_last_ping is used
+from osm_lcm.lcm_utils import LcmException
+import yaml
+
+""" This module is used for health check. A file called time_last_ping is used
This contains the last time where something is received from kafka
"""
-def health_check(health_check_file=None, ping_interval_pace=120):
- health_check_file = health_check_file or path.expanduser("~") + "/time_last_ping"
+def get_health_check_file(config_file=None):
+ try:
+ health_check_file = "/app/storage/time_last_ping"
+ if not config_file:
+ return health_check_file
+ # If config_input is dictionary
+ if isinstance(config_file, dict) and config_file.get("storage"):
+ health_check_file = config_file["storage"]["path"] + "/time_last_ping"
+ # If config_input is file
+ elif isinstance(config_file, str):
+ with open(config_file) as f:
+ # read file as yaml format
+ conf = yaml.safe_load(f)
+ # Ensure all sections are not empty
+ if conf.get("storage"):
+ health_check_file = conf["storage"]["path"] + "/time_last_ping"
+
+ return health_check_file
+ except (IOError, FileNotFoundError, TypeError, AttributeError, KeyError) as error:
+ raise LcmException(
+ f"Error occured while getting the health check file location: {error}"
+ )
+
+
+def health_check(config_file=None, ping_interval_pace=120):
+ health_check_file = get_health_check_file(config_file)
retry = 2
while retry:
retry -= 1