a1e848bcdf346376fa8f6cbd49ce2dc3a720044c
[osm/MON.git] / osm_mon / server / server.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
22 ##
23 """A common KafkaConsumer for all MON plugins."""
24 import asyncio
25 import json
26 import logging
27 from json import JSONDecodeError
28
29 import yaml
30 from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
31
32 from osm_mon.core.auth import AuthManager
33 from osm_mon.core.common_db import CommonDbClient
34 from osm_mon.core.database import DatabaseManager
35 from osm_mon.core.response import ResponseBuilder
36 from osm_mon.core.settings import Config
37
38 log = logging.getLogger(__name__)
39
40
41 class Server:
42
43 def __init__(self, loop=None):
44 cfg = Config.instance()
45 if not loop:
46 loop = asyncio.get_event_loop()
47 self.loop = loop
48 self.auth_manager = AuthManager()
49 self.database_manager = DatabaseManager()
50 self.database_manager.create_tables()
51 self.common_db = CommonDbClient()
52 self.kafka_server = cfg.BROKER_URI
53
54 def run(self):
55 self.loop.run_until_complete(self.start())
56
57 async def start(self):
58 consumer = AIOKafkaConsumer(
59 "vim_account",
60 "alarm_request",
61 loop=self.loop,
62 bootstrap_servers=self.kafka_server,
63 group_id="mon-server",
64 key_deserializer=bytes.decode,
65 value_deserializer=bytes.decode,
66 )
67 await consumer.start()
68 try:
69 async for message in consumer:
70 log.info("Message arrived: %s", message)
71 await self.consume_message(message)
72 finally:
73 await consumer.stop()
74
75 async def consume_message(self, message):
76 try:
77 try:
78 values = json.loads(message.value)
79 except JSONDecodeError:
80 values = yaml.safe_load(message.value)
81
82 if message.topic == "vim_account":
83 if message.key == "create" or message.key == "edit":
84 values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'],
85 values['schema_version'],
86 values['_id'])
87 self.auth_manager.store_auth_credentials(values)
88 if message.key == "delete":
89 self.auth_manager.delete_auth_credentials(values)
90
91 elif message.topic == "alarm_request":
92 if message.key == "create_alarm_request":
93 alarm_details = values['alarm_create_request']
94 cor_id = alarm_details['correlation_id']
95 response_builder = ResponseBuilder()
96 try:
97 alarm = self.database_manager.save_alarm(
98 alarm_details['alarm_name'],
99 alarm_details['threshold_value'],
100 alarm_details['operation'].lower(),
101 alarm_details['severity'].lower(),
102 alarm_details['statistic'].lower(),
103 alarm_details['metric_name'],
104 alarm_details['vdu_name'],
105 alarm_details['vnf_member_index'],
106 alarm_details['ns_id']
107 )
108 response = response_builder.generate_response('create_alarm_response',
109 cor_id=cor_id,
110 status=True,
111 alarm_id=alarm.uuid)
112 except Exception:
113 log.exception("Error creating alarm: ")
114 response = response_builder.generate_response('create_alarm_response',
115 cor_id=cor_id,
116 status=False,
117 alarm_id=None)
118 await self._publish_response('alarm_response_' + str(cor_id), 'create_alarm_response', response)
119
120 if message.key == "delete_alarm_request":
121 alarm_details = values['alarm_delete_request']
122 alarm_uuid = alarm_details['alarm_uuid']
123 response_builder = ResponseBuilder()
124 cor_id = alarm_details['correlation_id']
125 try:
126 self.database_manager.delete_alarm(alarm_uuid)
127 response = response_builder.generate_response('delete_alarm_response',
128 cor_id=cor_id,
129 status=True,
130 alarm_id=alarm_uuid)
131 except Exception:
132 log.exception("Error deleting alarm: ")
133 response = response_builder.generate_response('delete_alarm_response',
134 cor_id=cor_id,
135 status=False,
136 alarm_id=alarm_uuid)
137 await self._publish_response('alarm_response_' + str(cor_id), 'delete_alarm_response', response)
138
139 except Exception:
140 log.exception("Exception processing message: ")
141
142 async def _publish_response(self, topic: str, key: str, msg: dict):
143 producer = AIOKafkaProducer(loop=self.loop,
144 bootstrap_servers=self.kafka_server,
145 key_serializer=str.encode,
146 value_serializer=str.encode)
147 await producer.start()
148 log.info("Sending response %s to topic %s with key %s", json.dumps(msg), topic, key)
149 try:
150 await producer.send_and_wait(topic, key=key, value=json.dumps(msg))
151 finally:
152 await producer.stop()
153
154
155 if __name__ == '__main__':
156 Server().run()