Fixes bugs for integration with MON module 22/5922/1
authordiazb <bdiaz@whitestack.com>
Thu, 29 Mar 2018 20:23:48 +0000 (17:23 -0300)
committerdiazb <bdiaz@whitestack.com>
Thu, 29 Mar 2018 20:23:48 +0000 (17:23 -0300)
Signed-off-by: diazb <bdiaz@whitestack.com>
18 files changed:
policy_module/README.rst
policy_module/osm_policy_module/cmd/dbsync.py [deleted file]
policy_module/osm_policy_module/cmd/policy_module_agent.py
policy_module/osm_policy_module/common/alarm_config.py
policy_module/osm_policy_module/common/lcm_client.py
policy_module/osm_policy_module/common/mon_client.py
policy_module/osm_policy_module/core/agent.py
policy_module/osm_policy_module/core/database.py
policy_module/osm_policy_module/models/configure_scaling.json
policy_module/osm_policy_module/tests/examples/configure_scaling_full_example.json
policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py
policy_module/osm_policy_module/tests/test_examples.py [deleted file]
policy_module/osm_policy_module/tests/test_policy_config_agent.py [deleted file]
policy_module/osm_policy_module/tests/unit/__init__.py [new file with mode: 0644]
policy_module/osm_policy_module/tests/unit/test_examples.py [new file with mode: 0644]
policy_module/osm_policy_module/tests/unit/test_policy_config_agent.py [new file with mode: 0644]
policy_module/requirements.txt
policy_module/setup.py

index e69de29..5cb2fde 100644 (file)
@@ -0,0 +1,14 @@
+Install
+------------------------
+    ::
+
+        git clone https://osm.etsi.org/gerrit/osm/MON.git
+        cd MON/policy_module
+        pip install .
+
+Run
+------------------------
+    ::
+
+        osm-policy-agent
+
diff --git a/policy_module/osm_policy_module/cmd/dbsync.py b/policy_module/osm_policy_module/cmd/dbsync.py
deleted file mode 100644 (file)
index 25ef1a6..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-import argparse
-import logging
-import sys
-
-from osm_policy_module.core.config import Config
-
-from osm_policy_module.core.database import DatabaseManager
-
-
-def main():
-    cfg = Config.instance()
-    parser = argparse.ArgumentParser(prog='pm-dbsync')
-    parser.add_argument('--config-file', nargs='?', help='Policy module database sync configuration file')
-    args = parser.parse_args()
-    if args.config_file:
-        cfg.load_file(args.config_file)
-    if cfg.get('policy_module', 'log_dir') == 'stdout':
-        logging.basicConfig(stream=sys.stdout,
-                            format='%(asctime)s %(message)s',
-                            datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
-                            level=logging.INFO)
-    else:
-        logging.basicConfig(filename=cfg.get('policy_module', 'log_dir') + 'pm_dbsync.log',
-                            format='%(asctime)s %(message)s',
-                            datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
-                            level=logging.INFO)
-    log = logging.getLogger(__name__)
-    log.info("Starting database sync...")
-    db_manager = DatabaseManager()
-    db_manager.create_tables()
index 7116913..3fd42db 100644 (file)
@@ -2,9 +2,9 @@ import argparse
 import logging
 import sys
 
-from osm_policy_module.core.config import Config
-
 from osm_policy_module.core.agent import PolicyModuleAgent
+from osm_policy_module.core.config import Config
+from osm_policy_module.core.database import DatabaseManager
 
 
 def main():
@@ -14,17 +14,25 @@ def main():
     args = parser.parse_args()
     if args.config_file:
         cfg.load_file(args.config_file)
+    # TODO: Handle different log levels in config
     if cfg.get('policy_module', 'log_dir') == 'stdout':
         logging.basicConfig(stream=sys.stdout,
                             format='%(asctime)s %(message)s',
                             datefmt='%m/%d/%Y %I:%M:%S %p',
-                            level=logging._nameToLevel[cfg.get('policy_module', 'log_level')])
+                            level=logging.INFO)
     else:
         logging.basicConfig(filename=cfg.get('policy_module', 'log_dir') + 'pm_agent.log',
                             format='%(asctime)s %(message)s',
                             datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a',
-                            level=logging._nameToLevel[cfg.get('policy_module', 'log_level')])
+                            level=logging.INFO)
     log = logging.getLogger(__name__)
+    log.info("Syncing database...")
+    db_manager = DatabaseManager()
+    db_manager.create_tables()
     log.info("Starting policy module agent...")
     agent = PolicyModuleAgent()
     agent.run()
