X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcmd%2Fmon_utils.py;fp=osm_mon%2Fcmd%2Fmon_utils.py;h=a5d3d07bc6d70527ca378dc84665ebf367433e19;hb=b170aec221ad296cc9de06a7b6069c878211cc65;hp=0000000000000000000000000000000000000000;hpb=cb6abac8bf7111d38ed77f7f136afa33eeaa8734;p=osm%2FMON.git diff --git a/osm_mon/cmd/mon_utils.py b/osm_mon/cmd/mon_utils.py new file mode 100644 index 0000000..a5d3d07 --- /dev/null +++ b/osm_mon/cmd/mon_utils.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- + +# This file is part of OSM Monitoring module + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import pymongo +import time +import socket +import logging +import kafka + + +def wait_till_commondb_is_ready(config, process_name="osm-mon", commondb_wait_time=5): + + logging.debug("wait_till_commondb_is_ready") + + while(True): + commondb_url = config.conf["database"].get("uri") + try: + commondb = pymongo.MongoClient(commondb_url) + commondb.server_info() + break + except Exception: + logging.info("{} process is waiting for commondb to come up...".format(process_name)) + time.sleep(commondb_wait_time) + + +def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5): + + logging.debug("wait_till_kafka_is_ready") + + while(True): + kafka_ready = False + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + # Verify is kafka port is up + if ( + s.connect_ex( + ( + config.conf.get("message", {}).get("host", "kafka"), + int(config.conf["message"].get("port")), + ) + ) + == 0 + ): + # Get the list of topics. If kafka is not ready exception will be thrown. + consumer = kafka.KafkaConsumer(group_id=config.conf["message"].get("group_id"), + bootstrap_servers=[config.conf.get("message", {}).get("host", + "kafka") + ":" + config.conf["message"] + .get("port")]) + topics = consumer.topics() + logging.debug("Number of topics found: %s", len(topics)) + kafka_ready = True + except Exception as e: + logging.info("Error when trying to get kafka status.") + logging.debug("Exception when trying to get kafka status: %s", str(e)) + finally: + if kafka_ready: + break + else: + logging.info("{} process is waiting for kafka to come up...".format(process_name)) + time.sleep(kafka_wait_time) + + +def wait_till_core_services_are_ready(config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5): + + logging.debug("wait_till_services_are_ready") + + if not config: + logging.info("Config information is not available") + return False + + # Check if common-db is ready + wait_till_kafka_is_ready(config, process_name, commondb_wait_time) + + # Check if kafka is ready + wait_till_kafka_is_ready(config, process_name, kafka_wait_time) + + return True