1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
26 from unittest
import mock
28 from osm_policy_module
.alarming
.service
import AlarmingService
29 from osm_policy_module
.autoscaling
.service
import AutoscalingService
30 from osm_policy_module
.common
.common_db_client
import CommonDbClient
31 from osm_policy_module
.core
.agent
import PolicyModuleAgent
32 from osm_policy_module
.core
.config
import Config
33 from osm_policy_module
.healing
.service
import HealingService
36 class PolicyAgentTest(unittest
.TestCase
):
38 self
.loop
= asyncio
.new_event_loop()
40 @mock.patch
.object(CommonDbClient
, "__init__", lambda *args
, **kwargs
: None)
41 @mock.patch("osm_policy_module.alarming.service.MonClient")
42 @mock.patch("osm_policy_module.alarming.service.LcmClient")
43 @mock.patch("osm_policy_module.autoscaling.service.MonClient")
44 @mock.patch("osm_policy_module.autoscaling.service.LcmClient")
45 @mock.patch("osm_policy_module.healing.service.MonClient")
46 @mock.patch("osm_policy_module.healing.service.LcmClient")
47 @mock.patch
.object(AutoscalingService
, "configure_scaling_groups")
48 @mock.patch
.object(AlarmingService
, "configure_vnf_alarms")
49 @mock.patch
.object(HealingService
, "configure_healing_alarms")
50 @mock.patch
.object(AutoscalingService
, "delete_orphaned_alarms")
51 @mock.patch
.object(CommonDbClient
, "get_nslcmop")
52 def test_handle_instantiated(
55 delete_orphaned_alarms
,
56 configure_healing_alarms
,
58 configure_scaling_groups
,
59 autoscaling_lcm_client
,
60 autoscaling_mon_client
,
66 async def mock_configure_scaling_groups(nsr_id
):
69 async def mock_configure_vnf_alarms(nsr_id
):
72 async def mock_configure_healing_alarms(nsr_id
):
75 async def mock_delete_orphaned_alarms(nsr_id
):
79 agent
= PolicyModuleAgent(config
)
80 assert autoscaling_lcm_client
.called
81 assert autoscaling_mon_client
.called
82 assert alarming_lcm_client
.called
83 assert alarming_mon_client
.called
84 assert healing_lcm_client
.called
85 assert healing_mon_client
.called
87 "nslcmop_id": "test_id",
90 "operationState": "COMPLETED",
91 "nsInstanceId": "test_nsr_id",
93 nslcmop_failed
= {"operationState": "FAILED", "nsInstanceId": "test_nsr_id"}
94 configure_scaling_groups
.side_effect
= mock_configure_scaling_groups
95 configure_vnf_alarms
.side_effect
= mock_configure_vnf_alarms
96 configure_healing_alarms
.side_effect
= mock_configure_healing_alarms
97 delete_orphaned_alarms
.side_effect
= mock_delete_orphaned_alarms
99 get_nslcmop
.return_value
= nslcmop_completed
100 self
.loop
.run_until_complete(agent
._handle
_instantiated
(content
))
101 configure_scaling_groups
.assert_called_with("test_nsr_id")
102 configure_scaling_groups
.reset_mock()
104 get_nslcmop
.return_value
= nslcmop_failed
105 self
.loop
.run_until_complete(agent
._handle
_instantiated
(content
))
106 configure_scaling_groups
.assert_not_called()
108 @mock.patch
.object(CommonDbClient
, "__init__", lambda *args
, **kwargs
: None)
109 @mock.patch("osm_policy_module.alarming.service.MonClient")
110 @mock.patch("osm_policy_module.alarming.service.LcmClient")
111 @mock.patch("osm_policy_module.autoscaling.service.MonClient")
112 @mock.patch("osm_policy_module.autoscaling.service.LcmClient")
113 @mock.patch("osm_policy_module.healing.service.MonClient")
114 @mock.patch("osm_policy_module.healing.service.LcmClient")
115 @mock.patch
.object(HealingService
, "configure_healing_alarms")
116 @mock.patch
.object(AutoscalingService
, "configure_scaling_groups")
117 @mock.patch
.object(AlarmingService
, "configure_vnf_alarms")
118 @mock.patch
.object(HealingService
, "delete_healing_alarms")
119 @mock.patch
.object(AutoscalingService
, "delete_scaling_groups")
120 @mock.patch
.object(AlarmingService
, "delete_vnf_alarms")
121 def test_handle_policy_update(
124 delete_scaling_groups
,
125 delete_healing_alarms
,
126 configure_vnf_alarms
,
127 configure_scaling_groups
,
128 configure_healing_alarms
,
131 autoscaling_lcm_client
,
132 autoscaling_mon_client
,
136 async def mock_delete_scaling_groups(nsr_id
, vnf_member_index
):
139 async def mock_delete_vnf_alarms(nsr_id
, vnf_member_index
):
142 async def mock_delete_healing_alarms(nsr_id
, vnf_member_index
):
145 async def mock_configure_scaling_groups(nsr_id
, vnf_member_index
):
148 async def mock_configure_vnf_alarms(nsr_id
, vnf_member_index
):
151 async def mock_configure_healing_alarms(nsr_id
, vnf_member_index
):
155 agent
= PolicyModuleAgent(config
)
156 assert autoscaling_lcm_client
.called
157 assert autoscaling_mon_client
.called
158 assert alarming_lcm_client
.called
159 assert alarming_mon_client
.called
160 assert healing_lcm_client
.called
161 assert healing_mon_client
.called
163 "nsr_id": "test_nsr_id",
164 "vnf_member_index": "1",
165 "operationState": "COMPLETED",
168 "nsr_id": "test_nsr_id",
169 "vnf_member_index": "1",
170 "operationState": "FAILED",
172 configure_scaling_groups
.side_effect
= mock_configure_scaling_groups
173 configure_vnf_alarms
.side_effect
= mock_configure_vnf_alarms
174 configure_healing_alarms
.side_effect
= mock_configure_healing_alarms
175 delete_scaling_groups
.side_effect
= mock_delete_scaling_groups
176 delete_vnf_alarms
.side_effect
= mock_delete_vnf_alarms
177 delete_healing_alarms
.side_effect
= mock_delete_healing_alarms
179 self
.loop
.run_until_complete(agent
._handle
_policy
_update
(content
))
180 configure_scaling_groups
.assert_called_with("test_nsr_id", "1")
181 configure_scaling_groups
.reset_mock()
183 self
.loop
.run_until_complete(agent
._handle
_policy
_update
(failed_content
))
184 configure_scaling_groups
.assert_not_called()
186 @mock.patch
.object(CommonDbClient
, "__init__", lambda *args
, **kwargs
: None)
187 @mock.patch("osm_policy_module.autoscaling.service.MonClient")
188 @mock.patch("osm_policy_module.autoscaling.service.LcmClient")
189 @mock.patch("osm_policy_module.alarming.service.MonClient")
190 @mock.patch("osm_policy_module.alarming.service.LcmClient")
191 @mock.patch("osm_policy_module.healing.service.MonClient")
192 @mock.patch("osm_policy_module.healing.service.LcmClient")
193 @mock.patch
.object(AutoscalingService
, "handle_alarm")
194 @mock.patch
.object(AlarmingService
, "handle_alarm")
195 @mock.patch
.object(HealingService
, "handle_alarm")
196 def test_handle_alarm_notification(
198 healing_handle_alarm
,
199 alarming_handle_alarm
,
200 autoscaling_handle_alarm
,
201 autoscaling_lcm_client
,
202 autoscaling_mon_client
,
208 async def mock_handle_alarm(alarm_uuid
, status
, payload
=None):
212 agent
= PolicyModuleAgent(config
)
213 assert autoscaling_lcm_client
.called
214 assert autoscaling_mon_client
.called
215 assert alarming_lcm_client
.called
216 assert alarming_mon_client
.called
217 assert healing_lcm_client
.called
218 assert healing_mon_client
.called
221 "alarm_uuid": "test_alarm_uuid",
222 "metric_name": "test_metric_name",
223 "operation": "test_operation",
224 "threshold_value": "test_threshold_value",
225 "vdu_name": "test_vdu_name",
226 "vnf_member_index": "test_vnf_member_index",
227 "ns_id": "test_nsr_id",
231 autoscaling_handle_alarm
.side_effect
= mock_handle_alarm
232 alarming_handle_alarm
.side_effect
= mock_handle_alarm
233 healing_handle_alarm
.side_effect
= mock_handle_alarm
235 self
.loop
.run_until_complete(agent
._handle
_alarm
_notification
(content
))
236 autoscaling_handle_alarm
.assert_called_with("test_alarm_uuid", "alarm")
237 alarming_handle_alarm
.assert_called_with("test_alarm_uuid", "alarm", content
)
238 healing_handle_alarm
.assert_called_with("test_alarm_uuid", "alarm")
240 @mock.patch
.object(CommonDbClient
, "__init__", lambda *args
, **kwargs
: None)
241 @mock.patch("osm_policy_module.alarming.service.MonClient")
242 @mock.patch("osm_policy_module.alarming.service.LcmClient")
243 @mock.patch("osm_policy_module.autoscaling.service.MonClient")
244 @mock.patch("osm_policy_module.autoscaling.service.LcmClient")
245 @mock.patch
.object(AutoscalingService
, "delete_scaling_groups")
246 @mock.patch
.object(AlarmingService
, "delete_vnf_alarms")
247 def test_handle_vnf_terminated(
250 delete_scaling_groups
,
251 autoscaling_lcm_client
,
252 autoscaling_mon_client
,
256 async def mock_delete_scaling_groups(nsr_id
, vnf_member_index
):
259 async def mock_delete_vnf_alarms(nsr_id
, vnf_member_index
):
263 agent
= PolicyModuleAgent(config
)
264 assert autoscaling_lcm_client
.called
265 assert autoscaling_mon_client
.called
266 assert alarming_lcm_client
.called
267 assert alarming_mon_client
.called
269 "nsr_id": "test_nsr_id",
270 "vnf_member_index": "1",
271 "operationState": "COMPLETED",
274 "nsr_id": "test_nsr_id",
275 "vnf_member_index": "1",
276 "operationState": "FAILED",
278 delete_scaling_groups
.side_effect
= mock_delete_scaling_groups
279 delete_vnf_alarms
.side_effect
= mock_delete_vnf_alarms
281 self
.loop
.run_until_complete(agent
._handle
_vnf
_terminated
(content
))
282 delete_scaling_groups
.assert_called_with("test_nsr_id", "1")
283 delete_scaling_groups
.reset_mock()
285 self
.loop
.run_until_complete(agent
._handle
_vnf
_terminated
(failed_content
))
286 delete_scaling_groups
.assert_not_called()
289 if __name__
== "__main__":