X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fmessage_bus%2Fproducer.py;h=f04ecf820da89584582814bdbe3ad4e2c21f6a52;hb=326907a0151bed5641a9e9e241bc3b05bf0b71b9;hp=bf0839cb643ba55ed7e5f6c86cadbddce2765a98;hpb=24b8309395b534ffe4bff9b07f665951555ac955;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus/producer.py b/osm_mon/core/message_bus/producer.py index bf0839c..f04ecf8 100755 --- a/osm_mon/core/message_bus/producer.py +++ b/osm_mon/core/message_bus/producer.py @@ -67,97 +67,34 @@ class KafkaProducer(object): """Send the required message on the Kafka message bus.""" try: future = self.producer.send(topic=topic, key=key, value=value) - self.producer.flush() + record_metadata = future.get(timeout=10) except Exception: logging.exception("Error publishing to {} topic." .format(topic)) raise try: - record_metadata = future.get(timeout=10) logging.debug("TOPIC:", record_metadata.topic) logging.debug("PARTITION:", record_metadata.partition) logging.debug("OFFSET:", record_metadata.offset) except KafkaError: pass - def create_alarm_request(self, key, message): - """Create alarm request from SO to MON.""" + def publish_alarm_request(self, key, message): + """Publish an alarm request.""" # External to MON self.publish(key, value=message, topic='alarm_request') - def create_alarm_response(self, key, message): - """Response to a create alarm request from MON to SO.""" - # Internal to MON - - self.publish(key, - value=message, - topic='alarm_response') - - def acknowledge_alarm(self, key, message): - """Alarm acknowledgement request from SO to MON.""" + def publish_alarm_response(self, key, message): + """Publish an alarm response.""" # Internal to MON - self.publish(key, - value=message, - topic='alarm_request') - - def list_alarm_request(self, key, message): - """List alarms request from SO to MON.""" - # External to MON - - self.publish(key, - value=message, - topic='alarm_request') - - def notify_alarm(self, key, message): - """Notify of triggered alarm from MON to SO.""" - - self.publish(key, - value=message, - topic='alarm_response') - - def list_alarm_response(self, key, message): - """Response for list alarms request from MON to SO.""" - - self.publish(key, - value=message, - topic='alarm_response') - - def update_alarm_request(self, key, message): - """Update alarm request from SO to MON.""" - # External to Mon - - self.publish(key, - value=message, - topic='alarm_request') - - def update_alarm_response(self, key, message): - """Response from update alarm request from MON to SO.""" - # Internal to Mon - - self.publish(key, - value=message, - topic='alarm_response') - - def delete_alarm_request(self, key, message): - """Delete alarm request from SO to MON.""" - # External to Mon - - self.publish(key, - value=message, - topic='alarm_request') - - def delete_alarm_response(self, key, message): - """Response for a delete alarm request from MON to SO.""" - # Internal to Mon - self.publish(key, value=message, topic='alarm_response') - def create_metrics_request(self, key, message): + def publish_metrics_request(self, key, message): """Create metrics request from SO to MON.""" # External to Mon @@ -165,7 +102,7 @@ class KafkaProducer(object): value=message, topic='metric_request') - def create_metrics_resp(self, key, message): + def publish_metrics_response(self, key, message): """Response for a create metric request from MON to SO.""" # Internal to Mon