Fix pylint issues appeared with version 3.2.0 of pylint
[osm/MON.git] / osm_mon / cmd / mon_utils.py
1 # -*- coding: utf-8 -*-
2
3 # This file is part of OSM Monitoring module
4
5 # Licensed under the Apache License, Version 2.0 (the "License"); you may
6 # not use this file except in compliance with the License. You may obtain
7 # a copy of the License at
8
9 # http://www.apache.org/licenses/LICENSE-2.0
10
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 # License for the specific language governing permissions and limitations
15 # under the License.
16
17 import pymongo
18 import time
19 import socket
20 import logging
21 import kafka
22
23
24 def wait_till_commondb_is_ready(config, process_name="osm-mon", commondb_wait_time=5):
25 logging.debug("wait_till_commondb_is_ready")
26
27 while True:
28 commondb_url = config.conf["database"].get("uri")
29 try:
30 commondb = pymongo.MongoClient(commondb_url)
31 commondb.server_info()
32 break
33 except Exception:
34 logging.info(
35 "{} process is waiting for commondb to come up...".format(process_name)
36 )
37 time.sleep(commondb_wait_time)
38
39
40 def wait_till_kafka_is_ready(config, process_name="osm-mon", kafka_wait_time=5):
41 logging.debug("wait_till_kafka_is_ready")
42
43 while True:
44 kafka_ready = False
45 try:
46 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
47 # Verify is kafka port is up
48 if (
49 s.connect_ex(
50 (
51 config.conf.get("message", {}).get("host", "kafka"),
52 int(config.conf["message"].get("port")),
53 )
54 )
55 == 0
56 ):
57 # Get the list of topics. If kafka is not ready exception will be thrown.
58 consumer = kafka.KafkaConsumer(
59 group_id=config.conf["message"].get("group_id"),
60 bootstrap_servers=[
61 config.conf.get("message", {}).get("host", "kafka")
62 + ":"
63 + config.conf["message"].get("port")
64 ],
65 )
66 all_topics = consumer.topics()
67 logging.debug("Number of topics found: %s", len(all_topics))
68
69 # Send dummy message in kafka topics. If kafka is not ready exception will be thrown.
70 producer = kafka.KafkaProducer(
71 bootstrap_servers=[
72 config.conf.get("message", {}).get("host", "kafka")
73 + ":"
74 + config.conf["message"].get("port")
75 ]
76 )
77 mon_topics = ["alarm_request", "users", "project"]
78 for mon_topic in mon_topics:
79 producer.send(mon_topic, key=b"echo", value=b"dummy message")
80
81 # Kafka is ready now
82 kafka_ready = True
83 except Exception as e:
84 logging.info("Error when trying to get kafka status.")
85 logging.debug("Exception when trying to get kafka status: %s", str(e))
86 finally:
87 if kafka_ready:
88 break
89 else:
90 logging.info(
91 "{} process is waiting for kafka to come up...".format(process_name)
92 )
93 time.sleep(kafka_wait_time)
94
95
96 def wait_till_core_services_are_ready(
97 config, process_name="osm-mon", commondb_wait_time=5, kafka_wait_time=5
98 ):
99 logging.debug("wait_till_core_services_are_ready")
100
101 if not config:
102 logging.info("Config information is not available")
103 return False
104
105 # Check if common-db is ready
106 wait_till_commondb_is_ready(config, process_name, commondb_wait_time)
107
108 # Check if kafka is ready
109 wait_till_kafka_is_ready(config, process_name, kafka_wait_time)
110
111 return True