Policy Module first commit 21/5921/1
authordiazb <bdiaz@whitestack.com>
Tue, 20 Mar 2018 21:03:55 +0000 (18:03 -0300)
committerdiazb <bdiaz@whitestack.com>
Thu, 29 Mar 2018 19:58:12 +0000 (16:58 -0300)
Signed-off-by: diazb <bdiaz@whitestack.com>
26 files changed:
policy_module/.gitignore [new file with mode: 0644]
policy_module/MANIFEST.in [new file with mode: 0644]
policy_module/README.rst [new file with mode: 0644]
policy_module/config.example [new file with mode: 0644]
policy_module/osm_policy_module/__init__.py [new file with mode: 0644]
policy_module/osm_policy_module/cmd/__init__.py [new file with mode: 0644]
policy_module/osm_policy_module/cmd/dbsync.py [new file with mode: 0644]
policy_module/osm_policy_module/cmd/policy_module_agent.py [new file with mode: 0644]
policy_module/osm_policy_module/common/__init__.py [new file with mode: 0644]
policy_module/osm_policy_module/common/alarm_config.py [new file with mode: 0644]
policy_module/osm_policy_module/common/lcm_client.py [new file with mode: 0644]
policy_module/osm_policy_module/common/mon_client.py [new file with mode: 0644]
policy_module/osm_policy_module/core/__init__.py [new file with mode: 0644]
policy_module/osm_policy_module/core/agent.py [new file with mode: 0644]
policy_module/osm_policy_module/core/config.py [new file with mode: 0644]
policy_module/osm_policy_module/core/database.py [new file with mode: 0644]
policy_module/osm_policy_module/core/singleton.py [new file with mode: 0644]
policy_module/osm_policy_module/models/configure_scaling.json [new file with mode: 0644]
policy_module/osm_policy_module/tests/__init__.py [new file with mode: 0644]
policy_module/osm_policy_module/tests/examples/configure_scaling_full_example.json [new file with mode: 0644]
policy_module/osm_policy_module/tests/integration/__init__.py [new file with mode: 0644]
policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py [new file with mode: 0644]
policy_module/osm_policy_module/tests/test_examples.py [new file with mode: 0644]
policy_module/osm_policy_module/tests/test_policy_config_agent.py [new file with mode: 0644]
policy_module/requirements.txt [new file with mode: 0644]
policy_module/setup.py [new file with mode: 0644]

