--- /dev/null
+# Copyright 2017 Intel Research and Development Ireland Limited
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Intel Corporation
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: prithiv.mohan@intel.com or adrian.hoban@intel.com
+##
+
+'''
+This is a kafka producer app that interacts with the SO and the plugins of the
+datacenters like OpenStack, VMWare, AWS.
+'''
+
+
+
+from kafka import KafkaProducer as kaf
+from kafka.errors import KafkaError
+import logging as log
+import json
+import jsmin
+import os
+from os import listdir
+from jsmin import jsmin
+
+
+
+
+class KafkaProducer(object):
+
+ def __init__(self, topic):
+
+ self._topic= topic
+
+ if "ZOOKEEPER_URI" in os.environ:
+ broker = os.getenv("ZOOKEEPER_URI")
+ else:
+ broker = "localhost:9092"
+
+ '''
+ If the zookeeper broker URI is not set in the env, by default,
+ localhost container is taken as the host because an instance of
+ is already running.
+ '''
+
+ self.producer = kaf(key_serializer=str.encode,
+ value_serializer=lambda v: json.dumps(v).encode('ascii'),
+ bootstrap_servers=broker, api_version=(0,10))
+
+
+
+ def publish(self, key, value, topic):
+ try:
+ future = self.producer.send(key=key, value=value,topic=topic)
+ self.producer.flush()
+ except Exception:
+ log.exception("Error publishing to {} topic." .format(topic))
+ raise
+ try:
+ record_metadata = future.get(timeout=10)
+ #self._log.debug("TOPIC:", record_metadata.topic)
+ #self._log.debug("PARTITION:", record_metadata.partition)
+ #self._log.debug("OFFSET:", record_metadata.offset)
+ except KafkaError:
+ pass
+
+ json_path = os.path.join(os.pardir+"/models/")
+
+ def request(self, path, key, message, topic):
+ #External to MON
+ payload_create_alarm = jsmin(open(os.path.join(path)).read())
+ self.publish(key=key,
+ value = json.loads(payload_create_alarm),
+ topic=topic)
+
+
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "delete_metric_data_request",
+"metric_name": "CPU_UTILIATION",
+"metric_uuid": "",
+"resource_uuid": "i-098da78cbd8304e17",
+"tenant_uuid": "",
+"correlation_uuid": "S0123",
+"vim_type": "AWS"
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "delete_metric_data_request",
+"metric_name": "CPU_UTILIZATION",
+"metric_uuid": "",
+"resource_uuid": "i-098da78cbd8304e17",
+"tenant_uuid": "",
+"correlation_uuid": "S0123",
+"vim_type": "AWS"
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "list_metrics_request",
+"vim_type": "AWS",
+"metrics_list_request":
+{
+"metric_name": "CPU_UTILZATION",
+"correlation_id": "SO123",
+"resource_uuid": "i-098da78cbd8304e17"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "list_metrics_request",
+"vim_type": "AWS",
+"metrics_list_request":
+{
+"metric_name": "CPU_UTILIZATION",
+"correlation_id": "SO123",
+"resource_uuid": "i-098da78cbd8304e17"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "read_metric_data_request",
+"metric_name": "CPU_UTILIZATION",
+"metric_uuid": "0",
+"resource_uuid": "i-098da78cbd8304e17",
+"tenant_uuid": "",
+"correlation_uuid": "SO123",
+"vim_type":"AWS",
+"collection_period":"3500" ,
+"collection_unit": ""
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "read_metric_data_request",
+"metric_name": "CPU_UTILIZATION",
+"metric_uuid": "0",
+"resource_uuid": "i-098da78cbd8304e17",
+"tenant_uuid": "",
+"correlation_uuid": "SO123",
+"vim_type":"AWS",
+"collection_period":"3600" ,
+"collection_unit": ""
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "read_metric_data_request",
+"metric_name": "CPU_UTLIZATION",
+"metric_uuid": "0",
+"resource_uuid": "i-098da78cbd8304e17",
+"tenant_uuid": "",
+"correlation_uuid": "SO123",
+"vim_type":"AWS",
+"collection_period":"3600" ,
+"collection_unit": ""
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "read_metric_data_request",
+"metric_name": "CPU_UTILIZATION",
+"metric_uuid": "0",
+"resource_uuid": "i-098da78cbd8304e17",
+"tenant_uuid": "",
+"correlation_uuid": "SO123",
+"vim_type":"AWS",
+"collection_period":"3600" ,
+"collection_unit": ""
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "create_metrics_request",
+"tenant_uuid": "",
+"correlation_id": "SO123",
+"vim_type": "AWS",
+"metric_create":
+{
+"metric_name": "CPU_UTILIZ",
+"metric_unit": "",
+"resource_uuid": "i-098da78cbd8304e17"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "create_metrics_request",
+"tenant_uuid": "",
+"correlation_id": "SO123",
+"vim_type": "AWS",
+"metric_create":
+{
+"metric_name": "CPU_UTILIZATION",
+"metric_unit": "",
+"resource_uuid": "i-098da78cbd8304e17"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "alarm_ack",
+"vim_type": "AWS",
+"ack_details":
+{
+"alarm_uuid": "CPU_Utilization_i-098da78cbd8304e17",
+"resource_uuid": "i-098da78cbd8304e17",
+"tenant_uuid": ""
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "create_metrics_request",
+"tenant_uuid": "",
+"correlation_id": "SO123",
+"vim_type": "AWS",
+"metric_create":
+{
+"metric_name": "CPU_UTILIZ",
+"metric_unit": "",
+"resource_uuid": "i-098da78cbd8304e17"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "create_metrics_request",
+"tenant_uuid": "",
+"correlation_id": "SO123",
+"vim_type": "AWS",
+"metric_create":
+{
+"metric_name": "CPU_UTILIZATION",
+"metric_unit": "",
+"resource_uuid": "i-098da78cbd8304e17"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "create_alarm_request",
+"vim_type": "AWS",
+"alarm_create_request":
+{
+"correlation_id": "SO123",
+"alarm_name": "CPU_Utilization_Above_Threshold",
+"resource_uuid": "i-098da78cbd8304e17",
+"description": "",
+"severity": "Critical",
+"operation": "GE",
+"threshold_value": 1.5,
+"unit": "",
+"metric_name": "CPU_UTILIZATION",
+"statistic": "MAXIMUM"
+}
+}
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "create_alarm_request",
+"vim_type": "AWS",
+"alarm_create_request":
+{
+"correlation_id": "SO123",
+"alarm_name": "CPU_Utilization_Above_Threshold1",
+"resource_uuid": "i-098da78cbd8304e17",
+"description": "",
+"severity": "Critical",
+"operation": "GE",
+"threshold_value": 1.5,
+"unit": "",
+"metric_name": "CPU_UTILIZATION",
+"statistic": "MAXIMUM"
+}
+}
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "create_alarm_request",
+"vim_type": "AWS",
+"alarm_create_request":
+{
+"correlation_id": "SO123",
+"alarm_name": "CPU_Utilization_Above_Threshold",
+"resource_uuid": "i-09462760703837b26",
+"description": "",
+"severity": "Critical",
+"operation": "GE",
+"threshold_value": 1.5,
+"unit": "",
+"metric_name": "CPU_UTILIZATION",
+"statistic": "MAXIMUM"
+}
+}
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "create_alarm_request",
+"vim_type": "AWS",
+"alarm_create_request":
+{
+"correlation_id": "SO123",
+"alarm_name": "CPU_Utilization_Above_Threshold",
+"resource_uuid": "i-098da78cbd8304e17",
+"description": "",
+"severity": "Critical",
+"operation": "GE",
+"threshold_value": 1.5,
+"unit": "",
+"metric_name": "CPU_UTILIZATION",
+"statistic": "MAXIMUM"
+}
+}
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "create_alarm_request",
+"vim_type": "AWS",
+"alarm_create_request":
+{
+"correlation_id": "SO123",
+"alarm_name": "CPU_Utilization_Above_Threshold2",
+"resource_uuid": "i-098da78cbd8304e17",
+"description": "",
+"severity": "Critical",
+"operation": "Greaterthan",
+"threshold_value": 1.5,
+"unit": "",
+"metric_name": "CPU_UTILIZATION",
+"statistic": "MAXIMUM"
+}
+}
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "create_alarm_request",
+"vim_type": "AWS",
+"alarm_create_request":
+{
+"correlation_id": "SO123",
+"alarm_name": "CPU_Utilization_Above_Threshold2",
+"resource_uuid": "i-098da78cbd8304e17",
+"description": "",
+"severity": "Critical",
+"operation": "GE",
+"threshold_value": 1.5,
+"unit": "",
+"metric_name": "CPU_UTILIZATION",
+"statistic": "MAXIMUM"
+}
+}
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "create_alarm_request",
+"vim_type": "AWS",
+"alarm_create_request":
+{
+"correlation_id": "SO123",
+"alarm_name": "CPU_Utilization_Above_Threshold2",
+"resource_uuid": "i-098da78cbd8304e17",
+"description": "",
+"severity": "Critical",
+"operation": "GE",
+"threshold_value": 1.5,
+"unit": "",
+"metric_name": "CPU_UTILIZATION",
+"statistic": "MAX"
+}
+}
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "create_alarm_request",
+"vim_type": "AWS",
+"alarm_create_request":
+{
+"correlation_id": "SO123",
+"alarm_name": "CPU_Utilization_Above_Threshold2",
+"resource_uuid": "i-098da78cbd8304e17",
+"description": "",
+"severity": "Critical",
+"operation": "GE",
+"threshold_value": 1.5,
+"unit": "",
+"metric_name": "CPU_UTILIZATION",
+"statistic": "MAXIMUM"
+}
+}
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "delete_alarm_request",
+"vim_type": "AWS",
+"alarm_delete_request":
+{
+"alarm_uuid": "CPU_Utilization_Above_Threshold_i-098da78cbd8304e16",
+"correlation_id": "SO123"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "delete_alarm_request",
+"vim_type": "AWS",
+"alarm_delete_request":
+{
+"alarm_uuid": "CPU_Utilization_Above_Threshold_i-098da78cbd8304e17",
+"correlation_id": "SO123"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "delete_alarm_request",
+"vim_type": "AWS",
+"alarm_delete_request":
+{
+"alarm_uuid": "CPU_Utilization_Above_Threshold1_i-098da78cbd8304e17",
+"correlation_id": "SO123"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "delete_alarm_request",
+"vim_type": "AWS",
+"alarm_delete_request":
+{
+"alarm_uuid": "CPU_Utilization_Above_Threshold_i-09462760703837b26",
+"correlation_id": "SO123"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "delete_alarm_request",
+"vim_type": "AWS",
+"alarm_delete_request":
+{
+"alarm_uuid": "CPU_Utilization_Above_Threshold2_i-098da78cbd8304e17",
+"correlation_id": "SO123"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "delete_alarm_request",
+"vim_type": "AWS",
+"alarm_delete_request":
+{
+"alarm_uuid": "CPU_Utilization_Above_Threshold4_i-098da78cbd8304e17",
+"correlation_id": "SO123"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "list_alarm_request",
+"vim_type": "AWS",
+"alarm_list_request":
+{
+"correlation_id": "SO123",
+"resource_uuid": "",
+"alarm_name": "",
+"severity": ""
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "list_alarm_request",
+"vim_type": "AWS",
+"alarm_list_request":
+{
+"correlation_id": "SO123",
+"resource_uuid": "i-098da78cbd8304e17",
+"alarm_name": "",
+"severity": ""
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "list_alarm_request",
+"vim_type": "AWS",
+"alarm_list_request":
+{
+"correlation_id": "SO123",
+"resource_uuid": "i-098da78cbd8304e17",
+"alarm_name": "",
+"severity": "Critical"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "update_alarm_request",
+"vim_type": "AWS",
+"alarm_update_request":
+{
+"correlation_id": "SO123",
+"alarm_uuid": "CPU_Utilization_Above_Threshold_i-098da78cbd8304e13",
+"description": "",
+"severity": "Critical",
+"operation": "LE",
+"threshold_value": 1.5,
+"unit": "",
+"metric_name": "CPU_UTILIZATION",
+"statistic": "MAXIMUM"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "update_alarm_request",
+"vim_type": "AWS",
+"alarm_update_request":
+{
+"correlation_id": "SO123",
+"alarm_uuid": "CPU_Utilization_Above_Threshold4_i-098da78cbd8304e17",
+"description": "",
+"severity": "Critical",
+"operation": "LE",
+"threshold_value": 1.5,
+"unit": "",
+"metric_name": "CPU_UTILIZATION",
+"statistic": "MAXIMUM"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "update_alarm_request",
+"vim_type": "AWS",
+"alarm_update_request":
+{
+"correlation_id": "SO123",
+"alarm_uuid": "CPU_Utilization_Above_Threshold_i-098da78cbd8304e17",
+"description": "",
+"severity": "Critical",
+"operation": "Less",
+"threshold_value": 1.5,
+"unit": "",
+"metric_name": "CPU_UTILIZATION",
+"statistic": "MAXIMUM"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "update_alarm_request",
+"vim_type": "AWS",
+"alarm_update_request":
+{
+"correlation_id": "SO123",
+"alarm_uuid": "CPU_Utilization_Above_Threshold_i-098da78cbd8304e17",
+"description": "",
+"severity": "Critical",
+"operation": "LE",
+"threshold_value": 1.5,
+"unit": "",
+"metric_name": "CPU_UTILIZATION",
+"statistic": "MAXIMUM"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "update_alarm_request",
+"vim_type": "AWS",
+"alarm_update_request":
+{
+"correlation_id": "SO123",
+"alarm_uuid": "CPU_Utilization_Above_Threshold_i-098da78cbd8304e17",
+"description": "",
+"severity": "Critical",
+"operation": "LE",
+"threshold_value": 1.5,
+"unit": "",
+"metric_name": "CPU_UTILIZATION",
+"statistic": "MAX"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "update_alarm_request",
+"vim_type": "AWS",
+"alarm_update_request":
+{
+"correlation_id": "SO123",
+"alarm_uuid": "CPU_Utilization_Above_Threshold_i-098da78cbd8304e17",
+"description": "",
+"severity": "Critical",
+"operation": "LE",
+"threshold_value": 1.5,
+"unit": "",
+"metric_name": "CPU_UTILIZATION",
+"statistic": "MAXIMUM"
+}
+}
\ No newline at end of file
--- /dev/null
+{
+"schema_version": "1.0",
+"schema_type": "create_alarm_request",
+"vim_type": "AWS",
+"alarm_create_request":
+{
+"correlation_id": "SO123",
+"alarm_name": "CPU_Utilization_Above_Threshold4",
+"resource_uuid": "i-098da78cbd8304e17",
+"description": "",
+"severity": "Critical",
+"operation": "GE",
+"threshold_value": 1.5,
+"unit": "",
+"metric_name": "CPU_UTILIZATION",
+"statistic": "MAXIMUM"
+}
+}
supported=self.check_metric(metric_info['metric_name'])
metric_resp = dict()
+ metric_resp['resource_uuid'] = metric_info['resource_uuid']
+
if supported['status'] == True:
metric_resp['status'] = True
metric_resp['metric_uuid'] = 0
+ log.debug("Metrics Configured Succesfully : %s" , metric_resp)
else:
metric_resp['status'] = False
metric_resp['metric_uuid'] = None
-
- metric_resp['resource_uuid'] = metric_info['resource_uuid']
- log.debug("Metrics Configured Succesfully : %s" , metric_resp)
+ log.error("Metric name is not supported")
+
return metric_resp
except Exception as e:
return False
else:
+ log.error("Metric name is not supported")
return False
except Exception as e:
try:
supported=self.check_metric(metric_info['metric_name'])
update_resp = dict()
+ update_resp['resource_uuid'] = metric_info['resource_uuid']
if supported['status'] == True:
update_resp['status'] = True
update_resp['metric_uuid'] = 0
+ log.debug("Metric Updated : %s", update_resp)
else:
update_resp['status'] = False
update_resp['metric_uuid'] = None
-
- update_resp['resource_uuid'] = metric_info['resource_uuid']
- log.debug("Metric Updated : %s", update_resp)
+ log.error("Metric name is not supported")
+
return update_resp
except Exception as e:
log.info("Metric Deletion Not supported in AWS : %s",del_resp)
return del_resp
else:
+ log.error("Metric name is not supported")
return False
except Exception as e:
return metrics_list
log.debug("Metrics List : %s",metrics_list)
else:
+ log.error("Metric name is not supported")
return False
except Exception as e:
from connection import Connection
from metric_alarms import MetricAlarm
from metrics import Metrics
-# Need to import the producer message bus,not working yet
-#from core.message_bus.producerfunct import KafkaProducer
-sys.path.append("../../core/message-bus")
+sys.path.append("../../core/message_bus")
from producer import KafkaProducer
from kafka import KafkaConsumer
import logging as log
try:
for message in self._consumer:
-
metric_info = json.loads(message.value)
+ print metric_info
metric_response = dict()
if metric_info['vim_type'] == 'AWS':
payload = json.dumps(metric_response)
file = open('../../core/models/create_metric_resp.json','wb').write((payload))
self.producer.create_metrics_resp(key='create_metric_response',message=payload,topic = 'metric_response')
-
+
log.info("Metric configured: %s", metric_resp)
return metric_response
metric_response['schema_version'] = metric_info['schema_version']
metric_response['schema_type'] = "update_metric_response"
metric_response['metric_update_response'] = update_resp
- payload = json.dumps(metric_response)
+ payload = json.dumps(metric_response)
+ print payload
file = open('../../core/models/update_metric_resp.json','wb').write((payload))
- self.producer.create_metrics_resp(key='update_metric_response',message=payload,topic = 'metric_response')
+ self.producer.update_metric_response(key='update_metric_response',message=payload,topic = 'metric_response')
log.info("Metric Updates: %s",metric_response)
return metric_response
elif message.key == "delete_metric_request":
if self.check_resource(metric_info['resource_uuid']) == True:
del_resp=self.delete_metric_request(metric_info)
- payload = json.dumps(del_resp)
+ payload = json.dumps(del_resp)
file = open('../../core/models/delete_metric_resp.json','wb').write((payload))
- self.producer.create_metrics_resp(key='delete_metric_response',message=payload,topic = 'metric_response')
+ self.producer.delete_metric_response(key='delete_metric_response',message=payload,topic = 'metric_response')
log.info("Metric Deletion Not supported in AWS : %s",del_resp)
return del_resp
metric_response['metrics_list'] = list_resp
payload = json.dumps(metric_response)
file = open('../../core/models/list_metric_resp.json','wb').write((payload))
- self.producer.create_metrics_resp(key='list_metrics_response',message=payload,topic = 'metric_response')
+ self.producer.list_metric_response(key='list_metrics_response',message=payload,topic = 'metric_response')
log.info("Metric List: %s",metric_response)
return metric_response
if self.check_resource(metric_info['resource_uuid']) == True:
data_resp = self.read_metrics_data(metric_info)
metric_response['schema_version'] = metric_info['schema_version']
- metric_response['schema_type'] = "list_metric_response"
+ metric_response['schema_type'] = "read_metric_data_response"
metric_response['metric_name'] = metric_info['metric_name']
metric_response['metric_uuid'] = metric_info['metric_uuid']
metric_response['correlation_id'] = metric_info['correlation_uuid']
metric_response['resource_uuid'] = metric_info['resource_uuid']
metric_response['tenant_uuid'] = metric_info['tenant_uuid']
metric_response['metrics_data'] = data_resp
- payload = json.dumps(metric_response)
-
+ payload = json.dumps(metric_response)
file = open('../../core/models/read_metric_data_resp.json','wb').write((payload))
- self.producer.create_metrics_resp(key='read_metric_data_response',message=payload,topic = 'metric_response')
+ self.producer.read_metric_data_response(key='read_metric_data_response',message=payload,topic = 'metric_response')
+
log.info("Metric Data Response: %s",metric_response)
return metric_response
--- /dev/null
+from connection import Connection
+import unittest
+import sys
+import jsmin
+import json
+import os
+import time
+from jsmin import jsmin
+sys.path.append("../../core/message-bus")
+from test_producer_AWS import KafkaProducer
+from kafka import KafkaConsumer
+try:
+ import boto
+ import boto.ec2
+ import boto.vpc
+ import boto.ec2.cloudwatch
+ import boto.ec2.connection
+except:
+ exit("Boto not avialable. Try activating your virtualenv OR `pip install boto`")
+
+#--------------------------------------------------------------------------------------------------------------------------------------
+
+# Test Producer object to generate request
+
+producer = KafkaProducer('create_alarm_request')
+obj = Connection()
+connections = obj.setEnvironment()
+connections_res = obj.connection_instance()
+cloudwatch_conn = connections_res['cloudwatch_connection']
+
+#--------------------------------------------------------------------------------------------------------------------------------------
+
+'''Test E2E Flow : Test cases has been tested one at a time.
+1) Commom Request is generated using request function in test_producer_AWS.py(/core/message-bus)
+2) The request is then consumed by the comsumer (plugin)
+3) The response is sent back on the message bus in plugin_alarm.py using
+ response functions in producer.py(/core/message-bus)
+4) The response is then again consumed by the unit_tests_alarms.py
+ and the test cases has been applied on the response.
+'''
+
+class config_alarm_name_test(unittest.TestCase):
+
+
+ def setUp(self):
+ pass
+ #To generate a request of testing new alarm name and new instance id in create alarm request
+ def test_differentName_differentInstance(self):
+ time.sleep(2)
+ producer.request("Unit Testing/create_alarm/create_alarm_differentName_differentInstance.json",'create_alarm_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "create_alarm_response":
+ info = json.loads(json.loads(message.value))
+ print info
+ time.sleep(1)
+ self.assertTrue(info['alarm_create_response']['status'])
+ return
+
+ #To generate a request of testing new alarm name and existing instance id in create alarm request
+ def test_differentName_sameInstance(self):
+ time.sleep(2)
+ producer.request("Unit Testing/create_alarm/create_alarm_differentName_sameInstance.json",'create_alarm_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "create_alarm_response":
+ info = json.loads(json.loads(message.value))
+ print info
+ time.sleep(1)
+ producer.request("Unit Testing/delete_alarm/name_valid_delete1.json",'delete_alarm_request','','alarm_request')
+ self.assertTrue(info['alarm_create_response']['status'])
+ return
+
+ #To generate a request of testing existing alarm name and new instance id in create alarm request
+ def test_sameName_differentInstance(self):
+ time.sleep(2)
+ producer.request("Unit Testing/create_alarm/create_alarm_sameName_differentInstance.json",'create_alarm_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "create_alarm_response":
+ info = json.loads(json.loads(message.value))
+ print info
+ time.sleep(1)
+ producer.request("Unit Testing/delete_alarm/name_valid_delete2.json",'delete_alarm_request', '','alarm_request')
+ self.assertTrue(info['alarm_create_response']['status'])
+ return
+
+ #To generate a request of testing existing alarm name and existing instance id in create alarm request
+ def test_sameName_sameInstance(self):
+ time.sleep(2)
+ producer.request("Unit Testing/create_alarm/create_alarm_sameName_sameInstance.json",'create_alarm_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "create_alarm_response":
+ info = json.loads(json.loads(message.value))
+ print info,"---"
+ time.sleep(1)
+ producer.request("Unit Testing/delete_alarm/name_valid.json",'delete_alarm_request', '','alarm_request')
+ self.assertEqual(info, None)
+ return
+
+ #To generate a request of testing valid statistics in create alarm request
+ def test_statisticValid(self):
+ time.sleep(2)
+ producer.request("Unit Testing/create_alarm/statistic_valid.json",'create_alarm_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "create_alarm_response":
+ info = json.loads(json.loads(message.value))
+ print info
+ time.sleep(1)
+ producer.request("Unit Testing/delete_alarm/name_valid_delete3.json",'delete_alarm_request', '','alarm_request')
+ self.assertTrue(info['alarm_create_response']['status'])
+ return
+
+ #To generate a request of testing Invalid statistics in create alarm request
+ def test_statisticValidNot(self):
+ time.sleep(2)
+ producer.request("Unit Testing/create_alarm/statistic_invalid.json",'create_alarm_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "create_alarm_response":
+ info = json.loads(json.loads(message.value))
+ print info,"---"
+ time.sleep(1)
+ producer.request("Unit Testing/delete_alarm/name_valid_delete3.json",'delete_alarm_request', '','alarm_request')
+ self.assertEqual(info, None)
+ return
+
+ #To generate a request of testing valid operation in create alarm request
+ def test_operationValid(self):
+ time.sleep(2)
+ producer.request("Unit Testing/create_alarm/operation_valid.json",'create_alarm_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "create_alarm_response":
+ info = json.loads(json.loads(message.value))
+ print info
+ time.sleep(1)
+ producer.request("Unit Testing/delete_alarm/name_valid_delete3.json",'delete_alarm_request', '','alarm_request')
+ self.assertTrue(info['alarm_create_response']['status'])
+ return
+
+ #To generate a request of testing Invalid operation in create alarm request
+ def test_operationValidNot(self):
+ time.sleep(2)
+ producer.request("Unit Testing/create_alarm/operation_invalid.json",'create_alarm_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "create_alarm_response":
+ info = json.loads(json.loads(message.value))
+ print info
+ time.sleep(1)
+ self.assertEqual(info,None)
+ return
+
+
+#--------------------------------------------------------------------------------------------------------------------------------------
+class update_alarm_name_test(unittest.TestCase):
+
+ #To generate a request of testing valid alarm_id in update alarm request
+ def test_nameValid(self):
+ producer.request("Unit Testing/update_alarm/update_alarm_new_alarm.json",'create_alarm_request', '','alarm_request')
+ time.sleep(2)
+ producer.request("Unit Testing/update_alarm/name_valid.json",'update_alarm_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "update_alarm_response":
+ info = json.loads(json.loads(json.loads(message.value)))
+ print info
+ time.sleep(1)
+ producer.request("Unit Testing/delete_alarm/name_valid_delete4.json",'delete_alarm_request', '','alarm_request')
+ self.assertTrue(info['alarm_update_response']['status'])
+ return
+
+ #To generate a request of testing invalid alarm_id in update alarm request
+ def test_nameInvalid(self):
+ time.sleep(2)
+ producer.request("Unit Testing/update_alarm/name_invalid.json",'update_alarm_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "update_alarm_response":
+ info = json.loads(json.loads(json.loads(message.value)))
+ print info
+ time.sleep(1)
+ self.assertEqual(info,None)
+ return
+
+ #To generate a request of testing valid statistics in update alarm request
+ def test_statisticValid(self):
+ producer.request("Unit Testing/create_alarm/create_alarm_differentName_differentInstance.json",'create_alarm_request', '','alarm_request')
+ time.sleep(2)
+ producer.request("Unit Testing/update_alarm/statistic_valid.json",'update_alarm_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "update_alarm_response":
+ info = json.loads(json.loads(json.loads(message.value)))
+ print info
+ time.sleep(1)
+ producer.request("Unit Testing/delete_alarm/name_valid.json",'delete_alarm_request', '','alarm_request')
+ self.assertTrue(info['alarm_update_response']['status'])
+ return
+
+ #To generate a request of testing Invalid statistics in update alarm request
+ def test_statisticInvalid(self):
+ time.sleep(2)
+ producer.request("Unit Testing/update_alarm/statistic_invalid.json",'update_alarm_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "update_alarm_response":
+ info = json.loads(json.loads(json.loads(message.value)))
+ print info
+ time.sleep(1)
+ self.assertEqual(info,None)
+ return
+
+ #To generate a request of testing valid operation in update alarm request
+ def test_operationValid(self):
+ producer.request("Unit Testing/create_alarm/create_alarm_differentName_differentInstance.json",'create_alarm_request', '','alarm_request')
+ time.sleep(2)
+ producer.request("Unit Testing/update_alarm/operation_valid.json",'update_alarm_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "update_alarm_response":
+ info = json.loads(json.loads(json.loads(message.value)))
+ print info
+ time.sleep(1)
+ producer.request("Unit Testing/delete_alarm/name_valid.json",'delete_alarm_request', '','alarm_request')
+ self.assertTrue(info['alarm_update_response']['status'])
+ return
+
+#--------------------------------------------------------------------------------------------------------------------------------------
+class delete_alarm_test(unittest.TestCase):
+
+ #To generate a request of testing valid alarm_id in delete alarm request
+ def test_nameValid(self):
+ producer.request("Unit Testing/create_alarm/create_alarm_differentName_differentInstance.json",'create_alarm_request', '','alarm_request')
+ time.sleep(2)
+ producer.request("Unit Testing/delete_alarm/name_valid.json",'delete_alarm_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "delete_alarm_response":
+ info = json.loads(json.loads(json.loads(message.value)))
+ print info
+ time.sleep(1)
+ self.assertTrue(info['alarm_deletion_response']['status'])
+ return
+
+ #To generate a request of testing Invalid alarm_id in delete alarm request
+ def test_nameInvalid(self):
+ time.sleep(2)
+ producer.request("Unit Testing/delete_alarm/name_invalid.json",'delete_alarm_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "delete_alarm_response":
+ info = json.loads(json.loads(json.loads(message.value)))
+ print info
+ time.sleep(1)
+ self.assertEqual(info,None)
+ return
+
+#--------------------------------------------------------------------------------------------------------------------------------------
+class list_alarm_test(unittest.TestCase):
+
+ #To generate a request of testing valid input fields in alarm list request
+ def test_valid_no_arguments(self):
+ time.sleep(2)
+ producer.request("Unit Testing/list_alarm/list_alarm_valid_no_arguments.json",'alarm_list_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "list_alarm_response":
+ info = json.loads(json.loads(json.loads(message.value)))
+ print info
+ time.sleep(1)
+ self.assertEqual(type(info),dict)
+ return
+
+ #To generate a request of testing valid input fields in alarm list request
+ def test_valid_one_arguments(self):
+ time.sleep(2)
+ producer.request("Unit Testing/list_alarm/list_alarm_valid_one_arguments.json",'alarm_list_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "list_alarm_response":
+ info = json.loads(json.loads(json.loads(message.value)))
+ print info
+ time.sleep(1)
+ self.assertEqual(type(info),dict)
+ return
+
+ #To generate a request of testing valid input fields in alarm list request
+ def test_valid_two_arguments(self):
+ time.sleep(2)
+ producer.request("Unit Testing/list_alarm/list_alarm_valid_two_arguments.json",'alarm_list_request', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "list_alarm_response":
+ info = json.loads(json.loads(json.loads(message.value)))
+ print info
+ time.sleep(1)
+ self.assertEqual(type(info),dict)
+ return
+
+
+#--------------------------------------------------------------------------------------------------------------------------------------
+class alarm_details_test(unittest.TestCase):
+
+ #To generate a request of testing valid input fields in acknowledge alarm
+ def test_Valid(self):
+ time.sleep(2)
+ producer.request("Unit Testing/alarm_details/acknowledge_alarm.json",'acknowledge_alarm', '','alarm_request')
+ server = {'server': 'localhost:9092', 'topic': 'alarm_request'}
+
+ _consumer = KafkaConsumer(bootstrap_servers=server['server'])
+ _consumer.subscribe(['alarm_response'])
+
+ for message in _consumer:
+ if message.key == "notify_alarm":
+ info = json.loads(json.loads(json.loads(message.value)))
+ print info
+ time.sleep(1)
+ self.assertEqual(type(info),dict)
+ return
+
+if __name__ == '__main__':
+
+ # Saving test reults in Log file
+
+ log_file = 'log_file.txt'
+ f = open(log_file, "w")
+ runner = unittest.TextTestRunner(f)
+ unittest.main(testRunner=runner)
+ f.close()
+
+ # For printing results on Console
+ # unittest.main()
--- /dev/null
+from connection import Connection
+import unittest
+import sys
+import jsmin
+import json
+import os
+import time
+from jsmin import jsmin
+sys.path.append("../../core/message_bus")
+from test_producer_AWS import KafkaProducer
+from kafka import KafkaConsumer
+try:
+ import boto
+ import boto.ec2
+ import boto.vpc
+ import boto.ec2.cloudwatch
+ import boto.ec2.connection
+except:
+ exit("Boto not avialable. Try activating your virtualenv OR `pip install boto`")
+
+#--------------------------------------------------------------------------------------------------------------------------------------
+
+# Test Producer object to generate request
+
+producer = KafkaProducer('')
+obj = Connection()
+connections = obj.setEnvironment()
+connections_res = obj.connection_instance()
+cloudwatch_conn = connections_res['cloudwatch_connection']
+
+# Consumer Object to consume response from message bus
+server = {'server': 'localhost:9092', 'topic': 'metric_request'}
+_consumer = KafkaConsumer(bootstrap_servers=server['server'])
+_consumer.subscribe(['metric_response'])
+
+#--------------------------------------------------------------------------------------------------------------------------------------
+
+'''Test E2E Flow : Test cases has been tested one at a time.
+1) Commom Request is generated using request function in test_producer_AWS.py(/core/message-bus)
+2) The request is then consumed by the comsumer (plugin)
+3) The response is sent back on the message bus in plugin_metrics.py using
+ response functions in producer.py(/core/message-bus)
+4) The response is then again consumed by the unit_tests_metrics.py
+ and the test cases has been applied on the response.
+'''
+class test_create_metrics(unittest.TestCase):
+
+ def test_status_positive(self):
+ time.sleep(2)
+ # To generate Request of testing valid meric_name in create metrics requests
+ producer.request("Unit Testing/create metrics/create_metric_req_valid.json",'create_metric_request', '','metric_request')
+
+ for message in _consumer:
+ if message.key == "create_metric_response":
+ resp = json.loads(json.loads(json.loads(message.value)))
+ time.sleep(1)
+ self.assertTrue(resp['metric_create_response']['status'])
+ self.assertEqual(resp['metric_create_response']['metric_uuid'],0)
+ return
+
+ def test_status_negative(self):
+ time.sleep(2)
+ # To generate Request of testing invalid meric_name in create metrics requests
+ producer.request("Unit Testing/create metrics/create_metric_req_invalid.json",'create_metric_request', '','metric_request')
+
+ for message in _consumer:
+ if message.key == "create_metric_response":
+ resp = json.loads(json.loads(json.loads(message.value)))
+ time.sleep(1)
+ self.assertFalse(resp['metric_create_response']['status'])
+ self.assertEqual(resp['metric_create_response']['metric_uuid'],None)
+ return
+
+class test_metrics_data(unittest.TestCase):
+
+ def test_met_name_positive(self):
+ time.sleep(2)
+ # To generate Request of testing valid meric_name in read_metric_data_request
+ producer.request("Unit Testing/Read metrics data/read_metric_name_req_valid.json",'read_metric_data_request', '','metric_request')
+ for message in _consumer:
+ if message.key == "read_metric_data_response":
+ resp = json.loads(json.loads(json.loads(message.value)))
+ time.sleep(1)
+ self.assertEqual(type(resp['metrics_data']),dict)
+ return
+
+ def test_met_name_negative(self):
+ time.sleep(2)
+ # To generate Request of testing invalid meric_name in read_metric_data_request
+ producer.request("Unit Testing/Read metrics data/read_metric_name_req_invalid.json",'read_metric_data_request', '','metric_request')
+ for message in _consumer:
+ if message.key == "read_metric_data_response":
+ resp = json.loads(json.loads(json.loads(message.value)))
+ time.sleep(1)
+ self.assertFalse(resp['metrics_data'])
+ return
+
+ def test_coll_period_positive(self):
+ # To generate Request of testing valid collection_period in read_metric_data_request
+ # For AWS metric_data_stats collection period should be a multiple of 60
+ time.sleep(2)
+ producer.request("Unit Testing/Read metrics data/read_coll_period_req_valid.json",'read_metric_data_request', '','metric_request')
+ for message in _consumer:
+ if message.key == "read_metric_data_response":
+ resp = json.loads(json.loads(json.loads(message.value)))
+ time.sleep(1)
+ self.assertEqual(type(resp),dict)
+ return
+
+ def test_coll_period_negative(self):
+ time.sleep(2)
+ # To generate Request of testing invalid collection_period in read_metric_data_request
+ producer.request("Unit Testing/Read metrics data/read_coll_period_req_invalid.json",'read_metric_data_request', '','metric_request')
+ for message in _consumer:
+ if message.key == "read_metric_data_response":
+ resp = json.loads(json.loads(json.loads(message.value)))
+ time.sleep(1)
+ self.assertFalse(resp['metrics_data'])
+ return
+
+class test_update_metrics(unittest.TestCase):
+
+ def test_upd_status_positive(self):
+ time.sleep(2)
+ # To generate Request of testing valid meric_name in update metrics requests
+ producer.request("Unit Testing/Update metrics/update_metric_req_valid.json",'update_metric_request', '','metric_request')
+ for message in _consumer:
+ if message.key == "update_metric_response":
+ resp = json.loads(json.loads(json.loads(message.value)))
+ time.sleep(1)
+ self.assertTrue(resp['metric_update_response']['status'])
+ self.assertEqual(resp['metric_update_response']['metric_uuid'],0)
+ return
+
+ def test_upd_status_negative(self):
+ time.sleep(2)
+ # To generate Request of testing invalid meric_name in update metrics requests
+ producer.request("Unit Testing/Update metrics/update_metric_req_invalid.json",'update_metric_request', '','metric_request')
+ for message in _consumer:
+ if message.key == "update_metric_response":
+ resp = json.loads(json.loads(json.loads(message.value)))
+ time.sleep(1)
+ self.assertFalse(resp['metric_update_response']['status'])
+ self.assertEqual(resp['metric_update_response']['metric_uuid'],None)
+ return
+
+class test_delete_metrics(unittest.TestCase):
+
+ def test_del_met_name_positive(self):
+ time.sleep(2)
+ # To generate Request of testing valid meric_name in delete metrics requests
+ producer.request("Unit Testing/Delete metrics/delete_metric_req_valid.json",'delete_metric_request', '','metric_request')
+ for message in _consumer:
+ if message.key == "delete_metric_response":
+ resp = json.loads(json.loads(json.loads(message.value)))
+ time.sleep(1)
+ self.assertFalse(resp['status'])
+ return
+
+ def test_del_met_name_negative(self):
+ time.sleep(2)
+ # To generate Request of testing invalid meric_name in delete metrics requests
+ producer.request("Unit Testing/Delete metrics/delete_metric_req_invalid.json",'delete_metric_request', '','metric_request')
+ for message in _consumer:
+ if message.key == "delete_metric_response":
+ resp = json.loads(json.loads(json.loads(message.value)))
+ time.sleep(1)
+ self.assertFalse(resp)
+ return
+
+class test_list_metrics(unittest.TestCase):
+
+ def test_list_met_name_positive(self):
+ time.sleep(2)
+ # To generate Request of testing valid meric_name in list metrics requests
+ producer.request("Unit Testing/List metrics/list_metric_req_valid.json",'list_metric_request', '','metric_request')
+ for message in _consumer:
+ if message.key == "list_metrics_response":
+ resp = json.loads(json.loads(json.loads(message.value)))
+ time.sleep(1)
+ self.assertEqual(type(resp['metrics_list']),list)
+ return
+
+ def test_list_met_name_negitive(self):
+ time.sleep(2)
+ # To generate Request of testing invalid meric_name in list metrics requests
+ producer.request("Unit Testing/List metrics/list_metric_req_invalid.json",'list_metric_request', '','metric_request')
+ for message in _consumer:
+ if message.key == "list_metrics_response":
+ resp = json.loads(json.loads(json.loads(message.value)))
+ time.sleep(1)
+ self.assertFalse(resp['metrics_list'])
+ return
+
+
+if __name__ == '__main__':
+
+ # Saving test reults in Log file
+
+ log_file = 'log_file.txt'
+ f = open(log_file, "w")
+ runner = unittest.TextTestRunner(f)
+ unittest.main(testRunner=runner)
+ f.close()
+
+ # For printing results on Console
+ # unittest.main()
+