1 # -*- coding: utf-8 -*-
3 # This file is part of OSM Monitoring module
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
24 def wait_till_commondb_is_ready(config
, process_name
="osm-mon", commondb_wait_time
=5):
26 logging
.debug("wait_till_commondb_is_ready")
29 commondb_url
= config
.conf
["database"].get("uri")
31 commondb
= pymongo
.MongoClient(commondb_url
)
32 commondb
.server_info()
36 "{} process is waiting for commondb to come up...".format(process_name
)
38 time
.sleep(commondb_wait_time
)
41 def wait_till_kafka_is_ready(config
, process_name
="osm-mon", kafka_wait_time
=5):
43 logging
.debug("wait_till_kafka_is_ready")
48 with socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
) as s
:
49 # Verify is kafka port is up
53 config
.conf
.get("message", {}).get("host", "kafka"),
54 int(config
.conf
["message"].get("port")),
59 # Get the list of topics. If kafka is not ready exception will be thrown.
60 consumer
= kafka
.KafkaConsumer(
61 group_id
=config
.conf
["message"].get("group_id"),
63 config
.conf
.get("message", {}).get("host", "kafka")
65 + config
.conf
["message"].get("port")
68 all_topics
= consumer
.topics()
69 logging
.debug("Number of topics found: %s", len(all_topics
))
71 # Send dummy message in kafka topics. If kafka is not ready exception will be thrown.
72 producer
= kafka
.KafkaProducer(
74 config
.conf
.get("message", {}).get("host", "kafka")
76 + config
.conf
["message"].get("port")
79 mon_topics
= ["alarm_request", "users", "project"]
80 for mon_topic
in mon_topics
:
81 producer
.send(mon_topic
, key
=b
"echo", value
=b
"dummy message")
85 except Exception as e
:
86 logging
.info("Error when trying to get kafka status.")
87 logging
.debug("Exception when trying to get kafka status: %s", str(e
))
93 "{} process is waiting for kafka to come up...".format(process_name
)
95 time
.sleep(kafka_wait_time
)
98 def wait_till_core_services_are_ready(
99 config
, process_name
="osm-mon", commondb_wait_time
=5, kafka_wait_time
=5
102 logging
.debug("wait_till_core_services_are_ready")
105 logging
.info("Config information is not available")
108 # Check if common-db is ready
109 wait_till_commondb_is_ready(config
, process_name
, commondb_wait_time
)
111 # Check if kafka is ready
112 wait_till_kafka_is_ready(config
, process_name
, kafka_wait_time
)