fix issue with xalan install
[osm/SO.git] / rwmon / test / utest_rwmon.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import os
20 import unittest
21 import xmlrunner
22
23 import rw_peas
24
25 import gi
26 gi.require_version("RwcalYang", "1.0")
27 gi.require_version("RwMon", "1.0")
28 gi.require_version("RwmonYang", "1.0")
29 gi.require_version("RwTypes", "1.0")
30
31
32 from gi.repository import (
33 RwmonYang,
34 RwcalYang,
35 RwTypes,
36 )
37
38
39 class TestNullDataSource(unittest.TestCase):
40 def setUp(self):
41 plugin = rw_peas.PeasPlugin("rwmon_mock", 'RwMon-1.0')
42 self.plugin = plugin.get_interface("Monitoring")
43
44 self.account = RwcalYang.CloudAccount()
45 self.vim_id = "test-vim-id"
46
47 def test_null_data_source(self):
48 """
49 By default, the NFVI metrics plugin mock installs a 'null'
50 implementation that simply returns empty NFVI structures.
51
52 """
53 status, metrics = self.plugin.nfvi_metrics(self.account, self.vim_id)
54 self.assertEqual(status, RwTypes.RwStatus.SUCCESS)
55 self.assertEqual(metrics, RwmonYang.NfviMetrics())
56
57 status, metrics = self.plugin.nfvi_vcpu_metrics(self.account, self.vim_id)
58 self.assertEqual(status, RwTypes.RwStatus.SUCCESS)
59 self.assertEqual(metrics, RwmonYang.NfviMetrics_Vcpu())
60
61 status, metrics = self.plugin.nfvi_memory_metrics(self.account, self.vim_id)
62 self.assertEqual(status, RwTypes.RwStatus.SUCCESS)
63 self.assertEqual(metrics, RwmonYang.NfviMetrics_Memory())
64
65 status, metrics = self.plugin.nfvi_storage_metrics(self.account, self.vim_id)
66 self.assertEqual(status, RwTypes.RwStatus.SUCCESS)
67 self.assertEqual(metrics, RwmonYang.NfviMetrics_Storage())
68
69 status, result = self.plugin.nfvi_metrics_available(self.account)
70 self.assertEqual(status, RwTypes.RwStatus.SUCCESS)
71 self.assertTrue(result)
72
73
74 class TestMockDataSource(unittest.TestCase):
75 def setUp(self):
76 plugin = rw_peas.PeasPlugin("rwmon_mock", 'RwMon-1.0')
77 self.plugin = plugin.get_interface("Monitoring")
78 self.plugin.set_impl(MockDataSource())
79
80 self.account = RwcalYang.CloudAccount()
81 self.vim_id = "test-vim-id"
82
83 def test_mock_data_source(self):
84 """
85 This test installs a mock data source implementation in the plugin,
86 which returns known values. This test simply checks the expected values
87 are indeed returned.
88
89 """
90 expected_vcpu_metrics = RwmonYang.NfviMetrics_Vcpu()
91 expected_vcpu_metrics.utilization = 50.0
92 expected_vcpu_metrics.total = 100
93
94 status, metrics = self.plugin.nfvi_vcpu_metrics(self.account, self.vim_id)
95 self.assertEqual(status, RwTypes.RwStatus.SUCCESS)
96 self.assertEqual(metrics.total, expected_vcpu_metrics.total)
97 self.assertEqual(metrics.utilization, expected_vcpu_metrics.utilization)
98
99 expected_memory_metrics = RwmonYang.NfviMetrics_Memory()
100 expected_memory_metrics.used = 90
101 expected_memory_metrics.total = 100
102 expected_memory_metrics.utilization = 90/100
103
104 status, metrics = self.plugin.nfvi_memory_metrics(self.account, self.vim_id)
105 self.assertEqual(status, RwTypes.RwStatus.SUCCESS)
106 self.assertEqual(metrics.used, expected_memory_metrics.used)
107 self.assertEqual(metrics.total, expected_memory_metrics.total)
108 self.assertEqual(metrics.utilization, expected_memory_metrics.utilization)
109
110 expected_storage_metrics = RwmonYang.NfviMetrics_Storage()
111 expected_storage_metrics.used = 300
112 expected_storage_metrics.total = 500
113 expected_storage_metrics.utilization = 300/500
114
115 status, metrics = self.plugin.nfvi_storage_metrics(self.account, self.vim_id)
116 self.assertEqual(status, RwTypes.RwStatus.SUCCESS)
117 self.assertEqual(metrics.used, expected_storage_metrics.used)
118 self.assertEqual(metrics.total, expected_storage_metrics.total)
119 self.assertEqual(metrics.utilization, expected_storage_metrics.utilization)
120
121 status, metrics = self.plugin.nfvi_metrics(self.account, self.vim_id)
122 self.assertEqual(status, RwTypes.RwStatus.SUCCESS)
123 self.assertEqual(metrics.vcpu.total, expected_vcpu_metrics.total)
124 self.assertEqual(metrics.vcpu.utilization, expected_vcpu_metrics.utilization)
125 self.assertEqual(metrics.storage.used, expected_storage_metrics.used)
126 self.assertEqual(metrics.storage.total, expected_storage_metrics.total)
127 self.assertEqual(metrics.storage.utilization, expected_storage_metrics.utilization)
128 self.assertEqual(metrics.memory.used, expected_memory_metrics.used)
129 self.assertEqual(metrics.memory.total, expected_memory_metrics.total)
130 self.assertEqual(metrics.memory.utilization, expected_memory_metrics.utilization)
131
132 status, result = self.plugin.nfvi_metrics_available(self.account)
133 self.assertEqual(status, RwTypes.RwStatus.SUCCESS)
134 self.assertTrue(result)
135
136
137 class TestMockAlarms(unittest.TestCase):
138 def setUp(self):
139 plugin = rw_peas.PeasPlugin("rwmon_mock", 'RwMon-1.0')
140
141 self.mock = MockAlarmInterface()
142 self.plugin = plugin.get_interface("Monitoring")
143 self.plugin.set_impl(self.mock)
144
145 self.account = RwcalYang.CloudAccount()
146 self.alarm = RwmonYang.Alarm(name='test-alarm')
147 self.vim_id = 'test-vim-id'
148
149 def test(self):
150 """
151 This test uses a simple, mock implementation of the alarm interface to
152 check that create, update, delete, and list work correctly.
153
154 """
155 # In the beginning, there were no alarms
156 _, alarms = self.plugin.do_alarm_list(self.account)
157 self.assertEqual(0, len(alarms))
158
159 # Create two alarms
160 self.plugin.do_alarm_create(self.account, self.vim_id, RwmonYang.Alarm())
161 self.plugin.do_alarm_create(self.account, self.vim_id, RwmonYang.Alarm())
162
163 _, alarms = self.plugin.do_alarm_list(self.account)
164 self.assertEqual(2, len(alarms))
165
166 # The alarms should have no names
167 alarms.sort(key=lambda a: a.alarm_id)
168 self.assertEqual('test-alarm-id-1', alarms[0].alarm_id)
169 self.assertEqual('test-alarm-id-2', alarms[1].alarm_id)
170 self.assertTrue(all(a.name is None for a in alarms))
171
172 # Give names to the alarms
173 alarms[0].name = 'test-alarm'
174 alarms[1].name = 'test-alarm'
175 self.plugin.do_alarm_update(self.account, alarms[0])
176 self.plugin.do_alarm_update(self.account, alarms[1])
177 self.assertTrue(all(a.name == 'test-alarm' for a in alarms))
178
179 # Delete the alarms
180 self.plugin.do_alarm_delete(self.account, alarms[0].alarm_id)
181 self.plugin.do_alarm_delete(self.account, alarms[1].alarm_id)
182 _, alarms = self.plugin.do_alarm_list(self.account)
183 self.assertEqual(0, len(alarms))
184
185
186 class MockAlarmInterface(object):
187 """
188 This class is mock impementation for the alarm interface on the monitoring
189 plugin.
190 """
191
192 def __init__(self):
193 self.count = 0
194 self.alarms = dict()
195
196 def alarm_create(self, account, vim_id, alarm):
197 self.count += 1
198 alarm_id = 'test-alarm-id-{}'.format(self.count)
199 alarm.alarm_id = alarm_id
200 self.alarms[alarm_id] = alarm
201
202 def alarm_update(self, account, alarm):
203 assert alarm.alarm_id is not None
204 self.alarms[alarm.alarm_id] = alarm
205
206 def alarm_delete(self, account, alarm_id):
207 del self.alarms[alarm_id]
208
209 def alarm_list(self, account):
210 return list(self.alarms.values())
211
212
213 class MockDataSource(object):
214 """
215 This class implements the data source interface used by the monitoring
216 plugin and provides mock data for testing.
217 """
218
219 def nfvi_metrics(self, account, vm_id):
220 metrics = RwmonYang.NfviMetrics()
221 metrics.vcpu = self.nfvi_vcpu_metrics(account, vm_id)
222 metrics.memory = self.nfvi_memory_metrics(account, vm_id)
223 metrics.storage = self.nfvi_storage_metrics(account, vm_id)
224 return metrics
225
226 def nfvi_vcpu_metrics(self, account, vm_id):
227 metrics = RwmonYang.NfviMetrics_Vcpu()
228 metrics.total = 100
229 metrics.utilization = 50.0
230 return metrics
231
232 def nfvi_memory_metrics(self, account, vm_id):
233 metrics = RwmonYang.NfviMetrics_Memory()
234 metrics.used = 90
235 metrics.total = 100
236 metrics.utilization = 90/100
237 return metrics
238
239 def nfvi_storage_metrics(self, account, vm_id):
240 metrics = RwmonYang.NfviMetrics_Storage()
241 metrics.used = 300
242 metrics.total = 500
243 metrics.utilization = 300/500
244 return metrics
245
246 def nfvi_metrics_available(self, account):
247 return True
248
249
250 def main():
251 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
252 unittest.main(testRunner=runner)
253
254
255 if __name__ == '__main__':
256 main()
257
258 # vim: sw=4