blob: 0f17d99ccfe43a48dc8a849eb89f559984819213 [file] [log] [blame]
Benjamin Diazde3d5702018-11-22 17:27:35 -03001# -*- coding: utf-8 -*-
2
3# Copyright 2018 Whitestack, LLC
Benjamin Diaz51f44862018-11-15 10:27:12 -03004# *************************************************************
Benjamin Diazde3d5702018-11-22 17:27:35 -03005
Benjamin Diaz51f44862018-11-15 10:27:12 -03006# This file is part of OSM Monitoring module
Benjamin Diazde3d5702018-11-22 17:27:35 -03007# All Rights Reserved to Whitestack, LLC
8
9# Licensed under the Apache License, Version 2.0 (the "License"); you may
10# not use this file except in compliance with the License. You may obtain
11# a copy of the License at
12
Benjamin Diaz51f44862018-11-15 10:27:12 -030013# http://www.apache.org/licenses/LICENSE-2.0
Benjamin Diazde3d5702018-11-22 17:27:35 -030014
Benjamin Diaz51f44862018-11-15 10:27:12 -030015# Unless required by applicable law or agreed to in writing, software
Benjamin Diazde3d5702018-11-22 17:27:35 -030016# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18# License for the specific language governing permissions and limitations
19# under the License.
Benjamin Diaz51f44862018-11-15 10:27:12 -030020# For those usages not covered by the Apache License, Version 2.0 please
Benjamin Diazde3d5702018-11-22 17:27:35 -030021# contact: bdiaz@whitestack.com or glavado@whitestack.com
22##
Benjamin Diaz51f44862018-11-15 10:27:12 -030023"""A common KafkaConsumer for all MON plugins."""
Benjamin Diaz274a6e92018-11-26 13:14:33 -030024import asyncio
Benjamin Diaz51f44862018-11-15 10:27:12 -030025import json
26import logging
27from json import JSONDecodeError
28
29import yaml
Benjamin Diaz274a6e92018-11-26 13:14:33 -030030from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
Benjamin Diaz51f44862018-11-15 10:27:12 -030031
32from osm_mon.core.auth import AuthManager
33from osm_mon.core.common_db import CommonDbClient
34from osm_mon.core.database import DatabaseManager
Benjamin Diaz51f44862018-11-15 10:27:12 -030035from osm_mon.core.response import ResponseBuilder
Benjamin Diaz274a6e92018-11-26 13:14:33 -030036from osm_mon.core.settings import Config
Benjamin Diaz51f44862018-11-15 10:27:12 -030037
38log = logging.getLogger(__name__)
39
40
41class Server:
42
Benjamin Diaz274a6e92018-11-26 13:14:33 -030043 def __init__(self, loop=None):
44 cfg = Config.instance()
45 if not loop:
46 loop = asyncio.get_event_loop()
47 self.loop = loop
Benjamin Diaz51f44862018-11-15 10:27:12 -030048 self.auth_manager = AuthManager()
49 self.database_manager = DatabaseManager()
50 self.database_manager.create_tables()
51 self.common_db = CommonDbClient()
Benjamin Diaz274a6e92018-11-26 13:14:33 -030052 self.kafka_server = cfg.BROKER_URI
Benjamin Diaz51f44862018-11-15 10:27:12 -030053
54 def run(self):
Benjamin Diaz274a6e92018-11-26 13:14:33 -030055 self.loop.run_until_complete(self.start())
Benjamin Diaz51f44862018-11-15 10:27:12 -030056
Benjamin Diaz274a6e92018-11-26 13:14:33 -030057 async def start(self):
58 consumer = AIOKafkaConsumer(
59 "vim_account",
60 "alarm_request",
61 loop=self.loop,
62 bootstrap_servers=self.kafka_server,
63 group_id="mon-server",
64 key_deserializer=bytes.decode,
65 value_deserializer=bytes.decode,
66 )
67 await consumer.start()
68 try:
69 async for message in consumer:
70 log.info("Message arrived: %s", message)
71 await self.consume_message(message)
72 finally:
73 await consumer.stop()
Benjamin Diaz51f44862018-11-15 10:27:12 -030074
Benjamin Diaz274a6e92018-11-26 13:14:33 -030075 async def consume_message(self, message):
Benjamin Diaz51f44862018-11-15 10:27:12 -030076 try:
77 try:
78 values = json.loads(message.value)
79 except JSONDecodeError:
80 values = yaml.safe_load(message.value)
81
Benjamin Diaz51f44862018-11-15 10:27:12 -030082 if message.topic == "vim_account":
83 if message.key == "create" or message.key == "edit":
84 values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'],
85 values['schema_version'],
86 values['_id'])
Benjamin Diaz090df142019-01-30 13:01:54 -030087
88 vim_config_encrypted = ("admin_password", "nsx_password", "vcenter_password")
89 if 'config' in values:
90 for key in values['config']:
91 if key in vim_config_encrypted:
92 values['config'][key] = self.common_db.decrypt_vim_password(values['config'][key],
93 values['schema_version'],
94 values['_id'])
Benjamin Diaz51f44862018-11-15 10:27:12 -030095 self.auth_manager.store_auth_credentials(values)
Benjamin Diaz090df142019-01-30 13:01:54 -030096
Benjamin Diaz51f44862018-11-15 10:27:12 -030097 if message.key == "delete":
98 self.auth_manager.delete_auth_credentials(values)
99
100 elif message.topic == "alarm_request":
101 if message.key == "create_alarm_request":
102 alarm_details = values['alarm_create_request']
Benjamin Diaz274a6e92018-11-26 13:14:33 -0300103 cor_id = alarm_details['correlation_id']
Benjamin Diaz51f44862018-11-15 10:27:12 -0300104 response_builder = ResponseBuilder()
105 try:
106 alarm = self.database_manager.save_alarm(
107 alarm_details['alarm_name'],
108 alarm_details['threshold_value'],
109 alarm_details['operation'].lower(),
110 alarm_details['severity'].lower(),
111 alarm_details['statistic'].lower(),
112 alarm_details['metric_name'],
113 alarm_details['vdu_name'],
114 alarm_details['vnf_member_index'],
115 alarm_details['ns_id']
116 )
117 response = response_builder.generate_response('create_alarm_response',
Benjamin Diaz274a6e92018-11-26 13:14:33 -0300118 cor_id=cor_id,
Benjamin Diaz51f44862018-11-15 10:27:12 -0300119 status=True,
Benjamin Diazde3d5702018-11-22 17:27:35 -0300120 alarm_id=alarm.uuid)
Benjamin Diaz51f44862018-11-15 10:27:12 -0300121 except Exception:
122 log.exception("Error creating alarm: ")
123 response = response_builder.generate_response('create_alarm_response',
Benjamin Diaz274a6e92018-11-26 13:14:33 -0300124 cor_id=cor_id,
Benjamin Diaz51f44862018-11-15 10:27:12 -0300125 status=False,
126 alarm_id=None)
Benjamin Diaz274a6e92018-11-26 13:14:33 -0300127 await self._publish_response('alarm_response_' + str(cor_id), 'create_alarm_response', response)
128
Benjamin Diaz51f44862018-11-15 10:27:12 -0300129 if message.key == "delete_alarm_request":
130 alarm_details = values['alarm_delete_request']
Benjamin Diaz51f44862018-11-15 10:27:12 -0300131 alarm_uuid = alarm_details['alarm_uuid']
Benjamin Diazde3d5702018-11-22 17:27:35 -0300132 response_builder = ResponseBuilder()
Benjamin Diaz51f44862018-11-15 10:27:12 -0300133 cor_id = alarm_details['correlation_id']
134 try:
135 self.database_manager.delete_alarm(alarm_uuid)
Benjamin Diazde3d5702018-11-22 17:27:35 -0300136 response = response_builder.generate_response('delete_alarm_response',
Benjamin Diaz51f44862018-11-15 10:27:12 -0300137 cor_id=cor_id,
138 status=True,
139 alarm_id=alarm_uuid)
140 except Exception:
Benjamin Diazde3d5702018-11-22 17:27:35 -0300141 log.exception("Error deleting alarm: ")
142 response = response_builder.generate_response('delete_alarm_response',
Benjamin Diaz51f44862018-11-15 10:27:12 -0300143 cor_id=cor_id,
144 status=False,
145 alarm_id=alarm_uuid)
Benjamin Diaz274a6e92018-11-26 13:14:33 -0300146 await self._publish_response('alarm_response_' + str(cor_id), 'delete_alarm_response', response)
Benjamin Diaz51f44862018-11-15 10:27:12 -0300147
148 except Exception:
149 log.exception("Exception processing message: ")
150
Benjamin Diaz274a6e92018-11-26 13:14:33 -0300151 async def _publish_response(self, topic: str, key: str, msg: dict):
152 producer = AIOKafkaProducer(loop=self.loop,
153 bootstrap_servers=self.kafka_server,
154 key_serializer=str.encode,
155 value_serializer=str.encode)
156 await producer.start()
157 log.info("Sending response %s to topic %s with key %s", json.dumps(msg), topic, key)
158 try:
159 await producer.send_and_wait(topic, key=key, value=json.dumps(msg))
160 finally:
161 await producer.stop()
Benjamin Diaz51f44862018-11-15 10:27:12 -0300162
163
164if __name__ == '__main__':
165 Server().run()