projects
/
osm
/
MON.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Refactors codebase
[osm/MON.git]
/
osm_mon
/
core
/
message_bus
/
common_consumer.py
diff --git
a/osm_mon/core/message_bus/common_consumer.py
b/osm_mon/core/message_bus/common_consumer.py
index
fd97f83
..
b8e33d2
100755
(executable)
--- a/
osm_mon/core/message_bus/common_consumer.py
+++ b/
osm_mon/core/message_bus/common_consumer.py
@@
-23,21
+23,23
@@
import json
import logging
import sys
import threading
import logging
import sys
import threading
+from json import JSONDecodeError
import six
import yaml
import six
import yaml
-from kafka import KafkaConsumer
from osm_mon.common.common_db_client import CommonDbClient
from osm_mon.core.auth import AuthManager
from osm_mon.core.database import DatabaseManager
from osm_mon.common.common_db_client import CommonDbClient
from osm_mon.core.auth import AuthManager
from osm_mon.core.database import DatabaseManager
+from osm_mon.core.message_bus.consumer import Consumer
+from osm_mon.core.message_bus.producer import Producer
from osm_mon.core.settings import Config
from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
from osm_mon.plugins.CloudWatch.connection import Connection
from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
from osm_mon.core.settings import Config
from osm_mon.plugins.CloudWatch.access_credentials import AccessCredentials
from osm_mon.plugins.CloudWatch.connection import Connection
from osm_mon.plugins.CloudWatch.plugin_alarm import plugin_alarms
from osm_mon.plugins.CloudWatch.plugin_metric import plugin_metrics
-from osm_mon.plugins.OpenStack.Aodh import alarm
ing
-from osm_mon.plugins.OpenStack.Gnocchi import metric
s
+from osm_mon.plugins.OpenStack.Aodh import alarm
_handler
+from osm_mon.plugins.OpenStack.Gnocchi import metric
_handler
from osm_mon.plugins.vRealiseOps import plugin_receiver
cfg = Config.instance()
from osm_mon.plugins.vRealiseOps import plugin_receiver
cfg = Config.instance()
@@
-59,15
+61,13
@@
kafka_logger.addHandler(kafka_handler)
class CommonConsumer:
def __init__(self):
class CommonConsumer:
def __init__(self):
- cfg = Config.instance()
-
self.auth_manager = AuthManager()
self.database_manager = DatabaseManager()
self.database_manager.create_tables()
# Create OpenStack alarming and metric instances
self.auth_manager = AuthManager()
self.database_manager = DatabaseManager()
self.database_manager.create_tables()
# Create OpenStack alarming and metric instances
- self.openstack_metrics = metric
s.Metrics
()
- self.openstack_alarms = alarm
ing.Alarming
()
+ self.openstack_metrics = metric
_handler.OpenstackMetricHandler
()
+ self.openstack_alarms = alarm
_handler.OpenstackAlarmHandler
()
# Create CloudWatch alarm and metric instances
self.cloudwatch_alarms = plugin_alarms()
# Create CloudWatch alarm and metric instances
self.cloudwatch_alarms = plugin_alarms()
@@
-88,12
+88,7
@@
class CommonConsumer:
return credentials.type
def run(self):
return credentials.type
def run(self):
- common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- group_id="mon-consumer",
- session_timeout_ms=60000,
- heartbeat_interval_ms=20000)
+ common_consumer = Consumer("mon-consumer")
topics = ['metric_request', 'alarm_request', 'vim_account']
common_consumer.subscribe(topics)
topics = ['metric_request', 'alarm_request', 'vim_account']
common_consumer.subscribe(topics)
@@
-108,9
+103,11
@@
class CommonConsumer:
try:
try:
values = json.loads(message.value)
try:
try:
values = json.loads(message.value)
- except
Valu
eError:
+ except
JSONDecod
eError:
values = yaml.safe_load(message.value)
values = yaml.safe_load(message.value)
+ response = None
+
if message.topic == "vim_account":
if message.key == "create" or message.key == "edit":
self.auth_manager.store_auth_credentials(values)
if message.topic == "vim_account":
if message.key == "create" or message.key == "edit":
self.auth_manager.store_auth_credentials(values)
@@
-153,31
+150,42
@@
class CommonConsumer:
if vim_type == "openstack":
log.info("This message is for the OpenStack plugin.")
if message.topic == "metric_request":
if vim_type == "openstack":
log.info("This message is for the OpenStack plugin.")
if message.topic == "metric_request":
-
self.openstack_metrics.metric_calls(message
, vim_uuid)
+
response = self.openstack_metrics.handle_request(message.key, values
, vim_uuid)
if message.topic == "alarm_request":
if message.topic == "alarm_request":
-
self.openstack_alarms.alarming(message
, vim_uuid)
+
response = self.openstack_alarms.handle_message(message.key, values
, vim_uuid)
elif vim_type == "aws":
log.info("This message is for the CloudWatch plugin.")
aws_conn = self.aws_connection.setEnvironment()
if message.topic == "metric_request":
elif vim_type == "aws":
log.info("This message is for the CloudWatch plugin.")
aws_conn = self.aws_connection.setEnvironment()
if message.topic == "metric_request":
-
self.cloudwatch_metrics.metric_calls(message
, aws_conn)
+
response = self.cloudwatch_metrics.metric_calls(message.key, values
, aws_conn)
if message.topic == "alarm_request":
if message.topic == "alarm_request":
- self.cloudwatch_alarms.alarm_calls(message, aws_conn)
- if message.topic == "access_credentials":
- self.aws_access_credentials.access_credential_calls(message)
+ response = self.cloudwatch_alarms.alarm_calls(message.key, values, aws_conn)
elif vim_type == "vmware":
log.info("This metric_request message is for the vROPs plugin.")
elif vim_type == "vmware":
log.info("This metric_request message is for the vROPs plugin.")
- self.vrops_rcvr.consume(message, vim_uuid)
+ if message.topic == "metric_request":
+ response = self.vrops_rcvr.handle_metric_requests(message.key, values, vim_uuid)
+ if message.topic == "alarm_request":
+ response = self.vrops_rcvr.handle_alarm_requests(message.key, values, vim_uuid)
else:
log.debug("vim_type is misconfigured or unsupported; %s",
vim_type)
else:
log.debug("vim_type is misconfigured or unsupported; %s",
vim_type)
+ if response:
+ self._publish_response(message.topic, message.key, response)
except Exception:
log.exception("Exception processing message: ")
except Exception:
log.exception("Exception processing message: ")
+ def _publish_response(self, topic: str, key: str, msg: dict):
+ topic = topic.replace('request', 'response')
+ key = key.replace('request', 'response')
+ producer = Producer()
+ producer.send(topic=topic, key=key, value=json.dumps(msg))
+ producer.flush()
+ producer.close()
+
if __name__ == '__main__':
CommonConsumer().run()
if __name__ == '__main__':
CommonConsumer().run()