blob: a1e848bcdf346376fa8f6cbd49ce2dc3a720044c [file] [log] [blame]
Benjamin Diazde3d5702018-11-22 17:27:35 -03001# -*- coding: utf-8 -*-
2
3# Copyright 2018 Whitestack, LLC
Benjamin Diaz51f44862018-11-15 10:27:12 -03004# *************************************************************
Benjamin Diazde3d5702018-11-22 17:27:35 -03005
Benjamin Diaz51f44862018-11-15 10:27:12 -03006# This file is part of OSM Monitoring module
Benjamin Diazde3d5702018-11-22 17:27:35 -03007# All Rights Reserved to Whitestack, LLC
8
9# Licensed under the Apache License, Version 2.0 (the "License"); you may
10# not use this file except in compliance with the License. You may obtain
11# a copy of the License at
12
Benjamin Diaz51f44862018-11-15 10:27:12 -030013# http://www.apache.org/licenses/LICENSE-2.0
Benjamin Diazde3d5702018-11-22 17:27:35 -030014
Benjamin Diaz51f44862018-11-15 10:27:12 -030015# Unless required by applicable law or agreed to in writing, software
Benjamin Diazde3d5702018-11-22 17:27:35 -030016# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18# License for the specific language governing permissions and limitations
19# under the License.
Benjamin Diaz51f44862018-11-15 10:27:12 -030020# For those usages not covered by the Apache License, Version 2.0 please
Benjamin Diazde3d5702018-11-22 17:27:35 -030021# contact: bdiaz@whitestack.com or glavado@whitestack.com
22##
Benjamin Diaz51f44862018-11-15 10:27:12 -030023"""A common KafkaConsumer for all MON plugins."""
Benjamin Diaz274a6e92018-11-26 13:14:33 -030024import asyncio
Benjamin Diaz51f44862018-11-15 10:27:12 -030025import json
26import logging
27from json import JSONDecodeError
28
29import yaml
Benjamin Diaz274a6e92018-11-26 13:14:33 -030030from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
Benjamin Diaz51f44862018-11-15 10:27:12 -030031
32from osm_mon.core.auth import AuthManager
33from osm_mon.core.common_db import CommonDbClient
34from osm_mon.core.database import DatabaseManager
Benjamin Diaz51f44862018-11-15 10:27:12 -030035from osm_mon.core.response import ResponseBuilder
Benjamin Diaz274a6e92018-11-26 13:14:33 -030036from osm_mon.core.settings import Config
Benjamin Diaz51f44862018-11-15 10:27:12 -030037
38log = logging.getLogger(__name__)
39
40
41class Server:
42
Benjamin Diaz274a6e92018-11-26 13:14:33 -030043 def __init__(self, loop=None):
44 cfg = Config.instance()
45 if not loop:
46 loop = asyncio.get_event_loop()
47 self.loop = loop
Benjamin Diaz51f44862018-11-15 10:27:12 -030048 self.auth_manager = AuthManager()
49 self.database_manager = DatabaseManager()
50 self.database_manager.create_tables()
51 self.common_db = CommonDbClient()
Benjamin Diaz274a6e92018-11-26 13:14:33 -030052 self.kafka_server = cfg.BROKER_URI
Benjamin Diaz51f44862018-11-15 10:27:12 -030053
54 def run(self):
Benjamin Diaz274a6e92018-11-26 13:14:33 -030055 self.loop.run_until_complete(self.start())
Benjamin Diaz51f44862018-11-15 10:27:12 -030056
Benjamin Diaz274a6e92018-11-26 13:14:33 -030057 async def start(self):
58 consumer = AIOKafkaConsumer(
59 "vim_account",
60 "alarm_request",
61 loop=self.loop,
62 bootstrap_servers=self.kafka_server,
63 group_id="mon-server",
64 key_deserializer=bytes.decode,
65 value_deserializer=bytes.decode,
66 )
67 await consumer.start()
68 try:
69 async for message in consumer:
70 log.info("Message arrived: %s", message)
71 await self.consume_message(message)
72 finally:
73 await consumer.stop()
Benjamin Diaz51f44862018-11-15 10:27:12 -030074
Benjamin Diaz274a6e92018-11-26 13:14:33 -030075 async def consume_message(self, message):
Benjamin Diaz51f44862018-11-15 10:27:12 -030076 try:
77 try:
78 values = json.loads(message.value)
79 except JSONDecodeError:
80 values = yaml.safe_load(message.value)
81
Benjamin Diaz51f44862018-11-15 10:27:12 -030082 if message.topic == "vim_account":
83 if message.key == "create" or message.key == "edit":
84 values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'],
85 values['schema_version'],
86 values['_id'])
87 self.auth_manager.store_auth_credentials(values)
88 if message.key == "delete":
89 self.auth_manager.delete_auth_credentials(values)
90
91 elif message.topic == "alarm_request":
92 if message.key == "create_alarm_request":
93 alarm_details = values['alarm_create_request']
Benjamin Diaz274a6e92018-11-26 13:14:33 -030094 cor_id = alarm_details['correlation_id']
Benjamin Diaz51f44862018-11-15 10:27:12 -030095 response_builder = ResponseBuilder()
96 try:
97 alarm = self.database_manager.save_alarm(
98 alarm_details['alarm_name'],
99 alarm_details['threshold_value'],
100 alarm_details['operation'].lower(),
101 alarm_details['severity'].lower(),
102 alarm_details['statistic'].lower(),
103 alarm_details['metric_name'],
104 alarm_details['vdu_name'],
105 alarm_details['vnf_member_index'],
106 alarm_details['ns_id']
107 )
108 response = response_builder.generate_response('create_alarm_response',
Benjamin Diaz274a6e92018-11-26 13:14:33 -0300109 cor_id=cor_id,
Benjamin Diaz51f44862018-11-15 10:27:12 -0300110 status=True,
Benjamin Diazde3d5702018-11-22 17:27:35 -0300111 alarm_id=alarm.uuid)
Benjamin Diaz51f44862018-11-15 10:27:12 -0300112 except Exception:
113 log.exception("Error creating alarm: ")
114 response = response_builder.generate_response('create_alarm_response',
Benjamin Diaz274a6e92018-11-26 13:14:33 -0300115 cor_id=cor_id,
Benjamin Diaz51f44862018-11-15 10:27:12 -0300116 status=False,
117 alarm_id=None)
Benjamin Diaz274a6e92018-11-26 13:14:33 -0300118 await self._publish_response('alarm_response_' + str(cor_id), 'create_alarm_response', response)
119
Benjamin Diaz51f44862018-11-15 10:27:12 -0300120 if message.key == "delete_alarm_request":
121 alarm_details = values['alarm_delete_request']
Benjamin Diaz51f44862018-11-15 10:27:12 -0300122 alarm_uuid = alarm_details['alarm_uuid']
Benjamin Diazde3d5702018-11-22 17:27:35 -0300123 response_builder = ResponseBuilder()
Benjamin Diaz51f44862018-11-15 10:27:12 -0300124 cor_id = alarm_details['correlation_id']
125 try:
126 self.database_manager.delete_alarm(alarm_uuid)
Benjamin Diazde3d5702018-11-22 17:27:35 -0300127 response = response_builder.generate_response('delete_alarm_response',
Benjamin Diaz51f44862018-11-15 10:27:12 -0300128 cor_id=cor_id,
129 status=True,
130 alarm_id=alarm_uuid)
131 except Exception:
Benjamin Diazde3d5702018-11-22 17:27:35 -0300132 log.exception("Error deleting alarm: ")
133 response = response_builder.generate_response('delete_alarm_response',
Benjamin Diaz51f44862018-11-15 10:27:12 -0300134 cor_id=cor_id,
135 status=False,
136 alarm_id=alarm_uuid)
Benjamin Diaz274a6e92018-11-26 13:14:33 -0300137 await self._publish_response('alarm_response_' + str(cor_id), 'delete_alarm_response', response)
Benjamin Diaz51f44862018-11-15 10:27:12 -0300138
139 except Exception:
140 log.exception("Exception processing message: ")
141
Benjamin Diaz274a6e92018-11-26 13:14:33 -0300142 async def _publish_response(self, topic: str, key: str, msg: dict):
143 producer = AIOKafkaProducer(loop=self.loop,
144 bootstrap_servers=self.kafka_server,
145 key_serializer=str.encode,
146 value_serializer=str.encode)
147 await producer.start()
148 log.info("Sending response %s to topic %s with key %s", json.dumps(msg), topic, key)
149 try:
150 await producer.send_and_wait(topic, key=key, value=json.dumps(msg))
151 finally:
152 await producer.stop()
Benjamin Diaz51f44862018-11-15 10:27:12 -0300153
154
155if __name__ == '__main__':
156 Server().run()