Renaming folder structure
[osm/MON.git] / osm_mon / plugins / vRealiseOps / kafka_consumer_vrops.py
diff --git a/osm_mon/plugins/vRealiseOps/kafka_consumer_vrops.py b/osm_mon/plugins/vRealiseOps/kafka_consumer_vrops.py
new file mode 100644 (file)
index 0000000..f5e11e6
--- /dev/null
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+
+##
+# Copyright 2016-2017 VMware Inc.
+# This file is part of ETSI OSM
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact:  osslegalrouting@vmware.com
+##
+
+"""
+vROPs Kafka Consumer that consumes the request messages
+"""
+
+
+from kafka import KafkaConsumer
+from kafka.errors import KafkaError
+import logging as log
+
+class vROP_KafkaConsumer(object):
+    """
+        Kafka Consumer for vROPs
+    """
+
+    def __init__(self, topics=[], broker_uri=None):
+        """
+            Method to initize KafkaConsumer
+            Args:
+                broker_uri - hostname:port uri of Kafka broker
+                topics - list of topics to subscribe
+            Returns:
+               None
+        """
+
+        if broker_uri is None:
+            self.broker = '0.0.0.0:9092'
+        else:
+            self.broker = broker_uri
+
+        self.topic = topics
+        print ("vROPs Consumer started, Broker URI: {}".format(self.broker))
+        print ("Subscribed Topics {}".format(self.topic))
+        try:
+            self.vrops_consumer = KafkaConsumer(bootstrap_servers=self.broker)
+            self.vrops_consumer.subscribe(self.topic)
+        except Exception as exp:
+            msg = "fail to create consumer for topic {} with broker {} Error : {}"\
+                    .format(self.topic, self.broker, exp)
+            log.error(msg)
+            raise Exception(msg)
+