1 from connection
import Connection
8 from jsmin
import jsmin
9 sys
.path
.append("../../test/core/")
10 from test_producer
import KafkaProducer
11 from kafka
import KafkaConsumer
16 import boto
.ec2
.cloudwatch
17 import boto
.ec2
.connection
19 exit("Boto not avialable. Try activating your virtualenv OR `pip install boto`")
21 #--------------------------------------------------------------------------------------------------------------------------------------
23 # Test Producer object to generate request
25 producer
= KafkaProducer('')
27 connections
= obj
.setEnvironment()
28 connections_res
= obj
.connection_instance()
29 cloudwatch_conn
= connections_res
['cloudwatch_connection']
31 # Consumer Object to consume response from message bus
32 server
= {'server': 'localhost:9092', 'topic': 'metric_request'}
33 _consumer
= KafkaConsumer(bootstrap_servers
=server
['server'])
34 _consumer
.subscribe(['metric_response'])
36 #--------------------------------------------------------------------------------------------------------------------------------------
38 '''Test E2E Flow : Test cases has been tested one at a time.
39 1) Commom Request is generated using request function in test_producer.py(/core/message-bus)
40 2) The request is then consumed by the comsumer (plugin)
41 3) The response is sent back on the message bus in plugin_metrics.py using
42 response functions in producer.py(/core/message-bus)
43 4) The response is then again consumed by the unit_tests_metrics.py
44 and the test cases has been applied on the response.
46 class test_create_metrics(unittest
.TestCase
):
48 def test_status_positive(self
):
50 # To generate Request of testing valid meric_name in create metrics requests
51 producer
.request("create_metrics/create_metric_req_valid.json",'create_metric_request', '','metric_request')
53 for message
in _consumer
:
54 if message
.key
== "create_metric_response":
55 resp
= json
.loads(json
.loads(json
.loads(message
.value
)))
57 self
.assertTrue(resp
['metric_create_response']['status'])
58 self
.assertEqual(resp
['metric_create_response']['metric_uuid'],0)
61 def test_status_negative(self
):
63 # To generate Request of testing invalid meric_name in create metrics requests
64 producer
.request("create_metrics/create_metric_req_invalid.json",'create_metric_request', '','metric_request')
66 for message
in _consumer
:
67 if message
.key
== "create_metric_response":
68 resp
= json
.loads(json
.loads(json
.loads(message
.value
)))
70 self
.assertFalse(resp
['metric_create_response']['status'])
71 self
.assertEqual(resp
['metric_create_response']['metric_uuid'],None)
74 class test_metrics_data(unittest
.TestCase
):
76 def test_met_name_positive(self
):
78 # To generate Request of testing valid meric_name in read_metric_data_request
79 producer
.request("read_metrics_data/read_metric_name_req_valid.json",'read_metric_data_request', '','metric_request')
80 for message
in _consumer
:
81 if message
.key
== "read_metric_data_response":
82 resp
= json
.loads(json
.loads(json
.loads(message
.value
)))
84 self
.assertEqual(type(resp
['metrics_data']),dict)
87 def test_met_name_negative(self
):
89 # To generate Request of testing invalid meric_name in read_metric_data_request
90 producer
.request("read_metrics_data/read_metric_name_req_invalid.json",'read_metric_data_request', '','metric_request')
91 for message
in _consumer
:
92 if message
.key
== "read_metric_data_response":
93 resp
= json
.loads(json
.loads(json
.loads(message
.value
)))
95 self
.assertFalse(resp
['metrics_data'])
98 def test_coll_period_positive(self
):
99 # To generate Request of testing valid collection_period in read_metric_data_request
100 # For AWS metric_data_stats collection period should be a multiple of 60
102 producer
.request("read_metrics_data/read_coll_period_req_valid.json",'read_metric_data_request', '','metric_request')
103 for message
in _consumer
:
104 if message
.key
== "read_metric_data_response":
105 resp
= json
.loads(json
.loads(json
.loads(message
.value
)))
107 self
.assertEqual(type(resp
),dict)
110 def test_coll_period_negative(self
):
112 # To generate Request of testing invalid collection_period in read_metric_data_request
113 producer
.request("read_metrics_data/read_coll_period_req_invalid.json",'read_metric_data_request', '','metric_request')
114 for message
in _consumer
:
115 if message
.key
== "read_metric_data_response":
116 resp
= json
.loads(json
.loads(json
.loads(message
.value
)))
118 self
.assertFalse(resp
['metrics_data'])
121 class test_update_metrics(unittest
.TestCase
):
123 def test_upd_status_positive(self
):
125 # To generate Request of testing valid meric_name in update metrics requests
126 producer
.request("update_metrics/update_metric_req_valid.json",'update_metric_request', '','metric_request')
127 for message
in _consumer
:
128 if message
.key
== "update_metric_response":
129 resp
= json
.loads(json
.loads(json
.loads(message
.value
)))
131 self
.assertTrue(resp
['metric_update_response']['status'])
132 self
.assertEqual(resp
['metric_update_response']['metric_uuid'],0)
135 def test_upd_status_negative(self
):
137 # To generate Request of testing invalid meric_name in update metrics requests
138 producer
.request("update_metrics/update_metric_req_invalid.json",'update_metric_request', '','metric_request')
139 for message
in _consumer
:
140 if message
.key
== "update_metric_response":
141 resp
= json
.loads(json
.loads(json
.loads(message
.value
)))
143 self
.assertFalse(resp
['metric_update_response']['status'])
144 self
.assertEqual(resp
['metric_update_response']['metric_uuid'],None)
147 class test_delete_metrics(unittest
.TestCase
):
149 def test_del_met_name_positive(self
):
151 # To generate Request of testing valid meric_name in delete metrics requests
152 producer
.request("delete_metrics/delete_metric_req_valid.json",'delete_metric_request', '','metric_request')
153 for message
in _consumer
:
154 if message
.key
== "delete_metric_response":
155 resp
= json
.loads(json
.loads(json
.loads(message
.value
)))
157 self
.assertFalse(resp
['status'])
160 def test_del_met_name_negative(self
):
162 # To generate Request of testing invalid meric_name in delete metrics requests
163 producer
.request("delete_metrics/delete_metric_req_invalid.json",'delete_metric_request', '','metric_request')
164 for message
in _consumer
:
165 if message
.key
== "delete_metric_response":
166 resp
= json
.loads(json
.loads(json
.loads(message
.value
)))
168 self
.assertFalse(resp
)
171 class test_list_metrics(unittest
.TestCase
):
173 def test_list_met_name_positive(self
):
175 # To generate Request of testing valid meric_name in list metrics requests
176 producer
.request("list_metrics/list_metric_req_valid.json",'list_metric_request', '','metric_request')
177 for message
in _consumer
:
178 if message
.key
== "list_metrics_response":
179 resp
= json
.loads(json
.loads(json
.loads(message
.value
)))
181 self
.assertEqual(type(resp
['metrics_list']),list)
184 def test_list_met_name_negitive(self
):
186 # To generate Request of testing invalid meric_name in list metrics requests
187 producer
.request("list_metrics/list_metric_req_invalid.json",'list_metric_request', '','metric_request')
188 for message
in _consumer
:
189 if message
.key
== "list_metrics_response":
190 resp
= json
.loads(json
.loads(json
.loads(message
.value
)))
192 self
.assertFalse(resp
['metrics_list'])
196 if __name__
== '__main__':
198 # Saving test reults in Log file
200 log_file
= 'log_file.txt'
201 f
= open(log_file
, "w")
202 runner
= unittest
.TextTestRunner(f
)
203 unittest
.main(testRunner
=runner
)
206 # For printing results on Console