Fix bug 1442: set SDN target in vim_info for NS VLD
[osm/LCM.git] / osm_lcm / tests / test_prometheus.py
1 ##
2 # Licensed under the Apache License, Version 2.0 (the "License"); you may
3 # not use this file except in compliance with the License. You may obtain
4 # a copy of the License at
5 #
6 # http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
10 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
11 # License for the specific language governing permissions and limitations
12 # under the License.
13 #
14 # For those usages not covered by the Apache License, Version 2.0 please
15 # contact: alfonso.tiernosepulveda@telefonica.com
16 ##
17
18 import asynctest
19 from osm_lcm.prometheus import Prometheus, initial_prometheus_data
20 from asynctest.mock import Mock
21 from osm_lcm.data_utils.database.database import Database
22
23 __author__ = 'Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>'
24
25
26 class TestPrometheus(asynctest.TestCase):
27
28 async def setUp(self):
29 config = {'uri': 'http:prometheus:9090',
30 'path': '/etc/prometheus'}
31 # Cleanup singleton Database instance
32 Database.instance = None
33
34 self.db = Mock(Database({
35 "database": {
36 "driver": "memory"
37 }
38 }).instance.db)
39 Database().instance.db = self.db
40 self.p = Prometheus(config, worker_id='1', loop=self.loop)
41
42 @asynctest.fail_on(active_handles=True)
43 async def test_start(self):
44 # test with database empty
45 self.db.get_one.return_value = False
46 self.p.update = asynctest.CoroutineMock()
47 await self.p.start()
48 self.db.create.assert_called_once_with('admin', initial_prometheus_data)
49 self.p.update.assert_called_once_with()
50
51 # test with database not empty
52 self.db.create.reset_mock()
53 self.db.get_one.return_value = initial_prometheus_data
54 self.p.update.reset_mock()
55 await self.p.start()
56 self.db.create.assert_not_called()
57 self.p.update.assert_called_once_with()
58
59 @asynctest.fail_on(active_handles=True)
60 async def test_update(self):
61 self.p.PROMETHEUS_LOCKED_TIME = 1
62 number_call_set_one = 0
63
64 def _db_set_one(*args, **kwargs):
65 # simulated that database is not locked at first call
66 nonlocal number_call_set_one
67
68 number_call_set_one += 1
69 if number_call_set_one == 1:
70 return
71 else:
72 return {'update': 1}
73
74 def _check_set_one_calls(set_one_calls):
75 # check the three calls to database set_one
76 self.assertEqual(len(set_one_calls), 3, 'Not called three times to db.set_one, two blocks, one unblock')
77 self.assertIn('admin', set_one_calls[0][0], 'db.set_one collection should be admin')
78 first_used_time = set_one_calls[0][1]['update_dict']['_admin.locked_at']
79 second_used_time = set_one_calls[1][1]['update_dict']['_admin.locked_at']
80 third_used_time = set_one_calls[2][1]['update_dict']['_admin.locked_at']
81 self.assertTrue(first_used_time != 0 and second_used_time != 0, 'blocking locked_at time must not be 0')
82 self.assertGreater(second_used_time, first_used_time,
83 'Every blocking try must contain a new locked_at time')
84 self.assertEqual(third_used_time, 0, 'For unblocking must be set locked_at=0')
85
86 # check add_jobs
87 number_call_set_one = 0
88 self.db.get_one.return_value = initial_prometheus_data
89 self.db.set_one.side_effect = _db_set_one
90 self.p.send_data = asynctest.CoroutineMock(return_value=True)
91 add_jobs = {'job1': {'job_name': 'job1', 'nsr_id': 'nsr_id'}}
92 await self.p.update(add_jobs=add_jobs)
93 set_one_calls = self.db.set_one.call_args_list
94 _check_set_one_calls(set_one_calls)
95 update_dict = set_one_calls[2][1]['update_dict']
96 unset_dict = set_one_calls[2][1]['unset']
97 expected_final_set = {
98 '_admin.locked_at': 0,
99 '_admin.locked_by': None,
100 '_admin.modified_at': set_one_calls[1][1]['update_dict']['_admin.locked_at'],
101 'scrape_configs.job1': add_jobs['job1']}
102 self.assertEqual(update_dict, expected_final_set, 'invalid set and unlock values')
103 self.assertEqual(unset_dict, None, 'invalid unset and unlock values')
104
105 # check remove_jobs
106 number_call_set_one = 0
107 remove_jobs = ['job1']
108 self.db.set_one.reset_mock()
109 await self.p.update(remove_jobs=remove_jobs)
110 set_one_calls = self.db.set_one.call_args_list
111 _check_set_one_calls(set_one_calls)
112 update_dict = set_one_calls[2][1]['update_dict']
113 unset_dict = set_one_calls[2][1]['unset']
114 expected_final_set = {
115 '_admin.locked_at': 0,
116 '_admin.locked_by': None,
117 '_admin.modified_at': set_one_calls[1][1]['update_dict']['_admin.locked_at']
118 }
119 self.assertEqual(update_dict, expected_final_set, 'invalid set and unlock values')
120 self.assertEqual(unset_dict, {'scrape_configs.job1': None}, 'invalid unset and unlock values')
121
122 def test_parse_job(self):
123 text_to_parse = """
124 # yaml format with jinja2
125 key1: "parsing var1='{{ var1 }}'"
126 key2: "parsing var2='{{ var2 }}'"
127 """
128 vars = {'var1': 'VAR1', 'var2': 'VAR2', 'var3': 'VAR3'}
129 expected = {
130 'key1': "parsing var1='VAR1'",
131 'key2': "parsing var2='VAR2'"
132 }
133 result = self.p.parse_job(text_to_parse, vars)
134 self.assertEqual(result, expected, 'Error at jinja2 parse')
135
136
137 if __name__ == '__main__':
138 asynctest.main()