+
+
+if __name__ == '__main__':
+    main()
index 78d36d0..2b77d47 100644 (file)
@@ -1,9 +1,9 @@
 class AlarmConfig:
     def __init__(self, metric_name, resource_uuid, vim_uuid, threshold, operation, statistic, action):
-        self.metric_name = metric_name,
-        self.resource_uuid = resource_uuid,
-        self.vim_uuid = vim_uuid,
-        self.threshold = threshold,
-        self.operation = operation,
-        self.statistic = statistic,
+        self.metric_name = metric_name
+        self.resource_uuid = resource_uuid
+        self.vim_uuid = vim_uuid
+        self.threshold = threshold
+        self.operation = operation
+        self.statistic = statistic
         self.action = action
index 99e3ffb..710b59a 100644 (file)
@@ -1,25 +1,27 @@
 import json
+import logging
 
 from kafka import KafkaProducer
 
 from osm_policy_module.core.config import Config
 
+log = logging.getLogger(__name__)
+
 
 class LcmClient:
     def __init__(self):
         cfg = Config.instance()
-        self.kafka_server = {
-            'server': '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'),
-                                     cfg.get('policy_module', 'kafka_server_port'))}
+        self.kafka_server = '{}:{}'.format(cfg.get('policy_module', 'kafka_server_host'),
+                                           cfg.get('policy_module', 'kafka_server_port'))
         self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
                                       key_serializer=str.encode,
-                                      value_serializer=lambda v: json.dumps(v).encode('utf-8'))
+                                      value_serializer=str.encode)
 
     def scale(self, nsr_id, name, action):
         msg = self._create_scale_action_payload(nsr_id, name, action)
-        self.producer.send(topic='alarm_request', key='create_alarm_request', value=msg)
+        log.info("Sending scale action message: %s", json.dumps(msg))
+        self.producer.send(topic='lcm_pm', key='trigger_scaling', value=json.dumps(msg))
         self.producer.flush()
-        pass
 
     def _create_scale_action_payload(self, nsr_id, name, action):
         msg = {
index 19b440e..f03b3c3 100644 (file)
@@ -17,31 +17,31 @@ class MonClient:
                                            cfg.get('policy_module', 'kafka_server_port'))
         self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
                                       key_serializer=str.encode,
-                                      value_serializer=lambda v: json.dumps(v).encode('utf-8'))
+                                      value_serializer=str.encode)
 
     def create_alarm(self, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation):
         cor_id = random.randint(1, 1000000)
         msg = self._create_alarm_payload(cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation)
-        self.producer.send(topic='alarm_request', key='create_alarm_request', value=msg)
-        self.producer.flush()
-        consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, consumer_timeout_ms=10000)
+        log.info("Sending create_alarm_request %s", msg)
+        future = self.producer.send(topic='alarm_request', key='create_alarm_request', value=json.dumps(msg))
+        future.get(timeout=60)
+        consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
+                                 key_deserializer=bytes.decode,
+                                 value_deserializer=bytes.decode)
         consumer.subscribe(['alarm_response'])
-        alarm_uuid = None
         for message in consumer:
             if message.key == 'create_alarm_response':
-                content = json.load(message.value)
+                content = json.loads(message.value)
+                log.info("Received create_alarm_response %s", content)
                 if self._is_alarm_response_correlation_id_eq(cor_id, content):
                     alarm_uuid = content['alarm_create_response']['alarm_uuid']
                     # TODO Handle error response
-                    break
-        consumer.close()
-        if not alarm_uuid:
-            raise ValueError(
-                'Timeout: No alarm creation response from MON. Are it\'s IP and port correctly configured?')
-        return alarm_uuid
+                    return alarm_uuid
+
+        raise ValueError('Timeout: No alarm creation response from MON. Is MON up?')
 
     def _create_alarm_payload(self, cor_id, metric_name, resource_uuid, vim_uuid, threshold, statistic, operation):
