Added log messages in vROPs plugin receiver & minor fixes
[osm/MON.git] / core / message_bus / test_producer_AWS.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3
4 # This file is part of OSM Monitoring module
5 # All Rights Reserved to Intel Corporation
6
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10
11 # http://www.apache.org/licenses/LICENSE-2.0
12
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
21 ##
22
23 '''
24 This is a kafka producer app that interacts with the SO and the plugins of the
25 datacenters like OpenStack, VMWare, AWS.
26 '''
27
28
29
30 from kafka import KafkaProducer as kaf
31 from kafka.errors import KafkaError
32 import logging as log
33 import json
34 import jsmin
35 import os
36 from os import listdir
37 from jsmin import jsmin
38
39
40
41
42 class KafkaProducer(object):
43
44 def __init__(self, topic):
45
46 self._topic= topic
47
48 if "ZOOKEEPER_URI" in os.environ:
49 broker = os.getenv("ZOOKEEPER_URI")
50 else:
51 broker = "localhost:9092"
52
53 '''
54 If the zookeeper broker URI is not set in the env, by default,
55 localhost container is taken as the host because an instance of
56 is already running.
57 '''
58
59 self.producer = kaf(key_serializer=str.encode,
60 value_serializer=lambda v: json.dumps(v).encode('ascii'),
61 bootstrap_servers=broker, api_version=(0,10))
62
63
64
65 def publish(self, key, value, topic):
66 try:
67 future = self.producer.send(key=key, value=value,topic=topic)
68 self.producer.flush()
69 except Exception:
70 log.exception("Error publishing to {} topic." .format(topic))
71 raise
72 try:
73 record_metadata = future.get(timeout=10)
74 #self._log.debug("TOPIC:", record_metadata.topic)
75 #self._log.debug("PARTITION:", record_metadata.partition)
76 #self._log.debug("OFFSET:", record_metadata.offset)
77 except KafkaError:
78 pass
79
80 json_path = os.path.join(os.pardir+"/models/")
81
82 def request(self, path, key, message, topic):
83 #External to MON
84 payload_create_alarm = jsmin(open(os.path.join(path)).read())
85 self.publish(key=key,
86 value = json.loads(payload_create_alarm),
87 topic=topic)
88
89