X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fmessage_bus%2Fproducer.py;h=573e332f9b887ac47381a105cea5402d85152a94;hb=93699898c51364cde193d8d441f4aed45670e7bf;hp=f6feba16352ffd22f2df43301a39ad20cb50d106;hpb=e80db311a29dc8562dc84ae3336af167bac2ec5b;p=osm%2FMON.git diff --git a/osm_mon/core/message_bus/producer.py b/osm_mon/core/message_bus/producer.py old mode 100755 new mode 100644 index f6feba1..573e332 --- a/osm_mon/core/message_bus/producer.py +++ b/osm_mon/core/message_bus/producer.py @@ -1,8 +1,9 @@ -# Copyright 2017 Intel Research and Development Ireland Limited +# Copyright 2018 Whitestack, LLC # ************************************************************* # This file is part of OSM Monitoring module -# All Rights Reserved to Intel Corporation +# All Rights Reserved to Whitestack, LLC + # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at @@ -16,161 +17,19 @@ # under the License. # For those usages not covered by the Apache License, Version 2.0 please -# contact: prithiv.mohan@intel.com or adrian.hoban@intel.com +# contact: bdiaz@whitestack.com or glavado@whitestack.com ## -"""This is a common kafka producer app. - -It interacts with the SO and the plugins of the datacenters: OpenStack, VMWare -and AWS. -""" - -import logging -import os - -from kafka import KafkaProducer as kaf -from kafka.errors import KafkaError - -__author__ = "Prithiv Mohan" -__date__ = "06/Sep/2017" - -current_path = os.path.realpath(__file__) -json_path = os.path.abspath(os.path.join(current_path, '..', '..', 'models')) - -# TODO(): validate all of the request and response messages against the -# json_schemas - - -class KafkaProducer(object): - """A common KafkaProducer for requests and responses.""" - - def __init__(self, topic): - """Initialize the common kafka producer.""" - self._topic = topic - - if "BROKER_URI" in os.environ: - broker = os.getenv("BROKER_URI") - else: - broker = "localhost:9092" - - ''' - If the broker URI is not set in the env by default, - localhost container is taken as the host because an instance of - is already running. - ''' - - self.producer = kaf( - key_serializer=str.encode, - value_serializer=str.encode, - bootstrap_servers=broker, api_version=(0, 10, 1)) - - def publish(self, key, value, topic=None): - """Send the required message on the Kafka message bus.""" - try: - future = self.producer.send(topic=topic, key=key, value=value) - future.get(timeout=10) - except Exception: - logging.exception("Error publishing to {} topic." .format(topic)) - raise - - def publish_alarm_request(self, key, message): - """Publish an alarm request.""" - # External to MON - - self.publish(key, - value=message, - topic='alarm_request') - - def publish_alarm_response(self, key, message): - """Publish an alarm response.""" - # Internal to MON - - self.publish(key, - value=message, - topic='alarm_response') - - def publish_metrics_request(self, key, message): - """Create metrics request from SO to MON.""" - # External to Mon - - self.publish(key, - value=message, - topic='metric_request') - - def publish_metrics_response(self, key, message): - """Response for a create metric request from MON to SO.""" - # Internal to Mon - - self.publish(key, - value=message, - topic='metric_response') - - def read_metric_data_request(self, key, message): - """Read metric data request from SO to MON.""" - # External to Mon - - self.publish(key, - value=message, - topic='metric_request') - - def read_metric_data_response(self, key, message): - """Response from MON to SO for read metric data request.""" - # Internal to Mon - - self.publish(key, - value=message, - topic='metric_response') - - def list_metric_request(self, key, message): - """List metric request from SO to MON.""" - # External to MON - - self.publish(key, - value=message, - topic='metric_request') - - def list_metric_response(self, key, message): - """Response from SO to MON for list metrics request.""" - # Internal to MON - - self.publish(key, - value=message, - topic='metric_response') - - def delete_metric_request(self, key, message): - """Delete metric request from SO to MON.""" - # External to Mon - - self.publish(key, - value=message, - topic='metric_request') - - def delete_metric_response(self, key, message): - """Response from MON to SO for delete metric request.""" - # Internal to Mon - - self.publish(key, - value=message, - topic='metric_response') - - def update_metric_request(self, key, message): - """Metric update request from SO to MON.""" - # External to Mon - - self.publish(key, - value=message, - topic='metric_request') +from kafka import KafkaProducer - def update_metric_response(self, key, message): - """Reponse from MON to SO for metric update.""" - # Internal to Mon +from osm_mon.core.settings import Config - self.publish(key, - value=message, - topic='metric_response') - def access_credentials(self, key, message): - """Send access credentials to MON from SO.""" +class Producer(KafkaProducer): + def __init__(self): + cfg = Config.instance() + super().__init__(bootstrap_servers=cfg.BROKER_URI, + key_serializer=str.encode, + value_serializer=str.encode) - self.publish(key, - value=message, - topic='access_credentials') + def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): + return super().send(topic, value, key, partition, timestamp_ms)