Revert "Removes POL code from MON repo"
[osm/MON.git] / policy_module / osm_policy_module / tests / integration / test_scaling_config_kafka_msg.py
diff --git a/policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py b/policy_module/osm_policy_module/tests/integration/test_scaling_config_kafka_msg.py
new file mode 100644 (file)
index 0000000..aea3f4a
--- /dev/null
@@ -0,0 +1,66 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import json
+import logging
+import os
+import unittest
+
+from kafka import KafkaProducer, KafkaConsumer
+from kafka.errors import KafkaError
+
+log = logging.getLogger(__name__)
+
+
+class ScalingConfigTest(unittest.TestCase):
+    def setUp(self):
+        try:
+            kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"),
+                                          os.getenv("KAFKA_SERVER_PORT", "9092"))
+            self.producer = KafkaProducer(bootstrap_servers=kafka_server,
+                                          key_serializer=str.encode,
+                                          value_serializer=str.encode)
+            self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
+                                          group_id='osm_mon')
+            self.consumer.subscribe(['lcm_pm'])
+        except KafkaError:
+            self.skipTest('Kafka server not present.')
+
+    def test_send_scaling_config_msg(self):
+        try:
+            with open(
+                    os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')) as file:
+                payload = json.load(file)
+                future = self.producer.send('lcm_pm', json.dumps(payload), key="configure_scaling")
+                result = future.get(timeout=60)
+                log.info('Result: %s', result)
+
+                self.producer.flush()
+                # TODO: Improve assertions
+                self.assertIsNotNone(result)
+        except Exception as e:
+            self.fail(e)
+
+
+if __name__ == '__main__':
+    unittest.main()