import os
from typing import Iterable
-from peewee import CharField, FloatField, Model, AutoField, Proxy
+from peewee import CharField, FloatField, Model, AutoField, Proxy, ForeignKeyField
from peewee_migrate import Router
from playhouse.db_url import connect
threshold = FloatField()
operation = CharField()
statistic = CharField()
- monitoring_param = CharField()
- vdur_name = CharField()
- vnf_member_index = CharField()
- nsr_id = CharField()
+ metric = CharField()
+
+
+class AlarmTag(BaseModel):
+ name = CharField()
+ value = CharField()
+ alarm = ForeignKeyField(Alarm, related_name='tags', on_delete='CASCADE')
class DatabaseManager:
db.close()
+class AlarmTagRepository:
+ @staticmethod
+ def create(**query) -> Alarm:
+ return AlarmTag.create(**query)
+
+
class AlarmRepository:
@staticmethod
def create(**query) -> Alarm:
def generate_response(self, key, **kwargs) -> dict:
"""Make call to appropriate response function."""
- if key == "list_alarm_response":
- message = self.alarm_list_response(**kwargs)
- elif key == "create_alarm_response":
+ if key == "create_alarm_response":
message = self.create_alarm_response(**kwargs)
elif key == "delete_alarm_response":
message = self.delete_alarm_response(**kwargs)
- elif key == "update_alarm_response":
- message = self.update_alarm_response(**kwargs)
- elif key == "create_metric_response":
- message = self.metric_create_response(**kwargs)
- elif key == "read_metric_data_response":
- message = self.read_metric_data_response(**kwargs)
- elif key == "delete_metric_response":
- message = self.delete_metric_response(**kwargs)
- elif key == "update_metric_response":
- message = self.update_metric_response(**kwargs)
- elif key == "list_metric_response":
- message = self.list_metric_response(**kwargs)
elif key == "notify_alarm":
message = self.notify_alarm(**kwargs)
else:
return message
- def alarm_list_response(self, **kwargs) -> dict:
- """Generate the response for an alarm list request."""
- alarm_list_resp = {"schema_version": schema_version,
- "schema_type": "list_alarm_response",
- "correlation_id": kwargs['cor_id'],
- "list_alarm_response": kwargs['alarm_list']}
- return alarm_list_resp
-
def create_alarm_response(self, **kwargs) -> dict:
"""Generate a response for a create alarm request."""
create_alarm_resp = {"schema_version": schema_version,
"status": kwargs['status']}}
return delete_alarm_resp
- def update_alarm_response(self, **kwargs) -> dict:
- """Generate a response for an update alarm request."""
- update_alarm_resp = {"schema_version": schema_version,
- "schema_type": "update_alarm_response",
- "alarm_update_response": {
- "correlation_id": kwargs['cor_id'],
- "alarm_uuid": kwargs['alarm_id'],
- "status": kwargs['status']}}
- return update_alarm_resp
-
- def metric_create_response(self, **kwargs) -> dict:
- """Generate a response for a create metric request."""
- create_metric_resp = {"schema_version": schema_version,
- "schema_type": "create_metric_response",
- "correlation_id": kwargs['cor_id'],
- "metric_create_response": {
- "metric_uuid": kwargs['metric_id'],
- "resource_uuid": kwargs['resource_id'],
- "status": kwargs['status']}}
- return create_metric_resp
-
- def read_metric_data_response(self, **kwargs) -> dict:
- """Generate a response for a read metric data request."""
- read_metric_data_resp = {"schema_version": schema_version,
- "schema_type": "read_metric_data_response",
- "metric_name": kwargs['metric_name'],
- "metric_uuid": kwargs['metric_id'],
- "resource_uuid": kwargs['resource_id'],
- "correlation_id": kwargs['cor_id'],
- "status": kwargs['status'],
- "metrics_data": {
- "time_series": kwargs['times'],
- "metrics_series": kwargs['metrics']}}
- return read_metric_data_resp
-
- def delete_metric_response(self, **kwargs) -> dict:
- """Generate a response for a delete metric request."""
- delete_metric_resp = {"schema_version": schema_version,
- "schema_type": "delete_metric_response",
- "metric_name": kwargs['metric_name'],
- "metric_uuid": kwargs['metric_id'],
- "resource_uuid": kwargs['resource_id'],
- "correlation_id": kwargs['cor_id'],
- "status": kwargs['status']}
- return delete_metric_resp
-
- def update_metric_response(self, **kwargs) -> dict:
- """Generate a repsonse for an update metric request."""
- update_metric_resp = {"schema_version": schema_version,
- "schema_type": "update_metric_response",
- "correlation_id": kwargs['cor_id'],
- "metric_update_response": {
- "metric_uuid": kwargs['metric_id'],
- "status": kwargs['status'],
- "resource_uuid": kwargs['resource_id']}}
- return update_metric_resp
-
- def list_metric_response(self, **kwargs) -> dict:
- """Generate a response for a list metric request."""
- list_metric_resp = {"schema_version": schema_version,
- "schema_type": "list_metric_response",
- "correlation_id": kwargs['cor_id'],
- "status": kwargs['status'],
- "metrics_list": kwargs['metric_list']}
- return list_metric_resp
-
def notify_alarm(self, **kwargs) -> dict:
"""Generate a response to send alarm notifications."""
notify_alarm_resp = {"schema_version": schema_version,
"schema_type": "notify_alarm",
"notify_details": {
"alarm_uuid": kwargs['alarm_id'],
- "vdu_name": kwargs['vdu_name'],
- "vnf_member_index": kwargs['vnf_member_index'],
- "ns_id": kwargs['ns_id'],
"metric_name": kwargs['metric_name'],
"threshold_value": kwargs['threshold_value'],
"operation": kwargs['operation'],
"severity": kwargs['sev'],
"status": kwargs['status'],
- "start_date": kwargs['date']}}
+ "start_date": kwargs['date'],
+ "tags": kwargs['tags']}}
return notify_alarm_resp
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
+
from osm_mon.core.config import Config
def __init__(self, config: Config):
pass
- def get_metric_value(self, metric_name, nsr_id, vdur_name, vnf_member_index):
+ def get_metric_value(self, metric_name: str, tags: dict):
pass
super().__init__(config)
self.conf = config
- def get_metric_value(self, metric_name, nsr_id, vdur_name, vnf_member_index):
- query_section = "query={0}{{ns_id=\"{1}\",vdu_name=\"{2}\",vnf_member_index=\"{3}\"}}".format(
- OSM_METRIC_PREFIX + metric_name, nsr_id, vdur_name, vnf_member_index)
- request_url = self.conf.get('prometheus', 'url') + "/api/v1/query?" + query_section
+ def get_metric_value(self, metric_name: str, tags: dict):
+ query = self._build_query(metric_name, tags)
+ request_url = self._build_url(query)
log.info("Querying Prometheus: %s", request_url)
r = requests.get(request_url, timeout=int(self.conf.get('global', 'request_timeout')))
if r.status_code == 200:
json_response = r.json()
if json_response['status'] == 'success':
- result = json_response['data']['result']
- if len(result):
- metric_value = float(result[0]['value'][1])
- log.info("Metric value: %s", metric_value)
- return metric_value
- else:
- return None
+ return self._get_metric_value_from_response(json_response)
else:
log.warning("Prometheus response is not success. Got status %s", json_response['status'])
else:
log.warning("Error contacting Prometheus. Got status code %s: %s", r.status_code, r.text)
return None
+
+ def _build_query(self, metric_name: str, tags: dict) -> str:
+ query_section_tags = []
+ for k, v in tags.items():
+ query_section_tags.append(k + '=\"' + v + '\"')
+ query_section = "query={0}{{{1}}}".format(OSM_METRIC_PREFIX + metric_name, ','.join(query_section_tags))
+ return query_section
+
+ def _build_url(self, query: str):
+ return self.conf.get('prometheus', 'url') + "/api/v1/query?" + query
+
+ def _get_metric_value_from_response(self, json_response):
+ result = json_response['data']['result']
+ if len(result):
+ metric_value = float(result[0]['value'][1])
+ log.info("Metric value: %s", metric_value)
+ return metric_value
+ else:
+ return None
def evaluate(self):
log.debug('evaluate')
alarms_tuples = self.service.evaluate_alarms()
+ processes = []
for alarm, status in alarms_tuples:
p = multiprocessing.Process(target=self.notify_alarm,
args=(alarm, status))
p.start()
+ processes.append(p)
+ for process in processes:
+ process.join(timeout=10)
def notify_alarm(self, alarm: Alarm, status: AlarmStatus):
log.debug("notify_alarm")
def _build_alarm_response(self, alarm: Alarm, status: AlarmStatus):
response = ResponseBuilder()
+ tags = {}
+ for tag in alarm.tags:
+ tags[tag.name] = tag.value
now = time.strftime("%d-%m-%Y") + " " + time.strftime("%X")
return response.generate_response(
'notify_alarm',
alarm_id=alarm.uuid,
- vdu_name=alarm.vdur_name,
- vnf_member_index=alarm.vnf_member_index,
- ns_id=alarm.nsr_id,
- metric_name=alarm.monitoring_param,
+ metric_name=alarm.metric,
operation=alarm.operation,
threshold_value=alarm.threshold,
sev=alarm.severity,
status=status.value,
- date=now)
+ date=now,
+ tags=tags)
from enum import Enum
from typing import Tuple, List
-from osm_common.dbbase import DbException
-
from osm_mon.core import database
from osm_mon.core.common_db import CommonDbClient
from osm_mon.core.config import Config
self.queue = multiprocessing.Queue()
def _get_metric_value(self,
- nsr_id: str,
- vnf_member_index: str,
- vdur_name: str,
- metric_name: str):
- return BACKENDS[self.conf.get('evaluator', 'backend')](self.conf).get_metric_value(metric_name,
- nsr_id,
- vdur_name,
- vnf_member_index)
+ metric_name: str,
+ tags: dict):
+ return BACKENDS[self.conf.get('evaluator', 'backend')](self.conf).get_metric_value(metric_name, tags)
def _evaluate_metric(self,
- nsr_id: str,
- vnf_member_index: str,
- vdur_name: str,
- metric_name: str,
- alarm: Alarm):
+ alarm: Alarm, tags: dict):
log.debug("_evaluate_metric")
- metric_value = self._get_metric_value(nsr_id, vnf_member_index, vdur_name, metric_name)
+ metric_value = self._get_metric_value(alarm.metric, tags)
if metric_value is None:
log.warning("No metric result for alarm %s", alarm.id)
self.queue.put((alarm, AlarmStatus.INSUFFICIENT))
try:
with database.db.atomic():
for alarm in AlarmRepository.list():
- try:
- vnfr = self.common_db.get_vnfr(alarm.nsr_id, alarm.vnf_member_index)
- except DbException:
- log.exception("Error getting vnfr: ")
- continue
- vnfd = self.common_db.get_vnfd(vnfr['vnfd-id'])
- try:
- vdur = next(filter(lambda vdur: vdur['name'] == alarm.vdur_name, vnfr['vdur']))
- except StopIteration:
- log.warning("No vdur found with name %s for alarm %s", alarm.vdur_name, alarm.id)
- continue
- vdu = next(filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']))
- vnf_monitoring_param = next(
- filter(lambda param: param['id'] == alarm.monitoring_param, vnfd['monitoring-param']))
- nsr_id = vnfr['nsr-id-ref']
- vnf_member_index = vnfr['member-vnf-index-ref']
- vdur_name = vdur['name']
- if 'vdu-monitoring-param' in vnf_monitoring_param:
- vdu_monitoring_param = next(filter(
- lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param'][
- 'vdu-monitoring-param-ref'], vdu['monitoring-param']))
- nfvi_metric = vdu_monitoring_param['nfvi-metric']
-
- p = multiprocessing.Process(target=self._evaluate_metric,
- args=(nsr_id,
- vnf_member_index,
- vdur_name,
- nfvi_metric,
- alarm))
- processes.append(p)
- p.start()
- if 'vdu-metric' in vnf_monitoring_param:
- vnf_metric_name = vnf_monitoring_param['vdu-metric']['vdu-metric-name-ref']
- p = multiprocessing.Process(target=self._evaluate_metric,
- args=(nsr_id,
- vnf_member_index,
- vdur_name,
- vnf_metric_name,
- alarm))
- processes.append(p)
- p.start()
- if 'vnf-metric' in vnf_monitoring_param:
- vnf_metric_name = vnf_monitoring_param['vnf-metric']['vnf-metric-name-ref']
- p = multiprocessing.Process(target=self._evaluate_metric,
- args=(nsr_id,
- vnf_member_index,
- '',
- vnf_metric_name,
- alarm))
- processes.append(p)
- p.start()
+ # Tags need to be passed inside a dict to avoid database locking issues related to process forking
+ tags = {}
+ for tag in alarm.tags:
+ tags[tag.name] = tag.value
+ p = multiprocessing.Process(target=self._evaluate_metric,
+ args=(alarm, tags))
+ processes.append(p)
+ p.start()
for process in processes:
process.join(timeout=10)
alarms_tuples = []
+ log.info("Appending alarms to queue")
while not self.queue.empty():
alarms_tuples.append(self.queue.get())
return alarms_tuples
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+"""Peewee migrations -- 002_add_alarm_tags.py.
+
+Some examples (model - class or model name)::
+
+ > Model = migrator.orm['model_name'] # Return model in current state by name
+
+ > migrator.sql(sql) # Run custom SQL
+ > migrator.python(func, *args, **kwargs) # Run python code
+ > migrator.create_model(Model) # Create a model (could be used as decorator)
+ > migrator.remove_model(model, cascade=True) # Remove a model
+ > migrator.add_fields(model, **fields) # Add fields to a model
+ > migrator.change_fields(model, **fields) # Change fields
+ > migrator.remove_fields(model, *field_names, cascade=True)
+ > migrator.rename_field(model, old_field_name, new_field_name)
+ > migrator.rename_table(model, new_table_name)
+ > migrator.add_index(model, *col_names, unique=False)
+ > migrator.drop_index(model, *col_names)
+ > migrator.add_not_null(model, *field_names)
+ > migrator.drop_not_null(model, *field_names)
+ > migrator.add_default(model, field_name, default)
+
+"""
+
+import peewee as pw
+
+SQL = pw.SQL
+
+
+def migrate(migrator, database, fake=False, **kwargs):
+ """Write your migrations here."""
+
+ @migrator.create_model
+ class AlarmTag(pw.Model):
+ id = pw.AutoField()
+ name = pw.CharField(max_length=255)
+ value = pw.CharField(max_length=255)
+ alarm = pw.ForeignKeyField(backref='tags', column_name='alarm_id', field='id',
+ model=migrator.orm['alarm'], on_delete='CASCADE')
+
+ class Meta:
+ table_name = "alarmtag"
+
+
+def rollback(migrator, database, fake=False, **kwargs):
+ """Write your rollback migrations here."""
+
+ migrator.remove_model('alarmtag')
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+"""Peewee migrations -- 003_rename_monitoring_param.py.
+
+Some examples (model - class or model name)::
+
+ > Model = migrator.orm['model_name'] # Return model in current state by name
+
+ > migrator.sql(sql) # Run custom SQL
+ > migrator.python(func, *args, **kwargs) # Run python code
+ > migrator.create_model(Model) # Create a model (could be used as decorator)
+ > migrator.remove_model(model, cascade=True) # Remove a model
+ > migrator.add_fields(model, **fields) # Add fields to a model
+ > migrator.change_fields(model, **fields) # Change fields
+ > migrator.remove_fields(model, *field_names, cascade=True)
+ > migrator.rename_field(model, old_field_name, new_field_name)
+ > migrator.rename_table(model, new_table_name)
+ > migrator.add_index(model, *col_names, unique=False)
+ > migrator.drop_index(model, *col_names)
+ > migrator.add_not_null(model, *field_names)
+ > migrator.drop_not_null(model, *field_names)
+ > migrator.add_default(model, field_name, default)
+
+"""
+
+import peewee as pw
+
+SQL = pw.SQL
+
+
+def migrate(migrator, database, fake=False, **kwargs):
+ """Write your migrations here."""
+
+ migrator.rename_field('alarm', 'monitoring_param', 'metric')
+
+
+def rollback(migrator, database, fake=False, **kwargs):
+ """Write your rollback migrations here."""
+
+ migrator.rename_field('alarm', 'metric', 'monitoring_param')
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+"""Peewee migrations -- 004_remove_alarm_fields.py.
+
+Some examples (model - class or model name)::
+
+ > Model = migrator.orm['model_name'] # Return model in current state by name
+
+ > migrator.sql(sql) # Run custom SQL
+ > migrator.python(func, *args, **kwargs) # Run python code
+ > migrator.create_model(Model) # Create a model (could be used as decorator)
+ > migrator.remove_model(model, cascade=True) # Remove a model
+ > migrator.add_fields(model, **fields) # Add fields to a model
+ > migrator.change_fields(model, **fields) # Change fields
+ > migrator.remove_fields(model, *field_names, cascade=True)
+ > migrator.rename_field(model, old_field_name, new_field_name)
+ > migrator.rename_table(model, new_table_name)
+ > migrator.add_index(model, *col_names, unique=False)
+ > migrator.drop_index(model, *col_names)
+ > migrator.add_not_null(model, *field_names)
+ > migrator.drop_not_null(model, *field_names)
+ > migrator.add_default(model, field_name, default)
+
+"""
+
+import peewee as pw
+
+SQL = pw.SQL
+
+
+def migrate(migrator, database, fake=False, **kwargs):
+ """Write your migrations here."""
+
+ migrator.remove_fields('alarm', 'vdur_name', 'vnf_member_index', 'nsr_id')
+
+
+def rollback(migrator, database, fake=False, **kwargs):
+ """Write your rollback migrations here."""
+
+ migrator.add_fields('alarm',
+ vdur_name=pw.CharField(max_length=255),
+ vnf_member_index=pw.CharField(max_length=255),
+ nsr_id=pw.CharField(max_length=255))
alarm_details['severity'].lower(),
alarm_details['statistic'].lower(),
alarm_details['metric_name'],
- alarm_details['vdu_name'],
- alarm_details['vnf_member_index'],
- alarm_details['ns_id']
+ alarm_details['tags']
)
response = response_builder.generate_response('create_alarm_response',
cor_id=cor_id,
from osm_mon.core import database
from osm_mon.core.common_db import CommonDbClient
from osm_mon.core.config import Config
-from osm_mon.core.database import AlarmRepository, Alarm
+from osm_mon.core.database import AlarmRepository, Alarm, AlarmTagRepository
log = logging.getLogger(__name__)
severity: str,
statistic: str,
metric_name: str,
- vdur_name: str,
- vnf_member_index: str,
- nsr_id: str) -> Alarm:
+ tags: dict) -> Alarm:
database.db.connect()
try:
with database.db.atomic():
- return AlarmRepository.create(
+ alarm = AlarmRepository.create(
uuid=str(uuid.uuid4()),
name=name,
threshold=threshold,
operation=operation.lower(),
severity=severity.lower(),
statistic=statistic.lower(),
- monitoring_param=metric_name,
- vdur_name=vdur_name,
- vnf_member_index=vnf_member_index,
- nsr_id=nsr_id
+ metric=metric_name
)
+ for k, v in tags.items():
+ AlarmTagRepository.create(
+ name=k,
+ value=v,
+ alarm=alarm
+ )
+ return alarm
finally:
database.db.close()
from osm_mon.core.common_db import CommonDbClient
from osm_mon.core.config import Config
-from osm_mon.core.database import AlarmRepository
+from osm_mon.core.database import AlarmRepository, AlarmTag
from osm_mon.core.message_bus_client import MessageBusClient
from osm_mon.evaluator.backends.prometheus import PrometheusBackend
from osm_mon.evaluator.evaluator import AlarmStatus
mock_alarm = mock.Mock()
mock_alarm.operation = 'gt'
mock_alarm.threshold = 50.0
+ mock_alarm.metric = 'metric_name'
get_metric_value.return_value = 100.0
service = EvaluatorService(self.config)
service.queue = mock.Mock()
- service._evaluate_metric('test_id', '1', 'test_name', 'test_metric_name', mock_alarm)
+ service._evaluate_metric(mock_alarm, {})
service.queue.put.assert_called_with((mock_alarm, AlarmStatus.ALARM))
service.queue.reset_mock()
mock_alarm.operation = 'lt'
- service._evaluate_metric('test_id', '1', 'test_name', 'test_metric_name', mock_alarm)
+ service._evaluate_metric(mock_alarm, {})
service.queue.put.assert_called_with((mock_alarm, AlarmStatus.OK))
service.queue.reset_mock()
get_metric_value.return_value = None
- service._evaluate_metric('test_id', '1', 'test_name', 'test_metric_name', mock_alarm)
+ service._evaluate_metric(mock_alarm, {})
service.queue.put.assert_called_with((mock_alarm, AlarmStatus.INSUFFICIENT))
@mock.patch('multiprocessing.Process')
@mock.patch.object(CommonDbClient, "get_vnfr")
@mock.patch.object(AlarmRepository, "list")
@mock.patch('osm_mon.core.database.db')
- def test_evaluate(self, db, alarm_list, get_vnfr, get_vnfd, evaluate_metric, proccess):
+ def test_evaluate(self, db, alarm_list, get_vnfr, get_vnfd, evaluate_metric, process):
mock_alarm = mock.Mock()
mock_alarm.vdur_name = 'cirros_ns-1-cirros_vnfd-VM-1'
mock_alarm.monitoring_param = 'cirros_vnf_memory_util'
+ mock_tag = AlarmTag()
+ mock_tag.name = 'name'
+ mock_tag.value = 'value'
+ mock_alarm.tags = [mock_tag]
alarm_list.return_value = [mock_alarm]
get_vnfr.return_value = vnfr_record_mock
get_vnfd.return_value = vnfd_record_mock
evaluator = EvaluatorService(self.config)
evaluator.evaluate_alarms()
- proccess.assert_called_with(target=evaluate_metric, args=(
- '87776f33-b67c-417a-8119-cb08e4098951', '1', 'cirros_ns-1-cirros_vnfd-VM-1', 'average_memory_utilization',
- mock_alarm))
+ process.assert_called_with(target=evaluate_metric, args=(mock_alarm, {'name': 'value'}))
@mock.patch.object(PrometheusBackend, "get_metric_value")
@mock.patch('osm_mon.core.database.db')
def test_get_metric_value_prometheus(self, db, get_metric_value):
self.config.set('evaluator', 'backend', 'prometheus')
evaluator = EvaluatorService(self.config)
- evaluator._get_metric_value('test_id', 'test_vnf_member_index', 'test_vdur_name', 'test_metric_name')
+ evaluator._get_metric_value('test', {})
- get_metric_value.assert_called_with('test_metric_name', 'test_id', 'test_vdur_name', 'test_vnf_member_index')
+ get_metric_value.assert_called_with('test', {})
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import collections
+from unittest import TestCase
+
+from osm_mon.core.config import Config
+from osm_mon.evaluator.backends.prometheus import PrometheusBackend
+
+
+class EvaluatorTest(TestCase):
+ def setUp(self):
+ super().setUp()
+ self.config = Config()
+
+ def test_build_query(self):
+ prometheus = PrometheusBackend(self.config)
+ alarm_tags = collections.OrderedDict()
+ alarm_tags['tag_1'] = 'value_1'
+ alarm_tags['tag_2'] = 'value_2'
+ query = prometheus._build_query('metric_name', alarm_tags)
+ self.assertEqual(query, 'query=osm_metric_name{tag_1="value_1",tag_2="value_2"}')