-def get_vim_type(db_manager, vim_uuid):
- """Get the vim type that is required by the message."""
- credentials = db_manager.get_credentials(vim_uuid)
- return credentials.type
-
-
-def get_vdur(common_db, nsr_id, member_index, vdu_name):
- vnfr = get_vnfr(common_db, nsr_id, member_index)
- for vdur in vnfr['vdur']:
- if vdur['vdu-id-ref'] == vdu_name:
- return vdur
- raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index, vdu_name)
-
-
-def get_vnfr(common_db, nsr_id, member_index):
- vnfr = common_db.get_one(table="vnfrs", filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
- return vnfr
-
-
-def main():
- cfg = Config.instance()
- cfg.read_environ()
-
- auth_manager = AuthManager()
- database_manager = DatabaseManager()
- database_manager.create_tables()
-
- # Create OpenStack alarming and metric instances
- openstack_metrics = metrics.Metrics()
- openstack_alarms = alarming.Alarming()
-
- # Create CloudWatch alarm and metric instances
- cloudwatch_alarms = plugin_alarms()
- cloudwatch_metrics = plugin_metrics()
- aws_connection = Connection()
- aws_access_credentials = AccessCredentials()
-
- # Create vROps plugin_receiver class instance
- vrops_rcvr = plugin_receiver.PluginReceiver()
-
- common_db = dbmongo.DbMongo()
- common_db_uri = cfg.MONGO_URI.split(':')
- common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
-
- # Initialize consumers for alarms and metrics
- common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- group_id="mon-consumer")
-
- # Define subscribe the consumer for the plugins
- topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
- # TODO: Remove access_credentials
- common_consumer.subscribe(topics)
-
- log.info("Listening for alarm_request and metric_request messages")
- for message in common_consumer:
+class CommonConsumer:
+
+ def __init__(self):
+ cfg = Config.instance()
+
+ self.auth_manager = AuthManager()
+ self.database_manager = DatabaseManager()
+ self.database_manager.create_tables()
+
+ # Create OpenStack alarming and metric instances
+ self.openstack_metrics = metrics.Metrics()
+ self.openstack_alarms = alarming.Alarming()
+
+ # Create CloudWatch alarm and metric instances
+ self.cloudwatch_alarms = plugin_alarms()
+ self.cloudwatch_metrics = plugin_metrics()
+ self.aws_connection = Connection()
+ self.aws_access_credentials = AccessCredentials()
+
+ # Create vROps plugin_receiver class instance
+ self.vrops_rcvr = plugin_receiver.PluginReceiver()
+
+ log.info("Connecting to MongoDB...")
+ self.common_db = dbmongo.DbMongo()
+ common_db_uri = cfg.MONGO_URI.split(':')
+ self.common_db.db_connect({'host': common_db_uri[0], 'port': int(common_db_uri[1]), 'name': 'osm'})
+ log.info("Connection successful.")
+
+ # Initialize consumers for alarms and metrics
+ self.common_consumer = KafkaConsumer(bootstrap_servers=cfg.BROKER_URI,
+ key_deserializer=bytes.decode,
+ value_deserializer=bytes.decode,
+ group_id="mon-consumer")
+
+ # Define subscribe the consumer for the plugins
+ topics = ['metric_request', 'alarm_request', 'access_credentials', 'vim_account']
+ # TODO: Remove access_credentials
+ self.common_consumer.subscribe(topics)
+
+ def get_vim_type(self, vim_uuid):
+ """Get the vim type that is required by the message."""
+ credentials = self.database_manager.get_credentials(vim_uuid)
+ return credentials.type
+
+ def get_vdur(self, nsr_id, member_index, vdu_name):
+ vnfr = self.get_vnfr(nsr_id, member_index)
+ for vdur in vnfr['vdur']:
+ if vdur['name'] == vdu_name:
+ return vdur
+ raise ValueError('vdur not found for nsr-id %s, member_index %s and vdu_name %s', nsr_id, member_index,
+ vdu_name)
+
+ def get_vnfr(self, nsr_id, member_index):
+ vnfr = self.common_db.get_one("vnfrs",
+ {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
+ return vnfr
+
+ def run(self):
+ log.info("Listening for messages...")
+ for message in self.common_consumer:
+ t = threading.Thread(target=self.consume_message, args=(message,))
+ t.start()
+
+ def consume_message(self, message):