-        create_alarm_request = {
+        alarm_create_request = {
             'correlation_id': cor_id,
             'alarm_name': str(uuid.uuid4()),
             'metric_name': metric_name,
@@ -52,7 +52,7 @@ class MonClient:
             'statistic': statistic
         }
         msg = {
-            'create_alarm_request': create_alarm_request,
+            'alarm_create_request': alarm_create_request,
             'vim_uuid': vim_uuid
         }
         return msg
index c329743..b4f9c4d 100644 (file)
@@ -21,15 +21,15 @@ class PolicyModuleAgent:
 
         # Initialize Kafka consumer
         log.info("Connecting to Kafka server at %s", kafka_server)
+        # TODO: Add logic to handle deduplication of messages when using group_id.
+        # See: https://stackoverflow.com/a/29836412
         consumer = KafkaConsumer(bootstrap_servers=kafka_server,
                                  key_deserializer=bytes.decode,
-                                 value_deserializer=bytes.decode,
-                                 group_id="policy-module-agent")
+                                 value_deserializer=bytes.decode)
         consumer.subscribe(['lcm_pm', 'alarm_response'])
 
         for message in consumer:
             log.info("Message arrived: %s", message)
-            log.info("Message key: %s", message.key)
             try:
                 if message.key == 'configure_scaling':
                     content = json.loads(message.value)
@@ -67,7 +67,7 @@ class PolicyModuleAgent:
                     alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
                     if alarm:
                         lcm_client = LcmClient()
-                        log.info("Sending scaling action message: %s", json.dumps(alarm))
+                        log.info("Sending scaling action message for ns: %s", alarm_id)
                         lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.action)
             except Exception:
                 log.exception("Error consuming message: ")
index 4fb95e7..f7a90a9 100644 (file)
@@ -8,7 +8,7 @@ from osm_policy_module.core.config import Config
 log = logging.getLogger(__name__)
 cfg = Config.instance()
 
-db = SqliteExtDatabase('mon.db')
+db = SqliteExtDatabase('policy_module.db')
 
 
 class BaseModel(Model):
index f8479b3..72e7963 100644 (file)
@@ -52,8 +52,7 @@
                       "gt",
                       "le",
                       "ge",
-                      "eq",
-                      "ne"
+                      "eq"
                     ]
                   },
                   "scale_out_relational_operation": {
@@ -63,8 +62,7 @@
                       "gt",
                       "le",
                       "ge",
-                      "eq",
-                      "ne"
+                      "eq"
                     ]
                   },
                   "monitoring_param": {
                       "aggregation_type": {
                         "type": "string",
                         "enum": [
-                          "avg",
-                          "max",
-                          "min",
-                          "last",
+                          "average",
+                          "maximum",
+                          "minimum",
+                          "count",
                           "sum"
                         ]
                       },
index a37dffe..be14fb4 100644 (file)
           "monitoring_param": {
             "id": "test_param_id",
             "name": "test_param",
-            "aggregation_type": "avg",
+            "aggregation_type": "average",
             "vdu_monitoring_param": {
-              "vim_uuid": "vdu_monitoring_param_id",
-              "resource_id": "vdu_monitoring_param_resource_id",
-              "name": "vdu_monitoring_param_name"
+              "vim_uuid": "1",
+              "resource_id": "2d8d5355-acf7-42be-9f34-a10d02f9df39",
+              "name": "cpu_utilization"
             }
           }
         }
index a444265..486afc8 100644 (file)
@@ -3,32 +3,37 @@ import logging
 import os
 import unittest
 
-from kafka import KafkaProducer
+from kafka import KafkaProducer, KafkaConsumer
+from kafka.errors import KafkaError
 
 log = logging.getLogger(__name__)
 
 
-# logging.basicConfig(stream=sys.stdout,
-#                     format='%(asctime)s %(message)s',
-#                     datefmt='%m/%d/%Y %I:%M:%S %p',
-#                     level=logging.DEBUG)
-
 class ScalingConfigTest(unittest.TestCase):
+    def setUp(self):
+        try:
+            kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"),
+                                          os.getenv("KAFKA_SERVER_PORT", "9092"))
+            self.producer = KafkaProducer(bootstrap_servers=kafka_server,
+                                          key_serializer=str.encode,
+                                          value_serializer=str.encode)
+            self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
+                                          group_id='osm_mon')
+            self.consumer.subscribe(['lcm_pm'])
+        except KafkaError:
+            self.skipTest('Kafka server not present.')
+
     def test_send_scaling_config_msg(self):
         try:
             with open(
                     os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')) as file:
                 payload = json.load(file)
-                kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"),
-                                              os.getenv("KAFKA_SERVER_PORT", "9092"))
-                producer = KafkaProducer(bootstrap_servers=kafka_server,
-                                         key_serializer=str.encode,
-                                         value_serializer=str.encode)
-                future = producer.send('lcm_pm', json.dumps(payload), key="configure_scaling")
+                future = self.producer.send('lcm_pm', json.dumps(payload), key="configure_scaling")
                 result = future.get(timeout=60)
                 log.info('Result: %s', result)
 
