Implements filebased config, config override through env vars, use of osm
[osm/MON.git] / osm_mon / server / server.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
22 ##
23 """A common KafkaConsumer for all MON plugins."""
24 import asyncio
25 import json
26 import logging
27
28 from osm_mon.core.auth import AuthManager
29 from osm_mon.core.common_db import CommonDbClient
30 from osm_mon.core.config import Config
31 from osm_mon.core.database import DatabaseManager
32 from osm_mon.core.message_bus_client import MessageBusClient
33 from osm_mon.core.response import ResponseBuilder
34
35 log = logging.getLogger(__name__)
36
37
38 class Server:
39
40 def __init__(self, config: Config, loop=None):
41 self.conf = config
42 if not loop:
43 loop = asyncio.get_event_loop()
44 self.loop = loop
45 self.auth_manager = AuthManager(config)
46 self.database_manager = DatabaseManager(config)
47 self.database_manager.create_tables()
48 self.common_db = CommonDbClient(config)
49 self.msg_bus = MessageBusClient(config)
50
51 def run(self):
52 self.loop.run_until_complete(self.start())
53
54 async def start(self):
55 topics = [
56 "vim_account",
57 "alarm_request"
58 ]
59 await self.msg_bus.aioread(topics, self._process_msg)
60
61 async def _process_msg(self, topic, key, values):
62 log.info("Message arrived: %s", values)
63 try:
64 if topic == "vim_account":
65 if key == "create" or key == "edit":
66 values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'],
67 values['schema_version'],
68 values['_id'])
69
70 vim_config_encrypted = ("admin_password", "nsx_password", "vcenter_password")
71 if 'config' in values:
72 for key in values['config']:
73 if key in vim_config_encrypted:
74 values['config'][key] = self.common_db.decrypt_vim_password(values['config'][key],
75 values['schema_version'],
76 values['_id'])
77 self.auth_manager.store_auth_credentials(values)
78
79 if key == "delete":
80 self.auth_manager.delete_auth_credentials(values)
81
82 elif topic == "alarm_request":
83 if key == "create_alarm_request":
84 alarm_details = values['alarm_create_request']
85 cor_id = alarm_details['correlation_id']
86 response_builder = ResponseBuilder()
87 try:
88 alarm = self.database_manager.save_alarm(
89 alarm_details['alarm_name'],
90 alarm_details['threshold_value'],
91 alarm_details['operation'].lower(),
92 alarm_details['severity'].lower(),
93 alarm_details['statistic'].lower(),
94 alarm_details['metric_name'],
95 alarm_details['vdu_name'],
96 alarm_details['vnf_member_index'],
97 alarm_details['ns_id']
98 )
99 response = response_builder.generate_response('create_alarm_response',
100 cor_id=cor_id,
101 status=True,
102 alarm_id=alarm.uuid)
103 except Exception:
104 log.exception("Error creating alarm: ")
105 response = response_builder.generate_response('create_alarm_response',
106 cor_id=cor_id,
107 status=False,
108 alarm_id=None)
109 await self._publish_response('alarm_response_' + str(cor_id), 'create_alarm_response', response)
110
111 if key == "delete_alarm_request":
112 alarm_details = values['alarm_delete_request']
113 alarm_uuid = alarm_details['alarm_uuid']
114 response_builder = ResponseBuilder()
115 cor_id = alarm_details['correlation_id']
116 try:
117 self.database_manager.delete_alarm(alarm_uuid)
118 response = response_builder.generate_response('delete_alarm_response',
119 cor_id=cor_id,
120 status=True,
121 alarm_id=alarm_uuid)
122 except Exception:
123 log.exception("Error deleting alarm: ")
124 response = response_builder.generate_response('delete_alarm_response',
125 cor_id=cor_id,
126 status=False,
127 alarm_id=alarm_uuid)
128 await self._publish_response('alarm_response_' + str(cor_id), 'delete_alarm_response', response)
129
130 except Exception:
131 log.exception("Exception processing message: ")
132
133 async def _publish_response(self, topic: str, key: str, msg: dict):
134 log.info("Sending response %s to topic %s with key %s", json.dumps(msg), topic, key)
135 await self.msg_bus.aiowrite(topic, key, msg)