diff --git a/policy_module/.gitignore b/policy_module/.gitignore
new file mode 100644 (file)
index 0000000..88a8391
--- /dev/null
@@ -0,0 +1,60 @@
+*.py[cod]
+
+# C extensions
+*.so
+
+# log files
+*.log
+
+# Packages
+*.egg
+*.egg-info
+dist
+build
+.eggs
+eggs
+parts
+bin
+var
+sdist
+develop-eggs
+.installed.cfg
+lib
+lib64
+nohup.out
+
+# Installer logs
+pip-log.txt
+
+# Unit test / coverage reports
+.coverage
+.tox
+nosetests.xml
+.testrepository
+.venv
+.cache
+
+# Translations
+*.mo
+
+# Complexity
+output/*.html
+output/*/index.html
+
+# Sphinx
+doc/build
+
+# pbr generates these
+AUTHORS
+ChangeLog
+
+# Editors
+*~
+.*.swp
+.*sw?
+.settings/
+__pycache__/
+.idea
+
+*.db
+test.config
\ No newline at end of file
diff --git a/policy_module/MANIFEST.in b/policy_module/MANIFEST.in
new file mode 100644 (file)
index 0000000..06cf953
--- /dev/null
@@ -0,0 +1,3 @@
+include requirements.txt
+include README.rst
+recursive-include osm_policy_module
\ No newline at end of file
diff --git a/policy_module/README.rst b/policy_module/README.rst
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/policy_module/config.example b/policy_module/config.example
new file mode 100644 (file)
index 0000000..45e4f17
--- /dev/null
@@ -0,0 +1,4 @@
+[policy_module]
+kafka_server_host=localhost
+kafka_server_port=9092
+log_dir=
\ No newline at end of file
diff --git a/policy_module/osm_policy_module/__init__.py b/policy_module/osm_policy_module/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/policy_module/osm_policy_module/cmd/__init__.py b/policy_module/osm_policy_module/cmd/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/policy_module/osm_policy_module/cmd/dbsync.py b/policy_module/osm_policy_module/cmd/dbsync.py
new file mode 100644 (file)
index 0000000..25ef1a6
--- /dev/null
@@ -0,0 +1,30 @@
+import argparse
+import logging
+import sys
+
+from osm_policy_module.core.config import Config
+
+from osm_policy_module.core.database import DatabaseManager
+
+
+def main():
+    cfg = Config.instance()
+    parser = argparse.ArgumentParser(prog='pm-dbsync')
+    parser.add_argument('--config-file', nargs='?', help='Policy module database sync configuration file')
+    args = parser.parse_args()
+    if args.config_file:
+        cfg.load_file(args.config_file)
+    if cfg.get('policy_module', 'log_dir') == 'stdout':
+        logging.basicConfig(stream=sys.stdout,
+                            format='%(asctime)s %(message)s',
+                            datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
+                            level=logging.INFO)
+    else:
+        logging.basicConfig(filename=cfg.get('policy_module', 'log_dir') + 'pm_dbsync.log',
+                            format='%(asctime)s %(message)s',
+                            datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
+                            level=logging.INFO)
+    log = logging.getLogger(__name__)
+    log.info("Starting database sync...")
+    db_manager = DatabaseManager()
+    db_manager.create_tables()
diff --git a/policy_module/osm_policy_module/cmd/policy_module_agent.py b/policy_module/osm_policy_module/cmd/policy_module_agent.py
new file mode 100644 (file)
index 0000000..7116913
--- /dev/null
@@ -0,0 +1,30 @@
+import argparse
+import logging
+import sys
+
+from osm_policy_module.core.config import Config
+
+from osm_policy_module.core.agent import PolicyModuleAgent
+
+
+def main():
+    cfg = Config.instance()
+    parser = argparse.ArgumentParser(prog='pm-scaling-config-agent')
+    parser.add_argument('--config-file', nargs='?', help='Policy module agent configuration file')
+    args = parser.parse_args()
+    if args.config_file:
+        cfg.load_file(args.config_file)
+    if cfg.get('policy_module', 'log_dir') == 'stdout':
+        logging.basicConfig(stream=sys.stdout,
+                            format='%(asctime)s %(message)s',
+                            datefmt='%m/%d/%Y %I:%M:%S %p',
+                            level=logging._nameToLevel[cfg.get('policy_module', 'log_level')])
+    else:
+        logging.basicConfig(filename=cfg.get('policy_module', 'log_dir') + 'pm_agent.log',
+                            format='%(asctime)s %(message)s',
+                            datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
+                            level=logging._nameToLevel[cfg.get('policy_module', 'log_level')])
+    log = logging.getLogger(__name__)
+    log.info("Starting policy module agent...")
+    agent = PolicyModuleAgent()
+    agent.run()
diff --git a/policy_module/osm_policy_module/common/__init__.py b/policy_module/osm_policy_module/common/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/policy_module/osm_policy_module/common/alarm_config.py b/policy_module/osm_policy_module/common/alarm_config.py
new file mode 100644 (file)
index 0000000..78d36d0
--- /dev/null
@@ -0,0 +1,9 @@
+class AlarmConfig:
+    def __init__(self, metric_name, resource_uuid, vim_uuid, threshold, operation, statistic, action):
+        self.metric_name = metric_name,
+        self.resource_uuid = resource_uuid,
+        self.vim_uuid = vim_uuid,
+        self.threshold = threshold,
+        self.operation = operation,
+        self.statistic = statistic,
+        self.action = action
diff --git a/policy_module/osm_policy_module/common/lcm_client.py b/policy_module/osm_policy_module/common/lcm_client.py
new file mode 100644 (file)
index 0000000..99e3ffb
--- /dev/null
@@ -0,0 +1,32 @@
+import json
+
+from kafka import KafkaProducer
+
+from osm_policy_module.core.config import Config
+
+
+class LcmClient:
+    def __init__(self):
+        cfg = Config.instance()
+        self.kafka_server = {
+            'server': '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'),
+                                     cfg.get('policy_module', 'kafka_server_port'))}
+        self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
+                                      key_serializer=str.encode,
+                                      value_serializer=lambda v: json.dumps(v).encode('utf-8'))
+
+    def scale(self, nsr_id, name, action):
+        msg = self._create_scale_action_payload(nsr_id, name, action)
+        self.producer.send(topic='alarm_request', key='create_alarm_request', value=msg)
+        self.producer.flush()
+        pass
+
+    def _create_scale_action_payload(self, nsr_id, name, action):
+        msg = {
+            "ns_id": nsr_id,
+            "scaling_group_descriptor": {
+                "name": name,
+                "action": action
+            }
+        }
+        return msg
diff --git a/policy_module/osm_policy_module/common/mon_client.py b/policy_module/osm_policy_module/common/mon_client.py
new file mode 100644 (file)
index 0000000..19b440e
--- /dev/null
@@ -0,0 +1,61 @@
+import json
+import logging
+import random
+import uuid
+
+from kafka import KafkaProducer, KafkaConsumer
+
+from osm_policy_module.core.config import Config
+
+log = logging.getLogger(__name__)
+
+
+class MonClient:
+    def __init__(self):
+        cfg = Config.instance()
+        self.kafka_server = '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'),
+                                           cfg.get('policy_module', 'kafka_server_port'))
+        self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
+                                      key_serializer=str.encode,
+                                      value_serializer=lambda v: json.dumps(v).encode('utf-8'))
+
+    def create_alarm(self, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation):
+        cor_id = random.randint(1, 1000000)
+        msg = self._create_alarm_payload(cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation)
+        self.producer.send(topic='alarm_request', key='create_alarm_request', value=msg)
+        self.producer.flush()
+        consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, consumer_timeout_ms=10000)
+        consumer.subscribe(['alarm_response'])
+        alarm_uuid = None
+        for message in consumer:
+            if message.key == 'create_alarm_response':
+                content = json.load(message.value)
+                if self._is_alarm_response_correlation_id_eq(cor_id, content):
+                    alarm_uuid = content['alarm_create_response']['alarm_uuid']
+                    # TODO Handle error response
+                    break
+        consumer.close()
+        if not alarm_uuid:
+            raise ValueError(
+                'Timeout: No alarm creation response from MON. Are it\'s IP and port correctly configured?')
+        return alarm_uuid
+
+    def _create_alarm_payload(self, cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation):
+        create_alarm_request = {
+            'correlation_id': cor_id,
+            'alarm_name': str(uuid.uuid4()),
+            'metric_name': metric_name,
+            'resource_uuid': resource_uuid,
+            'operation': operation,
+            'severity': 'critical',
+            'threshold_value': threshold,
+            'statistic': statistic
+        }
+        msg = {
+            'create_alarm_request': create_alarm_request,
+            'vim_uuid': vim_uuid
+        }
+        return msg
+
+    def _is_alarm_response_correlation_id_eq(self, cor_id, message_content):
+        return message_content['alarm_create_response']['correlation_id'] == cor_id
diff --git a/policy_module/osm_policy_module/core/__init__.py b/policy_module/osm_policy_module/core/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/policy_module/osm_policy_module/core/agent.py b/policy_module/osm_policy_module/core/agent.py
new file mode 100644 (file)
index 0000000..c329743
--- /dev/null
@@ -0,0 +1,113 @@
+import json
+import logging
+
+from kafka import KafkaConsumer
+from osm_policy_module.core.config import Config
+from osm_policy_module.common.lcm_client import LcmClient
+
+from osm_policy_module.common.alarm_config import AlarmConfig
+from osm_policy_module.common.mon_client import MonClient
+from osm_policy_module.core.database import ScalingRecord, ScalingAlarm
+
+log = logging.getLogger(__name__)
+
+
+class PolicyModuleAgent:
+    def run(self):
+        cfg = Config.instance()
+        # Initialize servers
+        kafka_server = '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'),
+                                      cfg.get('policy_module', 'kafka_server_port'))
+
+        # Initialize Kafka consumer
+        log.info("Connecting to Kafka server at %s", kafka_server)
+        consumer = KafkaConsumer(bootstrap_servers=kafka_server,
+                                 key_deserializer=bytes.decode,
+                                 value_deserializer=bytes.decode,
+                                 group_id="policy-module-agent")
+        consumer.subscribe(['lcm_pm', 'alarm_response'])
+
+        for message in consumer:
+            log.info("Message arrived: %s", message)
+            log.info("Message key: %s", message.key)
+            try:
+                if message.key == 'configure_scaling':
+                    content = json.loads(message.value)
+                    log.info("Creating scaling record in DB")
+                    # TODO: Use transactions: http://docs.peewee-orm.com/en/latest/peewee/transactions.html
+                    scaling_record = ScalingRecord.create(
+                        nsr_id=content['ns_id'],
+                        name=content['scaling_group_descriptor']['name'],
+                        content=json.dumps(content)
+                    )
+                    log.info("Created scaling record in DB : nsr_id=%s, name=%s, content=%s",
+                             scaling_record.nsr_id,
+                             scaling_record.name,
+                             scaling_record.content)
+                    alarm_configs = self._get_alarm_configs(content)
+                    for config in alarm_configs:
+                        mon_client = MonClient()
+                        log.info("Creating alarm record in DB")
+                        alarm_uuid = mon_client.create_alarm(
+                            metric_name=config.metric_name,
+                            resource_uuid=config.resource_uuid,
+                            vim_uuid=config.vim_uuid,
+                            threshold=config.threshold,
+                            operation=config.operation,
+                            statistic=config.statistic
+                        )
+                        ScalingAlarm.create(
+                            alarm_id=alarm_uuid,
+                            action=config.action,
+                            scaling_record=scaling_record
+                        )
+                if message.key == 'notify_alarm':
+                    content = json.loads(message.value)
+                    alarm_id = content['notify_details']['alarm_uuid']
+                    alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
+                    if alarm:
+                        lcm_client = LcmClient()
+                        log.info("Sending scaling action message: %s", json.dumps(alarm))
+                        lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.action)
+            except Exception:
+                log.exception("Error consuming message: ")
+
+    def _get_alarm_configs(self, message_content):
+        scaling_criterias = message_content['scaling_group_descriptor']['scaling_policy']['scaling_criteria']
+        alarm_configs = []
+        for criteria in scaling_criterias:
+            metric_name = ''
+            scale_out_threshold = criteria['scale_out_threshold']
+            scale_in_threshold = criteria['scale_in_threshold']
+            scale_out_operation = criteria['scale_out_relational_operation']
+            scale_in_operation = criteria['scale_in_relational_operation']
+            statistic = criteria['monitoring_param']['aggregation_type']
+            vim_uuid = ''
+            resource_uuid = ''
+            if 'vdu_monitoring_param' in criteria['monitoring_param']:
+                vim_uuid = criteria['monitoring_param']['vdu_monitoring_param']['vim_uuid']
+                resource_uuid = criteria['monitoring_param']['vdu_monitoring_param']['resource_id']
+                metric_name = criteria['monitoring_param']['vdu_monitoring_param']['name']
+            if 'vnf_metric' in criteria['monitoring_param']:
+                # TODO vnf_metric
+                continue
+            if 'vdu_metric' in criteria['monitoring_param']:
+                # TODO vdu_metric
+                continue
+            scale_out_alarm_config = AlarmConfig(metric_name,
+                                                 resource_uuid,
+                                                 vim_uuid,
+                                                 scale_out_threshold,
+                                                 scale_out_operation,
+                                                 statistic,
+                                                 'scale_out')
+            scale_in_alarm_config = AlarmConfig(metric_name,
+                                                resource_uuid,
+                                                vim_uuid,
+                                                scale_in_threshold,
+                                                scale_in_operation,
+                                                statistic,
+                                                'scale_in')
+            alarm_configs.append(scale_in_alarm_config)
+            alarm_configs.append(scale_out_alarm_config)
+        return alarm_configs
diff --git a/policy_module/osm_policy_module/core/config.py b/policy_module/osm_policy_module/core/config.py
new file mode 100644 (file)
index 0000000..0ee2a7d
--- /dev/null
@@ -0,0 +1,45 @@
+"""Global Configuration."""
+
+import logging
+
+from osm_policy_module.core.singleton import Singleton
+
+try:
+    from configparser import ConfigParser
+except ImportError:
+    from ConfigParser import ConfigParser
+
+log = logging.getLogger(__name__)
+
+
+@Singleton
+class Config(object):
+    """Global configuration."""
+
+    def __init__(self):
+        # Default config values
+        self.config = {
+            'policy_module': {
+                'kafka_server_host': '127.0.0.1',
+                'kafka_server_port': '9092',
+                'log_dir': 'stdout',
+                'log_level': 'INFO'
+            },
+        }
+
+    def load_file(self, config_file_path):
+        if config_file_path:
+            config_parser = ConfigParser()
+            config_parser.read(config_file_path)
+            for section in config_parser.sections():
+                for key, value in config_parser.items(section):
+                    if section not in self.config:
+                        self.config[section] = {}
+                    self.config[section][key] = value
+
+    def get(self, group, name=None, default=None):
+        if group in self.config:
+            if name is None:
+                return self.config[group]
+            return self.config[group].get(name, default)
+        return default
diff --git a/policy_module/osm_policy_module/core/database.py b/policy_module/osm_policy_module/core/database.py
new file mode 100644 (file)
index 0000000..4fb95e7
--- /dev/null
@@ -0,0 +1,38 @@
+import logging
+
+from peewee import *
+from playhouse.sqlite_ext import SqliteExtDatabase
+
+from osm_policy_module.core.config import Config
+
+log = logging.getLogger(__name__)
+cfg = Config.instance()
+
+db = SqliteExtDatabase('mon.db')
+
+
+class BaseModel(Model):
+    class Meta:
+        database = db
+
+
+class ScalingRecord(BaseModel):
+    nsr_id = CharField()
+    name = CharField()
+    content = TextField()
+
+
+class ScalingAlarm(BaseModel):
+    alarm_id = CharField()
+    action = CharField()
+    scaling_record = ForeignKeyField(ScalingRecord, related_name='scaling_alarms')
+
+
+class DatabaseManager:
+    def create_tables(self):
+        try:
+            db.connect()
+            db.create_tables([ScalingRecord, ScalingAlarm])
+            db.close()
+        except Exception as e:
+            log.exception("Error creating tables: ")
diff --git a/policy_module/osm_policy_module/core/singleton.py b/policy_module/osm_policy_module/core/singleton.py
new file mode 100644 (file)
index 0000000..12cd5a9
--- /dev/null
@@ -0,0 +1,19 @@
+"""Simple singleton class."""
+
+from __future__ import unicode_literals
+
+
+class Singleton(object):
+    """Simple singleton class."""
+
+    def __init__(self, decorated):
+        """Initialize singleton instance."""
+        self._decorated = decorated
+
+    def instance(self):
+        """Return singleton instance."""
+        try:
+            return self._instance
+        except AttributeError:
+            self._instance = self._decorated()
+            return self._instance
diff --git a/policy_module/osm_policy_module/models/configure_scaling.json b/policy_module/osm_policy_module/models/configure_scaling.json
new file mode 100644 (file)
index 0000000..f8479b3
--- /dev/null
@@ -0,0 +1,113 @@
+{
+  "$schema": "http://json-schema.org/schema#",
+  "type": "object",
+  "properties": {
+    "ns_id": {
+      "type": "string"
+    },
+    "scaling_group_descriptor": {
+      "type": "object",
+      "properties": {
+        "name": {
+          "type": "string"
+        },
+        "scaling_policy": {
+          "type": "object",
+          "properties": {
+            "scale_in_operation_type": {
+              "type": "string",
+              "enum": [
+                "and",
+                "or"
+              ]
+            },
+            "scale_out_operation_type": {
+              "type": "string",
+              "enum": [
+                "and",
+                "or"
+              ]
+            },
+            "threshold_time": {
+              "type": "number"
+            },
+            "cooldown_time": {
+              "type": "number"
+            },
+            "scaling_criteria": {
+              "type": "array",
+              "items": {
+                "type": "object",
+                "properties": {
+                  "scale_in_threshold": {
+                    "type": "number"
+                  },
+                  "scale_out_threshold": {
+                    "type": "number"
+                  },
+                  "scale_in_relational_operation": {
+                    "type": "string",
+                    "enum": [
+                      "lt",
+                      "gt",
+                      "le",
+                      "ge",
+                      "eq",
+                      "ne"
+                    ]
+                  },
+                  "scale_out_relational_operation": {
+                    "type": "string",
+                    "enum": [
+                      "lt",
+                      "gt",
+                      "le",
+                      "ge",
+                      "eq",
+                      "ne"
+                    ]
+                  },
+                  "monitoring_param": {
+                    "type": "object",
+                    "properties": {
+                      "id": {
+                        "type": "string"
+                      },
+                      "name": {
+                        "type": "string"
+                      },
+                      "aggregation_type": {
+                        "type": "string",
+                        "enum": [
+                          "avg",
+                          "max",
+                          "min",
+                          "last",
+                          "sum"
+                        ]
+                      },
+                      "vdu_monitoring_param": {
+                        "type": "object",
+                        "properties": {
+                          "vim_uuid": {
+                            "type": "string"
+                          },
+                          "resource_id": {
+                            "type": "string"
+                          },
+                          "name": {
+                            "type": "string"
+                          }
+                        }
+                      }
+                    }
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+}
diff --git a/policy_module/osm_policy_module/tests/__init__.py b/policy_module/osm_policy_module/tests/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/policy_module/osm_policy_module/tests/examples/configure_scaling_full_example.json b/policy_module/osm_policy_module/tests/examples/configure_scaling_full_example.json
new file mode 100644 (file)
index 0000000..a37dffe
--- /dev/null
@@ -0,0 +1,30 @@
+{
+  "ns_id": "360b400b-86dc-4b8e-a139-b7fc3987cf69",
+  "scaling_group_descriptor": {
+    "name": "test",
+    "scaling_policy": {
+      "scale_in_operation_type": "or",
+      "scale_out_operation_type": "or",
+      "threshold_time": 10,
+      "cooldown_time": 10,
+      "scaling_criteria": [
+        {
+          "scale_in_threshold": 50,
+          "scale_out_threshold": 50,
+          "scale_in_relational_operation": "lt",
+          "scale_out_relational_operation": "gt",
+          "monitoring_param": {
+            "id": "test_param_id",
+            "name": "test_param",
+            "aggregation_type": "avg",
+            "vdu_monitoring_param": {
+              "vim_uuid": "vdu_monitoring_param_id",
+              "resource_id": "vdu_monitoring_param_resource_id",
+              "name": "vdu_monitoring_param_name"
+            }
+          }
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git a/policy_module/osm_policy_module/tests/integration/__init__.py b/policy_module/osm_policy_module/tests/integration/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py b/policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py
new file mode 100644 (file)
index 0000000..a444265
--- /dev/null
@@ -0,0 +1,38 @@
+import json
+import logging
+import os
+import unittest
+
+from kafka import KafkaProducer
+
+log = logging.getLogger(__name__)
+
+
+# logging.basicConfig(stream=sys.stdout,
+#                     format='%(asctime)s %(message)s',
+#                     datefmt='%m/%d/%Y %I:%M:%S %p',
+#                     level=logging.DEBUG)
+
+class ScalingConfigTest(unittest.TestCase):
+    def test_send_scaling_config_msg(self):
+        try:
+            with open(
+                    os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')) as file:
+                payload = json.load(file)
+                kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"),
+                                              os.getenv("KAFKA_SERVER_PORT", "9092"))
+                producer = KafkaProducer(bootstrap_servers=kafka_server,
+                                         key_serializer=str.encode,
+                                         value_serializer=str.encode)
+                future = producer.send('lcm_pm', json.dumps(payload), key="configure_scaling")
+                result = future.get(timeout=60)
+                log.info('Result: %s', result)
+
+                producer.flush()
+                self.assertIsNotNone(result)
+        except Exception as e:
+            self.fail(e)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/policy_module/osm_policy_module/tests/test_examples.py b/policy_module/osm_policy_module/tests/test_examples.py
new file mode 100644 (file)
index 0000000..b644fe4
--- /dev/null
@@ -0,0 +1,20 @@
+import json
+import unittest
+
+import os
+
+from jsonschema import validate
+
+
+class ExamplesTest(unittest.TestCase):
+    def test_examples_schema(self):
+        # TODO: Test that valid examples correspond to schema.
+        # This forces the modification of the examples in case of schema changes.
+        example_file_path = os.path.join(os.path.dirname(__file__), './examples/configure_scaling_full_example.json')
+        schema_file_path = os.path.join(os.path.dirname(__file__), '../models/configure_scaling.json')
+        with open(example_file_path) as example_file, open(schema_file_path) as schema_file:
+            validate(json.load(example_file), json.load(schema_file))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/policy_module/osm_policy_module/tests/test_policy_config_agent.py b/policy_module/osm_policy_module/tests/test_policy_config_agent.py
new file mode 100644 (file)
index 0000000..4334388
--- /dev/null
@@ -0,0 +1,21 @@
+import json
+import os
+import unittest
+
+from osm_policy_module.core.agent import PolicyModuleAgent
+
+
+class PolicyAgentTest(unittest.TestCase):
+    def setUp(self):
+        self.agent = PolicyModuleAgent()
+
+    def test_get_alarm_configs(self):
+        with open(os.path.join(os.path.dirname(__file__), './examples/configure_scaling_full_example.json')) as file:
+            example = json.load(file)
+            alarm_configs = self.agent._get_alarm_configs(example)
+            # TODO Improve assertions
+            self.assertEqual(len(alarm_configs), 2)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/policy_module/requirements.txt b/policy_module/requirements.txt
new file mode 100644 (file)
index 0000000..50e30ab
--- /dev/null
@@ -0,0 +1,3 @@
+kafka==1.3.*
+peewee==3.1.*
+jsonschema==2.6.*
\ No newline at end of file
diff --git a/policy_module/setup.py b/policy_module/setup.py
new file mode 100644 (file)
index 0000000..db04d03
--- /dev/null
@@ -0,0 +1,39 @@
+import setuptools
+
+
+def parse_requirements(requirements):
+    with open(requirements) as f:
+        return [l.strip('\n') for l in f if l.strip('\n') and not l.startswith('#')]
+
+
+_author = "Benjamín Díaz"
+_name = 'osm_policy_module'
+_author_email = 'bdiaz@whitestack.com'
+_version = '1.0'
+_description = 'OSM Policy Module'
+_maintainer = 'Benjamín Díaz'
+_maintainer_email = 'bdiaz@whitestack.com'
+_license = 'Apache 2.0'
+_url = 'https://osm.etsi.org/gitweb/?p=osm/MON.git;a=tree'
+
+setuptools.setup(
+    name=_name,
+    version=_version,
+    description=_description,
+    long_description=open('README.rst').read(),
+    author=_author,
+    author_email=_author_email,
+    maintainer=_maintainer,
+    maintainer_email=_maintainer_email,
+    url=_url,
+    license=_license,
+    packages=setuptools.find_packages(),
+    include_package_data=True,
+    install_requires=parse_requirements('requirements.txt'),
+    entry_points={
+        "console_scripts": [
+            "pm-dbsync = osm_policy_module.cmd.dbsync:main",
+            "pm-agent = osm_policy_module.cmd.policy_module_agent:main",
+        ]
+    }
+)