X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Ftest%2Fplugins%2FCloudWatch%2Funit_tests_alarms.py;fp=osm_mon%2Ftest%2Fplugins%2FCloudWatch%2Funit_tests_alarms.py;h=ae036cf48b13bb8153d026aa9bec1b00b949d4a5;hb=4da146638bc3838270fa41c9f9fb91961f726c97;hp=0000000000000000000000000000000000000000;hpb=2aec92e1eb52d5512c2acae9ce9878f2f3c8f782;p=osm%2FMON.git diff --git a/osm_mon/test/plugins/CloudWatch/unit_tests_alarms.py b/osm_mon/test/plugins/CloudWatch/unit_tests_alarms.py new file mode 100644 index 0000000..ae036cf --- /dev/null +++ b/osm_mon/test/plugins/CloudWatch/unit_tests_alarms.py @@ -0,0 +1,408 @@ +from connection import Connection +import unittest +import sys +import jsmin +import json +import os +import time +from jsmin import jsmin +sys.path.append("../../test/core/") +from test_producer import KafkaProducer +from kafka import KafkaConsumer +try: + import boto + import boto.ec2 + import boto.vpc + import boto.ec2.cloudwatch + import boto.ec2.connection +except: + exit("Boto not avialable. Try activating your virtualenv OR `pip install boto`") + +#-------------------------------------------------------------------------------------------------------------------------------------- + +# Test Producer object to generate request + +producer = KafkaProducer('create_alarm_request') +obj = Connection() +connections = obj.setEnvironment() +connections_res = obj.connection_instance() +cloudwatch_conn = connections_res['cloudwatch_connection'] + +#-------------------------------------------------------------------------------------------------------------------------------------- + +'''Test E2E Flow : Test cases has been tested one at a time. +1) Commom Request is generated using request function in test_producer.py(/test/core) +2) The request is then consumed by the comsumer (plugin) +3) The response is sent back on the message bus in plugin_alarm.py using + response functions in producer.py(/core/message-bus) +4) The response is then again consumed by the unit_tests_alarms.py + and the test cases has been applied on the response. +''' + +class config_alarm_name_test(unittest.TestCase): + + + def setUp(self): + pass + #To generate a request of testing new alarm name and new instance id in create alarm request + def test_differentName_differentInstance(self): + time.sleep(2) + producer.request("test_schemas/create_alarm/create_alarm_differentName_differentInstance.json",'create_alarm_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "create_alarm_response": + info = json.loads(json.loads(message.value)) + print(info) + time.sleep(1) + self.assertTrue(info['alarm_create_response']['status']) + return + + #To generate a request of testing new alarm name and existing instance id in create alarm request + def test_differentName_sameInstance(self): + time.sleep(2) + producer.request("test_schemas/create_alarm/create_alarm_differentName_sameInstance.json",'create_alarm_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "create_alarm_response": + info = json.loads(json.loads(message.value)) + print(info) + time.sleep(1) + producer.request("test_schemas/delete_alarm/name_valid_delete1.json",'delete_alarm_request','','alarm_request') + self.assertTrue(info['alarm_create_response']['status']) + return + + #To generate a request of testing existing alarm name and new instance id in create alarm request + def test_sameName_differentInstance(self): + time.sleep(2) + producer.request("test_schemas/create_alarm/create_alarm_sameName_differentInstance.json",'create_alarm_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "create_alarm_response": + info = json.loads(json.loads(message.value)) + print(info) + time.sleep(1) + producer.request("test_schemas/delete_alarm/name_valid_delete2.json",'delete_alarm_request', '','alarm_request') + self.assertTrue(info['alarm_create_response']['status']) + return + + #To generate a request of testing existing alarm name and existing instance id in create alarm request + def test_sameName_sameInstance(self): + time.sleep(2) + producer.request("test_schemas/create_alarm/create_alarm_sameName_sameInstance.json",'create_alarm_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "create_alarm_response": + info = json.loads(json.loads(message.value)) + print(info, "---") + time.sleep(1) + producer.request("test_schemas/delete_alarm/name_valid.json",'delete_alarm_request', '','alarm_request') + self.assertEqual(info, None) + return + + #To generate a request of testing valid statistics in create alarm request + def test_statisticValid(self): + time.sleep(2) + producer.request("test_schemas/create_alarm/statistic_valid.json",'create_alarm_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "create_alarm_response": + info = json.loads(json.loads(message.value)) + print(info) + time.sleep(1) + producer.request("test_schemas/delete_alarm/name_valid_delete3.json",'delete_alarm_request', '','alarm_request') + self.assertTrue(info['alarm_create_response']['status']) + return + + #To generate a request of testing Invalid statistics in create alarm request + def test_statisticValidNot(self): + time.sleep(2) + producer.request("test_schemas/create_alarm/statistic_invalid.json",'create_alarm_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "create_alarm_response": + info = json.loads(json.loads(message.value)) + print(info, "---") + time.sleep(1) + producer.request("test_schemas/delete_alarm/name_valid_delete3.json",'delete_alarm_request', '','alarm_request') + self.assertEqual(info, None) + return + + #To generate a request of testing valid operation in create alarm request + def test_operationValid(self): + time.sleep(2) + producer.request("test_schemas/create_alarm/operation_valid.json",'create_alarm_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "create_alarm_response": + info = json.loads(json.loads(message.value)) + print(info) + time.sleep(1) + producer.request("test_schemas/delete_alarm/name_valid_delete3.json",'delete_alarm_request', '','alarm_request') + self.assertTrue(info['alarm_create_response']['status']) + return + + #To generate a request of testing Invalid operation in create alarm request + def test_operationValidNot(self): + time.sleep(2) + producer.request("test_schemas/create_alarm/operation_invalid.json",'create_alarm_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "create_alarm_response": + info = json.loads(json.loads(message.value)) + print(info) + time.sleep(1) + self.assertEqual(info,None) + return + + +#-------------------------------------------------------------------------------------------------------------------------------------- +class update_alarm_name_test(unittest.TestCase): + + #To generate a request of testing valid alarm_id in update alarm request + def test_nameValid(self): + producer.request("test_schemas/update_alarm/update_alarm_new_alarm.json",'create_alarm_request', '','alarm_request') + time.sleep(2) + producer.request("test_schemas/update_alarm/name_valid.json",'update_alarm_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "update_alarm_response": + info = json.loads(json.loads(json.loads(message.value))) + print(info) + time.sleep(1) + producer.request("test_schemas/delete_alarm/name_valid_delete4.json",'delete_alarm_request', '','alarm_request') + self.assertTrue(info['alarm_update_response']['status']) + return + + #To generate a request of testing invalid alarm_id in update alarm request + def test_nameInvalid(self): + time.sleep(2) + producer.request("test_schemas/update_alarm/name_invalid.json",'update_alarm_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "update_alarm_response": + info = json.loads(json.loads(json.loads(message.value))) + print(info) + time.sleep(1) + self.assertEqual(info,None) + return + + #To generate a request of testing valid statistics in update alarm request + def test_statisticValid(self): + producer.request("test_schemas/create_alarm/create_alarm_differentName_differentInstance.json",'create_alarm_request', '','alarm_request') + time.sleep(2) + producer.request("test_schemas/update_alarm/statistic_valid.json",'update_alarm_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "update_alarm_response": + info = json.loads(json.loads(json.loads(message.value))) + print(info) + time.sleep(1) + producer.request("test_schemas/delete_alarm/name_valid.json",'delete_alarm_request', '','alarm_request') + self.assertTrue(info['alarm_update_response']['status']) + return + + #To generate a request of testing Invalid statistics in update alarm request + def test_statisticInvalid(self): + time.sleep(2) + producer.request("test_schemas/update_alarm/statistic_invalid.json",'update_alarm_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "update_alarm_response": + info = json.loads(json.loads(json.loads(message.value))) + print(info) + time.sleep(1) + self.assertEqual(info,None) + return + + #To generate a request of testing valid operation in update alarm request + def test_operationValid(self): + producer.request("test_schemas/create_alarm/create_alarm_differentName_differentInstance.json",'create_alarm_request', '','alarm_request') + time.sleep(2) + producer.request("test_schemas/update_alarm/operation_valid.json",'update_alarm_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "update_alarm_response": + info = json.loads(json.loads(json.loads(message.value))) + print(info) + time.sleep(1) + producer.request("test_schemas/delete_alarm/name_valid.json",'delete_alarm_request', '','alarm_request') + self.assertTrue(info['alarm_update_response']['status']) + return + +#-------------------------------------------------------------------------------------------------------------------------------------- +class delete_alarm_test(unittest.TestCase): + + #To generate a request of testing valid alarm_id in delete alarm request + def test_nameValid(self): + producer.request("test_schemas/create_alarm/create_alarm_differentName_differentInstance.json",'create_alarm_request', '','alarm_request') + time.sleep(2) + producer.request("test_schemas/delete_alarm/name_valid.json",'delete_alarm_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "delete_alarm_response": + info = json.loads(json.loads(json.loads(message.value))) + print(info) + time.sleep(1) + self.assertTrue(info['alarm_deletion_response']['status']) + return + + #To generate a request of testing Invalid alarm_id in delete alarm request + def test_nameInvalid(self): + time.sleep(2) + producer.request("test_schemas/delete_alarm/name_invalid.json",'delete_alarm_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "delete_alarm_response": + info = json.loads(json.loads(json.loads(message.value))) + print(info) + time.sleep(1) + self.assertEqual(info,None) + return + +#-------------------------------------------------------------------------------------------------------------------------------------- +class list_alarm_test(unittest.TestCase): + + #To generate a request of testing valid input fields in alarm list request + def test_valid_no_arguments(self): + time.sleep(2) + producer.request("test_schemas/list_alarm/list_alarm_valid_no_arguments.json",'alarm_list_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "list_alarm_response": + info = json.loads(json.loads(json.loads(message.value))) + print(info) + time.sleep(1) + self.assertEqual(type(info),dict) + return + + #To generate a request of testing valid input fields in alarm list request + def test_valid_one_arguments(self): + time.sleep(2) + producer.request("test_schemas/list_alarm/list_alarm_valid_one_arguments.json",'alarm_list_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "list_alarm_response": + info = json.loads(json.loads(json.loads(message.value))) + print(info) + time.sleep(1) + self.assertEqual(type(info),dict) + return + + #To generate a request of testing valid input fields in alarm list request + def test_valid_two_arguments(self): + time.sleep(2) + producer.request("test_schemas/list_alarm/list_alarm_valid_two_arguments.json",'alarm_list_request', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "list_alarm_response": + info = json.loads(json.loads(json.loads(message.value))) + print(info) + time.sleep(1) + self.assertEqual(type(info),dict) + return + + +#-------------------------------------------------------------------------------------------------------------------------------------- +class alarm_details_test(unittest.TestCase): + + #To generate a request of testing valid input fields in acknowledge alarm + def test_Valid(self): + time.sleep(2) + producer.request("test_schemas/alarm_details/acknowledge_alarm.json",'acknowledge_alarm', '','alarm_request') + server = {'server': 'localhost:9092', 'topic': 'alarm_request'} + + _consumer = KafkaConsumer(bootstrap_servers=server['server']) + _consumer.subscribe(['alarm_response']) + + for message in _consumer: + if message.key == "notify_alarm": + info = json.loads(json.loads(json.loads(message.value))) + print(info) + time.sleep(1) + self.assertEqual(type(info),dict) + return + +if __name__ == '__main__': + + # Saving test reults in Log file + + log_file = 'log_file.txt' + f = open(log_file, "w") + runner = unittest.TextTestRunner(f) + unittest.main(testRunner=runner) + f.close() + + # For printing results on Console + # unittest.main()