# loglevel: DEBUG
# logfile: /var/log/osm/lcm-message.log
group_id: lcm-server
+
+prometheus:
+ driver: prometheus
+ # local file to store the configuration
+ path: /etc/prometheus
+ uri: http://prometheus:9090/
+ # loglevel: DEBUG
+ # logfile: /var/log/osm/lcm-message.log
import getopt
import sys
-from osm_lcm import ns
-from osm_lcm import vim_sdn
-from osm_lcm import netslice
+from osm_lcm import ns, prometheus, vim_sdn, netslice
from osm_lcm.ng_ro import NgRoException, NgRoClient
from osm_lcm.ROclient import ROClient, ROClientException
# contains created tasks/futures to be able to cancel
self.lcm_tasks = TaskRegistry(self.worker_id, self.db, self.logger)
- self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
- self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
+ if self.config.get("prometheus"):
+ self.prometheus = prometheus.Prometheus(self.config["prometheus"], self.worker_id, self.db, self.loop)
+ else:
+ self.prometheus = None
+ self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop, self.prometheus)
+ self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop,
+ self.ns)
self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.config, self.loop)
# check RO version
self.loop.run_until_complete(self.check_RO_version())
+ # configure Prometheus
+ if self.prometheus:
+ self.loop.run_until_complete(self.prometheus.start())
+
self.loop.run_until_complete(asyncio.gather(
self.kafka_read(),
self.kafka_ping()
reuse_ee_id: str = None,
progress_timeout: float = None,
total_timeout: float = None,
+ config: dict = None,
artifact_path: str = None,
vca_type: str = None) -> (str, dict):
"""
:param str reuse_ee_id: ee id from an older execution. TODO - right now this params is not used
:param float progress_timeout:
:param float total_timeout:
- :param str artifact_path path of package content
- :param str vca_type Type of vca, not used as assumed of type helm
+ :param dict config: General variables to instantiate KDU
+ :param str artifact_path: path of package content
+ :param str vca_type: Type of vca, not used as assumed of type helm
:returns str, dict: id of the new execution environment including namespace.helm_id
and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
"""
# Call helm conn install
# Obtain system cluster id from database
system_cluster_uuid = self._get_system_cluster_id()
+ # Add parameter osm if exist to global
+ if config and config.get("osm"):
+ if not config.get("global"):
+ config["global"] = {}
+ config["global"]["osm"] = config.get("osm")
self.log.debug("install helm chart: {}".format(full_path))
helm_id = await self._k8sclusterhelm.install(system_cluster_uuid, kdu_model=full_path,
namespace=self._KUBECTL_OSM_NAMESPACE,
+ params=config,
db_dict=db_dict,
timeout=progress_timeout)
import logging
import logging.handlers
import traceback
-from osm_lcm import ROclient, ns
+from osm_lcm import ROclient
from osm_lcm.lcm_utils import LcmException, LcmBase, populate_dict, get_iterable, deep_get
from osm_common.dbbase import DbException
from time import time
timeout_nsi_deploy = 2 * 3600 # default global timeout for deployment a nsi
- def __init__(self, db, msg, fs, lcm_tasks, config, loop):
+ def __init__(self, db, msg, fs, lcm_tasks, config, loop, ns):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
self.logger = logging.getLogger('lcm.netslice')
self.loop = loop
self.lcm_tasks = lcm_tasks
- self.ns = ns.NsLcm(db, msg, fs, lcm_tasks, config, loop)
+ self.ns = ns
self.ro_config = config["ro_config"]
self.timeout = config["timeout"]
from time import time
from uuid import uuid4
from functools import partial
+from random import randint
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
async def create_execution_environment(self, namespace: str, db_dict: dict, reuse_ee_id: str = None,
progress_timeout: float = None, total_timeout: float = None,
- artifact_path: str = None, vca_type: str = None) -> (str, dict):
+ config: dict = None, artifact_path: str = None,
+ vca_type: str = None) -> (str, dict):
# admit two new parameters, artifact_path and vca_type
if vca_type == "k8s_proxy_charm":
ee_id = await self.n2vc.install_k8s_proxy_charm(
SUBOPERATION_STATUS_SKIP = -3
task_name_deploy_vca = "Deploying VCA"
- def __init__(self, db, msg, fs, lcm_tasks, config, loop):
+ def __init__(self, db, msg, fs, lcm_tasks, config, loop, prometheus=None):
"""
Init, Connect to database, filesystem storage, and messaging
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
"helm": self.conn_helm_ee
}
+ self.prometheus = prometheus
+
# create RO client
if self.ng_ro:
self.RO = NgRoClient(self.loop, **self.ro_config)
raise LcmException("Configuration aborted because dependent charm/s timeout")
async def instantiate_N2VC(self, logging_text, vca_index, nsi_id, db_nsr, db_vnfr, vdu_id, kdu_name, vdu_index,
- config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name):
+ config_descriptor, deploy_params, base_folder, nslcmop_id, stage, vca_type, vca_name,
+ ee_config_descriptor):
nsr_id = db_nsr["_id"]
db_update_entry = "_admin.deployed.VCA.{}.".format(vca_index)
vca_deployed_list = db_nsr["_admin"]["deployed"]["VCA"]
vca_deployed = db_nsr["_admin"]["deployed"]["VCA"][vca_index]
+ osm_config = {"osm": {"ns_id": db_nsr["_id"]}}
db_dict = {
'collection': 'nsrs',
'filter': {'_id': nsr_id},
vnfr_id = None
if db_vnfr:
vnfr_id = db_vnfr["_id"]
+ osm_config["osm"]["vnf_id"] = vnfr_id
namespace = "{nsi}.{ns}".format(
nsi=nsi_id if nsi_id else "",
namespace += ".{}-{}".format(vdu_id, vdu_index or 0)
element_type = 'VDU'
element_under_configuration = "{}-{}".format(vdu_id, vdu_index or 0)
+ osm_config["osm"]["vdu_id"] = vdu_id
elif kdu_name:
namespace += ".{}".format(kdu_name)
element_type = 'KDU'
element_under_configuration = kdu_name
+ osm_config["osm"]["kdu_name"] = kdu_name
# Get artifact path
artifact_path = "{}/{}/{}/{}".format(
namespace=namespace,
reuse_ee_id=ee_id,
db_dict=db_dict,
+ config=osm_config,
artifact_path=artifact_path,
vca_type=vca_type)
# TODO register in database that primitive is done
+ # STEP 7 Configure metrics
+ if vca_type == "helm":
+ prometheus_jobs = await self.add_prometheus_metrics(
+ ee_id=ee_id,
+ artifact_path=artifact_path,
+ ee_config_descriptor=ee_config_descriptor,
+ vnfr_id=vnfr_id,
+ nsr_id=nsr_id,
+ target_ip=rw_mgmt_ip,
+ )
+ if prometheus_jobs:
+ self.update_db_2("nsrs", nsr_id, {db_update_entry + "prometheus_jobs": prometheus_jobs})
+
step = "instantiated at VCA"
self.logger.debug(logging_text + step)
nslcmop_id=nslcmop_id,
stage=stage,
vca_type=vca_type,
- vca_name=vca_name
+ vca_name=vca_name,
+ ee_config_descriptor=ee_item
)
)
self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "instantiate_N2VC-{}".format(vca_index), task_n2vc)
db_update_entry = "_admin.deployed.VCA.{}.needed_terminate".format(vca_index)
self.update_db_2("nsrs", db_nslcmop["nsInstanceId"], {db_update_entry: False})
+ if vca_deployed.get("prometheus_jobs") and self.prometheus:
+ await self.prometheus.update(remove_jobs=vca_deployed["prometheus_jobs"])
+
if destroy_ee:
await self.vca_map[vca_type].delete_execution_environment(vca_deployed["ee_id"])
self.logger.debug(logging_text + stage[0])
stage[1] = "Looking execution environment that needs terminate."
self.logger.debug(logging_text + stage[1])
- self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
+ # self.logger.debug("nsr_deployed: {}".format(nsr_deployed))
for vca_index, vca in enumerate(get_iterable(nsr_deployed, "VCA")):
self.logger.debug("vca_index: {}, vca: {}".format(vca_index, vca))
config_descriptor = None
config_descriptor = kdud.get("kdu-configuration")
else:
config_descriptor = db_vnfds_from_member_index[vca["member-vnf-index"]].get("vnf-configuration")
- # For helm we must destroy_ee
vca_type = vca.get("type")
exec_terminate_primitives = (not operation_params.get("skip_terminate_primitives") and
vca.get("needed_terminate"))
- self.logger.debug("vca type: {}".format(vca_type))
- if not vca_type == "helm":
- task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
- vca_index, False, exec_terminate_primitives))
- else:
- task = asyncio.ensure_future(self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor,
- vca_index, True, exec_terminate_primitives))
+ # For helm we must destroy_ee
+ destroy_ee = "True" if vca_type == "helm" else "False"
+ task = asyncio.ensure_future(
+ self.destroy_N2VC(logging_text, db_nslcmop, vca, config_descriptor, vca_index,
+ destroy_ee, exec_terminate_primitives))
tasks_dict_info[task] = "Terminating VCA {}".format(vca.get("ee_id"))
# wait for pending tasks of terminate primitives
self.logger.error(logging_text + "kafka_write notification Exception {}".format(e))
self.logger.debug(logging_text + "Exit")
self.lcm_tasks.remove("ns", nsr_id, nslcmop_id, "ns_scale")
+
+ async def add_prometheus_metrics(self, ee_id, artifact_path, ee_config_descriptor, vnfr_id, nsr_id, target_ip):
+ if not self.prometheus:
+ return
+ # look if exist a file called 'prometheus*.j2' and
+ artifact_content = self.fs.dir_ls(artifact_path)
+ job_file = next((f for f in artifact_content if f.startswith("prometheus") and f.endswith(".j2")), None)
+ if not job_file:
+ return
+ with self.fs.file_open((artifact_path, job_file), "r") as f:
+ job_data = f.read()
+
+ # TODO get_service
+ _, _, service = ee_id.partition(".") # remove prefix "namespace."
+ host_name = "{}-{}".format(service, ee_config_descriptor["metric-service"])
+ host_port = "80"
+ vnfr_id = vnfr_id.replace("-", "")
+ variables = {
+ "JOB_NAME": vnfr_id,
+ "TARGET_IP": target_ip,
+ "EXPORTER_POD_IP": host_name,
+ "EXPORTER_POD_PORT": host_port,
+ }
+ job_list = self.prometheus.parse_job(job_data, variables)
+ # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
+ for job in job_list:
+ if not isinstance(job.get("job_name"), str) or vnfr_id not in job["job_name"]:
+ job["job_name"] = vnfr_id + "_" + str(randint(1, 10000))
+ job["nsr_id"] = nsr_id
+ job_dict = {jl["job_name"]: jl for jl in job_list}
+ if await self.prometheus.update(job_dict):
+ return list(job_dict.keys())
import os
from osm_lcm.lcm_utils import LcmException
from osm_common.dbbase import DbException
-
+from jinja2 import Template, TemplateError, TemplateNotFound, TemplateSyntaxError
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
"created": 1593445184,
"version": "1.0" # to allow future version updates
},
- 'scrape_configs': [{'static_configs': [{'targets': ['mon:8000']}], 'job_name': 'mon_exporter'}],
+ 'scrape_configs': { # Dictionary at database. Converted to list before sending to prometheus
+ 'mon_exporter': {'static_configs': [{'targets': ['mon:8000']}], 'job_name': 'mon_exporter'},
+ },
'global': {'evaluation_interval': '15s', 'scrape_interval': '15s'},
'rule_files': None,
'alerting': {'alertmanagers': [{'static_configs': [{'targets': None}]}]}
}
-class Prometheus():
+class Prometheus:
"""
Implements a class to update Prometheus
"""
self.worker_id = worker_id
self.db = db
self.loop = loop
- self.logger = logger or logging.get_legger("lcm.prometheus")
+ self.logger = logger or logging.getLogger("lcm.prometheus")
self.server = config["uri"]
self.path = config["path"]
if not self.path.endswith("/"):
self.cfg_file = self.path + "prometheus.yml"
self.cfg_file_backup = self.path + "prometheus.yml-backup"
+ @staticmethod
+ def parse_job(job_data: str, variables: dict) -> dict:
+ try:
+ template = Template(job_data)
+ job_parsed = template.render(variables or {})
+ return yaml.safe_load(job_parsed)
+ except (TemplateError, TemplateNotFound, TemplateSyntaxError) as e:
+ # TODO yaml exceptions
+ raise LcmException("Error parsing Jinja2 to prometheus job. job_data={}, variables={}. Error={}".format(
+ job_data, variables, e))
+
async def start(self):
for retry in range(4):
try:
+ # self.logger("Starting prometheus ")
# read from database
- prometheus_data = self.db.get_one("admin", {"_id": "prometheus"}, fail_on_empty=True)
+ prometheus_data = self.db.get_one("admin", {"_id": "prometheus"}, fail_on_empty=False)
if not prometheus_data:
self.logger.info("Init db.admin.prometheus content")
self.db.create("admin", initial_prometheus_data)
# send database config file to prometheus. Ignore loading errors, as prometheus may be starting
# but at least an initial configuration file is set
await self.update()
+ return
except DbException as e:
if retry == 3:
raise LcmException("Max retries trying to init prometheus configuration: {}".format(e))
await asyncio.sleep(5, loop=self.loop)
- async def update(self, add_jobs=None, remove_jobs=None):
+ async def update(self, add_jobs: dict = None, remove_jobs: list = None) -> bool:
+ """
+
+ :param add_jobs: dictionary with {job_id_1: job_content, job_id_2: job_content}
+ :param remove_jobs: list with jobs to remove [job_id_1, job_id_2]
+ :return: result. If false prometheus denies this configuration. Exception on error
+ """
for retry in range(4):
result = True
if retry: # first time do not wait
await asyncio.sleep(self.PROMETHEUS_LOCKED_TIME / 2, loop=self.loop)
+
# lock database
now = time()
if not self.db.set_one(
continue
# read database
prometheus_data = self.db.get_one("admin", {"_id": "prometheus"})
+ update_dict = {"_admin.locked_at": 0,
+ "_admin.locked_by": None}
# Make changes from prometheus_incremental
- push_list = pull_list = None
+ push_dict = pull_dict = None
if add_jobs or remove_jobs:
- update_dict = {"_admin.locked_at": 0,
- "_admin.locked_by": None,
- "_admin.modified_at": now}
+ log_text_list = []
if add_jobs:
- push_list = {"scrape_configs.static_configs": add_jobs}
- prometheus_data["scrape_configs"]["static_configs"] += add_jobs
+ log_text_list.append("adding jobs: {}".format(list(add_jobs.keys())))
+ prometheus_data["scrape_configs"].update(add_jobs)
+ push_dict = {"scrape_configs." + job_id: job_data for job_id, job_data in add_jobs.items()}
elif remove_jobs:
- pass # TODO
- if not self.send_data(prometheus_data):
- push_list = pull_list = None
- result = False
+ log_text_list.append("removing jobs: {}".format(list(remove_jobs)))
+ for job_id in remove_jobs:
+ prometheus_data["scrape_configs"].pop(job_id, None)
+ pull_dict = {"scrape_configs." + job_id: None for job_id in remove_jobs}
+ self.logger.debug(". ".join(log_text_list))
+
+ if not await self.send_data(prometheus_data):
+ push_dict = pull_dict = None
+ result = False
# unblock database
+ if push_dict:
+ update_dict.update(push_dict)
+ if push_dict or pull_dict:
+ update_dict["_admin.modified_at"] = now
if not self.db.set_one(
"admin", {"_id": "prometheus", "_admin.locked_at": now, "_admin.locked_by": self.worker_id},
- update_dict=update_dict, pull_list=pull_list, push_list=push_list, fail_on_empty=False):
+ update_dict=update_dict, unset=pull_dict, fail_on_empty=False):
continue
return result
raise LcmException("Cannot update prometheus database. Reached max retries")
async def send_data(self, new_config):
restore_backup = False
+ del new_config["_id"]
+ del new_config["_admin"]
+ new_scrape_configs = []
+
+ # generate a list with the values of scrape_configs
+ for scrape_config in new_config["scrape_configs"].values():
+ scrape_config = scrape_config.copy()
+ # remove nsr_id metadata from scrape_configs
+ scrape_config.pop("nsr_id", None)
+ new_scrape_configs.append(scrape_config)
+ new_config["scrape_configs"] = new_scrape_configs
+
try:
if os.path.exists(self.cfg_file):
os.rename(self.cfg_file, self.cfg_file_backup)
restore_backup = True
with open(self.cfg_file, "w+") as f:
- yaml.dump(new_config, f)
+ yaml.safe_dump(new_config, f, indent=4, default_flow_style=False)
+ # self.logger.debug("new configuration: {}".format(yaml.safe_dump(new_config, indent=4,
+ # default_flow_style=False)))
async with aiohttp.ClientSession() as session:
- async with session.post(self.server + "/-/reload") as resp:
+ async with session.post(self.server + "-/reload") as resp:
if resp.status > 204:
- raise LcmException(resp.text)
+ raise LcmException(await resp.text())
await asyncio.sleep(5, loop=self.loop)
- async with session.get(self.server + "/api/v1/status/config") as resp:
+ # If prometheus does not admit this configuration, remains with the old one
+ # Then, to check if the configuration has been accepted, get the configuration from prometheus
+ # and compares with the inserted one
+ async with session.get(self.server + "api/v1/status/config") as resp:
if resp.status > 204:
- raise LcmException(resp.text)
- current_config = resp.json()
+ raise LcmException(await resp.text())
+ current_config = await resp.json()
if not self._check_configuration_equal(current_config, new_config):
return False
else:
restore_backup = False
return True
except Exception as e:
- self.logger.error("Error updating prometheus configuration {}".format(e))
+ self.logger.error("Error updating prometheus configuration url={}: {}".format(self.server, e))
return False
finally:
if restore_backup:
- os.rename(self.cfg_file_backup, self.cfg_file)
+ try:
+ os.rename(self.cfg_file_backup, self.cfg_file)
+ except Exception as e:
+ self.logger.critical("Exception while rolling back: {}".format(e))
- @staticmethod
- def _check_configuration_equal(current_config, new_config):
- # TODO compare and return True if equal
- return True
+ def _check_configuration_equal(self, current_config, expected_config):
+ try:
+ # self.logger.debug("Comparing current_config='{}' with expected_config='{}'".format(current_config,
+ # expected_config))
+ current_config_yaml = yaml.safe_load(current_config['data']['yaml'])
+ current_jobs = [j["job_name"] for j in current_config_yaml["scrape_configs"]]
+ expected_jobs = [j["job_name"] for j in expected_config["scrape_configs"]]
+ return current_jobs == expected_jobs
+ except Exception as e:
+ self.logger.error("Invalid obtained prometheus status. Error: '{}'. Obtained data: '{}'".format(
+ e, current_config))
+ # if format is not understood, cannot be compared, assume it is ok
+ return True
--- /dev/null
+##
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: alfonso.tiernosepulveda@telefonica.com
+##
+
+import asynctest
+from osm_lcm.prometheus import Prometheus, initial_prometheus_data
+from asynctest.mock import Mock
+from osm_common.dbmemory import DbMemory
+
+__author__ = 'Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>'
+
+
+class TestPrometheus(asynctest.TestCase):
+
+ async def setUp(self):
+ config = {'uri': 'http:prometheus:9090',
+ 'path': '/etc/prometheus'}
+ self.db = Mock(DbMemory())
+ self.p = Prometheus(config, worker_id='1', db=self.db, loop=self.loop)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_start(self):
+ # test with database empty
+ self.db.get_one.return_value = False
+ self.p.update = asynctest.CoroutineMock()
+ await self.p.start()
+ self.db.create.assert_called_once_with('admin', initial_prometheus_data)
+ self.p.update.assert_called_once_with()
+
+ # test with database not empty
+ self.db.create.reset_mock()
+ self.db.get_one.return_value = initial_prometheus_data
+ self.p.update.reset_mock()
+ await self.p.start()
+ self.db.create.assert_not_called()
+ self.p.update.assert_called_once_with()
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_update(self):
+ self.p.PROMETHEUS_LOCKED_TIME = 1
+ number_call_set_one = 0
+
+ def _db_set_one(*args, **kwargs):
+ # simulated that database is not locked at first call
+ nonlocal number_call_set_one
+
+ number_call_set_one += 1
+ if number_call_set_one == 1:
+ return
+ else:
+ return {'update': 1}
+
+ def _check_set_one_calls(set_one_calls):
+ # check the three calls to database set_one
+ self.assertEqual(len(set_one_calls), 3, 'Not called three times to db.set_one, two blocks, one unblock')
+ self.assertIn('admin', set_one_calls[0][0], 'db.set_one collection should be admin')
+ first_used_time = set_one_calls[0][1]['update_dict']['_admin.locked_at']
+ second_used_time = set_one_calls[1][1]['update_dict']['_admin.locked_at']
+ third_used_time = set_one_calls[2][1]['update_dict']['_admin.locked_at']
+ self.assertTrue(first_used_time != 0 and second_used_time != 0, 'blocking locked_at time must not be 0')
+ self.assertGreater(second_used_time, first_used_time,
+ 'Every blocking try must contain a new locked_at time')
+ self.assertEqual(third_used_time, 0, 'For unblocking must be set locked_at=0')
+
+ # check add_jobs
+ number_call_set_one = 0
+ self.db.get_one.return_value = initial_prometheus_data
+ self.db.set_one.side_effect = _db_set_one
+ self.p.send_data = asynctest.CoroutineMock(return_value=True)
+ add_jobs = {'job1': {'job_name': 'job1', 'nsr_id': 'nsr_id'}}
+ await self.p.update(add_jobs=add_jobs)
+ set_one_calls = self.db.set_one.call_args_list
+ _check_set_one_calls(set_one_calls)
+ update_dict = set_one_calls[2][1]['update_dict']
+ unset_dict = set_one_calls[2][1]['unset']
+ expected_final_set = {
+ '_admin.locked_at': 0,
+ '_admin.locked_by': None,
+ '_admin.modified_at': set_one_calls[1][1]['update_dict']['_admin.locked_at'],
+ 'scrape_configs.job1': add_jobs['job1']}
+ self.assertEqual(update_dict, expected_final_set, 'invalid set and unlock values')
+ self.assertEqual(unset_dict, None, 'invalid unset and unlock values')
+
+ # check remove_jobs
+ number_call_set_one = 0
+ remove_jobs = ['job1']
+ self.db.set_one.reset_mock()
+ await self.p.update(remove_jobs=remove_jobs)
+ set_one_calls = self.db.set_one.call_args_list
+ _check_set_one_calls(set_one_calls)
+ update_dict = set_one_calls[2][1]['update_dict']
+ unset_dict = set_one_calls[2][1]['unset']
+ expected_final_set = {
+ '_admin.locked_at': 0,
+ '_admin.locked_by': None,
+ '_admin.modified_at': set_one_calls[1][1]['update_dict']['_admin.locked_at']
+ }
+ self.assertEqual(update_dict, expected_final_set, 'invalid set and unlock values')
+ self.assertEqual(unset_dict, {'scrape_configs.job1': None}, 'invalid unset and unlock values')
+
+ def test_parse_job(self):
+ text_to_parse = """
+ # yaml format with jinja2
+ key1: "parsing var1='{{ var1 }}'"
+ key2: "parsing var2='{{ var2 }}'"
+ """
+ vars = {'var1': 'VAR1', 'var2': 'VAR2', 'var3': 'VAR3'}
+ expected = {
+ 'key1': "parsing var1='VAR1'",
+ 'key2': "parsing var2='VAR2'"
+ }
+ result = self.p.parse_job(text_to_parse, vars)
+ self.assertEqual(result, expected, 'Error at jinja2 parse')
+
+
+if __name__ == '__main__':
+ asynctest.main()
[testenv:unittest]
basepython = python3
deps = asynctest
-commands = python3 -m unittest osm_lcm.tests.test_ns
+commands = python3 -m unittest osm_lcm.tests.test_ns osm_lcm.tests.test_prometheus
[testenv:build]
basepython = python3