+Install
+------------------------
+ ::
+
+ git clone https://osm.etsi.org/gerrit/osm/MON.git
+ cd MON/policy_module
+ pip install .
+
+Run
+------------------------
+ ::
+
+ osm-policy-agent
+
+++ /dev/null
-import argparse
-import logging
-import sys
-
-from osm_policy_module.core.config import Config
-
-from osm_policy_module.core.database import DatabaseManager
-
-
-def main():
- cfg = Config.instance()
- parser = argparse.ArgumentParser(prog='pm-dbsync')
- parser.add_argument('--config-file', nargs='?', help='Policy module database sync configuration file')
- args = parser.parse_args()
- if args.config_file:
- cfg.load_file(args.config_file)
- if cfg.get('policy_module', 'log_dir') == 'stdout':
- logging.basicConfig(stream=sys.stdout,
- format='%(asctime)s %(message)s',
- datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
- level=logging.INFO)
- else:
- logging.basicConfig(filename=cfg.get('policy_module', 'log_dir') + 'pm_dbsync.log',
- format='%(asctime)s %(message)s',
- datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
- level=logging.INFO)
- log = logging.getLogger(__name__)
- log.info("Starting database sync...")
- db_manager = DatabaseManager()
- db_manager.create_tables()
import logging
import sys
-from osm_policy_module.core.config import Config
-
from osm_policy_module.core.agent import PolicyModuleAgent
+from osm_policy_module.core.config import Config
+from osm_policy_module.core.database import DatabaseManager
def main():
args = parser.parse_args()
if args.config_file:
cfg.load_file(args.config_file)
+ # TODO: Handle different log levels in config
if cfg.get('policy_module', 'log_dir') == 'stdout':
logging.basicConfig(stream=sys.stdout,
format='%(asctime)s %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p',
- level=logging._nameToLevel[cfg.get('policy_module', 'log_level')])
+ level=logging.INFO)
else:
logging.basicConfig(filename=cfg.get('policy_module', 'log_dir') + 'pm_agent.log',
format='%(asctime)s %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
- level=logging._nameToLevel[cfg.get('policy_module', 'log_level')])
+ level=logging.INFO)
log = logging.getLogger(__name__)
+ log.info("Syncing database...")
+ db_manager = DatabaseManager()
+ db_manager.create_tables()
log.info("Starting policy module agent...")
agent = PolicyModuleAgent()
agent.run()
+
+
+if __name__ == '__main__':
+ main()
class AlarmConfig:
def __init__(self, metric_name, resource_uuid, vim_uuid, threshold, operation, statistic, action):
- self.metric_name = metric_name,
- self.resource_uuid = resource_uuid,
- self.vim_uuid = vim_uuid,
- self.threshold = threshold,
- self.operation = operation,
- self.statistic = statistic,
+ self.metric_name = metric_name
+ self.resource_uuid = resource_uuid
+ self.vim_uuid = vim_uuid
+ self.threshold = threshold
+ self.operation = operation
+ self.statistic = statistic
self.action = action
import json
+import logging
from kafka import KafkaProducer
from osm_policy_module.core.config import Config
+log = logging.getLogger(__name__)
+
class LcmClient:
def __init__(self):
cfg = Config.instance()
- self.kafka_server = {
- 'server': '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'),
- cfg.get('policy_module', 'kafka_server_port'))}
+ self.kafka_server = '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'),
+ cfg.get('policy_module', 'kafka_server_port'))
self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
key_serializer=str.encode,
- value_serializer=lambda v: json.dumps(v).encode('utf-8'))
+ value_serializer=str.encode)
def scale(self, nsr_id, name, action):
msg = self._create_scale_action_payload(nsr_id, name, action)
- self.producer.send(topic='alarm_request', key='create_alarm_request', value=msg)
+ log.info("Sending scale action message: %s", json.dumps(msg))
+ self.producer.send(topic='lcm_pm', key='trigger_scaling', value=json.dumps(msg))
self.producer.flush()
- pass
def _create_scale_action_payload(self, nsr_id, name, action):
msg = {
cfg.get('policy_module', 'kafka_server_port'))
self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
key_serializer=str.encode,
- value_serializer=lambda v: json.dumps(v).encode('utf-8'))
+ value_serializer=str.encode)
def create_alarm(self, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation):
cor_id = random.randint(1, 1000000)
msg = self._create_alarm_payload(cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation)
- self.producer.send(topic='alarm_request', key='create_alarm_request', value=msg)
- self.producer.flush()
- consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, consumer_timeout_ms=10000)
+ log.info("Sending create_alarm_request %s", msg)
+ future = self.producer.send(topic='alarm_request', key='create_alarm_request', value=json.dumps(msg))
+ future.get(timeout=60)
+ consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
+ key_deserializer=bytes.decode,
+ value_deserializer=bytes.decode)
consumer.subscribe(['alarm_response'])
- alarm_uuid = None
for message in consumer:
if message.key == 'create_alarm_response':
- content = json.load(message.value)
+ content = json.loads(message.value)
+ log.info("Received create_alarm_response %s", content)
if self._is_alarm_response_correlation_id_eq(cor_id, content):
alarm_uuid = content['alarm_create_response']['alarm_uuid']
# TODO Handle error response
- break
- consumer.close()
- if not alarm_uuid:
- raise ValueError(
- 'Timeout: No alarm creation response from MON. Are it\'s IP and port correctly configured?')
- return alarm_uuid
+ return alarm_uuid
+
+ raise ValueError('Timeout: No alarm creation response from MON. Is MON up?')
def _create_alarm_payload(self, cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation):
- create_alarm_request = {
+ alarm_create_request = {
'correlation_id': cor_id,
'alarm_name': str(uuid.uuid4()),
'metric_name': metric_name,
'statistic': statistic
}
msg = {
- 'create_alarm_request': create_alarm_request,
+ 'alarm_create_request': alarm_create_request,
'vim_uuid': vim_uuid
}
return msg
# Initialize Kafka consumer
log.info("Connecting to Kafka server at %s", kafka_server)
+ # TODO: Add logic to handle deduplication of messages when using group_id.
+ # See: https://stackoverflow.com/a/29836412
consumer = KafkaConsumer(bootstrap_servers=kafka_server,
key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- group_id="policy-module-agent")
+ value_deserializer=bytes.decode)
consumer.subscribe(['lcm_pm', 'alarm_response'])
for message in consumer:
log.info("Message arrived: %s", message)
- log.info("Message key: %s", message.key)
try:
if message.key == 'configure_scaling':
content = json.loads(message.value)
alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
if alarm:
lcm_client = LcmClient()
- log.info("Sending scaling action message: %s", json.dumps(alarm))
+ log.info("Sending scaling action message for ns: %s", alarm_id)
lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.action)
except Exception:
log.exception("Error consuming message: ")
log = logging.getLogger(__name__)
cfg = Config.instance()
-db = SqliteExtDatabase('mon.db')
+db = SqliteExtDatabase('policy_module.db')
class BaseModel(Model):
"gt",
"le",
"ge",
- "eq",
- "ne"
+ "eq"
]
},
"scale_out_relational_operation": {
"gt",
"le",
"ge",
- "eq",
- "ne"
+ "eq"
]
},
"monitoring_param": {
"aggregation_type": {
"type": "string",
"enum": [
- "avg",
- "max",
- "min",
- "last",
+ "average",
+ "maximum",
+ "minimum",
+ "count",
"sum"
]
},
"monitoring_param": {
"id": "test_param_id",
"name": "test_param",
- "aggregation_type": "avg",
+ "aggregation_type": "average",
"vdu_monitoring_param": {
- "vim_uuid": "vdu_monitoring_param_id",
- "resource_id": "vdu_monitoring_param_resource_id",
- "name": "vdu_monitoring_param_name"
+ "vim_uuid": "1",
+ "resource_id": "2d8d5355-acf7-42be-9f34-a10d02f9df39",
+ "name": "cpu_utilization"
}
}
}
import os
import unittest
-from kafka import KafkaProducer
+from kafka import KafkaProducer, KafkaConsumer
+from kafka.errors import KafkaError
log = logging.getLogger(__name__)
-# logging.basicConfig(stream=sys.stdout,
-# format='%(asctime)s %(message)s',
-# datefmt='%m/%d/%Y %I:%M:%S %p',
-# level=logging.DEBUG)
-
class ScalingConfigTest(unittest.TestCase):
+ def setUp(self):
+ try:
+ kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"),
+ os.getenv("KAFKA_SERVER_PORT", "9092"))
+ self.producer = KafkaProducer(bootstrap_servers=kafka_server,
+ key_serializer=str.encode,
+ value_serializer=str.encode)
+ self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
+ group_id='osm_mon')
+ self.consumer.subscribe(['lcm_pm'])
+ except KafkaError:
+ self.skipTest('Kafka server not present.')
+
def test_send_scaling_config_msg(self):
try:
with open(
os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')) as file:
payload = json.load(file)
- kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"),
- os.getenv("KAFKA_SERVER_PORT", "9092"))
- producer = KafkaProducer(bootstrap_servers=kafka_server,
- key_serializer=str.encode,
- value_serializer=str.encode)
- future = producer.send('lcm_pm', json.dumps(payload), key="configure_scaling")
+ future = self.producer.send('lcm_pm', json.dumps(payload), key="configure_scaling")
result = future.get(timeout=60)
log.info('Result: %s', result)
- producer.flush()
+ self.producer.flush()
+ # TODO: Improve assertions
self.assertIsNotNone(result)
except Exception as e:
self.fail(e)
+++ /dev/null
-import json
-import unittest
-
-import os
-
-from jsonschema import validate
-
-
-class ExamplesTest(unittest.TestCase):
- def test_examples_schema(self):
- # TODO: Test that valid examples correspond to schema.
- # This forces the modification of the examples in case of schema changes.
- example_file_path = os.path.join(os.path.dirname(__file__), './examples/configure_scaling_full_example.json')
- schema_file_path = os.path.join(os.path.dirname(__file__), '../models/configure_scaling.json')
- with open(example_file_path) as example_file, open(schema_file_path) as schema_file:
- validate(json.load(example_file), json.load(schema_file))
-
-
-if __name__ == '__main__':
- unittest.main()
+++ /dev/null
-import json
-import os
-import unittest
-
-from osm_policy_module.core.agent import PolicyModuleAgent
-
-
-class PolicyAgentTest(unittest.TestCase):
- def setUp(self):
- self.agent = PolicyModuleAgent()
-
- def test_get_alarm_configs(self):
- with open(os.path.join(os.path.dirname(__file__), './examples/configure_scaling_full_example.json')) as file:
- example = json.load(file)
- alarm_configs = self.agent._get_alarm_configs(example)
- # TODO Improve assertions
- self.assertEqual(len(alarm_configs), 2)
-
-
-if __name__ == '__main__':
- unittest.main()
--- /dev/null
+import json
+import unittest
+
+import os
+
+from jsonschema import validate
+
+
+class ExamplesTest(unittest.TestCase):
+ def test_examples_schema(self):
+ example_file_path = os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')
+ schema_file_path = os.path.join(os.path.dirname(__file__), '../../models/configure_scaling.json')
+ with open(example_file_path) as example_file, open(schema_file_path) as schema_file:
+ validate(json.load(example_file), json.load(schema_file))
+
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+import json
+import os
+import unittest
+
+from osm_policy_module.core.agent import PolicyModuleAgent
+
+
+class PolicyAgentTest(unittest.TestCase):
+ def setUp(self):
+ self.agent = PolicyModuleAgent()
+
+ def test_get_alarm_configs(self):
+ with open(os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')) as file:
+ example = json.load(file)
+ alarm_configs = self.agent._get_alarm_configs(example)
+ # TODO Improve assertions
+ self.assertEqual(len(alarm_configs), 2)
+
+
+if __name__ == '__main__':
+ unittest.main()
kafka==1.3.*
peewee==3.1.*
-jsonschema==2.6.*
\ No newline at end of file
+jsonschema==2.6.*
+six
\ No newline at end of file
install_requires=parse_requirements('requirements.txt'),
entry_points={
"console_scripts": [
- "pm-dbsync = osm_policy_module.cmd.dbsync:main",
- "pm-agent = osm_policy_module.cmd.policy_module_agent:main",
+ "osm-policy-agent = osm_policy_module.cmd.policy_module_agent:main",
]
}
)