Adds alarm engine
[osm/MON.git] / osm_mon / server / server.py
1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
21
22 import json
23 import logging
24 from json import JSONDecodeError
25
26 import yaml
27
28 from osm_mon.core.auth import AuthManager
29 from osm_mon.core.common_db import CommonDbClient
30 from osm_mon.core.database import DatabaseManager
31 from osm_mon.core.message_bus.consumer import Consumer
32 from osm_mon.core.message_bus.producer import Producer
33 from osm_mon.core.response import ResponseBuilder
34
35 log = logging.getLogger(__name__)
36
37
38 class Server:
39
40 def __init__(self):
41 self.auth_manager = AuthManager()
42 self.database_manager = DatabaseManager()
43 self.database_manager.create_tables()
44 self.common_db = CommonDbClient()
45
46 def run(self):
47 common_consumer = Consumer("mon-server")
48
49 topics = ['alarm_request', 'vim_account']
50 common_consumer.subscribe(topics)
51
52 log.info("Listening for messages...")
53 for message in common_consumer:
54 self.consume_message(message)
55
56 def consume_message(self, message):
57 log.info("Message arrived: %s", message)
58 try:
59 try:
60 values = json.loads(message.value)
61 except JSONDecodeError:
62 values = yaml.safe_load(message.value)
63
64 response = None
65
66 if message.topic == "vim_account":
67 if message.key == "create" or message.key == "edit":
68 values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'],
69 values['schema_version'],
70 values['_id'])
71 self.auth_manager.store_auth_credentials(values)
72 if message.key == "delete":
73 self.auth_manager.delete_auth_credentials(values)
74
75 elif message.topic == "alarm_request":
76 if message.key == "create_alarm_request":
77 alarm_details = values['alarm_create_request']
78 response_builder = ResponseBuilder()
79 try:
80 alarm = self.database_manager.save_alarm(
81 alarm_details['alarm_name'],
82 alarm_details['threshold_value'],
83 alarm_details['operation'].lower(),
84 alarm_details['severity'].lower(),
85 alarm_details['statistic'].lower(),
86 alarm_details['metric_name'],
87 alarm_details['vdu_name'],
88 alarm_details['vnf_member_index'],
89 alarm_details['ns_id']
90 )
91 response = response_builder.generate_response('create_alarm_response',
92 cor_id=alarm_details['correlation_id'],
93 status=True,
94 alarm_id=alarm.id)
95 except Exception:
96 log.exception("Error creating alarm: ")
97 response = response_builder.generate_response('create_alarm_response',
98 cor_id=alarm_details['correlation_id'],
99 status=False,
100 alarm_id=None)
101 if message.key == "delete_alarm_request":
102 alarm_details = values['alarm_delete_request']
103 response_builder = ResponseBuilder()
104 alarm_uuid = alarm_details['alarm_uuid']
105 cor_id = alarm_details['correlation_id']
106 try:
107 self.database_manager.delete_alarm(alarm_uuid)
108 response = response_builder.generate_response('create_alarm_response',
109 cor_id=cor_id,
110 status=True,
111 alarm_id=alarm_uuid)
112 except Exception:
113 log.exception("Error creating alarm: ")
114 response = response_builder.generate_response('create_alarm_response',
115 cor_id=cor_id,
116 status=False,
117 alarm_id=alarm_uuid)
118 if response:
119 self._publish_response(message.topic, message.key, response)
120
121 except Exception:
122 log.exception("Exception processing message: ")
123
124 def _publish_response(self, topic: str, key: str, msg: dict):
125 topic = topic.replace('request', 'response')
126 key = key.replace('request', 'response')
127 producer = Producer()
128 producer.send(topic=topic, key=key, value=json.dumps(msg))
129 producer.flush(timeout=5)
130 producer.close()
131
132
133 if __name__ == '__main__':
134 Server().run()