1 # -*- coding: utf-8 -*-
3 # This file is part of OSM Monitoring module
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
24 def wait_till_commondb_is_ready(config
, process_name
="osm-mon", commondb_wait_time
=5):
25 logging
.debug("wait_till_commondb_is_ready")
28 commondb_url
= config
.conf
["database"].get("uri")
30 commondb
= pymongo
.MongoClient(commondb_url
)
31 commondb
.server_info()
35 "{} process is waiting for commondb to come up...".format(process_name
)
37 time
.sleep(commondb_wait_time
)
40 def wait_till_kafka_is_ready(config
, process_name
="osm-mon", kafka_wait_time
=5):
41 logging
.debug("wait_till_kafka_is_ready")
46 with socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
) as s
:
47 # Verify is kafka port is up
51 config
.conf
.get("message", {}).get("host", "kafka"),
52 int(config
.conf
["message"].get("port")),
57 # Get the list of topics. If kafka is not ready exception will be thrown.
58 consumer
= kafka
.KafkaConsumer(
59 group_id
=config
.conf
["message"].get("group_id"),
61 config
.conf
.get("message", {}).get("host", "kafka")
63 + config
.conf
["message"].get("port")
66 all_topics
= consumer
.topics()
67 logging
.debug("Number of topics found: %s", len(all_topics
))
69 # Send dummy message in kafka topics. If kafka is not ready exception will be thrown.
70 producer
= kafka
.KafkaProducer(
72 config
.conf
.get("message", {}).get("host", "kafka")
74 + config
.conf
["message"].get("port")
77 mon_topics
= ["alarm_request", "users", "project"]
78 for mon_topic
in mon_topics
:
79 producer
.send(mon_topic
, key
=b
"echo", value
=b
"dummy message")
83 except Exception as e
:
84 logging
.info("Error when trying to get kafka status.")
85 logging
.debug("Exception when trying to get kafka status: %s", str(e
))
91 "{} process is waiting for kafka to come up...".format(process_name
)
93 time
.sleep(kafka_wait_time
)
96 def wait_till_core_services_are_ready(
97 config
, process_name
="osm-mon", commondb_wait_time
=5, kafka_wait_time
=5
99 logging
.debug("wait_till_core_services_are_ready")
102 logging
.info("Config information is not available")
105 # Check if common-db is ready
106 wait_till_commondb_is_ready(config
, process_name
, commondb_wait_time
)
108 # Check if kafka is ready
109 wait_till_kafka_is_ready(config
, process_name
, kafka_wait_time
)