2 # Licensed under the Apache License, Version 2.0 (the "License"); you may
3 # not use this file except in compliance with the License. You may obtain
4 # a copy of the License at
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
10 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
11 # License for the specific language governing permissions and limitations
14 # For those usages not covered by the Apache License, Version 2.0 please
15 # contact: alfonso.tiernosepulveda@telefonica.com
19 from osm_lcm
.prometheus
import Prometheus
, initial_prometheus_data
20 from asynctest
.mock
import Mock
21 from osm_common
.dbmemory
import DbMemory
23 __author__
= 'Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>'
26 class TestPrometheus(asynctest
.TestCase
):
28 async def setUp(self
):
29 config
= {'uri': 'http:prometheus:9090',
30 'path': '/etc/prometheus'}
31 self
.db
= Mock(DbMemory())
32 self
.p
= Prometheus(config
, worker_id
='1', db
=self
.db
, loop
=self
.loop
)
34 @asynctest.fail_on(active_handles
=True)
35 async def test_start(self
):
36 # test with database empty
37 self
.db
.get_one
.return_value
= False
38 self
.p
.update
= asynctest
.CoroutineMock()
40 self
.db
.create
.assert_called_once_with('admin', initial_prometheus_data
)
41 self
.p
.update
.assert_called_once_with()
43 # test with database not empty
44 self
.db
.create
.reset_mock()
45 self
.db
.get_one
.return_value
= initial_prometheus_data
46 self
.p
.update
.reset_mock()
48 self
.db
.create
.assert_not_called()
49 self
.p
.update
.assert_called_once_with()
51 @asynctest.fail_on(active_handles
=True)
52 async def test_update(self
):
53 self
.p
.PROMETHEUS_LOCKED_TIME
= 1
54 number_call_set_one
= 0
56 def _db_set_one(*args
, **kwargs
):
57 # simulated that database is not locked at first call
58 nonlocal number_call_set_one
60 number_call_set_one
+= 1
61 if number_call_set_one
== 1:
66 def _check_set_one_calls(set_one_calls
):
67 # check the three calls to database set_one
68 self
.assertEqual(len(set_one_calls
), 3, 'Not called three times to db.set_one, two blocks, one unblock')
69 self
.assertIn('admin', set_one_calls
[0][0], 'db.set_one collection should be admin')
70 first_used_time
= set_one_calls
[0][1]['update_dict']['_admin.locked_at']
71 second_used_time
= set_one_calls
[1][1]['update_dict']['_admin.locked_at']
72 third_used_time
= set_one_calls
[2][1]['update_dict']['_admin.locked_at']
73 self
.assertTrue(first_used_time
!= 0 and second_used_time
!= 0, 'blocking locked_at time must not be 0')
74 self
.assertGreater(second_used_time
, first_used_time
,
75 'Every blocking try must contain a new locked_at time')
76 self
.assertEqual(third_used_time
, 0, 'For unblocking must be set locked_at=0')
79 number_call_set_one
= 0
80 self
.db
.get_one
.return_value
= initial_prometheus_data
81 self
.db
.set_one
.side_effect
= _db_set_one
82 self
.p
.send_data
= asynctest
.CoroutineMock(return_value
=True)
83 add_jobs
= {'job1': {'job_name': 'job1', 'nsr_id': 'nsr_id'}}
84 await self
.p
.update(add_jobs
=add_jobs
)
85 set_one_calls
= self
.db
.set_one
.call_args_list
86 _check_set_one_calls(set_one_calls
)
87 update_dict
= set_one_calls
[2][1]['update_dict']
88 unset_dict
= set_one_calls
[2][1]['unset']
89 expected_final_set
= {
90 '_admin.locked_at': 0,
91 '_admin.locked_by': None,
92 '_admin.modified_at': set_one_calls
[1][1]['update_dict']['_admin.locked_at'],
93 'scrape_configs.job1': add_jobs
['job1']}
94 self
.assertEqual(update_dict
, expected_final_set
, 'invalid set and unlock values')
95 self
.assertEqual(unset_dict
, None, 'invalid unset and unlock values')
98 number_call_set_one
= 0
99 remove_jobs
= ['job1']
100 self
.db
.set_one
.reset_mock()
101 await self
.p
.update(remove_jobs
=remove_jobs
)
102 set_one_calls
= self
.db
.set_one
.call_args_list
103 _check_set_one_calls(set_one_calls
)
104 update_dict
= set_one_calls
[2][1]['update_dict']
105 unset_dict
= set_one_calls
[2][1]['unset']
106 expected_final_set
= {
107 '_admin.locked_at': 0,
108 '_admin.locked_by': None,
109 '_admin.modified_at': set_one_calls
[1][1]['update_dict']['_admin.locked_at']
111 self
.assertEqual(update_dict
, expected_final_set
, 'invalid set and unlock values')
112 self
.assertEqual(unset_dict
, {'scrape_configs.job1': None}, 'invalid unset and unlock values')
114 def test_parse_job(self
):
116 # yaml format with jinja2
117 key1: "parsing var1='{{ var1 }}'"
118 key2: "parsing var2='{{ var2 }}'"
120 vars = {'var1': 'VAR1', 'var2': 'VAR2', 'var3': 'VAR3'}
122 'key1': "parsing var1='VAR1'",
123 'key2': "parsing var2='VAR2'"
125 result
= self
.p
.parse_job(text_to_parse
, vars)
126 self
.assertEqual(result
, expected
, 'Error at jinja2 parse')
129 if __name__
== '__main__':