ce6255c890e0125407e9470c29c03aae4e1671fa
[osm/MON.git] / osm_mon / server / server.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
22 ##
23 """
24 MON component in charge of CRUD operations for vim_accounts and alarms. It uses the message bus to communicate.
25 """
26 import asyncio
27 import json
28 import logging
29
30 from osm_mon.core.config import Config
31 from osm_mon.core.message_bus_client import MessageBusClient
32 from osm_mon.core.response import ResponseBuilder
33 from osm_mon.server.service import ServerService
34
35 log = logging.getLogger(__name__)
36
37
38 class Server:
39 def __init__(self, config: Config, loop=None):
40 self.conf = config
41 if not loop:
42 loop = asyncio.get_event_loop()
43 self.loop = loop
44 self.msg_bus = MessageBusClient(config)
45 self.service = ServerService(config)
46
47 def run(self):
48 self.loop.run_until_complete(self.start())
49
50 async def start(self):
51 topics = ["alarm_request"]
52 try:
53 await self.msg_bus.aioread(topics, self._process_msg)
54 except Exception as e:
55 # Failed to subscribe to kafka topic
56 log.exception("Error when subscribing to topics %s", str(topics))
57 raise e
58
59 async def _process_msg(self, topic, key, values):
60 log.info("Message arrived: %s", values)
61 try:
62
63 if topic == "alarm_request":
64 if key == "create_alarm_request":
65 alarm_details = values["alarm_create_request"]
66 cor_id = alarm_details["correlation_id"]
67 response_builder = ResponseBuilder()
68 try:
69 alarm = self.service.create_alarm(
70 alarm_details["alarm_name"],
71 alarm_details["threshold_value"],
72 alarm_details["operation"].lower(),
73 alarm_details["severity"].lower(),
74 alarm_details["statistic"].lower(),
75 alarm_details["metric_name"],
76 alarm_details["tags"],
77 )
78 response = response_builder.generate_response(
79 "create_alarm_response",
80 cor_id=cor_id,
81 status=True,
82 alarm_id=alarm.uuid,
83 )
84 except Exception:
85 log.exception("Error creating alarm: ")
86 response = response_builder.generate_response(
87 "create_alarm_response",
88 cor_id=cor_id,
89 status=False,
90 alarm_id=None,
91 )
92 await self._publish_response(
93 "alarm_response_" + str(cor_id),
94 "create_alarm_response",
95 response,
96 )
97
98 if key == "delete_alarm_request":
99 alarm_details = values["alarm_delete_request"]
100 alarm_uuid = alarm_details["alarm_uuid"]
101 response_builder = ResponseBuilder()
102 cor_id = alarm_details["correlation_id"]
103 try:
104 self.service.delete_alarm(alarm_uuid)
105 response = response_builder.generate_response(
106 "delete_alarm_response",
107 cor_id=cor_id,
108 status=True,
109 alarm_id=alarm_uuid,
110 )
111 except Exception:
112 log.exception("Error deleting alarm: ")
113 response = response_builder.generate_response(
114 "delete_alarm_response",
115 cor_id=cor_id,
116 status=False,
117 alarm_id=alarm_uuid,
118 )
119 await self._publish_response(
120 "alarm_response_" + str(cor_id),
121 "delete_alarm_response",
122 response,
123 )
124
125 except Exception:
126 log.exception("Exception processing message: ")
127
128 async def _publish_response(self, topic: str, key: str, msg: dict):
129 log.info(
130 "Sending response %s to topic %s with key %s", json.dumps(msg), topic, key
131 )
132 await self.msg_bus.aiowrite(topic, key, msg)