fixing prometheus metric exporter issues
[osm/LCM.git] / osm_lcm / tests / test_prometheus.py
1 ##
2 # Licensed under the Apache License, Version 2.0 (the "License"); you may
3 # not use this file except in compliance with the License. You may obtain
4 # a copy of the License at
5 #
6 # http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
10 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
11 # License for the specific language governing permissions and limitations
12 # under the License.
13 #
14 # For those usages not covered by the Apache License, Version 2.0 please
15 # contact: alfonso.tiernosepulveda@telefonica.com
16 ##
17
18 import asynctest
19 from osm_lcm.prometheus import Prometheus, initial_prometheus_data
20 from asynctest.mock import Mock
21 from osm_common.dbmemory import DbMemory
22
23 __author__ = 'Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>'
24
25
26 class TestPrometheus(asynctest.TestCase):
27
28 async def setUp(self):
29 config = {'uri': 'http:prometheus:9090',
30 'path': '/etc/prometheus'}
31 self.db = Mock(DbMemory())
32 self.p = Prometheus(config, worker_id='1', db=self.db, loop=self.loop)
33
34 @asynctest.fail_on(active_handles=True)
35 async def test_start(self):
36 # test with database empty
37 self.db.get_one.return_value = False
38 self.p.update = asynctest.CoroutineMock()
39 await self.p.start()
40 self.db.create.assert_called_once_with('admin', initial_prometheus_data)
41 self.p.update.assert_called_once_with()
42
43 # test with database not empty
44 self.db.create.reset_mock()
45 self.db.get_one.return_value = initial_prometheus_data
46 self.p.update.reset_mock()
47 await self.p.start()
48 self.db.create.assert_not_called()
49 self.p.update.assert_called_once_with()
50
51 @asynctest.fail_on(active_handles=True)
52 async def test_update(self):
53 self.p.PROMETHEUS_LOCKED_TIME = 1
54 number_call_set_one = 0
55
56 def _db_set_one(*args, **kwargs):
57 # simulated that database is not locked at first call
58 nonlocal number_call_set_one
59
60 number_call_set_one += 1
61 if number_call_set_one == 1:
62 return
63 else:
64 return {'update': 1}
65
66 def _check_set_one_calls(set_one_calls):
67 # check the three calls to database set_one
68 self.assertEqual(len(set_one_calls), 3, 'Not called three times to db.set_one, two blocks, one unblock')
69 self.assertIn('admin', set_one_calls[0][0], 'db.set_one collection should be admin')
70 first_used_time = set_one_calls[0][1]['update_dict']['_admin.locked_at']
71 second_used_time = set_one_calls[1][1]['update_dict']['_admin.locked_at']
72 third_used_time = set_one_calls[2][1]['update_dict']['_admin.locked_at']
73 self.assertTrue(first_used_time != 0 and second_used_time != 0, 'blocking locked_at time must not be 0')
74 self.assertGreater(second_used_time, first_used_time,
75 'Every blocking try must contain a new locked_at time')
76 self.assertEqual(third_used_time, 0, 'For unblocking must be set locked_at=0')
77
78 # check add_jobs
79 number_call_set_one = 0
80 self.db.get_one.return_value = initial_prometheus_data
81 self.db.set_one.side_effect = _db_set_one
82 self.p.send_data = asynctest.CoroutineMock(return_value=True)
83 add_jobs = {'job1': {'job_name': 'job1', 'nsr_id': 'nsr_id'}}
84 await self.p.update(add_jobs=add_jobs)
85 set_one_calls = self.db.set_one.call_args_list
86 _check_set_one_calls(set_one_calls)
87 update_dict = set_one_calls[2][1]['update_dict']
88 unset_dict = set_one_calls[2][1]['unset']
89 expected_final_set = {
90 '_admin.locked_at': 0,
91 '_admin.locked_by': None,
92 '_admin.modified_at': set_one_calls[1][1]['update_dict']['_admin.locked_at'],
93 'scrape_configs.job1': add_jobs['job1']}
94 self.assertEqual(update_dict, expected_final_set, 'invalid set and unlock values')
95 self.assertEqual(unset_dict, None, 'invalid unset and unlock values')
96
97 # check remove_jobs
98 number_call_set_one = 0
99 remove_jobs = ['job1']
100 self.db.set_one.reset_mock()
101 await self.p.update(remove_jobs=remove_jobs)
102 set_one_calls = self.db.set_one.call_args_list
103 _check_set_one_calls(set_one_calls)
104 update_dict = set_one_calls[2][1]['update_dict']
105 unset_dict = set_one_calls[2][1]['unset']
106 expected_final_set = {
107 '_admin.locked_at': 0,
108 '_admin.locked_by': None,
109 '_admin.modified_at': set_one_calls[1][1]['update_dict']['_admin.locked_at']
110 }
111 self.assertEqual(update_dict, expected_final_set, 'invalid set and unlock values')
112 self.assertEqual(unset_dict, {'scrape_configs.job1': None}, 'invalid unset and unlock values')
113
114 def test_parse_job(self):
115 text_to_parse = """
116 # yaml format with jinja2
117 key1: "parsing var1='{{ var1 }}'"
118 key2: "parsing var2='{{ var2 }}'"
119 """
120 vars = {'var1': 'VAR1', 'var2': 'VAR2', 'var3': 'VAR3'}
121 expected = {
122 'key1': "parsing var1='VAR1'",
123 'key2': "parsing var2='VAR2'"
124 }
125 result = self.p.parse_job(text_to_parse, vars)
126 self.assertEqual(result, expected, 'Error at jinja2 parse')
127
128
129 if __name__ == '__main__':
130 asynctest.main()