2 # Licensed under the Apache License, Version 2.0 (the "License"); you may
3 # not use this file except in compliance with the License. You may obtain
4 # a copy of the License at
6 # http://www.apache.org/licenses/LICENSE-2.0
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
10 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
11 # License for the specific language governing permissions and limitations
14 # For those usages not covered by the Apache License, Version 2.0 please
15 # contact: alfonso.tiernosepulveda@telefonica.com
19 from osm_lcm
.prometheus
import Prometheus
, initial_prometheus_data
20 from asynctest
.mock
import Mock
21 from osm_lcm
.data_utils
.database
.database
import Database
23 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
26 class TestPrometheus(asynctest
.TestCase
):
27 async def setUp(self
):
28 config
= {"uri": "http:prometheus:9090", "path": "/etc/prometheus"}
29 # Cleanup singleton Database instance
30 Database
.instance
= None
32 self
.db
= Mock(Database({"database": {"driver": "memory"}}).instance
.db
)
33 Database().instance
.db
= self
.db
34 self
.p
= Prometheus(config
, worker_id
="1", loop
=self
.loop
)
36 @asynctest.fail_on(active_handles
=True)
37 async def test_start(self
):
38 # test with database empty
39 self
.db
.get_one
.return_value
= False
40 self
.p
.update
= asynctest
.CoroutineMock()
42 self
.db
.create
.assert_called_once_with("admin", initial_prometheus_data
)
43 self
.p
.update
.assert_called_once_with()
45 # test with database not empty
46 self
.db
.create
.reset_mock()
47 self
.db
.get_one
.return_value
= initial_prometheus_data
48 self
.p
.update
.reset_mock()
50 self
.db
.create
.assert_not_called()
51 self
.p
.update
.assert_called_once_with()
53 @asynctest.fail_on(active_handles
=True)
54 async def test_update(self
):
55 self
.p
.PROMETHEUS_LOCKED_TIME
= 1
56 number_call_set_one
= 0
58 def _db_set_one(*args
, **kwargs
):
59 # simulated that database is not locked at first call
60 nonlocal number_call_set_one
62 number_call_set_one
+= 1
63 if number_call_set_one
== 1:
68 def _check_set_one_calls(set_one_calls
):
69 # check the three calls to database set_one
73 "Not called three times to db.set_one, two blocks, one unblock",
76 "admin", set_one_calls
[0][0], "db.set_one collection should be admin"
78 first_used_time
= set_one_calls
[0][1]["update_dict"]["_admin.locked_at"]
79 second_used_time
= set_one_calls
[1][1]["update_dict"]["_admin.locked_at"]
80 third_used_time
= set_one_calls
[2][1]["update_dict"]["_admin.locked_at"]
82 first_used_time
!= 0 and second_used_time
!= 0,
83 "blocking locked_at time must not be 0",
88 "Every blocking try must contain a new locked_at time",
91 third_used_time
, 0, "For unblocking must be set locked_at=0"
95 number_call_set_one
= 0
96 self
.db
.get_one
.return_value
= initial_prometheus_data
97 self
.db
.set_one
.side_effect
= _db_set_one
98 self
.p
.send_data
= asynctest
.CoroutineMock(return_value
=True)
99 add_jobs
= {"job1": {"job_name": "job1", "nsr_id": "nsr_id"}}
100 await self
.p
.update(add_jobs
=add_jobs
)
101 set_one_calls
= self
.db
.set_one
.call_args_list
102 _check_set_one_calls(set_one_calls
)
103 update_dict
= set_one_calls
[2][1]["update_dict"]
104 unset_dict
= set_one_calls
[2][1]["unset"]
105 expected_final_set
= {
106 "_admin.locked_at": 0,
107 "_admin.locked_by": None,
108 "_admin.modified_at": set_one_calls
[1][1]["update_dict"][
111 "scrape_configs.job1": add_jobs
["job1"],
114 update_dict
, expected_final_set
, "invalid set and unlock values"
116 self
.assertEqual(unset_dict
, None, "invalid unset and unlock values")
119 number_call_set_one
= 0
120 remove_jobs
= ["job1"]
121 self
.db
.set_one
.reset_mock()
122 await self
.p
.update(remove_jobs
=remove_jobs
)
123 set_one_calls
= self
.db
.set_one
.call_args_list
124 _check_set_one_calls(set_one_calls
)
125 update_dict
= set_one_calls
[2][1]["update_dict"]
126 unset_dict
= set_one_calls
[2][1]["unset"]
127 expected_final_set
= {
128 "_admin.locked_at": 0,
129 "_admin.locked_by": None,
130 "_admin.modified_at": set_one_calls
[1][1]["update_dict"][
135 update_dict
, expected_final_set
, "invalid set and unlock values"
138 unset_dict
, {"scrape_configs.job1": None}, "invalid unset and unlock values"
141 def test_parse_job(self
):
143 # yaml format with jinja2
144 key1: "parsing var1='{{ var1 }}'"
145 key2: "parsing var2='{{ var2 }}'"
147 vars = {"var1": "VAR1", "var2": "VAR2", "var3": "VAR3"}
148 expected
= {"key1": "parsing var1='VAR1'", "key2": "parsing var2='VAR2'"}
149 result
= self
.p
.parse_job(text_to_parse
, vars)
150 self
.assertEqual(result
, expected
, "Error at jinja2 parse")
153 if __name__
== "__main__":