X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Ftest%2FCloudWatch%2Funit_tests_alarms.py;fp=osm_mon%2Ftest%2FCloudWatch%2Funit_tests_alarms.py;h=0000000000000000000000000000000000000000;hb=4da146638bc3838270fa41c9f9fb91961f726c97;hp=ae036cf48b13bb8153d026aa9bec1b00b949d4a5;hpb=2aec92e1eb52d5512c2acae9ce9878f2f3c8f782;p=osm%2FMON.git diff --git a/osm_mon/test/CloudWatch/unit_tests_alarms.py b/osm_mon/test/CloudWatch/unit_tests_alarms.py deleted file mode 100644 index ae036cf..0000000 --- a/osm_mon/test/CloudWatch/unit_tests_alarms.py +++ /dev/null @@ -1,408 +0,0 @@ -from connection import Connection -import unittest -import sys -import jsmin -import json -import os -import time -from jsmin import jsmin -sys.path.append("../../test/core/") -from test_producer import KafkaProducer -from kafka import KafkaConsumer -try: - import boto - import boto.ec2 - import boto.vpc - import boto.ec2.cloudwatch - import boto.ec2.connection -except: - exit("Boto not avialable. Try activating your virtualenv OR `pip install boto`") - -#-------------------------------------------------------------------------------------------------------------------------------------- - -# Test Producer object to generate request - -producer = KafkaProducer('create_alarm_request') -obj = Connection() -connections = obj.setEnvironment() -connections_res = obj.connection_instance() -cloudwatch_conn = connections_res['cloudwatch_connection'] - -#-------------------------------------------------------------------------------------------------------------------------------------- - -'''Test E2E Flow : Test cases has been tested one at a time. -1) Commom Request is generated using request function in test_producer.py(/test/core) -2) The request is then consumed by the comsumer (plugin) -3) The response is sent back on the message bus in plugin_alarm.py using - response functions in producer.py(/core/message-bus) -4) The response is then again consumed by the unit_tests_alarms.py - and the test cases has been applied on the response. -''' - -class config_alarm_name_test(unittest.TestCase): - - - def setUp(self): - pass - #To generate a request of testing new alarm name and new instance id in create alarm request - def test_differentName_differentInstance(self): - time.sleep(2) - producer.request("test_schemas/create_alarm/create_alarm_differentName_differentInstance.json",'create_alarm_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "create_alarm_response": - info = json.loads(json.loads(message.value)) - print(info) - time.sleep(1) - self.assertTrue(info['alarm_create_response']['status']) - return - - #To generate a request of testing new alarm name and existing instance id in create alarm request - def test_differentName_sameInstance(self): - time.sleep(2) - producer.request("test_schemas/create_alarm/create_alarm_differentName_sameInstance.json",'create_alarm_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "create_alarm_response": - info = json.loads(json.loads(message.value)) - print(info) - time.sleep(1) - producer.request("test_schemas/delete_alarm/name_valid_delete1.json",'delete_alarm_request','','alarm_request') - self.assertTrue(info['alarm_create_response']['status']) - return - - #To generate a request of testing existing alarm name and new instance id in create alarm request - def test_sameName_differentInstance(self): - time.sleep(2) - producer.request("test_schemas/create_alarm/create_alarm_sameName_differentInstance.json",'create_alarm_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "create_alarm_response": - info = json.loads(json.loads(message.value)) - print(info) - time.sleep(1) - producer.request("test_schemas/delete_alarm/name_valid_delete2.json",'delete_alarm_request', '','alarm_request') - self.assertTrue(info['alarm_create_response']['status']) - return - - #To generate a request of testing existing alarm name and existing instance id in create alarm request - def test_sameName_sameInstance(self): - time.sleep(2) - producer.request("test_schemas/create_alarm/create_alarm_sameName_sameInstance.json",'create_alarm_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "create_alarm_response": - info = json.loads(json.loads(message.value)) - print(info, "---") - time.sleep(1) - producer.request("test_schemas/delete_alarm/name_valid.json",'delete_alarm_request', '','alarm_request') - self.assertEqual(info, None) - return - - #To generate a request of testing valid statistics in create alarm request - def test_statisticValid(self): - time.sleep(2) - producer.request("test_schemas/create_alarm/statistic_valid.json",'create_alarm_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "create_alarm_response": - info = json.loads(json.loads(message.value)) - print(info) - time.sleep(1) - producer.request("test_schemas/delete_alarm/name_valid_delete3.json",'delete_alarm_request', '','alarm_request') - self.assertTrue(info['alarm_create_response']['status']) - return - - #To generate a request of testing Invalid statistics in create alarm request - def test_statisticValidNot(self): - time.sleep(2) - producer.request("test_schemas/create_alarm/statistic_invalid.json",'create_alarm_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "create_alarm_response": - info = json.loads(json.loads(message.value)) - print(info, "---") - time.sleep(1) - producer.request("test_schemas/delete_alarm/name_valid_delete3.json",'delete_alarm_request', '','alarm_request') - self.assertEqual(info, None) - return - - #To generate a request of testing valid operation in create alarm request - def test_operationValid(self): - time.sleep(2) - producer.request("test_schemas/create_alarm/operation_valid.json",'create_alarm_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "create_alarm_response": - info = json.loads(json.loads(message.value)) - print(info) - time.sleep(1) - producer.request("test_schemas/delete_alarm/name_valid_delete3.json",'delete_alarm_request', '','alarm_request') - self.assertTrue(info['alarm_create_response']['status']) - return - - #To generate a request of testing Invalid operation in create alarm request - def test_operationValidNot(self): - time.sleep(2) - producer.request("test_schemas/create_alarm/operation_invalid.json",'create_alarm_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "create_alarm_response": - info = json.loads(json.loads(message.value)) - print(info) - time.sleep(1) - self.assertEqual(info,None) - return - - -#-------------------------------------------------------------------------------------------------------------------------------------- -class update_alarm_name_test(unittest.TestCase): - - #To generate a request of testing valid alarm_id in update alarm request - def test_nameValid(self): - producer.request("test_schemas/update_alarm/update_alarm_new_alarm.json",'create_alarm_request', '','alarm_request') - time.sleep(2) - producer.request("test_schemas/update_alarm/name_valid.json",'update_alarm_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "update_alarm_response": - info = json.loads(json.loads(json.loads(message.value))) - print(info) - time.sleep(1) - producer.request("test_schemas/delete_alarm/name_valid_delete4.json",'delete_alarm_request', '','alarm_request') - self.assertTrue(info['alarm_update_response']['status']) - return - - #To generate a request of testing invalid alarm_id in update alarm request - def test_nameInvalid(self): - time.sleep(2) - producer.request("test_schemas/update_alarm/name_invalid.json",'update_alarm_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "update_alarm_response": - info = json.loads(json.loads(json.loads(message.value))) - print(info) - time.sleep(1) - self.assertEqual(info,None) - return - - #To generate a request of testing valid statistics in update alarm request - def test_statisticValid(self): - producer.request("test_schemas/create_alarm/create_alarm_differentName_differentInstance.json",'create_alarm_request', '','alarm_request') - time.sleep(2) - producer.request("test_schemas/update_alarm/statistic_valid.json",'update_alarm_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "update_alarm_response": - info = json.loads(json.loads(json.loads(message.value))) - print(info) - time.sleep(1) - producer.request("test_schemas/delete_alarm/name_valid.json",'delete_alarm_request', '','alarm_request') - self.assertTrue(info['alarm_update_response']['status']) - return - - #To generate a request of testing Invalid statistics in update alarm request - def test_statisticInvalid(self): - time.sleep(2) - producer.request("test_schemas/update_alarm/statistic_invalid.json",'update_alarm_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "update_alarm_response": - info = json.loads(json.loads(json.loads(message.value))) - print(info) - time.sleep(1) - self.assertEqual(info,None) - return - - #To generate a request of testing valid operation in update alarm request - def test_operationValid(self): - producer.request("test_schemas/create_alarm/create_alarm_differentName_differentInstance.json",'create_alarm_request', '','alarm_request') - time.sleep(2) - producer.request("test_schemas/update_alarm/operation_valid.json",'update_alarm_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "update_alarm_response": - info = json.loads(json.loads(json.loads(message.value))) - print(info) - time.sleep(1) - producer.request("test_schemas/delete_alarm/name_valid.json",'delete_alarm_request', '','alarm_request') - self.assertTrue(info['alarm_update_response']['status']) - return - -#-------------------------------------------------------------------------------------------------------------------------------------- -class delete_alarm_test(unittest.TestCase): - - #To generate a request of testing valid alarm_id in delete alarm request - def test_nameValid(self): - producer.request("test_schemas/create_alarm/create_alarm_differentName_differentInstance.json",'create_alarm_request', '','alarm_request') - time.sleep(2) - producer.request("test_schemas/delete_alarm/name_valid.json",'delete_alarm_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "delete_alarm_response": - info = json.loads(json.loads(json.loads(message.value))) - print(info) - time.sleep(1) - self.assertTrue(info['alarm_deletion_response']['status']) - return - - #To generate a request of testing Invalid alarm_id in delete alarm request - def test_nameInvalid(self): - time.sleep(2) - producer.request("test_schemas/delete_alarm/name_invalid.json",'delete_alarm_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "delete_alarm_response": - info = json.loads(json.loads(json.loads(message.value))) - print(info) - time.sleep(1) - self.assertEqual(info,None) - return - -#-------------------------------------------------------------------------------------------------------------------------------------- -class list_alarm_test(unittest.TestCase): - - #To generate a request of testing valid input fields in alarm list request - def test_valid_no_arguments(self): - time.sleep(2) - producer.request("test_schemas/list_alarm/list_alarm_valid_no_arguments.json",'alarm_list_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "list_alarm_response": - info = json.loads(json.loads(json.loads(message.value))) - print(info) - time.sleep(1) - self.assertEqual(type(info),dict) - return - - #To generate a request of testing valid input fields in alarm list request - def test_valid_one_arguments(self): - time.sleep(2) - producer.request("test_schemas/list_alarm/list_alarm_valid_one_arguments.json",'alarm_list_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "list_alarm_response": - info = json.loads(json.loads(json.loads(message.value))) - print(info) - time.sleep(1) - self.assertEqual(type(info),dict) - return - - #To generate a request of testing valid input fields in alarm list request - def test_valid_two_arguments(self): - time.sleep(2) - producer.request("test_schemas/list_alarm/list_alarm_valid_two_arguments.json",'alarm_list_request', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "list_alarm_response": - info = json.loads(json.loads(json.loads(message.value))) - print(info) - time.sleep(1) - self.assertEqual(type(info),dict) - return - - -#-------------------------------------------------------------------------------------------------------------------------------------- -class alarm_details_test(unittest.TestCase): - - #To generate a request of testing valid input fields in acknowledge alarm - def test_Valid(self): - time.sleep(2) - producer.request("test_schemas/alarm_details/acknowledge_alarm.json",'acknowledge_alarm', '','alarm_request') - server = {'server': 'localhost:9092', 'topic': 'alarm_request'} - - _consumer = KafkaConsumer(bootstrap_servers=server['server']) - _consumer.subscribe(['alarm_response']) - - for message in _consumer: - if message.key == "notify_alarm": - info = json.loads(json.loads(json.loads(message.value))) - print(info) - time.sleep(1) - self.assertEqual(type(info),dict) - return - -if __name__ == '__main__': - - # Saving test reults in Log file - - log_file = 'log_file.txt' - f = open(log_file, "w") - runner = unittest.TextTestRunner(f) - unittest.main(testRunner=runner) - f.close() - - # For printing results on Console - # unittest.main()