import socket
import logging
import kafka
-from osm_mon.core.message_bus_client import MessageBusClient
+
def wait_till_commondb_is_ready(config, process_name="osm-mon", commondb_wait_time=5):
logging.debug("wait_till_commondb_is_ready")
- while(True):
+ while True:
commondb_url = config.conf["database"].get("uri")
try:
commondb = pymongo.MongoClient(commondb_url)
commondb.server_info()
break
except Exception:
- logging.info("{} process is waiting for commondb to come up...".format(process_name))
+ logging.info(
+ "{} process is waiting for commondb to come up...".format(process_name)
+ )
time.sleep(commondb_wait_time)
logging.debug("wait_till_kafka_is_ready")
- while(True):
+ while True:
kafka_ready = False
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
== 0
):
# Get the list of topics. If kafka is not ready exception will be thrown.
- consumer = kafka.KafkaConsumer(group_id=config.conf["message"].get("group_id"),
- bootstrap_servers=[config.conf.get("message", {}).get("host",
- "kafka") + ":" + config.conf["message"]
- .get("port")])
- topics = consumer.topics()
- logging.debug("Number of topics found: %s", len(topics))
+ consumer = kafka.KafkaConsumer(
+ group_id=config.conf["message"].get("group_id"),
+ bootstrap_servers=[
+ config.conf.get("message", {}).get("host", "kafka")
+ + ":"
+ + config.conf["message"].get("port")
+ ],
+ )
+ all_topics = consumer.topics()
+ logging.debug("Number of topics found: %s", len(all_topics))
# Send dummy message in kafka topics. If kafka is not ready exception will be thrown.
- msg_bus = MessageBusClient(config)
- topics = ["alarm_request", "users", "project"]
- for topic in topics:
- msg_bus.aiowrite(topic, 'echo', 'dummy message')
+ producer = kafka.KafkaProducer(
+ bootstrap_servers=[
+ config.conf.get("message", {}).get("host", "kafka")
+ + ":"
+ + config.conf["message"].get("port")
+ ]
+ )
+ mon_topics = ["alarm_request", "users", "project"]
+ for mon_topic in mon_topics:
+ producer.send(mon_topic, key=b"echo", value=b"dummy message")
# Kafka is ready now
kafka_ready = True
if kafka_ready:
break
else:
- logging.info("{} process is waiting for kafka to come up...".format(process_name))
+ logging.info(
+ "{} process is waiting for kafka to come up...".format(process_name)
+ )
time.sleep(kafka_wait_time)
-def wait_till_core_services_are_ready(config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5):
+def wait_till_core_services_are_ready(
+ config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5
+):
logging.debug("wait_till_core_services_are_ready")