import asynctest
from osm_lcm.prometheus import Prometheus, initial_prometheus_data
from asynctest.mock import Mock
-from osm_common.dbmemory import DbMemory
+from osm_lcm.data_utils.database.database import Database
-__author__ = 'Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>'
+__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
class TestPrometheus(asynctest.TestCase):
-
async def setUp(self):
- config = {'uri': 'http:prometheus:9090',
- 'path': '/etc/prometheus'}
- self.db = Mock(DbMemory())
- self.p = Prometheus(config, worker_id='1', db=self.db, loop=self.loop)
+ config = {"uri": "http:prometheus:9090", "path": "/etc/prometheus"}
+ # Cleanup singleton Database instance
+ Database.instance = None
+
+ self.db = Mock(Database({"database": {"driver": "memory"}}).instance.db)
+ Database().instance.db = self.db
+ self.p = Prometheus(config, worker_id="1", loop=self.loop)
@asynctest.fail_on(active_handles=True)
async def test_start(self):
self.db.get_one.return_value = False
self.p.update = asynctest.CoroutineMock()
await self.p.start()
- self.db.create.assert_called_once_with('admin', initial_prometheus_data)
+ self.db.create.assert_called_once_with("admin", initial_prometheus_data)
self.p.update.assert_called_once_with()
# test with database not empty
if number_call_set_one == 1:
return
else:
- return {'update': 1}
+ return {"update": 1}
def _check_set_one_calls(set_one_calls):
# check the three calls to database set_one
- self.assertEqual(len(set_one_calls), 3, 'Not called three times to db.set_one, two blocks, one unblock')
- self.assertIn('admin', set_one_calls[0][0], 'db.set_one collection should be admin')
- first_used_time = set_one_calls[0][1]['update_dict']['_admin.locked_at']
- second_used_time = set_one_calls[1][1]['update_dict']['_admin.locked_at']
- third_used_time = set_one_calls[2][1]['update_dict']['_admin.locked_at']
- self.assertTrue(first_used_time != 0 and second_used_time != 0, 'blocking locked_at time must not be 0')
- self.assertGreater(second_used_time, first_used_time,
- 'Every blocking try must contain a new locked_at time')
- self.assertEqual(third_used_time, 0, 'For unblocking must be set locked_at=0')
+ self.assertEqual(
+ len(set_one_calls),
+ 3,
+ "Not called three times to db.set_one, two blocks, one unblock",
+ )
+ self.assertIn(
+ "admin", set_one_calls[0][0], "db.set_one collection should be admin"
+ )
+ first_used_time = set_one_calls[0][1]["update_dict"]["_admin.locked_at"]
+ second_used_time = set_one_calls[1][1]["update_dict"]["_admin.locked_at"]
+ third_used_time = set_one_calls[2][1]["update_dict"]["_admin.locked_at"]
+ self.assertTrue(
+ first_used_time != 0 and second_used_time != 0,
+ "blocking locked_at time must not be 0",
+ )
+ self.assertGreater(
+ second_used_time,
+ first_used_time,
+ "Every blocking try must contain a new locked_at time",
+ )
+ self.assertEqual(
+ third_used_time, 0, "For unblocking must be set locked_at=0"
+ )
# check add_jobs
number_call_set_one = 0
self.db.get_one.return_value = initial_prometheus_data
self.db.set_one.side_effect = _db_set_one
self.p.send_data = asynctest.CoroutineMock(return_value=True)
- add_jobs = {'job1': {'job_name': 'job1', 'nsr_id': 'nsr_id'}}
+ add_jobs = {"job1": {"job_name": "job1", "nsr_id": "nsr_id"}}
await self.p.update(add_jobs=add_jobs)
set_one_calls = self.db.set_one.call_args_list
_check_set_one_calls(set_one_calls)
- update_dict = set_one_calls[2][1]['update_dict']
- unset_dict = set_one_calls[2][1]['unset']
+ update_dict = set_one_calls[2][1]["update_dict"]
+ unset_dict = set_one_calls[2][1]["unset"]
expected_final_set = {
- '_admin.locked_at': 0,
- '_admin.locked_by': None,
- '_admin.modified_at': set_one_calls[1][1]['update_dict']['_admin.locked_at'],
- 'scrape_configs.job1': add_jobs['job1']}
- self.assertEqual(update_dict, expected_final_set, 'invalid set and unlock values')
- self.assertEqual(unset_dict, None, 'invalid unset and unlock values')
+ "_admin.locked_at": 0,
+ "_admin.locked_by": None,
+ "_admin.modified_at": set_one_calls[1][1]["update_dict"][
+ "_admin.locked_at"
+ ],
+ "scrape_configs.job1": add_jobs["job1"],
+ }
+ self.assertEqual(
+ update_dict, expected_final_set, "invalid set and unlock values"
+ )
+ self.assertEqual(unset_dict, None, "invalid unset and unlock values")
# check remove_jobs
number_call_set_one = 0
- remove_jobs = ['job1']
+ remove_jobs = ["job1"]
self.db.set_one.reset_mock()
await self.p.update(remove_jobs=remove_jobs)
set_one_calls = self.db.set_one.call_args_list
_check_set_one_calls(set_one_calls)
- update_dict = set_one_calls[2][1]['update_dict']
- unset_dict = set_one_calls[2][1]['unset']
+ update_dict = set_one_calls[2][1]["update_dict"]
+ unset_dict = set_one_calls[2][1]["unset"]
expected_final_set = {
- '_admin.locked_at': 0,
- '_admin.locked_by': None,
- '_admin.modified_at': set_one_calls[1][1]['update_dict']['_admin.locked_at']
+ "_admin.locked_at": 0,
+ "_admin.locked_by": None,
+ "_admin.modified_at": set_one_calls[1][1]["update_dict"][
+ "_admin.locked_at"
+ ],
}
- self.assertEqual(update_dict, expected_final_set, 'invalid set and unlock values')
- self.assertEqual(unset_dict, {'scrape_configs.job1': None}, 'invalid unset and unlock values')
+ self.assertEqual(
+ update_dict, expected_final_set, "invalid set and unlock values"
+ )
+ self.assertEqual(
+ unset_dict, {"scrape_configs.job1": None}, "invalid unset and unlock values"
+ )
def test_parse_job(self):
text_to_parse = """
key1: "parsing var1='{{ var1 }}'"
key2: "parsing var2='{{ var2 }}'"
"""
- vars = {'var1': 'VAR1', 'var2': 'VAR2', 'var3': 'VAR3'}
- expected = {
- 'key1': "parsing var1='VAR1'",
- 'key2': "parsing var2='VAR2'"
- }
+ vars = {"var1": "VAR1", "var2": "VAR2", "var3": "VAR3"}
+ expected = {"key1": "parsing var1='VAR1'", "key2": "parsing var2='VAR2'"}
result = self.p.parse_job(text_to_parse, vars)
- self.assertEqual(result, expected, 'Error at jinja2 parse')
+ self.assertEqual(result, expected, "Error at jinja2 parse")
-if __name__ == '__main__':
+if __name__ == "__main__":
asynctest.main()