2 # Licensed under the Apache License, Version 2.0 (the "License"); you may
3 # not use this file except in compliance with the License. You may obtain
4 # a copy of the License at
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
10 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
11 # License for the specific language governing permissions and limitations
14 # For those usages not covered by the Apache License, Version 2.0 please
15 # contact: alfonso.tiernosepulveda@telefonica.com
19 from osm_lcm
.prometheus
import Prometheus
, initial_prometheus_data
20 from asynctest
.mock
import Mock
21 from osm_lcm
.data_utils
.database
.database
import Database
23 __author__
= 'Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>'
26 class TestPrometheus(asynctest
.TestCase
):
28 async def setUp(self
):
29 config
= {'uri': 'http:prometheus:9090',
30 'path': '/etc/prometheus'}
31 # Cleanup singleton Database instance
32 Database
.instance
= None
34 self
.db
= Mock(Database({
39 Database().instance
.db
= self
.db
40 self
.p
= Prometheus(config
, worker_id
='1', loop
=self
.loop
)
42 @asynctest.fail_on(active_handles
=True)
43 async def test_start(self
):
44 # test with database empty
45 self
.db
.get_one
.return_value
= False
46 self
.p
.update
= asynctest
.CoroutineMock()
48 self
.db
.create
.assert_called_once_with('admin', initial_prometheus_data
)
49 self
.p
.update
.assert_called_once_with()
51 # test with database not empty
52 self
.db
.create
.reset_mock()
53 self
.db
.get_one
.return_value
= initial_prometheus_data
54 self
.p
.update
.reset_mock()
56 self
.db
.create
.assert_not_called()
57 self
.p
.update
.assert_called_once_with()
59 @asynctest.fail_on(active_handles
=True)
60 async def test_update(self
):
61 self
.p
.PROMETHEUS_LOCKED_TIME
= 1
62 number_call_set_one
= 0
64 def _db_set_one(*args
, **kwargs
):
65 # simulated that database is not locked at first call
66 nonlocal number_call_set_one
68 number_call_set_one
+= 1
69 if number_call_set_one
== 1:
74 def _check_set_one_calls(set_one_calls
):
75 # check the three calls to database set_one
76 self
.assertEqual(len(set_one_calls
), 3, 'Not called three times to db.set_one, two blocks, one unblock')
77 self
.assertIn('admin', set_one_calls
[0][0], 'db.set_one collection should be admin')
78 first_used_time
= set_one_calls
[0][1]['update_dict']['_admin.locked_at']
79 second_used_time
= set_one_calls
[1][1]['update_dict']['_admin.locked_at']
80 third_used_time
= set_one_calls
[2][1]['update_dict']['_admin.locked_at']
81 self
.assertTrue(first_used_time
!= 0 and second_used_time
!= 0, 'blocking locked_at time must not be 0')
82 self
.assertGreater(second_used_time
, first_used_time
,
83 'Every blocking try must contain a new locked_at time')
84 self
.assertEqual(third_used_time
, 0, 'For unblocking must be set locked_at=0')
87 number_call_set_one
= 0
88 self
.db
.get_one
.return_value
= initial_prometheus_data
89 self
.db
.set_one
.side_effect
= _db_set_one
90 self
.p
.send_data
= asynctest
.CoroutineMock(return_value
=True)
91 add_jobs
= {'job1': {'job_name': 'job1', 'nsr_id': 'nsr_id'}}
92 await self
.p
.update(add_jobs
=add_jobs
)
93 set_one_calls
= self
.db
.set_one
.call_args_list
94 _check_set_one_calls(set_one_calls
)
95 update_dict
= set_one_calls
[2][1]['update_dict']
96 unset_dict
= set_one_calls
[2][1]['unset']
97 expected_final_set
= {
98 '_admin.locked_at': 0,
99 '_admin.locked_by': None,
100 '_admin.modified_at': set_one_calls
[1][1]['update_dict']['_admin.locked_at'],
101 'scrape_configs.job1': add_jobs
['job1']}
102 self
.assertEqual(update_dict
, expected_final_set
, 'invalid set and unlock values')
103 self
.assertEqual(unset_dict
, None, 'invalid unset and unlock values')
106 number_call_set_one
= 0
107 remove_jobs
= ['job1']
108 self
.db
.set_one
.reset_mock()
109 await self
.p
.update(remove_jobs
=remove_jobs
)
110 set_one_calls
= self
.db
.set_one
.call_args_list
111 _check_set_one_calls(set_one_calls
)
112 update_dict
= set_one_calls
[2][1]['update_dict']
113 unset_dict
= set_one_calls
[2][1]['unset']
114 expected_final_set
= {
115 '_admin.locked_at': 0,
116 '_admin.locked_by': None,
117 '_admin.modified_at': set_one_calls
[1][1]['update_dict']['_admin.locked_at']
119 self
.assertEqual(update_dict
, expected_final_set
, 'invalid set and unlock values')
120 self
.assertEqual(unset_dict
, {'scrape_configs.job1': None}, 'invalid unset and unlock values')
122 def test_parse_job(self
):
124 # yaml format with jinja2
125 key1: "parsing var1='{{ var1 }}'"
126 key2: "parsing var2='{{ var2 }}'"
128 vars = {'var1': 'VAR1', 'var2': 'VAR2', 'var3': 'VAR3'}
130 'key1': "parsing var1='VAR1'",
131 'key2': "parsing var2='VAR2'"
133 result
= self
.p
.parse_job(text_to_parse
, vars)
134 self
.assertEqual(result
, expected
, 'Error at jinja2 parse')
137 if __name__
== '__main__':