X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=osm_mon%2Fcore%2Fmessage_bus%2Fproducer.py;h=f6feba16352ffd22f2df43301a39ad20cb50d106;hb=2a1f7bc83b827fe3c11cbefd0c724e281d75b365;hp=bf0839cb643ba55ed7e5f6c86cadbddce2765a98;hpb=b85fc8cdf840080b10d01c33b4a57a2a39bcc0f1;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus/producer.py b/osm_mon/core/message_bus/producer.py index bf0839c..f6feba1 100755 --- a/osm_mon/core/message_bus/producer.py +++ b/osm_mon/core/message_bus/producer.py @@ -61,103 +61,34 @@ class KafkaProducer(object): self.producer = kaf( key_serializer=str.encode, value_serializer=str.encode, - bootstrap_servers=broker, api_version=(0, 10)) + bootstrap_servers=broker, api_version=(0, 10, 1)) def publish(self, key, value, topic=None): """Send the required message on the Kafka message bus.""" try: future = self.producer.send(topic=topic, key=key, value=value) - self.producer.flush() + future.get(timeout=10) except Exception: logging.exception("Error publishing to {} topic." .format(topic)) raise - try: - record_metadata = future.get(timeout=10) - logging.debug("TOPIC:", record_metadata.topic) - logging.debug("PARTITION:", record_metadata.partition) - logging.debug("OFFSET:", record_metadata.offset) - except KafkaError: - pass - - def create_alarm_request(self, key, message): - """Create alarm request from SO to MON.""" - # External to MON - - self.publish(key, - value=message, - topic='alarm_request') - - def create_alarm_response(self, key, message): - """Response to a create alarm request from MON to SO.""" - # Internal to MON - - self.publish(key, - value=message, - topic='alarm_response') - def acknowledge_alarm(self, key, message): - """Alarm acknowledgement request from SO to MON.""" - # Internal to MON - - self.publish(key, - value=message, - topic='alarm_request') - - def list_alarm_request(self, key, message): - """List alarms request from SO to MON.""" + def publish_alarm_request(self, key, message): + """Publish an alarm request.""" # External to MON self.publish(key, value=message, topic='alarm_request') - def notify_alarm(self, key, message): - """Notify of triggered alarm from MON to SO.""" - - self.publish(key, - value=message, - topic='alarm_response') - - def list_alarm_response(self, key, message): - """Response for list alarms request from MON to SO.""" - - self.publish(key, - value=message, - topic='alarm_response') - - def update_alarm_request(self, key, message): - """Update alarm request from SO to MON.""" - # External to Mon - - self.publish(key, - value=message, - topic='alarm_request') - - def update_alarm_response(self, key, message): - """Response from update alarm request from MON to SO.""" - # Internal to Mon - - self.publish(key, - value=message, - topic='alarm_response') - - def delete_alarm_request(self, key, message): - """Delete alarm request from SO to MON.""" - # External to Mon - - self.publish(key, - value=message, - topic='alarm_request') - - def delete_alarm_response(self, key, message): - """Response for a delete alarm request from MON to SO.""" - # Internal to Mon + def publish_alarm_response(self, key, message): + """Publish an alarm response.""" + # Internal to MON self.publish(key, value=message, topic='alarm_response') - def create_metrics_request(self, key, message): + def publish_metrics_request(self, key, message): """Create metrics request from SO to MON.""" # External to Mon @@ -165,7 +96,7 @@ class KafkaProducer(object): value=message, topic='metric_request') - def create_metrics_resp(self, key, message): + def publish_metrics_response(self, key, message): """Response for a create metric request from MON to SO.""" # Internal to Mon