Adds decryption of passwords in vim config block
[osm/MON.git] / osm_mon / server / server.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
22 ##
23 """A common KafkaConsumer for all MON plugins."""
24 import asyncio
25 import json
26 import logging
27 from json import JSONDecodeError
28
29 import yaml
30 from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
31
32 from osm_mon.core.auth import AuthManager
33 from osm_mon.core.common_db import CommonDbClient
34 from osm_mon.core.database import DatabaseManager
35 from osm_mon.core.response import ResponseBuilder
36 from osm_mon.core.settings import Config
37
38 log = logging.getLogger(__name__)
39
40
41 class Server:
42
43 def __init__(self, loop=None):
44 cfg = Config.instance()
45 if not loop:
46 loop = asyncio.get_event_loop()
47 self.loop = loop
48 self.auth_manager = AuthManager()
49 self.database_manager = DatabaseManager()
50 self.database_manager.create_tables()
51 self.common_db = CommonDbClient()
52 self.kafka_server = cfg.BROKER_URI
53
54 def run(self):
55 self.loop.run_until_complete(self.start())
56
57 async def start(self):
58 consumer = AIOKafkaConsumer(
59 "vim_account",
60 "alarm_request",
61 loop=self.loop,
62 bootstrap_servers=self.kafka_server,
63 group_id="mon-server",
64 key_deserializer=bytes.decode,
65 value_deserializer=bytes.decode,
66 )
67 await consumer.start()
68 try:
69 async for message in consumer:
70 log.info("Message arrived: %s", message)
71 await self.consume_message(message)
72 finally:
73 await consumer.stop()
74
75 async def consume_message(self, message):
76 try:
77 try:
78 values = json.loads(message.value)
79 except JSONDecodeError:
80 values = yaml.safe_load(message.value)
81
82 if message.topic == "vim_account":
83 if message.key == "create" or message.key == "edit":
84 values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'],
85 values['schema_version'],
86 values['_id'])
87
88 vim_config_encrypted = ("admin_password", "nsx_password", "vcenter_password")
89 if 'config' in values:
90 for key in values['config']:
91 if key in vim_config_encrypted:
92 values['config'][key] = self.common_db.decrypt_vim_password(values['config'][key],
93 values['schema_version'],
94 values['_id'])
95 self.auth_manager.store_auth_credentials(values)
96
97 if message.key == "delete":
98 self.auth_manager.delete_auth_credentials(values)
99
100 elif message.topic == "alarm_request":
101 if message.key == "create_alarm_request":
102 alarm_details = values['alarm_create_request']
103 cor_id = alarm_details['correlation_id']
104 response_builder = ResponseBuilder()
105 try:
106 alarm = self.database_manager.save_alarm(
107 alarm_details['alarm_name'],
108 alarm_details['threshold_value'],
109 alarm_details['operation'].lower(),
110 alarm_details['severity'].lower(),
111 alarm_details['statistic'].lower(),
112 alarm_details['metric_name'],
113 alarm_details['vdu_name'],
114 alarm_details['vnf_member_index'],
115 alarm_details['ns_id']
116 )
117 response = response_builder.generate_response('create_alarm_response',
118 cor_id=cor_id,
119 status=True,
120 alarm_id=alarm.uuid)
121 except Exception:
122 log.exception("Error creating alarm: ")
123 response = response_builder.generate_response('create_alarm_response',
124 cor_id=cor_id,
125 status=False,
126 alarm_id=None)
127 await self._publish_response('alarm_response_' + str(cor_id), 'create_alarm_response', response)
128
129 if message.key == "delete_alarm_request":
130 alarm_details = values['alarm_delete_request']
131 alarm_uuid = alarm_details['alarm_uuid']
132 response_builder = ResponseBuilder()
133 cor_id = alarm_details['correlation_id']
134 try:
135 self.database_manager.delete_alarm(alarm_uuid)
136 response = response_builder.generate_response('delete_alarm_response',
137 cor_id=cor_id,
138 status=True,
139 alarm_id=alarm_uuid)
140 except Exception:
141 log.exception("Error deleting alarm: ")
142 response = response_builder.generate_response('delete_alarm_response',
143 cor_id=cor_id,
144 status=False,
145 alarm_id=alarm_uuid)
146 await self._publish_response('alarm_response_' + str(cor_id), 'delete_alarm_response', response)
147
148 except Exception:
149 log.exception("Exception processing message: ")
150
151 async def _publish_response(self, topic: str, key: str, msg: dict):
152 producer = AIOKafkaProducer(loop=self.loop,
153 bootstrap_servers=self.kafka_server,
154 key_serializer=str.encode,
155 value_serializer=str.encode)
156 await producer.start()
157 log.info("Sending response %s to topic %s with key %s", json.dumps(msg), topic, key)
158 try:
159 await producer.send_and_wait(topic, key=key, value=json.dumps(msg))
160 finally:
161 await producer.stop()
162
163
164 if __name__ == '__main__':
165 Server().run()