Reformat LCM to standardized format
[osm/LCM.git] / osm_lcm / tests / test_prometheus.py
1 ##
2 # Licensed under the Apache License, Version 2.0 (the "License"); you may
3 # not use this file except in compliance with the License. You may obtain
4 # a copy of the License at
5 #
6 # http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
10 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
11 # License for the specific language governing permissions and limitations
12 # under the License.
13 #
14 # For those usages not covered by the Apache License, Version 2.0 please
15 # contact: alfonso.tiernosepulveda@telefonica.com
16 ##
17
18 import asynctest
19 from osm_lcm.prometheus import Prometheus, initial_prometheus_data
20 from asynctest.mock import Mock
21 from osm_lcm.data_utils.database.database import Database
22
23 __author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
24
25
26 class TestPrometheus(asynctest.TestCase):
27 async def setUp(self):
28 config = {"uri": "http:prometheus:9090", "path": "/etc/prometheus"}
29 # Cleanup singleton Database instance
30 Database.instance = None
31
32 self.db = Mock(Database({"database": {"driver": "memory"}}).instance.db)
33 Database().instance.db = self.db
34 self.p = Prometheus(config, worker_id="1", loop=self.loop)
35
36 @asynctest.fail_on(active_handles=True)
37 async def test_start(self):
38 # test with database empty
39 self.db.get_one.return_value = False
40 self.p.update = asynctest.CoroutineMock()
41 await self.p.start()
42 self.db.create.assert_called_once_with("admin", initial_prometheus_data)
43 self.p.update.assert_called_once_with()
44
45 # test with database not empty
46 self.db.create.reset_mock()
47 self.db.get_one.return_value = initial_prometheus_data
48 self.p.update.reset_mock()
49 await self.p.start()
50 self.db.create.assert_not_called()
51 self.p.update.assert_called_once_with()
52
53 @asynctest.fail_on(active_handles=True)
54 async def test_update(self):
55 self.p.PROMETHEUS_LOCKED_TIME = 1
56 number_call_set_one = 0
57
58 def _db_set_one(*args, **kwargs):
59 # simulated that database is not locked at first call
60 nonlocal number_call_set_one
61
62 number_call_set_one += 1
63 if number_call_set_one == 1:
64 return
65 else:
66 return {"update": 1}
67
68 def _check_set_one_calls(set_one_calls):
69 # check the three calls to database set_one
70 self.assertEqual(
71 len(set_one_calls),
72 3,
73 "Not called three times to db.set_one, two blocks, one unblock",
74 )
75 self.assertIn(
76 "admin", set_one_calls[0][0], "db.set_one collection should be admin"
77 )
78 first_used_time = set_one_calls[0][1]["update_dict"]["_admin.locked_at"]
79 second_used_time = set_one_calls[1][1]["update_dict"]["_admin.locked_at"]
80 third_used_time = set_one_calls[2][1]["update_dict"]["_admin.locked_at"]
81 self.assertTrue(
82 first_used_time != 0 and second_used_time != 0,
83 "blocking locked_at time must not be 0",
84 )
85 self.assertGreater(
86 second_used_time,
87 first_used_time,
88 "Every blocking try must contain a new locked_at time",
89 )
90 self.assertEqual(
91 third_used_time, 0, "For unblocking must be set locked_at=0"
92 )
93
94 # check add_jobs
95 number_call_set_one = 0
96 self.db.get_one.return_value = initial_prometheus_data
97 self.db.set_one.side_effect = _db_set_one
98 self.p.send_data = asynctest.CoroutineMock(return_value=True)
99 add_jobs = {"job1": {"job_name": "job1", "nsr_id": "nsr_id"}}
100 await self.p.update(add_jobs=add_jobs)
101 set_one_calls = self.db.set_one.call_args_list
102 _check_set_one_calls(set_one_calls)
103 update_dict = set_one_calls[2][1]["update_dict"]
104 unset_dict = set_one_calls[2][1]["unset"]
105 expected_final_set = {
106 "_admin.locked_at": 0,
107 "_admin.locked_by": None,
108 "_admin.modified_at": set_one_calls[1][1]["update_dict"][
109 "_admin.locked_at"
110 ],
111 "scrape_configs.job1": add_jobs["job1"],
112 }
113 self.assertEqual(
114 update_dict, expected_final_set, "invalid set and unlock values"
115 )
116 self.assertEqual(unset_dict, None, "invalid unset and unlock values")
117
118 # check remove_jobs
119 number_call_set_one = 0
120 remove_jobs = ["job1"]
121 self.db.set_one.reset_mock()
122 await self.p.update(remove_jobs=remove_jobs)
123 set_one_calls = self.db.set_one.call_args_list
124 _check_set_one_calls(set_one_calls)
125 update_dict = set_one_calls[2][1]["update_dict"]
126 unset_dict = set_one_calls[2][1]["unset"]
127 expected_final_set = {
128 "_admin.locked_at": 0,
129 "_admin.locked_by": None,
130 "_admin.modified_at": set_one_calls[1][1]["update_dict"][
131 "_admin.locked_at"
132 ],
133 }
134 self.assertEqual(
135 update_dict, expected_final_set, "invalid set and unlock values"
136 )
137 self.assertEqual(
138 unset_dict, {"scrape_configs.job1": None}, "invalid unset and unlock values"
139 )
140
141 def test_parse_job(self):
142 text_to_parse = """
143 # yaml format with jinja2
144 key1: "parsing var1='{{ var1 }}'"
145 key2: "parsing var2='{{ var2 }}'"
146 """
147 vars = {"var1": "VAR1", "var2": "VAR2", "var3": "VAR3"}
148 expected = {"key1": "parsing var1='VAR1'", "key2": "parsing var2='VAR2'"}
149 result = self.p.parse_job(text_to_parse, vars)
150 self.assertEqual(result, expected, "Error at jinja2 parse")
151
152
153 if __name__ == "__main__":
154 asynctest.main()