1 # Copyright 2017 Intel Research and Development Ireland Limited
2 # *************************************************************
3 # This file is part of OSM Monitoring module
4 # All Rights Reserved to Intel Corporation
6 # Licensed under the Apache License, Version 2.0 (the "License"); you
7 # may not use this file except in compliance with the License. You may
8 # obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied. See the License for the specific language governing
16 # permissions and limitations under the License.
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: helena.mcgough@intel.com or adrian.hoban@intel.com
20 """A common KafkaConsumer for all MON plugins."""
24 from json
import JSONDecodeError
28 from osm_mon
.core
.auth
import AuthManager
29 from osm_mon
.core
.common_db
import CommonDbClient
30 from osm_mon
.core
.database
import DatabaseManager
31 from osm_mon
.core
.message_bus
.consumer
import Consumer
32 from osm_mon
.core
.message_bus
.producer
import Producer
33 from osm_mon
.core
.response
import ResponseBuilder
35 log
= logging
.getLogger(__name__
)
41 self
.auth_manager
= AuthManager()
42 self
.database_manager
= DatabaseManager()
43 self
.database_manager
.create_tables()
44 self
.common_db
= CommonDbClient()
47 common_consumer
= Consumer("mon-server")
49 topics
= ['alarm_request', 'vim_account']
50 common_consumer
.subscribe(topics
)
52 log
.info("Listening for messages...")
53 for message
in common_consumer
:
54 self
.consume_message(message
)
56 def consume_message(self
, message
):
57 log
.info("Message arrived: %s", message
)
60 values
= json
.loads(message
.value
)
61 except JSONDecodeError
:
62 values
= yaml
.safe_load(message
.value
)
66 if message
.topic
== "vim_account":
67 if message
.key
== "create" or message
.key
== "edit":
68 values
['vim_password'] = self
.common_db
.decrypt_vim_password(values
['vim_password'],
69 values
['schema_version'],
71 self
.auth_manager
.store_auth_credentials(values
)
72 if message
.key
== "delete":
73 self
.auth_manager
.delete_auth_credentials(values
)
75 elif message
.topic
== "alarm_request":
76 if message
.key
== "create_alarm_request":
77 alarm_details
= values
['alarm_create_request']
78 response_builder
= ResponseBuilder()
80 alarm
= self
.database_manager
.save_alarm(
81 alarm_details
['alarm_name'],
82 alarm_details
['threshold_value'],
83 alarm_details
['operation'].lower(),
84 alarm_details
['severity'].lower(),
85 alarm_details
['statistic'].lower(),
86 alarm_details
['metric_name'],
87 alarm_details
['vdu_name'],
88 alarm_details
['vnf_member_index'],
89 alarm_details
['ns_id']
91 response
= response_builder
.generate_response('create_alarm_response',
92 cor_id
=alarm_details
['correlation_id'],
96 log
.exception("Error creating alarm: ")
97 response
= response_builder
.generate_response('create_alarm_response',
98 cor_id
=alarm_details
['correlation_id'],
101 if message
.key
== "delete_alarm_request":
102 alarm_details
= values
['alarm_delete_request']
103 response_builder
= ResponseBuilder()
104 alarm_uuid
= alarm_details
['alarm_uuid']
105 cor_id
= alarm_details
['correlation_id']
107 self
.database_manager
.delete_alarm(alarm_uuid
)
108 response
= response_builder
.generate_response('create_alarm_response',
113 log
.exception("Error creating alarm: ")
114 response
= response_builder
.generate_response('create_alarm_response',
119 self
._publish
_response
(message
.topic
, message
.key
, response
)
122 log
.exception("Exception processing message: ")
124 def _publish_response(self
, topic
: str, key
: str, msg
: dict):
125 topic
= topic
.replace('request', 'response')
126 key
= key
.replace('request', 'response')
127 producer
= Producer()
128 producer
.send(topic
=topic
, key
=key
, value
=json
.dumps(msg
))
129 producer
.flush(timeout
=5)
133 if __name__
== '__main__':