Fix pylint issues appeared with version 3.2.0 of pylint
[osm/MON.git] / osm_mon / tests / unit / collector / vnf_collectors / vmware / test_vcd_collector.py
1 # -*- coding: utf-8 -*-
2 # #
3 # Copyright 2016-2019 VMware Inc.
4 # This file is part of ETSI OSM
5 # All Rights Reserved.
6 #
7 # Licensed under the Apache License, Version 2.0 (the "License"); you may
8 # not use this file except in compliance with the License. You may obtain
9 # a copy of the License at
10 #
11 # http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 # License for the specific language governing permissions and limitations
17 # under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact: osslegalrouting@vmware.com
21 # #
22
23 from osm_mon.collector.vnf_collectors.vmware import VMwareCollector
24 from osm_mon.core.config import Config
25 from osm_mon.tests.unit.collector.vnf_collectors.vmware.mock_http import (
26 mock_http_response,
27 )
28 from unittest import TestCase, mock
29
30 import json
31 import os
32 import requests_mock
33
34 VIM_ACCOUNT = {
35 "vrops_site": "https://vrops",
36 "vrops_user": "",
37 "vrops_password": "",
38 "vim_url": "https://vcd",
39 "admin_username": "",
40 "admin_password": "",
41 "vim_uuid": "",
42 }
43
44
45 @mock.patch.object(VMwareCollector, "get_vm_moref_id", spec_set=True, autospec=True)
46 class CollectorTest(TestCase):
47 @mock.patch.object(VMwareCollector, "get_vim_account", spec_set=True, autospec=True)
48 @mock.patch("osm_mon.collector.vnf_collectors.vmware.CommonDbClient")
49 def setUp(self, mock_db, mock_get_vim_account):
50 super().setUp()
51 mock_vim_session = mock.Mock()
52 mock_get_vim_account.return_value = VIM_ACCOUNT
53 self.collector = VMwareCollector(
54 Config(), "9de6df67-b820-48c3-bcae-ee4838c5c5f4", mock_vim_session
55 )
56 self.mock_db = mock_db
57 with open(
58 os.path.join(os.path.dirname(__file__), "osm_mocks", "VNFR.json"), "r"
59 ) as f:
60 self.vnfr = json.load(f)
61 with open(
62 os.path.join(os.path.dirname(__file__), "osm_mocks", "VNFD.json"), "r"
63 ) as f:
64 self.vnfd = json.load(f)
65
66 def tearDown(self):
67 super().tearDown()
68
69 def test_collect_cpu_and_memory(self, mock_vm_moref_id):
70 mock_vm_moref_id.return_value = "VMWARE-OID-VM-1"
71 self.vnfd["vdu"][0]["monitoring-parameter"] = [
72 {"id": "ubuntu_vnf_cpu_util", "performance-metric": "cpu_utilization"},
73 {
74 "id": "ubuntu_vnf_average_memory_utilization",
75 "performance-metric": "average_memory_utilization",
76 },
77 ]
78 self.mock_db.return_value.get_vnfd.return_value = self.vnfd
79
80 with requests_mock.Mocker() as mock_requests:
81 mock_http_response(
82 mock_requests,
83 method="POST",
84 url_pattern="/suite-api/api/auth/token/acquire",
85 response_file="vrops_token.json",
86 )
87 mock_http_response(
88 mock_requests,
89 url_pattern="/suite-api/api/resources\\?resourceKind=VirtualMachine",
90 response_file="vrops_resources.json",
91 )
92 mock_http_response(
93 mock_requests,
94 url_pattern="/suite-api/api/resources/stats.*",
95 response_file="vrops_multi.json",
96 )
97 metrics = self.collector.collect(self.vnfr)
98 self.assertEqual(len(metrics), 2, "Number of metrics returned")
99 self.assertEqual(metrics[0].name, "cpu_utilization", "First metric name")
100 self.assertEqual(metrics[0].value, 100.0, "CPU metric value")
101 self.assertEqual(
102 metrics[1].name, "average_memory_utilization", "Second metric name"
103 )
104 self.assertEqual(metrics[1].value, 20.515941619873047, "Memory metric value")
105
106 def test_collect_no_moref(self, mock_vm_moref_id):
107 mock_vm_moref_id.return_value = None
108 self.mock_db.return_value.get_vnfd.return_value = self.vnfd
109 with requests_mock.Mocker() as mock_requests:
110 mock_http_response(
111 mock_requests,
112 method="POST",
113 url_pattern="/suite-api/api/auth/token/acquire",
114 response_file="vrops_token.json",
115 )
116 mock_http_response(
117 mock_requests,
118 url_pattern="/suite-api/api/resources\\?resourceKind=VirtualMachine",
119 response_file="404.txt",
120 status_code=404,
121 )
122 metrics = self.collector.collect(self.vnfr)
123 self.assertEqual(len(metrics), 0, "Number of metrics returned")
124
125 def test_collect_no_monitoring_param(self, _):
126 self.vnfd["vdu"][0]["monitoring-parameter"] = []
127 self.mock_db.return_value.get_vnfd.return_value = self.vnfd
128 with requests_mock.Mocker() as mock_requests:
129 mock_http_response(
130 mock_requests,
131 method="POST",
132 url_pattern="/suite-api/api/auth/token/acquire",
133 response_file="vrops_token.json",
134 )
135 mock_http_response(
136 mock_requests,
137 url_pattern="/suite-api/api/resources\\?resourceKind=VirtualMachine",
138 response_file="vrops_resources.json",
139 )
140 mock_http_response(
141 mock_requests,
142 url_pattern="/suite-api/api/resources/stats.*",
143 response_file="vrops_multi.json",
144 )
145 metrics = self.collector.collect(self.vnfr)
146 self.assertEqual(len(metrics), 0, "Number of metrics returned")
147
148 def test_collect_empty_monitoring_param(self, _):
149 del self.vnfd["vdu"][0]["monitoring-parameter"]
150 self.mock_db.return_value.get_vnfd.return_value = self.vnfd
151 with requests_mock.Mocker() as mock_requests:
152 mock_http_response(
153 mock_requests,
154 method="POST",
155 url_pattern="/suite-api/api/auth/token/acquire",
156 response_file="vrops_token.json",
157 )
158 mock_http_response(
159 mock_requests,
160 url_pattern="/suite-api/api/resources\\?resourceKind=VirtualMachine",
161 response_file="vrops_resources.json",
162 )
163 mock_http_response(
164 mock_requests,
165 url_pattern="/suite-api/api/resources/stats.*",
166 response_file="vrops_multi.json",
167 )
168 metrics = self.collector.collect(self.vnfr)
169 self.assertEqual(len(metrics), 0, "Number of metrics returned")
170
171 def test_collect_no_name(self, _):
172 del self.vnfr["vdur"][0]["name"]
173 del self.vnfr["vdur"][1]["name"]
174 self.mock_db.return_value.get_vnfd.return_value = self.vnfd
175 with requests_mock.Mocker() as mock_requests:
176 mock_http_response(
177 mock_requests,
178 method="POST",
179 url_pattern="/suite-api/api/auth/token/acquire",
180 response_file="vrops_token.json",
181 )
182 mock_http_response(
183 mock_requests,
184 url_pattern="/suite-api/api/resources\\?resourceKind=VirtualMachine",
185 response_file="vrops_resources.json",
186 )
187 mock_http_response(
188 mock_requests,
189 url_pattern="/suite-api/api/resources/stats.*",
190 response_file="vrops_multi.json",
191 )
192 metrics = self.collector.collect(self.vnfr)
193 self.assertEqual(len(metrics), 0, "Number of metrics returned")
194
195
196 class VApp_Details_Test(TestCase):
197 @mock.patch.object(VMwareCollector, "get_vim_account", spec_set=True, autospec=True)
198 @mock.patch("osm_mon.collector.vnf_collectors.vmware.CommonDbClient")
199 def setUp(self, mock_db, mock_get_vim_account):
200 super().setUp()
201 self.mock_db = mock_db
202 mock_vim_session = mock.Mock()
203 mock_get_vim_account.return_value = VIM_ACCOUNT
204 self.collector = VMwareCollector(
205 Config(), "9de6df67-b820-48c3-bcae-ee4838c5c5f4", mock_vim_session
206 )
207
208 def tearDown(self):
209 super().tearDown()
210
211 @mock.patch("osm_mon.collector.vnf_collectors.vmware.Client")
212 def test_get_vapp_details(self, mock_vcd_client):
213 mock_vcd_client.return_value._session.headers = {"x-vcloud-authorization": ""}
214 with requests_mock.Mocker() as mock_requests:
215 mock_http_response(
216 mock_requests,
217 site="https://vcd",
218 url_pattern="/api/vApp/.*",
219 response_file="vcd_vapp_response.xml",
220 )
221 response = self.collector.get_vapp_details_rest("")
222 self.assertDictContainsSubset(
223 {"vm_vcenter_info": {"vm_moref_id": "vm-4055"}},
224 response,
225 "Managed object reference id incorrect",
226 )
227
228 def test_no_admin_connect(self):
229 response = self.collector.get_vapp_details_rest("")
230 self.assertDictEqual(
231 response, {}, "Failed to connect should return empty dictionary"
232 )
233
234 def test_no_id(self):
235 response = self.collector.get_vapp_details_rest()
236 self.assertDictEqual(
237 response, {}, "No id supplied should return empty dictionary"
238 )
239
240 @mock.patch("osm_mon.collector.vnf_collectors.vmware.Client")
241 def test_get_vapp_details_404(self, mock_vcd_client):
242 mock_vcd_client.return_value._session.headers = {"x-vcloud-authorization": ""}
243 with requests_mock.Mocker() as mock_requests:
244 mock_http_response(
245 mock_requests,
246 site="https://vcd",
247 url_pattern="/api/vApp/.*",
248 response_file="404.txt",
249 status_code=404,
250 )
251 response = self.collector.get_vapp_details_rest("")
252 self.assertDictEqual(response, {}, "HTTP error should return empty dictionary")
253
254 @mock.patch("osm_mon.collector.vnf_collectors.vmware.Client")
255 def test_get_vapp_details_xml_parse_error(self, mock_vcd_client):
256 mock_vcd_client.return_value._session.headers = {"x-vcloud-authorization": ""}
257 with requests_mock.Mocker() as mock_requests:
258 mock_http_response(
259 mock_requests,
260 site="https://vcd",
261 url_pattern="/api/vApp/.*",
262 response_file="404.txt",
263 )
264 response = self.collector.get_vapp_details_rest("")
265 self.assertDictEqual(
266 response, {}, "XML parse error should return empty dictionary"
267 )
268
269
270 class Get_VM_Moref_Test(TestCase):
271 @mock.patch.object(VMwareCollector, "get_vim_account", spec_set=True, autospec=True)
272 @mock.patch("osm_mon.collector.vnf_collectors.vmware.CommonDbClient")
273 def setUp(self, mock_db, mock_get_vim_account):
274 super().setUp()
275 self.mock_db = mock_db
276 mock_vim_session = mock.Mock()
277 mock_get_vim_account.return_value = VIM_ACCOUNT
278 self.collector = VMwareCollector(
279 Config(), "9de6df67-b820-48c3-bcae-ee4838c5c5f4", mock_vim_session
280 )
281
282 def tearDown(self):
283 super().tearDown()
284
285 @mock.patch.object(
286 VMwareCollector, "get_vapp_details_rest", spec_set=True, autospec=True
287 )
288 def test_get_vm_moref_id(self, mock_vapp_details):
289 mock_vapp_details.return_value = {"vm_vcenter_info": {"vm_moref_id": "vm-4055"}}
290 response = self.collector.get_vm_moref_id("1234")
291 self.assertEqual(
292 response, "vm-4055", "Did not fetch correct ref id from dictionary"
293 )
294
295 @mock.patch.object(
296 VMwareCollector, "get_vapp_details_rest", spec_set=True, autospec=True
297 )
298 def test_get_vm_moref_bad_content(self, mock_vapp_details):
299 mock_vapp_details.return_value = {}
300 response = self.collector.get_vm_moref_id("1234")
301 self.assertEqual(
302 response, None, "Error fetching vapp details should return None"
303 )
304
305 @mock.patch.object(
306 VMwareCollector, "get_vapp_details_rest", spec_set=True, autospec=True
307 )
308 def test_get_vm_moref_has_exception(self, mock_vapp_details):
309 mock_vapp_details.side_effect = Exception("Testing")
310 response = self.collector.get_vm_moref_id("1234")
311 self.assertEqual(
312 response, None, "Exception while fetching vapp details should return None"
313 )