blob: 854d9268e521bbdf2556a474a0e422421ceb95a4 [file] [log] [blame]
Benjamin Diazde3d5702018-11-22 17:27:35 -03001# -*- coding: utf-8 -*-
2
3# Copyright 2018 Whitestack, LLC
Benjamin Diaz51f44862018-11-15 10:27:12 -03004# *************************************************************
Benjamin Diazde3d5702018-11-22 17:27:35 -03005
Benjamin Diaz51f44862018-11-15 10:27:12 -03006# This file is part of OSM Monitoring module
Benjamin Diazde3d5702018-11-22 17:27:35 -03007# All Rights Reserved to Whitestack, LLC
8
9# Licensed under the Apache License, Version 2.0 (the "License"); you may
10# not use this file except in compliance with the License. You may obtain
11# a copy of the License at
12
Benjamin Diaz51f44862018-11-15 10:27:12 -030013# http://www.apache.org/licenses/LICENSE-2.0
Benjamin Diazde3d5702018-11-22 17:27:35 -030014
Benjamin Diaz51f44862018-11-15 10:27:12 -030015# Unless required by applicable law or agreed to in writing, software
Benjamin Diazde3d5702018-11-22 17:27:35 -030016# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18# License for the specific language governing permissions and limitations
19# under the License.
Benjamin Diaz51f44862018-11-15 10:27:12 -030020# For those usages not covered by the Apache License, Version 2.0 please
Benjamin Diazde3d5702018-11-22 17:27:35 -030021# contact: bdiaz@whitestack.com or glavado@whitestack.com
22##
Benjamin Diaz51f44862018-11-15 10:27:12 -030023"""A common KafkaConsumer for all MON plugins."""
24
25import json
26import logging
27from json import JSONDecodeError
28
29import yaml
30
31from osm_mon.core.auth import AuthManager
32from osm_mon.core.common_db import CommonDbClient
33from osm_mon.core.database import DatabaseManager
34from osm_mon.core.message_bus.consumer import Consumer
35from osm_mon.core.message_bus.producer import Producer
36from osm_mon.core.response import ResponseBuilder
37
38log = logging.getLogger(__name__)
39
40
41class Server:
42
43 def __init__(self):
44 self.auth_manager = AuthManager()
45 self.database_manager = DatabaseManager()
46 self.database_manager.create_tables()
47 self.common_db = CommonDbClient()
48
49 def run(self):
50 common_consumer = Consumer("mon-server")
51
52 topics = ['alarm_request', 'vim_account']
53 common_consumer.subscribe(topics)
54
55 log.info("Listening for messages...")
56 for message in common_consumer:
57 self.consume_message(message)
58
59 def consume_message(self, message):
60 log.info("Message arrived: %s", message)
61 try:
62 try:
63 values = json.loads(message.value)
64 except JSONDecodeError:
65 values = yaml.safe_load(message.value)
66
67 response = None
68
69 if message.topic == "vim_account":
70 if message.key == "create" or message.key == "edit":
71 values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'],
72 values['schema_version'],
73 values['_id'])
74 self.auth_manager.store_auth_credentials(values)
75 if message.key == "delete":
76 self.auth_manager.delete_auth_credentials(values)
77
78 elif message.topic == "alarm_request":
79 if message.key == "create_alarm_request":
80 alarm_details = values['alarm_create_request']
81 response_builder = ResponseBuilder()
82 try:
83 alarm = self.database_manager.save_alarm(
84 alarm_details['alarm_name'],
85 alarm_details['threshold_value'],
86 alarm_details['operation'].lower(),
87 alarm_details['severity'].lower(),
88 alarm_details['statistic'].lower(),
89 alarm_details['metric_name'],
90 alarm_details['vdu_name'],
91 alarm_details['vnf_member_index'],
92 alarm_details['ns_id']
93 )
94 response = response_builder.generate_response('create_alarm_response',
95 cor_id=alarm_details['correlation_id'],
96 status=True,
Benjamin Diazde3d5702018-11-22 17:27:35 -030097 alarm_id=alarm.uuid)
Benjamin Diaz51f44862018-11-15 10:27:12 -030098 except Exception:
99 log.exception("Error creating alarm: ")
100 response = response_builder.generate_response('create_alarm_response',
101 cor_id=alarm_details['correlation_id'],
102 status=False,
103 alarm_id=None)
104 if message.key == "delete_alarm_request":
105 alarm_details = values['alarm_delete_request']
Benjamin Diaz51f44862018-11-15 10:27:12 -0300106 alarm_uuid = alarm_details['alarm_uuid']
Benjamin Diazde3d5702018-11-22 17:27:35 -0300107 response_builder = ResponseBuilder()
Benjamin Diaz51f44862018-11-15 10:27:12 -0300108 cor_id = alarm_details['correlation_id']
109 try:
110 self.database_manager.delete_alarm(alarm_uuid)
Benjamin Diazde3d5702018-11-22 17:27:35 -0300111 response = response_builder.generate_response('delete_alarm_response',
Benjamin Diaz51f44862018-11-15 10:27:12 -0300112 cor_id=cor_id,
113 status=True,
114 alarm_id=alarm_uuid)
115 except Exception:
Benjamin Diazde3d5702018-11-22 17:27:35 -0300116 log.exception("Error deleting alarm: ")
117 response = response_builder.generate_response('delete_alarm_response',
Benjamin Diaz51f44862018-11-15 10:27:12 -0300118 cor_id=cor_id,
119 status=False,
120 alarm_id=alarm_uuid)
121 if response:
122 self._publish_response(message.topic, message.key, response)
123
124 except Exception:
125 log.exception("Exception processing message: ")
126
127 def _publish_response(self, topic: str, key: str, msg: dict):
128 topic = topic.replace('request', 'response')
129 key = key.replace('request', 'response')
130 producer = Producer()
131 producer.send(topic=topic, key=key, value=json.dumps(msg))
132 producer.flush(timeout=5)
133 producer.close()
134
135
136if __name__ == '__main__':
137 Server().run()