+++ /dev/null
-from connection import Connection
-import unittest
-import sys
-import jsmin
-import json
-import os
-import time
-from jsmin import jsmin
-sys.path.append("../../test/core/")
-from test_producer import KafkaProducer
-from kafka import KafkaConsumer
-try:
- import boto
- import boto.ec2
- import boto.vpc
- import boto.ec2.cloudwatch
- import boto.ec2.connection
-except:
- exit("Boto not avialable. Try activating your virtualenv OR `pip install boto`")
-
-#--------------------------------------------------------------------------------------------------------------------------------------
-
-# Test Producer object to generate request
-
-producer = KafkaProducer('')
-obj = Connection()
-connections = obj.setEnvironment()
-connections_res = obj.connection_instance()
-cloudwatch_conn = connections_res['cloudwatch_connection']
-
-# Consumer Object to consume response from message bus
-server = {'server': 'localhost:9092', 'topic': 'metric_request'}
-_consumer = KafkaConsumer(bootstrap_servers=server['server'])
-_consumer.subscribe(['metric_response'])
-
-#--------------------------------------------------------------------------------------------------------------------------------------
-
-'''Test E2E Flow : Test cases has been tested one at a time.
-1) Commom Request is generated using request function in test_producer.py(/core/message-bus)
-2) The request is then consumed by the comsumer (plugin)
-3) The response is sent back on the message bus in plugin_metrics.py using
- response functions in producer.py(/core/message-bus)
-4) The response is then again consumed by the unit_tests_metrics.py
- and the test cases has been applied on the response.
-'''
-class test_create_metrics(unittest.TestCase):
-
- def test_status_positive(self):
- time.sleep(2)
- # To generate Request of testing valid meric_name in create metrics requests
- producer.request("create_metrics/create_metric_req_valid.json",'create_metric_request', '','metric_request')
-
- for message in _consumer:
- if message.key == "create_metric_response":
- resp = json.loads(json.loads(json.loads(message.value)))
- time.sleep(1)
- self.assertTrue(resp['metric_create_response']['status'])
- self.assertEqual(resp['metric_create_response']['metric_uuid'],0)
- return
-
- def test_status_negative(self):
- time.sleep(2)
- # To generate Request of testing invalid meric_name in create metrics requests
- producer.request("create_metrics/create_metric_req_invalid.json",'create_metric_request', '','metric_request')
-
- for message in _consumer:
- if message.key == "create_metric_response":
- resp = json.loads(json.loads(json.loads(message.value)))
- time.sleep(1)
- self.assertFalse(resp['metric_create_response']['status'])
- self.assertEqual(resp['metric_create_response']['metric_uuid'],None)
- return
-
-class test_metrics_data(unittest.TestCase):
-
- def test_met_name_positive(self):
- time.sleep(2)
- # To generate Request of testing valid meric_name in read_metric_data_request
- producer.request("read_metrics_data/read_metric_name_req_valid.json",'read_metric_data_request', '','metric_request')
- for message in _consumer:
- if message.key == "read_metric_data_response":
- resp = json.loads(json.loads(json.loads(message.value)))
- time.sleep(1)
- self.assertEqual(type(resp['metrics_data']),dict)
- return
-
- def test_met_name_negative(self):
- time.sleep(2)
- # To generate Request of testing invalid meric_name in read_metric_data_request
- producer.request("read_metrics_data/read_metric_name_req_invalid.json",'read_metric_data_request', '','metric_request')
- for message in _consumer:
- if message.key == "read_metric_data_response":
- resp = json.loads(json.loads(json.loads(message.value)))
- time.sleep(1)
- self.assertFalse(resp['metrics_data'])
- return
-
- def test_coll_period_positive(self):
- # To generate Request of testing valid collection_period in read_metric_data_request
- # For AWS metric_data_stats collection period should be a multiple of 60
- time.sleep(2)
- producer.request("read_metrics_data/read_coll_period_req_valid.json",'read_metric_data_request', '','metric_request')
- for message in _consumer:
- if message.key == "read_metric_data_response":
- resp = json.loads(json.loads(json.loads(message.value)))
- time.sleep(1)
- self.assertEqual(type(resp),dict)
- return
-
- def test_coll_period_negative(self):
- time.sleep(2)
- # To generate Request of testing invalid collection_period in read_metric_data_request
- producer.request("read_metrics_data/read_coll_period_req_invalid.json",'read_metric_data_request', '','metric_request')
- for message in _consumer:
- if message.key == "read_metric_data_response":
- resp = json.loads(json.loads(json.loads(message.value)))
- time.sleep(1)
- self.assertFalse(resp['metrics_data'])
- return
-
-class test_update_metrics(unittest.TestCase):
-
- def test_upd_status_positive(self):
- time.sleep(2)
- # To generate Request of testing valid meric_name in update metrics requests
- producer.request("update_metrics/update_metric_req_valid.json",'update_metric_request', '','metric_request')
- for message in _consumer:
- if message.key == "update_metric_response":
- resp = json.loads(json.loads(json.loads(message.value)))
- time.sleep(1)
- self.assertTrue(resp['metric_update_response']['status'])
- self.assertEqual(resp['metric_update_response']['metric_uuid'],0)
- return
-
- def test_upd_status_negative(self):
- time.sleep(2)
- # To generate Request of testing invalid meric_name in update metrics requests
- producer.request("update_metrics/update_metric_req_invalid.json",'update_metric_request', '','metric_request')
- for message in _consumer:
- if message.key == "update_metric_response":
- resp = json.loads(json.loads(json.loads(message.value)))
- time.sleep(1)
- self.assertFalse(resp['metric_update_response']['status'])
- self.assertEqual(resp['metric_update_response']['metric_uuid'],None)
- return
-
-class test_delete_metrics(unittest.TestCase):
-
- def test_del_met_name_positive(self):
- time.sleep(2)
- # To generate Request of testing valid meric_name in delete metrics requests
- producer.request("delete_metrics/delete_metric_req_valid.json",'delete_metric_request', '','metric_request')
- for message in _consumer:
- if message.key == "delete_metric_response":
- resp = json.loads(json.loads(json.loads(message.value)))
- time.sleep(1)
- self.assertFalse(resp['status'])
- return
-
- def test_del_met_name_negative(self):
- time.sleep(2)
- # To generate Request of testing invalid meric_name in delete metrics requests
- producer.request("delete_metrics/delete_metric_req_invalid.json",'delete_metric_request', '','metric_request')
- for message in _consumer:
- if message.key == "delete_metric_response":
- resp = json.loads(json.loads(json.loads(message.value)))
- time.sleep(1)
- self.assertFalse(resp)
- return
-
-class test_list_metrics(unittest.TestCase):
-
- def test_list_met_name_positive(self):
- time.sleep(2)
- # To generate Request of testing valid meric_name in list metrics requests
- producer.request("list_metrics/list_metric_req_valid.json",'list_metric_request', '','metric_request')
- for message in _consumer:
- if message.key == "list_metrics_response":
- resp = json.loads(json.loads(json.loads(message.value)))
- time.sleep(1)
- self.assertEqual(type(resp['metrics_list']),list)
- return
-
- def test_list_met_name_negitive(self):
- time.sleep(2)
- # To generate Request of testing invalid meric_name in list metrics requests
- producer.request("list_metrics/list_metric_req_invalid.json",'list_metric_request', '','metric_request')
- for message in _consumer:
- if message.key == "list_metrics_response":
- resp = json.loads(json.loads(json.loads(message.value)))
- time.sleep(1)
- self.assertFalse(resp['metrics_list'])
- return
-
-
-if __name__ == '__main__':
-
- # Saving test reults in Log file
-
- log_file = 'log_file.txt'
- f = open(log_file, "w")
- runner = unittest.TextTestRunner(f)
- unittest.main(testRunner=runner)
- f.close()
-
- # For printing results on Console
- # unittest.main()
-