From c0af268b2b9de10384b0a5bf80fa69168bf951d8 Mon Sep 17 00:00:00 2001 From: prithiv Date: Wed, 23 Aug 2017 10:55:21 +0100 Subject: [PATCH] Kafka Producer and Consumer Initial Producer and Consumer Application. Config details and Consumer messages are yet to be modified. Change-Id: I28091573c4b67bd559dc90611f1e5af7420a1df6 Signed-off-by: prithiv --- core/message-bus/northbound_consumer.py | 27 +++++++++++++++++++ core/message-bus/northbound_producer.py | 35 +++++++++++++++++++++++++ core/models/.gitkeep | 1 + devops-stages/.gitkeep | 1 + plugins/CloudWatch/.gitkeep | 1 + plugins/OpenStack/Ceilometer/.gitkeep | 1 + plugins/vRealiseOps/.gitkeep | 1 + test/.gitkeep | 1 + 8 files changed, 68 insertions(+) create mode 100644 core/message-bus/northbound_consumer.py create mode 100644 core/message-bus/northbound_producer.py create mode 100644 core/models/.gitkeep create mode 100644 devops-stages/.gitkeep create mode 100644 plugins/CloudWatch/.gitkeep create mode 100644 plugins/OpenStack/Ceilometer/.gitkeep create mode 100644 plugins/vRealiseOps/.gitkeep create mode 100644 test/.gitkeep diff --git a/core/message-bus/northbound_consumer.py b/core/message-bus/northbound_consumer.py new file mode 100644 index 0000000..be1c9ea --- /dev/null +++ b/core/message-bus/northbound_consumer.py @@ -0,0 +1,27 @@ +from kafka import KafkaConsumer +from kafka.errors import KafkaError +import logging + +class KafkaConsumer(object): + """Adds messages to a kafka topic. Topic is hardcoded as 'alarms' and group as + 'my_group' for now. + + """ + + def __init__(self, uri): + """Init + + uri - kafka connection details + """ + if not cfg.CONF.kafka.uri: + raise Exception("Kafka URI Not Found. Check the config file for Kafka URI") + else: + broker = cfg.CONF.kafka.uri + consumer = KafkaConsumer('alarms', + group_id='my_group', + bootstrap_servers=broker, api_version=(0,10)) + #KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii'))) + + def consume(self, topic, messages): + for message in self._consumer: + print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) diff --git a/core/message-bus/northbound_producer.py b/core/message-bus/northbound_producer.py new file mode 100644 index 0000000..a77b944 --- /dev/null +++ b/core/message-bus/northbound_producer.py @@ -0,0 +1,35 @@ +from kafka import KafkaProducer +from kafka.errors import KafkaError +import logging + + +class KafkaProducer(object): + + def __init__(self): + if not cfg.CONF.kafka.uri: + raise Exception("Kafka URI Not Found. Check the config file for Kafka URI") + else: + broker = cfg.CONF.kafka.uri + producer = KafkaProducer(bootstrap_servers=broker, api_version=(0,10)) + + def publish(self, topic, messages): + + """Takes messages and puts them on the supplied kafka topic. The topic is + hardcoded as 'alarms' and the message is harcoded as 'memory_usage' for now. + + """ + try: + future = producer.send('alarms', b'memory_usage') + producer.flush() + except Exception: + log.exception('Error publishing to {} topic.'.format(topic)) + raise + try: + record_metadata = future.get(timeout=10) + self._log.debug("TOPIC:", record_metadata.topic) + self._log.debug("PARTITION:", record_metadata.partition) + self._log.debug("OFFSET:", record_metadata.offset) + except KafkaError: + pass + #producer = KafkaProducer(retries=5) + diff --git a/core/models/.gitkeep b/core/models/.gitkeep new file mode 100644 index 0000000..2272ebb --- /dev/null +++ b/core/models/.gitkeep @@ -0,0 +1 @@ +#gitkeep file to keep the initial empty directory structure. diff --git a/devops-stages/.gitkeep b/devops-stages/.gitkeep new file mode 100644 index 0000000..2272ebb --- /dev/null +++ b/devops-stages/.gitkeep @@ -0,0 +1 @@ +#gitkeep file to keep the initial empty directory structure. diff --git a/plugins/CloudWatch/.gitkeep b/plugins/CloudWatch/.gitkeep new file mode 100644 index 0000000..2272ebb --- /dev/null +++ b/plugins/CloudWatch/.gitkeep @@ -0,0 +1 @@ +#gitkeep file to keep the initial empty directory structure. diff --git a/plugins/OpenStack/Ceilometer/.gitkeep b/plugins/OpenStack/Ceilometer/.gitkeep new file mode 100644 index 0000000..2272ebb --- /dev/null +++ b/plugins/OpenStack/Ceilometer/.gitkeep @@ -0,0 +1 @@ +#gitkeep file to keep the initial empty directory structure. diff --git a/plugins/vRealiseOps/.gitkeep b/plugins/vRealiseOps/.gitkeep new file mode 100644 index 0000000..2272ebb --- /dev/null +++ b/plugins/vRealiseOps/.gitkeep @@ -0,0 +1 @@ +#gitkeep file to keep the initial empty directory structure. diff --git a/test/.gitkeep b/test/.gitkeep new file mode 100644 index 0000000..2272ebb --- /dev/null +++ b/test/.gitkeep @@ -0,0 +1 @@ +#gitkeep file to keep the initial empty directory structure. -- 2.25.1