-                producer.flush()
+                self.producer.flush()
+                # TODO: Improve assertions
                 self.assertIsNotNone(result)
         except Exception as e:
             self.fail(e)
diff --git a/policy_module/osm_policy_module/tests/test_examples.py b/policy_module/osm_policy_module/tests/test_examples.py
deleted file mode 100644 (file)
index b644fe4..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-import json
-import unittest
-
-import os
-
-from jsonschema import validate
-
-
-class ExamplesTest(unittest.TestCase):
-    def test_examples_schema(self):
-        # TODO: Test that valid examples correspond to schema.
-        # This forces the modification of the examples in case of schema changes.
-        example_file_path = os.path.join(os.path.dirname(__file__), './examples/configure_scaling_full_example.json')
-        schema_file_path = os.path.join(os.path.dirname(__file__), '../models/configure_scaling.json')
-        with open(example_file_path) as example_file, open(schema_file_path) as schema_file:
-            validate(json.load(example_file), json.load(schema_file))
-
-
-if __name__ == '__main__':
-    unittest.main()
diff --git a/policy_module/osm_policy_module/tests/test_policy_config_agent.py b/policy_module/osm_policy_module/tests/test_policy_config_agent.py
deleted file mode 100644 (file)
index 4334388..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-import json
-import os
-import unittest
-
-from osm_policy_module.core.agent import PolicyModuleAgent
-
-
-class PolicyAgentTest(unittest.TestCase):
-    def setUp(self):
-        self.agent = PolicyModuleAgent()
-
-    def test_get_alarm_configs(self):
-        with open(os.path.join(os.path.dirname(__file__), './examples/configure_scaling_full_example.json')) as file:
-            example = json.load(file)
-            alarm_configs = self.agent._get_alarm_configs(example)
-            # TODO Improve assertions
-            self.assertEqual(len(alarm_configs), 2)
-
-
-if __name__ == '__main__':
-    unittest.main()
diff --git a/policy_module/osm_policy_module/tests/unit/__init__.py b/policy_module/osm_policy_module/tests/unit/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/policy_module/osm_policy_module/tests/unit/test_examples.py b/policy_module/osm_policy_module/tests/unit/test_examples.py
new file mode 100644 (file)
index 0000000..935982f
--- /dev/null
@@ -0,0 +1,18 @@
+import json
+import unittest
+
+import os
+
+from jsonschema import validate
+
+
+class ExamplesTest(unittest.TestCase):
+    def test_examples_schema(self):
+        example_file_path = os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')
+        schema_file_path = os.path.join(os.path.dirname(__file__), '../../models/configure_scaling.json')
+        with open(example_file_path) as example_file, open(schema_file_path) as schema_file:
+            validate(json.load(example_file), json.load(schema_file))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/policy_module/osm_policy_module/tests/unit/test_policy_config_agent.py b/policy_module/osm_policy_module/tests/unit/test_policy_config_agent.py
new file mode 100644 (file)
index 0000000..7618682
--- /dev/null
@@ -0,0 +1,21 @@
+import json
+import os
+import unittest
+
+from osm_policy_module.core.agent import PolicyModuleAgent
+
+
+class PolicyAgentTest(unittest.TestCase):
+    def setUp(self):
+        self.agent = PolicyModuleAgent()
+
+    def test_get_alarm_configs(self):
+        with open(os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')) as file:
+            example = json.load(file)
+            alarm_configs = self.agent._get_alarm_configs(example)
+            # TODO Improve assertions
+            self.assertEqual(len(alarm_configs), 2)
+
+
+if __name__ == '__main__':
+    unittest.main()
index 50e30ab..fbdc2a8 100644 (file)
@@ -1,3 +1,4 @@
 kafka==1.3.*
 peewee==3.1.*
-jsonschema==2.6.*
\ No newline at end of file
+jsonschema==2.6.*
+six
\ No newline at end of file
index db04d03..ea9d38c 100644 (file)
@@ -32,8 +32,7 @@ setuptools.setup(
     install_requires=parse_requirements('requirements.txt'),
     entry_points={
         "console_scripts": [
-            "pm-dbsync = osm_policy_module.cmd.dbsync:main",
-            "pm-agent = osm_policy_module.cmd.policy_module_agent:main",
+            "osm-policy-agent = osm_policy_module.cmd.policy_module_agent:main",
         ]
     }
 )