From 925ff50f55485470e3243c090d89ceb715e6518d Mon Sep 17 00:00:00 2001 From: diazb Date: Thu, 29 Mar 2018 17:23:48 -0300 Subject: [PATCH] Fixes bugs for integration with MON module Signed-off-by: diazb --- policy_module/README.rst | 14 +++++++++ policy_module/osm_policy_module/cmd/dbsync.py | 30 ------------------ .../cmd/policy_module_agent.py | 16 +++++++--- .../osm_policy_module/common/alarm_config.py | 12 +++---- .../osm_policy_module/common/lcm_client.py | 14 +++++---- .../osm_policy_module/common/mon_client.py | 28 ++++++++--------- policy_module/osm_policy_module/core/agent.py | 8 ++--- .../osm_policy_module/core/database.py | 2 +- .../models/configure_scaling.json | 14 ++++----- .../configure_scaling_full_example.json | 8 ++--- .../test_scaling_config_kafka_msg.py | 31 +++++++++++-------- .../osm_policy_module/tests/unit/__init__.py | 0 .../tests/{ => unit}/test_examples.py | 6 ++-- .../{ => unit}/test_policy_config_agent.py | 2 +- policy_module/requirements.txt | 3 +- policy_module/setup.py | 3 +- 16 files changed, 93 insertions(+), 98 deletions(-) delete mode 100644 policy_module/osm_policy_module/cmd/dbsync.py create mode 100644 policy_module/osm_policy_module/tests/unit/__init__.py rename policy_module/osm_policy_module/tests/{ => unit}/test_examples.py (67%) rename policy_module/osm_policy_module/tests/{ => unit}/test_policy_config_agent.py (80%) diff --git a/policy_module/README.rst b/policy_module/README.rst index e69de29..5cb2fde 100644 --- a/policy_module/README.rst +++ b/policy_module/README.rst @@ -0,0 +1,14 @@ +Install +------------------------ + :: + + git clone https://osm.etsi.org/gerrit/osm/MON.git + cd MON/policy_module + pip install . + +Run +------------------------ + :: + + osm-policy-agent + diff --git a/policy_module/osm_policy_module/cmd/dbsync.py b/policy_module/osm_policy_module/cmd/dbsync.py deleted file mode 100644 index 25ef1a6..0000000 --- a/policy_module/osm_policy_module/cmd/dbsync.py +++ /dev/null @@ -1,30 +0,0 @@ -import argparse -import logging -import sys - -from osm_policy_module.core.config import Config - -from osm_policy_module.core.database import DatabaseManager - - -def main(): - cfg = Config.instance() - parser = argparse.ArgumentParser(prog='pm-dbsync') - parser.add_argument('--config-file', nargs='?', help='Policy module database sync configuration file') - args = parser.parse_args() - if args.config_file: - cfg.load_file(args.config_file) - if cfg.get('policy_module', 'log_dir') == 'stdout': - logging.basicConfig(stream=sys.stdout, - format='%(asctime)s %(message)s', - datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a', - level=logging.INFO) - else: - logging.basicConfig(filename=cfg.get('policy_module', 'log_dir') + 'pm_dbsync.log', - format='%(asctime)s %(message)s', - datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a', - level=logging.INFO) - log = logging.getLogger(__name__) - log.info("Starting database sync...") - db_manager = DatabaseManager() - db_manager.create_tables() diff --git a/policy_module/osm_policy_module/cmd/policy_module_agent.py b/policy_module/osm_policy_module/cmd/policy_module_agent.py index 7116913..3fd42db 100644 --- a/policy_module/osm_policy_module/cmd/policy_module_agent.py +++ b/policy_module/osm_policy_module/cmd/policy_module_agent.py @@ -2,9 +2,9 @@ import argparse import logging import sys -from osm_policy_module.core.config import Config - from osm_policy_module.core.agent import PolicyModuleAgent +from osm_policy_module.core.config import Config +from osm_policy_module.core.database import DatabaseManager def main(): @@ -14,17 +14,25 @@ def main(): args = parser.parse_args() if args.config_file: cfg.load_file(args.config_file) + # TODO: Handle different log levels in config if cfg.get('policy_module', 'log_dir') == 'stdout': logging.basicConfig(stream=sys.stdout, format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', - level=logging._nameToLevel[cfg.get('policy_module', 'log_level')]) + level=logging.INFO) else: logging.basicConfig(filename=cfg.get('policy_module', 'log_dir') + 'pm_agent.log', format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a', - level=logging._nameToLevel[cfg.get('policy_module', 'log_level')]) + level=logging.INFO) log = logging.getLogger(__name__) + log.info("Syncing database...") + db_manager = DatabaseManager() + db_manager.create_tables() log.info("Starting policy module agent...") agent = PolicyModuleAgent() agent.run() + + +if __name__ == '__main__': + main() diff --git a/policy_module/osm_policy_module/common/alarm_config.py b/policy_module/osm_policy_module/common/alarm_config.py index 78d36d0..2b77d47 100644 --- a/policy_module/osm_policy_module/common/alarm_config.py +++ b/policy_module/osm_policy_module/common/alarm_config.py @@ -1,9 +1,9 @@ class AlarmConfig: def __init__(self, metric_name, resource_uuid, vim_uuid, threshold, operation, statistic, action): - self.metric_name = metric_name, - self.resource_uuid = resource_uuid, - self.vim_uuid = vim_uuid, - self.threshold = threshold, - self.operation = operation, - self.statistic = statistic, + self.metric_name = metric_name + self.resource_uuid = resource_uuid + self.vim_uuid = vim_uuid + self.threshold = threshold + self.operation = operation + self.statistic = statistic self.action = action diff --git a/policy_module/osm_policy_module/common/lcm_client.py b/policy_module/osm_policy_module/common/lcm_client.py index 99e3ffb..710b59a 100644 --- a/policy_module/osm_policy_module/common/lcm_client.py +++ b/policy_module/osm_policy_module/common/lcm_client.py @@ -1,25 +1,27 @@ import json +import logging from kafka import KafkaProducer from osm_policy_module.core.config import Config +log = logging.getLogger(__name__) + class LcmClient: def __init__(self): cfg = Config.instance() - self.kafka_server = { - 'server': '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'), - cfg.get('policy_module', 'kafka_server_port'))} + self.kafka_server = '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'), + cfg.get('policy_module', 'kafka_server_port')) self.producer = KafkaProducer(bootstrap_servers=self.kafka_server, key_serializer=str.encode, - value_serializer=lambda v: json.dumps(v).encode('utf-8')) + value_serializer=str.encode) def scale(self, nsr_id, name, action): msg = self._create_scale_action_payload(nsr_id, name, action) - self.producer.send(topic='alarm_request', key='create_alarm_request', value=msg) + log.info("Sending scale action message: %s", json.dumps(msg)) + self.producer.send(topic='lcm_pm', key='trigger_scaling', value=json.dumps(msg)) self.producer.flush() - pass def _create_scale_action_payload(self, nsr_id, name, action): msg = { diff --git a/policy_module/osm_policy_module/common/mon_client.py b/policy_module/osm_policy_module/common/mon_client.py index 19b440e..f03b3c3 100644 --- a/policy_module/osm_policy_module/common/mon_client.py +++ b/policy_module/osm_policy_module/common/mon_client.py @@ -17,31 +17,31 @@ class MonClient: cfg.get('policy_module', 'kafka_server_port')) self.producer = KafkaProducer(bootstrap_servers=self.kafka_server, key_serializer=str.encode, - value_serializer=lambda v: json.dumps(v).encode('utf-8')) + value_serializer=str.encode) def create_alarm(self, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation): cor_id = random.randint(1, 1000000) msg = self._create_alarm_payload(cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation) - self.producer.send(topic='alarm_request', key='create_alarm_request', value=msg) - self.producer.flush() - consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, consumer_timeout_ms=10000) + log.info("Sending create_alarm_request %s", msg) + future = self.producer.send(topic='alarm_request', key='create_alarm_request', value=json.dumps(msg)) + future.get(timeout=60) + consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, + key_deserializer=bytes.decode, + value_deserializer=bytes.decode) consumer.subscribe(['alarm_response']) - alarm_uuid = None for message in consumer: if message.key == 'create_alarm_response': - content = json.load(message.value) + content = json.loads(message.value) + log.info("Received create_alarm_response %s", content) if self._is_alarm_response_correlation_id_eq(cor_id, content): alarm_uuid = content['alarm_create_response']['alarm_uuid'] # TODO Handle error response - break - consumer.close() - if not alarm_uuid: - raise ValueError( - 'Timeout: No alarm creation response from MON. Are it\'s IP and port correctly configured?') - return alarm_uuid + return alarm_uuid + + raise ValueError('Timeout: No alarm creation response from MON. Is MON up?') def _create_alarm_payload(self, cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation): - create_alarm_request = { + alarm_create_request = { 'correlation_id': cor_id, 'alarm_name': str(uuid.uuid4()), 'metric_name': metric_name, @@ -52,7 +52,7 @@ class MonClient: 'statistic': statistic } msg = { - 'create_alarm_request': create_alarm_request, + 'alarm_create_request': alarm_create_request, 'vim_uuid': vim_uuid } return msg diff --git a/policy_module/osm_policy_module/core/agent.py b/policy_module/osm_policy_module/core/agent.py index c329743..b4f9c4d 100644 --- a/policy_module/osm_policy_module/core/agent.py +++ b/policy_module/osm_policy_module/core/agent.py @@ -21,15 +21,15 @@ class PolicyModuleAgent: # Initialize Kafka consumer log.info("Connecting to Kafka server at %s", kafka_server) + # TODO: Add logic to handle deduplication of messages when using group_id. + # See: https://stackoverflow.com/a/29836412 consumer = KafkaConsumer(bootstrap_servers=kafka_server, key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - group_id="policy-module-agent") + value_deserializer=bytes.decode) consumer.subscribe(['lcm_pm', 'alarm_response']) for message in consumer: log.info("Message arrived: %s", message) - log.info("Message key: %s", message.key) try: if message.key == 'configure_scaling': content = json.loads(message.value) @@ -67,7 +67,7 @@ class PolicyModuleAgent: alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get() if alarm: lcm_client = LcmClient() - log.info("Sending scaling action message: %s", json.dumps(alarm)) + log.info("Sending scaling action message for ns: %s", alarm_id) lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.action) except Exception: log.exception("Error consuming message: ") diff --git a/policy_module/osm_policy_module/core/database.py b/policy_module/osm_policy_module/core/database.py index 4fb95e7..f7a90a9 100644 --- a/policy_module/osm_policy_module/core/database.py +++ b/policy_module/osm_policy_module/core/database.py @@ -8,7 +8,7 @@ from osm_policy_module.core.config import Config log = logging.getLogger(__name__) cfg = Config.instance() -db = SqliteExtDatabase('mon.db') +db = SqliteExtDatabase('policy_module.db') class BaseModel(Model): diff --git a/policy_module/osm_policy_module/models/configure_scaling.json b/policy_module/osm_policy_module/models/configure_scaling.json index f8479b3..72e7963 100644 --- a/policy_module/osm_policy_module/models/configure_scaling.json +++ b/policy_module/osm_policy_module/models/configure_scaling.json @@ -52,8 +52,7 @@ "gt", "le", "ge", - "eq", - "ne" + "eq" ] }, "scale_out_relational_operation": { @@ -63,8 +62,7 @@ "gt", "le", "ge", - "eq", - "ne" + "eq" ] }, "monitoring_param": { @@ -79,10 +77,10 @@ "aggregation_type": { "type": "string", "enum": [ - "avg", - "max", - "min", - "last", + "average", + "maximum", + "minimum", + "count", "sum" ] }, diff --git a/policy_module/osm_policy_module/tests/examples/configure_scaling_full_example.json b/policy_module/osm_policy_module/tests/examples/configure_scaling_full_example.json index a37dffe..be14fb4 100644 --- a/policy_module/osm_policy_module/tests/examples/configure_scaling_full_example.json +++ b/policy_module/osm_policy_module/tests/examples/configure_scaling_full_example.json @@ -16,11 +16,11 @@ "monitoring_param": { "id": "test_param_id", "name": "test_param", - "aggregation_type": "avg", + "aggregation_type": "average", "vdu_monitoring_param": { - "vim_uuid": "vdu_monitoring_param_id", - "resource_id": "vdu_monitoring_param_resource_id", - "name": "vdu_monitoring_param_name" + "vim_uuid": "1", + "resource_id": "2d8d5355-acf7-42be-9f34-a10d02f9df39", + "name": "cpu_utilization" } } } diff --git a/policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py b/policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py index a444265..486afc8 100644 --- a/policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py +++ b/policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py @@ -3,32 +3,37 @@ import logging import os import unittest -from kafka import KafkaProducer +from kafka import KafkaProducer, KafkaConsumer +from kafka.errors import KafkaError log = logging.getLogger(__name__) -# logging.basicConfig(stream=sys.stdout, -# format='%(asctime)s %(message)s', -# datefmt='%m/%d/%Y %I:%M:%S %p', -# level=logging.DEBUG) - class ScalingConfigTest(unittest.TestCase): + def setUp(self): + try: + kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"), + os.getenv("KAFKA_SERVER_PORT", "9092")) + self.producer = KafkaProducer(bootstrap_servers=kafka_server, + key_serializer=str.encode, + value_serializer=str.encode) + self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092', + group_id='osm_mon') + self.consumer.subscribe(['lcm_pm']) + except KafkaError: + self.skipTest('Kafka server not present.') + def test_send_scaling_config_msg(self): try: with open( os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')) as file: payload = json.load(file) - kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"), - os.getenv("KAFKA_SERVER_PORT", "9092")) - producer = KafkaProducer(bootstrap_servers=kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) - future = producer.send('lcm_pm', json.dumps(payload), key="configure_scaling") + future = self.producer.send('lcm_pm', json.dumps(payload), key="configure_scaling") result = future.get(timeout=60) log.info('Result: %s', result) - producer.flush() + self.producer.flush() + # TODO: Improve assertions self.assertIsNotNone(result) except Exception as e: self.fail(e) diff --git a/policy_module/osm_policy_module/tests/unit/__init__.py b/policy_module/osm_policy_module/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/policy_module/osm_policy_module/tests/test_examples.py b/policy_module/osm_policy_module/tests/unit/test_examples.py similarity index 67% rename from policy_module/osm_policy_module/tests/test_examples.py rename to policy_module/osm_policy_module/tests/unit/test_examples.py index b644fe4..935982f 100644 --- a/policy_module/osm_policy_module/tests/test_examples.py +++ b/policy_module/osm_policy_module/tests/unit/test_examples.py @@ -8,10 +8,8 @@ from jsonschema import validate class ExamplesTest(unittest.TestCase): def test_examples_schema(self): - # TODO: Test that valid examples correspond to schema. - # This forces the modification of the examples in case of schema changes. - example_file_path = os.path.join(os.path.dirname(__file__), './examples/configure_scaling_full_example.json') - schema_file_path = os.path.join(os.path.dirname(__file__), '../models/configure_scaling.json') + example_file_path = os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json') + schema_file_path = os.path.join(os.path.dirname(__file__), '../../models/configure_scaling.json') with open(example_file_path) as example_file, open(schema_file_path) as schema_file: validate(json.load(example_file), json.load(schema_file)) diff --git a/policy_module/osm_policy_module/tests/test_policy_config_agent.py b/policy_module/osm_policy_module/tests/unit/test_policy_config_agent.py similarity index 80% rename from policy_module/osm_policy_module/tests/test_policy_config_agent.py rename to policy_module/osm_policy_module/tests/unit/test_policy_config_agent.py index 4334388..7618682 100644 --- a/policy_module/osm_policy_module/tests/test_policy_config_agent.py +++ b/policy_module/osm_policy_module/tests/unit/test_policy_config_agent.py @@ -10,7 +10,7 @@ class PolicyAgentTest(unittest.TestCase): self.agent = PolicyModuleAgent() def test_get_alarm_configs(self): - with open(os.path.join(os.path.dirname(__file__), './examples/configure_scaling_full_example.json')) as file: + with open(os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')) as file: example = json.load(file) alarm_configs = self.agent._get_alarm_configs(example) # TODO Improve assertions diff --git a/policy_module/requirements.txt b/policy_module/requirements.txt index 50e30ab..fbdc2a8 100644 --- a/policy_module/requirements.txt +++ b/policy_module/requirements.txt @@ -1,3 +1,4 @@ kafka==1.3.* peewee==3.1.* -jsonschema==2.6.* \ No newline at end of file +jsonschema==2.6.* +six \ No newline at end of file diff --git a/policy_module/setup.py b/policy_module/setup.py index db04d03..ea9d38c 100644 --- a/policy_module/setup.py +++ b/policy_module/setup.py @@ -32,8 +32,7 @@ setuptools.setup( install_requires=parse_requirements('requirements.txt'), entry_points={ "console_scripts": [ - "pm-dbsync = osm_policy_module.cmd.dbsync:main", - "pm-agent = osm_policy_module.cmd.policy_module_agent:main", + "osm-policy-agent = osm_policy_module.cmd.policy_module_agent:main", ] } ) -- 2.17.1