Fixes bugs for integration with MON module
Signed-off-by: diazb <bdiaz@whitestack.com>
diff --git a/policy_module/README.rst b/policy_module/README.rst
index e69de29..5cb2fde 100644
--- a/policy_module/README.rst
+++ b/policy_module/README.rst
@@ -0,0 +1,14 @@
+Install
+------------------------
+ ::
+
+ git clone https://osm.etsi.org/gerrit/osm/MON.git
+ cd MON/policy_module
+ pip install .
+
+Run
+------------------------
+ ::
+
+ osm-policy-agent
+
diff --git a/policy_module/osm_policy_module/cmd/dbsync.py b/policy_module/osm_policy_module/cmd/dbsync.py
deleted file mode 100644
index 25ef1a6..0000000
--- a/policy_module/osm_policy_module/cmd/dbsync.py
+++ /dev/null
@@ -1,30 +0,0 @@
-import argparse
-import logging
-import sys
-
-from osm_policy_module.core.config import Config
-
-from osm_policy_module.core.database import DatabaseManager
-
-
-def main():
- cfg = Config.instance()
- parser = argparse.ArgumentParser(prog='pm-dbsync')
- parser.add_argument('--config-file', nargs='?', help='Policy module database sync configuration file')
- args = parser.parse_args()
- if args.config_file:
- cfg.load_file(args.config_file)
- if cfg.get('policy_module', 'log_dir') == 'stdout':
- logging.basicConfig(stream=sys.stdout,
- format='%(asctime)s %(message)s',
- datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
- level=logging.INFO)
- else:
- logging.basicConfig(filename=cfg.get('policy_module', 'log_dir') + 'pm_dbsync.log',
- format='%(asctime)s %(message)s',
- datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
- level=logging.INFO)
- log = logging.getLogger(__name__)
- log.info("Starting database sync...")
- db_manager = DatabaseManager()
- db_manager.create_tables()
diff --git a/policy_module/osm_policy_module/cmd/policy_module_agent.py b/policy_module/osm_policy_module/cmd/policy_module_agent.py
index 7116913..3fd42db 100644
--- a/policy_module/osm_policy_module/cmd/policy_module_agent.py
+++ b/policy_module/osm_policy_module/cmd/policy_module_agent.py
@@ -2,9 +2,9 @@
import logging
import sys
-from osm_policy_module.core.config import Config
-
from osm_policy_module.core.agent import PolicyModuleAgent
+from osm_policy_module.core.config import Config
+from osm_policy_module.core.database import DatabaseManager
def main():
@@ -14,17 +14,25 @@
args = parser.parse_args()
if args.config_file:
cfg.load_file(args.config_file)
+ # TODO: Handle different log levels in config
if cfg.get('policy_module', 'log_dir') == 'stdout':
logging.basicConfig(stream=sys.stdout,
format='%(asctime)s %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p',
- level=logging._nameToLevel[cfg.get('policy_module', 'log_level')])
+ level=logging.INFO)
else:
logging.basicConfig(filename=cfg.get('policy_module', 'log_dir') + 'pm_agent.log',
format='%(asctime)s %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
- level=logging._nameToLevel[cfg.get('policy_module', 'log_level')])
+ level=logging.INFO)
log = logging.getLogger(__name__)
+ log.info("Syncing database...")
+ db_manager = DatabaseManager()
+ db_manager.create_tables()
log.info("Starting policy module agent...")
agent = PolicyModuleAgent()
agent.run()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/policy_module/osm_policy_module/common/alarm_config.py b/policy_module/osm_policy_module/common/alarm_config.py
index 78d36d0..2b77d47 100644
--- a/policy_module/osm_policy_module/common/alarm_config.py
+++ b/policy_module/osm_policy_module/common/alarm_config.py
@@ -1,9 +1,9 @@
class AlarmConfig:
def __init__(self, metric_name, resource_uuid, vim_uuid, threshold, operation, statistic, action):
- self.metric_name = metric_name,
- self.resource_uuid = resource_uuid,
- self.vim_uuid = vim_uuid,
- self.threshold = threshold,
- self.operation = operation,
- self.statistic = statistic,
+ self.metric_name = metric_name
+ self.resource_uuid = resource_uuid
+ self.vim_uuid = vim_uuid
+ self.threshold = threshold
+ self.operation = operation
+ self.statistic = statistic
self.action = action
diff --git a/policy_module/osm_policy_module/common/lcm_client.py b/policy_module/osm_policy_module/common/lcm_client.py
index 99e3ffb..710b59a 100644
--- a/policy_module/osm_policy_module/common/lcm_client.py
+++ b/policy_module/osm_policy_module/common/lcm_client.py
@@ -1,25 +1,27 @@
import json
+import logging
from kafka import KafkaProducer
from osm_policy_module.core.config import Config
+log = logging.getLogger(__name__)
+
class LcmClient:
def __init__(self):
cfg = Config.instance()
- self.kafka_server = {
- 'server': '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'),
- cfg.get('policy_module', 'kafka_server_port'))}
+ self.kafka_server = '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'),
+ cfg.get('policy_module', 'kafka_server_port'))
self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
key_serializer=str.encode,
- value_serializer=lambda v: json.dumps(v).encode('utf-8'))
+ value_serializer=str.encode)
def scale(self, nsr_id, name, action):
msg = self._create_scale_action_payload(nsr_id, name, action)
- self.producer.send(topic='alarm_request', key='create_alarm_request', value=msg)
+ log.info("Sending scale action message: %s", json.dumps(msg))
+ self.producer.send(topic='lcm_pm', key='trigger_scaling', value=json.dumps(msg))
self.producer.flush()
- pass
def _create_scale_action_payload(self, nsr_id, name, action):
msg = {
diff --git a/policy_module/osm_policy_module/common/mon_client.py b/policy_module/osm_policy_module/common/mon_client.py
index 19b440e..f03b3c3 100644
--- a/policy_module/osm_policy_module/common/mon_client.py
+++ b/policy_module/osm_policy_module/common/mon_client.py
@@ -17,31 +17,31 @@
cfg.get('policy_module', 'kafka_server_port'))
self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
key_serializer=str.encode,
- value_serializer=lambda v: json.dumps(v).encode('utf-8'))
+ value_serializer=str.encode)
def create_alarm(self, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation):
cor_id = random.randint(1, 1000000)
msg = self._create_alarm_payload(cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation)
- self.producer.send(topic='alarm_request', key='create_alarm_request', value=msg)
- self.producer.flush()
- consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, consumer_timeout_ms=10000)
+ log.info("Sending create_alarm_request %s", msg)
+ future = self.producer.send(topic='alarm_request', key='create_alarm_request', value=json.dumps(msg))
+ future.get(timeout=60)
+ consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
+ key_deserializer=bytes.decode,
+ value_deserializer=bytes.decode)
consumer.subscribe(['alarm_response'])
- alarm_uuid = None
for message in consumer:
if message.key == 'create_alarm_response':
- content = json.load(message.value)
+ content = json.loads(message.value)
+ log.info("Received create_alarm_response %s", content)
if self._is_alarm_response_correlation_id_eq(cor_id, content):
alarm_uuid = content['alarm_create_response']['alarm_uuid']
# TODO Handle error response
- break
- consumer.close()
- if not alarm_uuid:
- raise ValueError(
- 'Timeout: No alarm creation response from MON. Are it\'s IP and port correctly configured?')
- return alarm_uuid
+ return alarm_uuid
+
+ raise ValueError('Timeout: No alarm creation response from MON. Is MON up?')
def _create_alarm_payload(self, cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation):
- create_alarm_request = {
+ alarm_create_request = {
'correlation_id': cor_id,
'alarm_name': str(uuid.uuid4()),
'metric_name': metric_name,
@@ -52,7 +52,7 @@
'statistic': statistic
}
msg = {
- 'create_alarm_request': create_alarm_request,
+ 'alarm_create_request': alarm_create_request,
'vim_uuid': vim_uuid
}
return msg
diff --git a/policy_module/osm_policy_module/core/agent.py b/policy_module/osm_policy_module/core/agent.py
index c329743..b4f9c4d 100644
--- a/policy_module/osm_policy_module/core/agent.py
+++ b/policy_module/osm_policy_module/core/agent.py
@@ -21,15 +21,15 @@
# Initialize Kafka consumer
log.info("Connecting to Kafka server at %s", kafka_server)
+ # TODO: Add logic to handle deduplication of messages when using group_id.
+ # See: https://stackoverflow.com/a/29836412
consumer = KafkaConsumer(bootstrap_servers=kafka_server,
key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- group_id="policy-module-agent")
+ value_deserializer=bytes.decode)
consumer.subscribe(['lcm_pm', 'alarm_response'])
for message in consumer:
log.info("Message arrived: %s", message)
- log.info("Message key: %s", message.key)
try:
if message.key == 'configure_scaling':
content = json.loads(message.value)
@@ -67,7 +67,7 @@
alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
if alarm:
lcm_client = LcmClient()
- log.info("Sending scaling action message: %s", json.dumps(alarm))
+ log.info("Sending scaling action message for ns: %s", alarm_id)
lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.action)
except Exception:
log.exception("Error consuming message: ")
diff --git a/policy_module/osm_policy_module/core/database.py b/policy_module/osm_policy_module/core/database.py
index 4fb95e7..f7a90a9 100644
--- a/policy_module/osm_policy_module/core/database.py
+++ b/policy_module/osm_policy_module/core/database.py
@@ -8,7 +8,7 @@
log = logging.getLogger(__name__)
cfg = Config.instance()
-db = SqliteExtDatabase('mon.db')
+db = SqliteExtDatabase('policy_module.db')
class BaseModel(Model):
diff --git a/policy_module/osm_policy_module/models/configure_scaling.json b/policy_module/osm_policy_module/models/configure_scaling.json
index f8479b3..72e7963 100644
--- a/policy_module/osm_policy_module/models/configure_scaling.json
+++ b/policy_module/osm_policy_module/models/configure_scaling.json
@@ -52,8 +52,7 @@
"gt",
"le",
"ge",
- "eq",
- "ne"
+ "eq"
]
},
"scale_out_relational_operation": {
@@ -63,8 +62,7 @@
"gt",
"le",
"ge",
- "eq",
- "ne"
+ "eq"
]
},
"monitoring_param": {
@@ -79,10 +77,10 @@
"aggregation_type": {
"type": "string",
"enum": [
- "avg",
- "max",
- "min",
- "last",
+ "average",
+ "maximum",
+ "minimum",
+ "count",
"sum"
]
},
diff --git a/policy_module/osm_policy_module/tests/examples/configure_scaling_full_example.json b/policy_module/osm_policy_module/tests/examples/configure_scaling_full_example.json
index a37dffe..be14fb4 100644
--- a/policy_module/osm_policy_module/tests/examples/configure_scaling_full_example.json
+++ b/policy_module/osm_policy_module/tests/examples/configure_scaling_full_example.json
@@ -16,11 +16,11 @@
"monitoring_param": {
"id": "test_param_id",
"name": "test_param",
- "aggregation_type": "avg",
+ "aggregation_type": "average",
"vdu_monitoring_param": {
- "vim_uuid": "vdu_monitoring_param_id",
- "resource_id": "vdu_monitoring_param_resource_id",
- "name": "vdu_monitoring_param_name"
+ "vim_uuid": "1",
+ "resource_id": "2d8d5355-acf7-42be-9f34-a10d02f9df39",
+ "name": "cpu_utilization"
}
}
}
diff --git a/policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py b/policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py
index a444265..486afc8 100644
--- a/policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py
+++ b/policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py
@@ -3,32 +3,37 @@
import os
import unittest
-from kafka import KafkaProducer
+from kafka import KafkaProducer, KafkaConsumer
+from kafka.errors import KafkaError
log = logging.getLogger(__name__)
-# logging.basicConfig(stream=sys.stdout,
-# format='%(asctime)s %(message)s',
-# datefmt='%m/%d/%Y %I:%M:%S %p',
-# level=logging.DEBUG)
-
class ScalingConfigTest(unittest.TestCase):
+ def setUp(self):
+ try:
+ kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"),
+ os.getenv("KAFKA_SERVER_PORT", "9092"))
+ self.producer = KafkaProducer(bootstrap_servers=kafka_server,
+ key_serializer=str.encode,
+ value_serializer=str.encode)
+ self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
+ group_id='osm_mon')
+ self.consumer.subscribe(['lcm_pm'])
+ except KafkaError:
+ self.skipTest('Kafka server not present.')
+
def test_send_scaling_config_msg(self):
try:
with open(
os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')) as file:
payload = json.load(file)
- kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"),
- os.getenv("KAFKA_SERVER_PORT", "9092"))
- producer = KafkaProducer(bootstrap_servers=kafka_server,
- key_serializer=str.encode,
- value_serializer=str.encode)
- future = producer.send('lcm_pm', json.dumps(payload), key="configure_scaling")
+ future = self.producer.send('lcm_pm', json.dumps(payload), key="configure_scaling")
result = future.get(timeout=60)
log.info('Result: %s', result)
- producer.flush()
+ self.producer.flush()
+ # TODO: Improve assertions
self.assertIsNotNone(result)
except Exception as e:
self.fail(e)
diff --git a/policy_module/osm_policy_module/tests/unit/__init__.py b/policy_module/osm_policy_module/tests/unit/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/policy_module/osm_policy_module/tests/unit/__init__.py
diff --git a/policy_module/osm_policy_module/tests/test_examples.py b/policy_module/osm_policy_module/tests/unit/test_examples.py
similarity index 67%
rename from policy_module/osm_policy_module/tests/test_examples.py
rename to policy_module/osm_policy_module/tests/unit/test_examples.py
index b644fe4..935982f 100644
--- a/policy_module/osm_policy_module/tests/test_examples.py
+++ b/policy_module/osm_policy_module/tests/unit/test_examples.py
@@ -8,10 +8,8 @@
class ExamplesTest(unittest.TestCase):
def test_examples_schema(self):
- # TODO: Test that valid examples correspond to schema.
- # This forces the modification of the examples in case of schema changes.
- example_file_path = os.path.join(os.path.dirname(__file__), './examples/configure_scaling_full_example.json')
- schema_file_path = os.path.join(os.path.dirname(__file__), '../models/configure_scaling.json')
+ example_file_path = os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')
+ schema_file_path = os.path.join(os.path.dirname(__file__), '../../models/configure_scaling.json')
with open(example_file_path) as example_file, open(schema_file_path) as schema_file:
validate(json.load(example_file), json.load(schema_file))
diff --git a/policy_module/osm_policy_module/tests/test_policy_config_agent.py b/policy_module/osm_policy_module/tests/unit/test_policy_config_agent.py
similarity index 80%
rename from policy_module/osm_policy_module/tests/test_policy_config_agent.py
rename to policy_module/osm_policy_module/tests/unit/test_policy_config_agent.py
index 4334388..7618682 100644
--- a/policy_module/osm_policy_module/tests/test_policy_config_agent.py
+++ b/policy_module/osm_policy_module/tests/unit/test_policy_config_agent.py
@@ -10,7 +10,7 @@
self.agent = PolicyModuleAgent()
def test_get_alarm_configs(self):
- with open(os.path.join(os.path.dirname(__file__), './examples/configure_scaling_full_example.json')) as file:
+ with open(os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')) as file:
example = json.load(file)
alarm_configs = self.agent._get_alarm_configs(example)
# TODO Improve assertions
diff --git a/policy_module/requirements.txt b/policy_module/requirements.txt
index 50e30ab..fbdc2a8 100644
--- a/policy_module/requirements.txt
+++ b/policy_module/requirements.txt
@@ -1,3 +1,4 @@
kafka==1.3.*
peewee==3.1.*
-jsonschema==2.6.*
\ No newline at end of file
+jsonschema==2.6.*
+six
\ No newline at end of file
diff --git a/policy_module/setup.py b/policy_module/setup.py
index db04d03..ea9d38c 100644
--- a/policy_module/setup.py
+++ b/policy_module/setup.py
@@ -32,8 +32,7 @@
install_requires=parse_requirements('requirements.txt'),
entry_points={
"console_scripts": [
- "pm-dbsync = osm_policy_module.cmd.dbsync:main",
- "pm-agent = osm_policy_module.cmd.policy_module_agent:main",
+ "osm-policy-agent = osm_policy_module.cmd.policy_module_agent:main",
]
}
)