Readds plugins code and respective tests
[osm/MON.git] / osm_mon / tests / plugins / CloudWatch / unit_tests_metrics.py
diff --git a/osm_mon/tests/plugins/CloudWatch/unit_tests_metrics.py b/osm_mon/tests/plugins/CloudWatch/unit_tests_metrics.py
new file mode 100644 (file)
index 0000000..625e872
--- /dev/null
@@ -0,0 +1,208 @@
+from connection import Connection
+import unittest
+import sys
+import jsmin
+import json
+import os
+import time
+from jsmin import jsmin
+sys.path.append("../../test/core/")
+from test_producer import KafkaProducer
+from kafka import KafkaConsumer
+try:
+    import boto
+    import boto.ec2
+    import boto.vpc
+    import boto.ec2.cloudwatch
+    import boto.ec2.connection
+except:
+    exit("Boto not avialable. Try activating your virtualenv OR `pip install boto`")
+
+#--------------------------------------------------------------------------------------------------------------------------------------
+
+# Test Producer object to generate request
+
+producer = KafkaProducer('')
+obj = Connection() 
+connections = obj.setEnvironment()
+connections_res = obj.connection_instance()
+cloudwatch_conn = connections_res['cloudwatch_connection'] 
+
+# Consumer Object to consume response from message bus
+server = {'server': 'localhost:9092', 'topic': 'metric_request'}
+_consumer = KafkaConsumer(bootstrap_servers=server['server'])
+_consumer.subscribe(['metric_response'])
+
+#--------------------------------------------------------------------------------------------------------------------------------------
+
+'''Test E2E Flow : Test cases has been tested one at a time.
+1) Commom Request is generated using request function in test_producer.py(/core/message-bus)
+2) The request is then consumed by the comsumer (plugin)
+3) The response is sent back on the message bus in plugin_metrics.py using
+   response functions in producer.py(/core/message-bus)
+4) The response is then again consumed by the unit_tests_metrics.py
+   and the test cases has been applied on the response.
+'''
+class test_create_metrics(unittest.TestCase):
+
+    def test_status_positive(self):
+        time.sleep(2)
+        # To generate Request of testing valid meric_name in create metrics requests
+        producer.request("create_metrics/create_metric_req_valid.json",'create_metric_request', '','metric_request')  
+
+        for message in _consumer:
+            if message.key == "create_metric_response": 
+                resp = json.loads(json.loads(json.loads(message.value)))
+                time.sleep(1)
+                self.assertTrue(resp['metric_create_response']['status'])
+                self.assertEqual(resp['metric_create_response']['metric_uuid'],0)
+                return 
+
+    def test_status_negative(self):
+        time.sleep(2)
+        # To generate Request of testing invalid meric_name in create metrics requests
+        producer.request("create_metrics/create_metric_req_invalid.json",'create_metric_request', '','metric_request')  
+
+        for message in _consumer:
+            if message.key == "create_metric_response": 
+                resp = json.loads(json.loads(json.loads(message.value)))
+                time.sleep(1)
+                self.assertFalse(resp['metric_create_response']['status'])
+                self.assertEqual(resp['metric_create_response']['metric_uuid'],None)
+                return 
+
+class test_metrics_data(unittest.TestCase):
+
+    def test_met_name_positive(self):
+        time.sleep(2)
+        # To generate Request of testing valid meric_name in read_metric_data_request
+        producer.request("read_metrics_data/read_metric_name_req_valid.json",'read_metric_data_request', '','metric_request')  
+        for message in _consumer:
+            if message.key == "read_metric_data_response": 
+                resp = json.loads(json.loads(json.loads(message.value)))
+                time.sleep(1)
+                self.assertEqual(type(resp['metrics_data']),dict)
+                return 
+
+    def test_met_name_negative(self):
+        time.sleep(2)
+        # To generate Request of testing invalid meric_name in read_metric_data_request
+        producer.request("read_metrics_data/read_metric_name_req_invalid.json",'read_metric_data_request', '','metric_request')  
+        for message in _consumer:
+            if message.key == "read_metric_data_response": 
+                resp = json.loads(json.loads(json.loads(message.value)))
+                time.sleep(1)
+                self.assertFalse(resp['metrics_data'])
+                return 
+
+    def test_coll_period_positive(self):
+        # To generate Request of testing valid collection_period in read_metric_data_request
+        # For AWS metric_data_stats collection period should be a multiple of 60
+        time.sleep(2)
+        producer.request("read_metrics_data/read_coll_period_req_valid.json",'read_metric_data_request', '','metric_request')  
+        for message in _consumer:
+            if message.key == "read_metric_data_response": 
+                resp = json.loads(json.loads(json.loads(message.value)))
+                time.sleep(1)
+                self.assertEqual(type(resp),dict)
+                return
+
+    def test_coll_period_negative(self):
+        time.sleep(2)
+        # To generate Request of testing invalid collection_period in read_metric_data_request
+        producer.request("read_metrics_data/read_coll_period_req_invalid.json",'read_metric_data_request', '','metric_request')  
+        for message in _consumer:
+            if message.key == "read_metric_data_response": 
+                resp = json.loads(json.loads(json.loads(message.value)))
+                time.sleep(1)
+                self.assertFalse(resp['metrics_data'])
+                return
+
+class test_update_metrics(unittest.TestCase):
+
+    def test_upd_status_positive(self):
+        time.sleep(2)
+        # To generate Request of testing valid meric_name in update metrics requests
+        producer.request("update_metrics/update_metric_req_valid.json",'update_metric_request', '','metric_request')  
+        for message in _consumer:
+            if message.key == "update_metric_response": 
+                resp = json.loads(json.loads(json.loads(message.value)))
+                time.sleep(1)
+                self.assertTrue(resp['metric_update_response']['status'])
+                self.assertEqual(resp['metric_update_response']['metric_uuid'],0)
+                return
+
+    def test_upd_status_negative(self):
+        time.sleep(2)
+        # To generate Request of testing invalid meric_name in update metrics requests
+        producer.request("update_metrics/update_metric_req_invalid.json",'update_metric_request', '','metric_request')  
+        for message in _consumer:
+            if message.key == "update_metric_response": 
+                resp = json.loads(json.loads(json.loads(message.value)))
+                time.sleep(1)
+                self.assertFalse(resp['metric_update_response']['status'])
+                self.assertEqual(resp['metric_update_response']['metric_uuid'],None)
+                return
+
+class test_delete_metrics(unittest.TestCase):
+
+    def test_del_met_name_positive(self):
+        time.sleep(2)
+        # To generate Request of testing valid meric_name in delete metrics requests
+        producer.request("delete_metrics/delete_metric_req_valid.json",'delete_metric_request', '','metric_request')  
+        for message in _consumer:
+            if message.key == "delete_metric_response": 
+                resp = json.loads(json.loads(json.loads(message.value)))
+                time.sleep(1)
+                self.assertFalse(resp['status'])
+                return
+
+    def test_del_met_name_negative(self):
+        time.sleep(2)
+        # To generate Request of testing invalid meric_name in delete metrics requests
+        producer.request("delete_metrics/delete_metric_req_invalid.json",'delete_metric_request', '','metric_request')  
+        for message in _consumer:
+            if message.key == "delete_metric_response": 
+                resp = json.loads(json.loads(json.loads(message.value)))
+                time.sleep(1)
+                self.assertFalse(resp)
+                return
+
+class test_list_metrics(unittest.TestCase):
+
+    def test_list_met_name_positive(self):
+        time.sleep(2)
+        # To generate Request of testing valid meric_name in list metrics requests
+        producer.request("list_metrics/list_metric_req_valid.json",'list_metric_request', '','metric_request')  
+        for message in _consumer:
+            if message.key == "list_metrics_response": 
+                resp = json.loads(json.loads(json.loads(message.value)))
+                time.sleep(1)
+                self.assertEqual(type(resp['metrics_list']),list)
+                return
+
+    def test_list_met_name_negitive(self):
+        time.sleep(2)
+        # To generate Request of testing invalid meric_name in list metrics requests
+        producer.request("list_metrics/list_metric_req_invalid.json",'list_metric_request', '','metric_request')  
+        for message in _consumer:
+            if message.key == "list_metrics_response": 
+                resp = json.loads(json.loads(json.loads(message.value)))
+                time.sleep(1)
+                self.assertFalse(resp['metrics_list'])
+                return
+
+
+if __name__ == '__main__':
+
+    # Saving test reults in Log file
+
+    log_file = 'log_file.txt'
+    f = open(log_file, "w")
+    runner = unittest.TextTestRunner(f)
+    unittest.main(testRunner=runner)
+    f.close()
+
+    # For printing results on Console
+    # unittest.main()
+