blob: 2eb81f5c2d9066ea10ee8ae8e19ff7e4195ef748 [file] [log] [blame]
##
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# For those usages not covered by the Apache License, Version 2.0 please
# contact: alfonso.tiernosepulveda@telefonica.com
##
import asynctest
from osm_lcm.prometheus import Prometheus, initial_prometheus_data
from asynctest.mock import Mock
from osm_lcm.data_utils.database.database import Database
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
class TestPrometheus(asynctest.TestCase):
async def setUp(self):
config = {"uri": "http:prometheus:9090", "path": "/etc/prometheus"}
# Cleanup singleton Database instance
Database.instance = None
self.db = Mock(Database({"database": {"driver": "memory"}}).instance.db)
Database().instance.db = self.db
self.p = Prometheus(config, worker_id="1", loop=self.loop)
@asynctest.fail_on(active_handles=True)
async def test_start(self):
# test with database empty
self.db.get_one.return_value = False
self.p.update = asynctest.CoroutineMock()
await self.p.start()
self.db.create.assert_called_once_with("admin", initial_prometheus_data)
self.p.update.assert_called_once_with()
# test with database not empty
self.db.create.reset_mock()
self.db.get_one.return_value = initial_prometheus_data
self.p.update.reset_mock()
await self.p.start()
self.db.create.assert_not_called()
self.p.update.assert_called_once_with()
@asynctest.fail_on(active_handles=True)
async def test_update(self):
self.p.PROMETHEUS_LOCKED_TIME = 1
number_call_set_one = 0
def _db_set_one(*args, **kwargs):
# simulated that database is not locked at first call
nonlocal number_call_set_one
number_call_set_one += 1
if number_call_set_one == 1:
return
else:
return {"update": 1}
def _check_set_one_calls(set_one_calls):
# check the three calls to database set_one
self.assertEqual(
len(set_one_calls),
3,
"Not called three times to db.set_one, two blocks, one unblock",
)
self.assertIn(
"admin", set_one_calls[0][0], "db.set_one collection should be admin"
)
first_used_time = set_one_calls[0][1]["update_dict"]["_admin.locked_at"]
second_used_time = set_one_calls[1][1]["update_dict"]["_admin.locked_at"]
third_used_time = set_one_calls[2][1]["update_dict"]["_admin.locked_at"]
self.assertTrue(
first_used_time != 0 and second_used_time != 0,
"blocking locked_at time must not be 0",
)
self.assertGreater(
second_used_time,
first_used_time,
"Every blocking try must contain a new locked_at time",
)
self.assertEqual(
third_used_time, 0, "For unblocking must be set locked_at=0"
)
# check add_jobs
number_call_set_one = 0
self.db.get_one.return_value = initial_prometheus_data
self.db.set_one.side_effect = _db_set_one
self.p.send_data = asynctest.CoroutineMock(return_value=True)
add_jobs = {"job1": {"job_name": "job1", "nsr_id": "nsr_id"}}
await self.p.update(add_jobs=add_jobs)
set_one_calls = self.db.set_one.call_args_list
_check_set_one_calls(set_one_calls)
update_dict = set_one_calls[2][1]["update_dict"]
unset_dict = set_one_calls[2][1]["unset"]
expected_final_set = {
"_admin.locked_at": 0,
"_admin.locked_by": None,
"_admin.modified_at": set_one_calls[1][1]["update_dict"][
"_admin.locked_at"
],
"scrape_configs.job1": add_jobs["job1"],
}
self.assertEqual(
update_dict, expected_final_set, "invalid set and unlock values"
)
self.assertEqual(unset_dict, None, "invalid unset and unlock values")
# check remove_jobs
number_call_set_one = 0
remove_jobs = ["job1"]
self.db.set_one.reset_mock()
await self.p.update(remove_jobs=remove_jobs)
set_one_calls = self.db.set_one.call_args_list
_check_set_one_calls(set_one_calls)
update_dict = set_one_calls[2][1]["update_dict"]
unset_dict = set_one_calls[2][1]["unset"]
expected_final_set = {
"_admin.locked_at": 0,
"_admin.locked_by": None,
"_admin.modified_at": set_one_calls[1][1]["update_dict"][
"_admin.locked_at"
],
}
self.assertEqual(
update_dict, expected_final_set, "invalid set and unlock values"
)
self.assertEqual(
unset_dict, {"scrape_configs.job1": None}, "invalid unset and unlock values"
)
def test_parse_job(self):
text_to_parse = """
# yaml format with jinja2
key1: "parsing var1='{{ var1 }}'"
key2: "parsing var2='{{ var2 }}'"
"""
vars = {"var1": "VAR1", "var2": "VAR2", "var3": "VAR3"}
expected = {"key1": "parsing var1='VAR1'", "key2": "parsing var2='VAR2'"}
result = self.p.parse_job(text_to_parse, vars)
self.assertEqual(result, expected, "Error at jinja2 parse")
if __name__ == "__main__":
asynctest.main()