--- /dev/null
+*.py[cod]
+
+# C extensions
+*.so
+
+# log files
+*.log
+
+# Packages
+*.egg
+*.egg-info
+dist
+build
+.eggs
+eggs
+parts
+bin
+var
+sdist
+develop-eggs
+.installed.cfg
+lib
+lib64
+nohup.out
+
+# Installer logs
+pip-log.txt
+
+# Unit test / coverage reports
+.coverage
+.tox
+nosetests.xml
+.testrepository
+.venv
+.cache
+
+# Translations
+*.mo
+
+# Complexity
+output/*.html
+output/*/index.html
+
+# Sphinx
+doc/build
+
+# pbr generates these
+AUTHORS
+ChangeLog
+
+# Editors
+*~
+.*.swp
+.*sw?
+.settings/
+__pycache__/
+.idea
+
+*.db
+test.config
\ No newline at end of file
--- /dev/null
+include requirements.txt
+include README.rst
+recursive-include osm_policy_module
\ No newline at end of file
--- /dev/null
+[policy_module]
+kafka_server_host=localhost
+kafka_server_port=9092
+log_dir=
\ No newline at end of file
--- /dev/null
+import argparse
+import logging
+import sys
+
+from osm_policy_module.core.config import Config
+
+from osm_policy_module.core.database import DatabaseManager
+
+
+def main():
+ cfg = Config.instance()
+ parser = argparse.ArgumentParser(prog='pm-dbsync')
+ parser.add_argument('--config-file', nargs='?', help='Policy module database sync configuration file')
+ args = parser.parse_args()
+ if args.config_file:
+ cfg.load_file(args.config_file)
+ if cfg.get('policy_module', 'log_dir') == 'stdout':
+ logging.basicConfig(stream=sys.stdout,
+ format='%(asctime)s %(message)s',
+ datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
+ level=logging.INFO)
+ else:
+ logging.basicConfig(filename=cfg.get('policy_module', 'log_dir') + 'pm_dbsync.log',
+ format='%(asctime)s %(message)s',
+ datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
+ level=logging.INFO)
+ log = logging.getLogger(__name__)
+ log.info("Starting database sync...")
+ db_manager = DatabaseManager()
+ db_manager.create_tables()
--- /dev/null
+import argparse
+import logging
+import sys
+
+from osm_policy_module.core.config import Config
+
+from osm_policy_module.core.agent import PolicyModuleAgent
+
+
+def main():
+ cfg = Config.instance()
+ parser = argparse.ArgumentParser(prog='pm-scaling-config-agent')
+ parser.add_argument('--config-file', nargs='?', help='Policy module agent configuration file')
+ args = parser.parse_args()
+ if args.config_file:
+ cfg.load_file(args.config_file)
+ if cfg.get('policy_module', 'log_dir') == 'stdout':
+ logging.basicConfig(stream=sys.stdout,
+ format='%(asctime)s %(message)s',
+ datefmt='%m/%d/%Y %I:%M:%S %p',
+ level=logging._nameToLevel[cfg.get('policy_module', 'log_level')])
+ else:
+ logging.basicConfig(filename=cfg.get('policy_module', 'log_dir') + 'pm_agent.log',
+ format='%(asctime)s %(message)s',
+ datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
+ level=logging._nameToLevel[cfg.get('policy_module', 'log_level')])
+ log = logging.getLogger(__name__)
+ log.info("Starting policy module agent...")
+ agent = PolicyModuleAgent()
+ agent.run()
--- /dev/null
+class AlarmConfig:
+ def __init__(self, metric_name, resource_uuid, vim_uuid, threshold, operation, statistic, action):
+ self.metric_name = metric_name,
+ self.resource_uuid = resource_uuid,
+ self.vim_uuid = vim_uuid,
+ self.threshold = threshold,
+ self.operation = operation,
+ self.statistic = statistic,
+ self.action = action
--- /dev/null
+import json
+
+from kafka import KafkaProducer
+
+from osm_policy_module.core.config import Config
+
+
+class LcmClient:
+ def __init__(self):
+ cfg = Config.instance()
+ self.kafka_server = {
+ 'server': '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'),
+ cfg.get('policy_module', 'kafka_server_port'))}
+ self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
+ key_serializer=str.encode,
+ value_serializer=lambda v: json.dumps(v).encode('utf-8'))
+
+ def scale(self, nsr_id, name, action):
+ msg = self._create_scale_action_payload(nsr_id, name, action)
+ self.producer.send(topic='alarm_request', key='create_alarm_request', value=msg)
+ self.producer.flush()
+ pass
+
+ def _create_scale_action_payload(self, nsr_id, name, action):
+ msg = {
+ "ns_id": nsr_id,
+ "scaling_group_descriptor": {
+ "name": name,
+ "action": action
+ }
+ }
+ return msg
--- /dev/null
+import json
+import logging
+import random
+import uuid
+
+from kafka import KafkaProducer, KafkaConsumer
+
+from osm_policy_module.core.config import Config
+
+log = logging.getLogger(__name__)
+
+
+class MonClient:
+ def __init__(self):
+ cfg = Config.instance()
+ self.kafka_server = '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'),
+ cfg.get('policy_module', 'kafka_server_port'))
+ self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
+ key_serializer=str.encode,
+ value_serializer=lambda v: json.dumps(v).encode('utf-8'))
+
+ def create_alarm(self, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation):
+ cor_id = random.randint(1, 1000000)
+ msg = self._create_alarm_payload(cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation)
+ self.producer.send(topic='alarm_request', key='create_alarm_request', value=msg)
+ self.producer.flush()
+ consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, consumer_timeout_ms=10000)
+ consumer.subscribe(['alarm_response'])
+ alarm_uuid = None
+ for message in consumer:
+ if message.key == 'create_alarm_response':
+ content = json.load(message.value)
+ if self._is_alarm_response_correlation_id_eq(cor_id, content):
+ alarm_uuid = content['alarm_create_response']['alarm_uuid']
+ # TODO Handle error response
+ break
+ consumer.close()
+ if not alarm_uuid:
+ raise ValueError(
+ 'Timeout: No alarm creation response from MON. Are it\'s IP and port correctly configured?')
+ return alarm_uuid
+
+ def _create_alarm_payload(self, cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation):
+ create_alarm_request = {
+ 'correlation_id': cor_id,
+ 'alarm_name': str(uuid.uuid4()),
+ 'metric_name': metric_name,
+ 'resource_uuid': resource_uuid,
+ 'operation': operation,
+ 'severity': 'critical',
+ 'threshold_value': threshold,
+ 'statistic': statistic
+ }
+ msg = {
+ 'create_alarm_request': create_alarm_request,
+ 'vim_uuid': vim_uuid
+ }
+ return msg
+
+ def _is_alarm_response_correlation_id_eq(self, cor_id, message_content):
+ return message_content['alarm_create_response']['correlation_id'] == cor_id
--- /dev/null
+import json
+import logging
+
+from kafka import KafkaConsumer
+from osm_policy_module.core.config import Config
+from osm_policy_module.common.lcm_client import LcmClient
+
+from osm_policy_module.common.alarm_config import AlarmConfig
+from osm_policy_module.common.mon_client import MonClient
+from osm_policy_module.core.database import ScalingRecord, ScalingAlarm
+
+log = logging.getLogger(__name__)
+
+
+class PolicyModuleAgent:
+ def run(self):
+ cfg = Config.instance()
+ # Initialize servers
+ kafka_server = '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'),
+ cfg.get('policy_module', 'kafka_server_port'))
+
+ # Initialize Kafka consumer
+ log.info("Connecting to Kafka server at %s", kafka_server)
+ consumer = KafkaConsumer(bootstrap_servers=kafka_server,
+ key_deserializer=bytes.decode,
+ value_deserializer=bytes.decode,
+ group_id="policy-module-agent")
+ consumer.subscribe(['lcm_pm', 'alarm_response'])
+
+ for message in consumer:
+ log.info("Message arrived: %s", message)
+ log.info("Message key: %s", message.key)
+ try:
+ if message.key == 'configure_scaling':
+ content = json.loads(message.value)
+ log.info("Creating scaling record in DB")
+ # TODO: Use transactions: http://docs.peewee-orm.com/en/latest/peewee/transactions.html
+ scaling_record = ScalingRecord.create(
+ nsr_id=content['ns_id'],
+ name=content['scaling_group_descriptor']['name'],
+ content=json.dumps(content)
+ )
+ log.info("Created scaling record in DB : nsr_id=%s, name=%s, content=%s",
+ scaling_record.nsr_id,
+ scaling_record.name,
+ scaling_record.content)
+ alarm_configs = self._get_alarm_configs(content)
+ for config in alarm_configs:
+ mon_client = MonClient()
+ log.info("Creating alarm record in DB")
+ alarm_uuid = mon_client.create_alarm(
+ metric_name=config.metric_name,
+ resource_uuid=config.resource_uuid,
+ vim_uuid=config.vim_uuid,
+ threshold=config.threshold,
+ operation=config.operation,
+ statistic=config.statistic
+ )
+ ScalingAlarm.create(
+ alarm_id=alarm_uuid,
+ action=config.action,
+ scaling_record=scaling_record
+ )
+ if message.key == 'notify_alarm':
+ content = json.loads(message.value)
+ alarm_id = content['notify_details']['alarm_uuid']
+ alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
+ if alarm:
+ lcm_client = LcmClient()
+ log.info("Sending scaling action message: %s", json.dumps(alarm))
+ lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.action)
+ except Exception:
+ log.exception("Error consuming message: ")
+
+ def _get_alarm_configs(self, message_content):
+ scaling_criterias = message_content['scaling_group_descriptor']['scaling_policy']['scaling_criteria']
+ alarm_configs = []
+ for criteria in scaling_criterias:
+ metric_name = ''
+ scale_out_threshold = criteria['scale_out_threshold']
+ scale_in_threshold = criteria['scale_in_threshold']
+ scale_out_operation = criteria['scale_out_relational_operation']
+ scale_in_operation = criteria['scale_in_relational_operation']
+ statistic = criteria['monitoring_param']['aggregation_type']
+ vim_uuid = ''
+ resource_uuid = ''
+ if 'vdu_monitoring_param' in criteria['monitoring_param']:
+ vim_uuid = criteria['monitoring_param']['vdu_monitoring_param']['vim_uuid']
+ resource_uuid = criteria['monitoring_param']['vdu_monitoring_param']['resource_id']
+ metric_name = criteria['monitoring_param']['vdu_monitoring_param']['name']
+ if 'vnf_metric' in criteria['monitoring_param']:
+ # TODO vnf_metric
+ continue
+ if 'vdu_metric' in criteria['monitoring_param']:
+ # TODO vdu_metric
+ continue
+ scale_out_alarm_config = AlarmConfig(metric_name,
+ resource_uuid,
+ vim_uuid,
+ scale_out_threshold,
+ scale_out_operation,
+ statistic,
+ 'scale_out')
+ scale_in_alarm_config = AlarmConfig(metric_name,
+ resource_uuid,
+ vim_uuid,
+ scale_in_threshold,
+ scale_in_operation,
+ statistic,
+ 'scale_in')
+ alarm_configs.append(scale_in_alarm_config)
+ alarm_configs.append(scale_out_alarm_config)
+ return alarm_configs
--- /dev/null
+"""Global Configuration."""
+
+import logging
+
+from osm_policy_module.core.singleton import Singleton
+
+try:
+ from configparser import ConfigParser
+except ImportError:
+ from ConfigParser import ConfigParser
+
+log = logging.getLogger(__name__)
+
+
+@Singleton
+class Config(object):
+ """Global configuration."""
+
+ def __init__(self):
+ # Default config values
+ self.config = {
+ 'policy_module': {
+ 'kafka_server_host': '127.0.0.1',
+ 'kafka_server_port': '9092',
+ 'log_dir': 'stdout',
+ 'log_level': 'INFO'
+ },
+ }
+
+ def load_file(self, config_file_path):
+ if config_file_path:
+ config_parser = ConfigParser()
+ config_parser.read(config_file_path)
+ for section in config_parser.sections():
+ for key, value in config_parser.items(section):
+ if section not in self.config:
+ self.config[section] = {}
+ self.config[section][key] = value
+
+ def get(self, group, name=None, default=None):
+ if group in self.config:
+ if name is None:
+ return self.config[group]
+ return self.config[group].get(name, default)
+ return default
--- /dev/null
+import logging
+
+from peewee import *
+from playhouse.sqlite_ext import SqliteExtDatabase
+
+from osm_policy_module.core.config import Config
+
+log = logging.getLogger(__name__)
+cfg = Config.instance()
+
+db = SqliteExtDatabase('mon.db')
+
+
+class BaseModel(Model):
+ class Meta:
+ database = db
+
+
+class ScalingRecord(BaseModel):
+ nsr_id = CharField()
+ name = CharField()
+ content = TextField()
+
+
+class ScalingAlarm(BaseModel):
+ alarm_id = CharField()
+ action = CharField()
+ scaling_record = ForeignKeyField(ScalingRecord, related_name='scaling_alarms')
+
+
+class DatabaseManager:
+ def create_tables(self):
+ try:
+ db.connect()
+ db.create_tables([ScalingRecord, ScalingAlarm])
+ db.close()
+ except Exception as e:
+ log.exception("Error creating tables: ")
--- /dev/null
+"""Simple singleton class."""
+
+from __future__ import unicode_literals
+
+
+class Singleton(object):
+ """Simple singleton class."""
+
+ def __init__(self, decorated):
+ """Initialize singleton instance."""
+ self._decorated = decorated
+
+ def instance(self):
+ """Return singleton instance."""
+ try:
+ return self._instance
+ except AttributeError:
+ self._instance = self._decorated()
+ return self._instance
--- /dev/null
+{
+ "$schema": "http://json-schema.org/schema#",
+ "type": "object",
+ "properties": {
+ "ns_id": {
+ "type": "string"
+ },
+ "scaling_group_descriptor": {
+ "type": "object",
+ "properties": {
+ "name": {
+ "type": "string"
+ },
+ "scaling_policy": {
+ "type": "object",
+ "properties": {
+ "scale_in_operation_type": {
+ "type": "string",
+ "enum": [
+ "and",
+ "or"
+ ]
+ },
+ "scale_out_operation_type": {
+ "type": "string",
+ "enum": [
+ "and",
+ "or"
+ ]
+ },
+ "threshold_time": {
+ "type": "number"
+ },
+ "cooldown_time": {
+ "type": "number"
+ },
+ "scaling_criteria": {
+ "type": "array",
+ "items": {
+ "type": "object",
+ "properties": {
+ "scale_in_threshold": {
+ "type": "number"
+ },
+ "scale_out_threshold": {
+ "type": "number"
+ },
+ "scale_in_relational_operation": {
+ "type": "string",
+ "enum": [
+ "lt",
+ "gt",
+ "le",
+ "ge",
+ "eq",
+ "ne"
+ ]
+ },
+ "scale_out_relational_operation": {
+ "type": "string",
+ "enum": [
+ "lt",
+ "gt",
+ "le",
+ "ge",
+ "eq",
+ "ne"
+ ]
+ },
+ "monitoring_param": {
+ "type": "object",
+ "properties": {
+ "id": {
+ "type": "string"
+ },
+ "name": {
+ "type": "string"
+ },
+ "aggregation_type": {
+ "type": "string",
+ "enum": [
+ "avg",
+ "max",
+ "min",
+ "last",
+ "sum"
+ ]
+ },
+ "vdu_monitoring_param": {
+ "type": "object",
+ "properties": {
+ "vim_uuid": {
+ "type": "string"
+ },
+ "resource_id": {
+ "type": "string"
+ },
+ "name": {
+ "type": "string"
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
--- /dev/null
+{
+ "ns_id": "360b400b-86dc-4b8e-a139-b7fc3987cf69",
+ "scaling_group_descriptor": {
+ "name": "test",
+ "scaling_policy": {
+ "scale_in_operation_type": "or",
+ "scale_out_operation_type": "or",
+ "threshold_time": 10,
+ "cooldown_time": 10,
+ "scaling_criteria": [
+ {
+ "scale_in_threshold": 50,
+ "scale_out_threshold": 50,
+ "scale_in_relational_operation": "lt",
+ "scale_out_relational_operation": "gt",
+ "monitoring_param": {
+ "id": "test_param_id",
+ "name": "test_param",
+ "aggregation_type": "avg",
+ "vdu_monitoring_param": {
+ "vim_uuid": "vdu_monitoring_param_id",
+ "resource_id": "vdu_monitoring_param_resource_id",
+ "name": "vdu_monitoring_param_name"
+ }
+ }
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
--- /dev/null
+import json
+import logging
+import os
+import unittest
+
+from kafka import KafkaProducer
+
+log = logging.getLogger(__name__)
+
+
+# logging.basicConfig(stream=sys.stdout,
+# format='%(asctime)s %(message)s',
+# datefmt='%m/%d/%Y %I:%M:%S %p',
+# level=logging.DEBUG)
+
+class ScalingConfigTest(unittest.TestCase):
+ def test_send_scaling_config_msg(self):
+ try:
+ with open(
+ os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')) as file:
+ payload = json.load(file)
+ kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"),
+ os.getenv("KAFKA_SERVER_PORT", "9092"))
+ producer = KafkaProducer(bootstrap_servers=kafka_server,
+ key_serializer=str.encode,
+ value_serializer=str.encode)
+ future = producer.send('lcm_pm', json.dumps(payload), key="configure_scaling")
+ result = future.get(timeout=60)
+ log.info('Result: %s', result)
+
+ producer.flush()
+ self.assertIsNotNone(result)
+ except Exception as e:
+ self.fail(e)
+
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+import json
+import unittest
+
+import os
+
+from jsonschema import validate
+
+
+class ExamplesTest(unittest.TestCase):
+ def test_examples_schema(self):
+ # TODO: Test that valid examples correspond to schema.
+ # This forces the modification of the examples in case of schema changes.
+ example_file_path = os.path.join(os.path.dirname(__file__), './examples/configure_scaling_full_example.json')
+ schema_file_path = os.path.join(os.path.dirname(__file__), '../models/configure_scaling.json')
+ with open(example_file_path) as example_file, open(schema_file_path) as schema_file:
+ validate(json.load(example_file), json.load(schema_file))
+
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+import json
+import os
+import unittest
+
+from osm_policy_module.core.agent import PolicyModuleAgent
+
+
+class PolicyAgentTest(unittest.TestCase):
+ def setUp(self):
+ self.agent = PolicyModuleAgent()
+
+ def test_get_alarm_configs(self):
+ with open(os.path.join(os.path.dirname(__file__), './examples/configure_scaling_full_example.json')) as file:
+ example = json.load(file)
+ alarm_configs = self.agent._get_alarm_configs(example)
+ # TODO Improve assertions
+ self.assertEqual(len(alarm_configs), 2)
+
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+kafka==1.3.*
+peewee==3.1.*
+jsonschema==2.6.*
\ No newline at end of file
--- /dev/null
+import setuptools
+
+
+def parse_requirements(requirements):
+ with open(requirements) as f:
+ return [l.strip('\n') for l in f if l.strip('\n') and not l.startswith('#')]
+
+
+_author = "Benjamín Díaz"
+_name = 'osm_policy_module'
+_author_email = 'bdiaz@whitestack.com'
+_version = '1.0'
+_description = 'OSM Policy Module'
+_maintainer = 'Benjamín Díaz'
+_maintainer_email = 'bdiaz@whitestack.com'
+_license = 'Apache 2.0'
+_url = 'https://osm.etsi.org/gitweb/?p=osm/MON.git;a=tree'
+
+setuptools.setup(
+ name=_name,
+ version=_version,
+ description=_description,
+ long_description=open('README.rst').read(),
+ author=_author,
+ author_email=_author_email,
+ maintainer=_maintainer,
+ maintainer_email=_maintainer_email,
+ url=_url,
+ license=_license,
+ packages=setuptools.find_packages(),
+ include_package_data=True,
+ install_requires=parse_requirements('requirements.txt'),
+ entry_points={
+ "console_scripts": [
+ "pm-dbsync = osm_policy_module.cmd.dbsync:main",
+ "pm-agent = osm_policy_module.cmd.policy_module_agent:main",
+ ]
+ }
+)