Resolved bug 1719 - POL hangs waiting for webhook response
[osm/POL.git] / osm_policy_module / tests / unit / alarming / test_alarming_service.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import asyncio
25 from unittest import TestCase, mock
26
27 from osm_policy_module.alarming.service import AlarmingService
28 from osm_policy_module.common.common_db_client import CommonDbClient
29 from osm_policy_module.common.lcm_client import LcmClient
30 from osm_policy_module.common.mon_client import MonClient
31 from osm_policy_module.core.config import Config
32 from osm_policy_module.core.database import VnfAlarmRepository
33
34
35 @mock.patch.object(LcmClient, "__init__", lambda *args, **kwargs: None)
36 @mock.patch.object(MonClient, "__init__", lambda *args, **kwargs: None)
37 @mock.patch.object(CommonDbClient, "__init__", lambda *args, **kwargs: None)
38 class TestAlarmingService(TestCase):
39 def setUp(self):
40 self.config = Config()
41 self.loop = asyncio.new_event_loop()
42
43 @mock.patch.object(VnfAlarmRepository, "get")
44 @mock.patch("requests.post")
45 @mock.patch("osm_policy_module.core.database.db")
46 def test_handle_alarm(self, database, requests_post, get_alarm):
47 alert_timeout = int(self.config.get('alert', 'timeout'))
48 mock_alarm = self._build_mock_alarm("test_id")
49 get_alarm.return_value = mock_alarm
50
51 service = AlarmingService(self.config)
52 self.loop.run_until_complete(service.handle_alarm("test_id", "alarm", {}))
53 requests_post.assert_called_once_with(json="{}", url="http://alarm-url/", timeout=alert_timeout)
54
55 requests_post.reset_mock()
56 self.loop.run_until_complete(service.handle_alarm("test_id", "ok", {}))
57 requests_post.assert_called_once_with(json="{}", url="http://ok-url/", timeout=alert_timeout)
58
59 requests_post.reset_mock()
60 self.loop.run_until_complete(
61 service.handle_alarm("test_id", "insufficient-data", {})
62 )
63 requests_post.assert_called_once_with(
64 json="{}", url="http://insufficient-data-url/", timeout=alert_timeout
65 )
66
67 @mock.patch.object(VnfAlarmRepository, "get")
68 @mock.patch("requests.post")
69 @mock.patch("osm_policy_module.core.database.db")
70 def test_handle_alarm_unknown_status(self, database, requests_post, get_alarm):
71 mock_alarm = self._build_mock_alarm("test_id")
72 get_alarm.return_value = mock_alarm
73
74 service = AlarmingService(self.config)
75 self.loop.run_until_complete(service.handle_alarm("test_id", "unknown", {}))
76 requests_post.assert_not_called()
77
78 def _build_mock_alarm(
79 self,
80 alarm_id="test_id",
81 alarm_url="http://alarm-url/",
82 insufficient_data_url="http://insufficient-data-url/",
83 ok_url="http://ok-url/",
84 ):
85 mock_alarm = mock.Mock()
86 mock_alarm.alarm_id = alarm_id
87 insufficient_data_action = mock.Mock()
88 insufficient_data_action.type = "insufficient-data"
89 insufficient_data_action.url = insufficient_data_url
90 alarm_action = mock.Mock()
91 alarm_action.type = "alarm"
92 alarm_action.url = alarm_url
93 ok_action = mock.Mock()
94 ok_action.type = "ok"
95 ok_action.url = ok_url
96 mock_alarm.actions = [insufficient_data_action, alarm_action, ok_action]
97 return mock_alarm