Fixes bugs regarding alarm deletion
[osm/MON.git] / osm_mon / server / server.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
22 ##
23 """A common KafkaConsumer for all MON plugins."""
24
25 import json
26 import logging
27 from json import JSONDecodeError
28
29 import yaml
30
31 from osm_mon.core.auth import AuthManager
32 from osm_mon.core.common_db import CommonDbClient
33 from osm_mon.core.database import DatabaseManager
34 from osm_mon.core.message_bus.consumer import Consumer
35 from osm_mon.core.message_bus.producer import Producer
36 from osm_mon.core.response import ResponseBuilder
37
38 log = logging.getLogger(__name__)
39
40
41 class Server:
42
43 def __init__(self):
44 self.auth_manager = AuthManager()
45 self.database_manager = DatabaseManager()
46 self.database_manager.create_tables()
47 self.common_db = CommonDbClient()
48
49 def run(self):
50 common_consumer = Consumer("mon-server")
51
52 topics = ['alarm_request', 'vim_account']
53 common_consumer.subscribe(topics)
54
55 log.info("Listening for messages...")
56 for message in common_consumer:
57 self.consume_message(message)
58
59 def consume_message(self, message):
60 log.info("Message arrived: %s", message)
61 try:
62 try:
63 values = json.loads(message.value)
64 except JSONDecodeError:
65 values = yaml.safe_load(message.value)
66
67 response = None
68
69 if message.topic == "vim_account":
70 if message.key == "create" or message.key == "edit":
71 values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'],
72 values['schema_version'],
73 values['_id'])
74 self.auth_manager.store_auth_credentials(values)
75 if message.key == "delete":
76 self.auth_manager.delete_auth_credentials(values)
77
78 elif message.topic == "alarm_request":
79 if message.key == "create_alarm_request":
80 alarm_details = values['alarm_create_request']
81 response_builder = ResponseBuilder()
82 try:
83 alarm = self.database_manager.save_alarm(
84 alarm_details['alarm_name'],
85 alarm_details['threshold_value'],
86 alarm_details['operation'].lower(),
87 alarm_details['severity'].lower(),
88 alarm_details['statistic'].lower(),
89 alarm_details['metric_name'],
90 alarm_details['vdu_name'],
91 alarm_details['vnf_member_index'],
92 alarm_details['ns_id']
93 )
94 response = response_builder.generate_response('create_alarm_response',
95 cor_id=alarm_details['correlation_id'],
96 status=True,
97 alarm_id=alarm.uuid)
98 except Exception:
99 log.exception("Error creating alarm: ")
100 response = response_builder.generate_response('create_alarm_response',
101 cor_id=alarm_details['correlation_id'],
102 status=False,
103 alarm_id=None)
104 if message.key == "delete_alarm_request":
105 alarm_details = values['alarm_delete_request']
106 alarm_uuid = alarm_details['alarm_uuid']
107 response_builder = ResponseBuilder()
108 cor_id = alarm_details['correlation_id']
109 try:
110 self.database_manager.delete_alarm(alarm_uuid)
111 response = response_builder.generate_response('delete_alarm_response',
112 cor_id=cor_id,
113 status=True,
114 alarm_id=alarm_uuid)
115 except Exception:
116 log.exception("Error deleting alarm: ")
117 response = response_builder.generate_response('delete_alarm_response',
118 cor_id=cor_id,
119 status=False,
120 alarm_id=alarm_uuid)
121 if response:
122 self._publish_response(message.topic, message.key, response)
123
124 except Exception:
125 log.exception("Exception processing message: ")
126
127 def _publish_response(self, topic: str, key: str, msg: dict):
128 topic = topic.replace('request', 'response')
129 key = key.replace('request', 'response')
130 producer = Producer()
131 producer.send(topic=topic, key=key, value=json.dumps(msg))
132 producer.flush(timeout=5)
133 producer.close()
134
135
136 if __name__ == '__main__':
137 Server().run()