Added vROPs consumer, receiver that will consume messages & act on it. Aligned with design doc. Added Metrics monitoring code & more config parameters.
Signed-off-by: bhangare <sbhangare@vmware.com>
diff --git a/plugins/vRealiseOps/kafka_consumer_vrops.py b/plugins/vRealiseOps/kafka_consumer_vrops.py
new file mode 100644
index 0000000..00d6ada
--- /dev/null
+++ b/plugins/vRealiseOps/kafka_consumer_vrops.py
@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+"""
+vROPs Kafka Consumer that consumes the request messages
+"""
+
+
+from kafka import KafkaConsumer
+from kafka.errors import KafkaError
+import logging as log
+
+class vROP_KafkaConsumer(object):
+ """
+ Kafka Consumer for vROPs
+ """
+
+ def __init__(self, topics=[], broker_uri=None):
+ """
+ Method to initize KafkaConsumer
+ Args:
+ broker_uri - hostname:port uri of Kafka broker
+ topics - list of topics to subscribe
+ Returns:
+ None
+ """
+
+ if broker_uri is None:
+ self.broker = '0.0.0.0:9092'
+ else:
+ self.broker = broker_uri
+
+ self.topic = topics
+ print ("vROPs Consumer started, Broker URI: {}".format(self.broker))
+ print ("Subscribed Topics {}".format(self.topic))
+ try:
+ self.vrops_consumer = KafkaConsumer(bootstrap_servers=self.broker)
+ self.vrops_consumer.subscribe(self.topic)
+ except Exception as exp:
+ msg = "fail to create consumer for topic {} with broker {} Error : {}"\
+ .format(self.topic, self.broker, exp)
+ log.error(msg)
+ raise Exception(msg)
+
diff --git a/plugins/vRealiseOps/mon_plugin_vrops.py b/plugins/vRealiseOps/mon_plugin_vrops.py
index a65d01c..79b1ea7 100644
--- a/plugins/vRealiseOps/mon_plugin_vrops.py
+++ b/plugins/vRealiseOps/mon_plugin_vrops.py
@@ -8,37 +8,44 @@
from pyvcloud.vcloudair import VCA
from xml.etree import ElementTree as XmlElementTree
import traceback
+import time
+import json
+from OpenSSL.crypto import load_certificate, FILETYPE_PEM
OPERATION_MAPPING = {'GE':'GT_EQ', 'LE':'LT_EQ', 'GT':'GT', 'LT':'LT', 'EQ':'EQ'}
-SEVERITY_MAPPING = {'WARNING':'WARNING', 'MINOR':'WARNING', 'MAJOR':"IMMEDIATE", 'CRITICAL':'CRITICAL'}
-
+severity_mano2vrops = {'WARNING':'WARNING', 'MINOR':'WARNING', 'MAJOR':"IMMEDIATE", 'CRITICAL':'CRITICAL'}
+PERIOD_MSEC = {'HR':3600000,'DAY':86400000,'WEEK':604800000,'MONTH':2678400000,'YEAR':31536000000}
+DEFAULT_CONFIG_FILE = 'vrops_config.xml'
class MonPlugin():
"""MON Plugin class for vROPs telemetry plugin
- """
- def __init__(self, access_config={}):
+ """
+ def __init__(self):
"""Constructor of MON plugin
Params:
- 'access_config': dictionary with VIM access information based on VIM type.
- This contains a consolidate version of VIM & monitoring tool config at creation and
+ 'access_config': dictionary with VIM access information based on VIM type.
+ This contains a consolidate version of VIM & monitoring tool config at creation and
particular VIM config at their attachment.
- For VIM type: 'vmware', access_config - {'vrops_site':<>, 'vrops_user':<>, 'vrops_password':<>,
- 'vcloud-site':<>,'admin_username':<>,'admin_password':<>,
- 'nsx_manager':<>,'nsx_user':<>,'nsx_password':<>,
- 'vcenter_ip':<>,'vcenter_port':<>,'vcenter_user':<>,'vcenter_password':<>,
- 'vim_tenant_name':<>,'orgname':<>}
+ For VIM type: 'vmware',
+ access_config - {'vrops_site':<>, 'vrops_user':<>, 'vrops_password':<>,
+ 'vcloud-site':<>,'admin_username':<>,'admin_password':<>,
+ 'nsx_manager':<>,'nsx_user':<>,'nsx_password':<>,
+ 'vcenter_ip':<>,'vcenter_port':<>,'vcenter_user':<>,'vcenter_password':<>,
+ 'vim_tenant_name':<>,'orgname':<>}
#To Do
Returns: Raise an exception if some needed parameter is missing, but it must not do any connectivity
check against the VIM
"""
+ access_config = self.get_default_Params('Access_Config')
self.access_config = access_config
self.vrops_site = access_config['vrops_site']
self.vrops_user = access_config['vrops_user']
self.vrops_password = access_config['vrops_password']
- self.vcloud_site = access_config['vcloud_site']
- self.vcloud_username = access_config['vcloud_username']
- self.vcloud_password = access_config['vcloud_password']
+ self.vcloud_site = access_config['vcloud-site']
+ self.admin_username = access_config['admin_username']
+ self.admin_password = access_config['admin_password']
+ self.tenant_id = access_config['tenant_id']
def configure_alarm(self, config_dict = {}):
"""Configures or creates a new alarm using the input parameters in config_dict
@@ -48,29 +55,40 @@
"resource_uuid": Resource UUID for which alarm needs to be configured. in string format
"Resource type": String resource type: 'VDU' or 'host'
"Severity": 'WARNING', 'MINOR', 'MAJOR', 'CRITICAL'
- "metric": Metric key in string format
+ "metric_name": Metric key in string format
"operation": One of ('GE', 'LE', 'GT', 'LT', 'EQ')
- "threshold_value": Defines the threshold (up to 2 fraction digits) that, if crossed, will trigger the alarm.
+ "threshold_value": Defines the threshold (up to 2 fraction digits) that,
+ if crossed, will trigger the alarm.
"unit": Unit of measurement in string format
"statistic": AVERAGE, MINIMUM, MAXIMUM, COUNT, SUM
Default parameters for each alarm are read from the plugin specific config file.
Dict of default parameters is as follows:
- default_params keys = {'cancel_cycles','wait_cycles','resource_kind','adapter_kind','alarm_type','alarm_subType',impact}
+ default_params keys = {'cancel_cycles','wait_cycles','resource_kind','adapter_kind',
+ 'alarm_type','alarm_subType',impact}
Returns the UUID of created alarm or None
"""
- alarm_def_uuid = None
+ alarm_def = None
#1) get alarm & metrics parameters from plugin specific file
- def_params = self.get_default_Params(config_dict['alarm_name'])
- metric_key_params = self.get_default_Params(config_dict['metric'])
+ def_a_params = self.get_default_Params(config_dict['alarm_name'])
+ if not def_a_params:
+ log.warn("Alarm not supported: {}".format(config_dict['alarm_name']))
+ return None
+ metric_key_params = self.get_default_Params(config_dict['metric_name'])
+ if not metric_key_params:
+ log.warn("Metric not supported: {}".format(config_dict['metric_name']))
+ return None
#2) create symptom definition
- #TO DO - 'metric_key':config_dict['metric'] - mapping from file def_params
- symptom_params ={'cancel_cycles': (def_params['cancel_period']/300)*def_params['cancel_cycles'],
- 'wait_cycles': (def_params['period']/300)*def_params['evaluation'],
- 'resource_kind_key': def_params['resource_kind'],'adapter_kind_key': def_params['adapter_kind'],
- 'symptom_name':config_dict['alarm_name'],'severity': SEVERITY_MAPPING[config_dict['severity']],
- 'metric_key':metric_key_params['metric_key'],'operation':OPERATION_MAPPING[config_dict['operation']],
+ vrops_alarm_name = def_a_params['vrops_alarm']+ '-' +config_dict['resource_uuid']
+ symptom_params ={'cancel_cycles': (def_a_params['cancel_period']/300)*def_a_params['cancel_cycles'],
+ 'wait_cycles': (def_a_params['period']/300)*def_a_params['evaluation'],
+ 'resource_kind_key': def_a_params['resource_kind'],
+ 'adapter_kind_key': def_a_params['adapter_kind'],
+ 'symptom_name':vrops_alarm_name,
+ 'severity': severity_mano2vrops[config_dict['severity']],
+ 'metric_key':metric_key_params['metric_key'],
+ 'operation':OPERATION_MAPPING[config_dict['operation']],
'threshold_value':config_dict['threshold_value']}
symptom_uuid = self.create_symptom(symptom_params)
if symptom_uuid is not None:
@@ -79,22 +97,24 @@
log.warn("Failed to create Symptom: {}".format(symptom_params['symptom_name']))
return None
#3) create alert definition
- #To Do - Get type & subtypes for all 5 alarms
- alarm_params = {'name':config_dict['alarm_name'],
- 'description':config_dict['description'] if config_dict['description'] is not None else config_dict['alarm_name'],
- 'adapterKindKey':def_params['adapter_kind'], 'resourceKindKey':def_params['resource_kind'],
+ #To Do - Get type & subtypes for all 5 alarms
+ alarm_params = {'name':vrops_alarm_name,
+ 'description':config_dict['description']\
+ if config_dict['description'] is not None else config_dict['alarm_name'],
+ 'adapterKindKey':def_a_params['adapter_kind'],
+ 'resourceKindKey':def_a_params['resource_kind'],
'waitCycles':1, 'cancelCycles':1,
- 'type':def_params['alarm_type'], 'subType':def_params['alarm_subType'],
- 'severity':SEVERITY_MAPPING[config_dict['severity']],
+ 'type':def_a_params['alarm_type'], 'subType':def_a_params['alarm_subType'],
+ 'severity':severity_mano2vrops[config_dict['severity']],
'symptomDefinitionId':symptom_uuid,
- 'impact':def_params['impact']}
+ 'impact':def_a_params['impact']}
- alarm_def_uuid = self.create_alarm_definition(alarm_params)
- if alarm_def_uuid is None:
+ alarm_def = self.create_alarm_definition(alarm_params)
+ if alarm_def is None:
log.warn("Failed to create Alert: {}".format(alarm_params['name']))
return None
- log.info("Alarm defined: {} with ID: {}".format(alarm_params['name'],alarm_def_uuid))
+ log.info("Alarm defined: {} with ID: {}".format(alarm_params['name'],alarm_def))
#4) Find vm_moref_id from vApp uuid in vCD
vm_moref_id = self.get_vm_moref_id(config_dict['resource_uuid'])
@@ -109,12 +129,15 @@
return None
#6) Configure alarm notification for a particular VM using it's resource_id
- notification_id = self.create_alarm_notification(config_dict['alarm_name'], alarm_def_uuid, resource_id)
+ notification_id = self.create_alarm_notification(vrops_alarm_name, alarm_def, resource_id)
if notification_id is None:
return None
else:
- log.info("Alarm defination created with notification: {} with ID: {}".format(alarm_params['name'],alarm_def_uuid))
- return alarm_def_uuid
+ alarm_def_uuid = alarm_def.split('-', 1)[1]
+ log.info("Alarm defination created with notification: {} with ID: {}"\
+ .format(alarm_params['name'],alarm_def_uuid))
+ #Return alarm defination UUID by removing 'AlertDefinition' from UUID
+ return (alarm_def_uuid)
def get_default_Params(self, metric_alarm_name):
"""
@@ -122,25 +145,23 @@
Params:
metric_alarm_name: Name of the alarm, whose congif params to be read from the config file.
"""
- tree = XmlElementTree.parse('vROPs_default_config.xml')
+ tree = XmlElementTree.parse(DEFAULT_CONFIG_FILE)
alarms = tree.getroot()
a_params = {}
for alarm in alarms:
if alarm.tag == metric_alarm_name:
for param in alarm:
- if param.tag in ("period", "evaluation", "cancel_period", "alarm_type", "cancel_cycles", "alarm_subType"):
+ if param.tag in ("period", "evaluation", "cancel_period", "alarm_type",\
+ "cancel_cycles", "alarm_subType"):
a_params[param.tag] = int(param.text)
elif param.tag in ("enabled", "repeat"):
- if(param.text == "True" or param.text == "true"):
+ if(param.text.lower() == "true"):
a_params[param.tag] = True
else:
a_params[param.tag] = False
else:
a_params[param.tag] = param.text
- if not a_params:
- log.warn("No such '{}' alarm found!.".format(alarm))
-
return a_params
@@ -165,7 +186,11 @@
api_url = '/suite-api/api/symptomdefinitions'
headers = {'Content-Type': 'application/xml'}
data = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
- <ops:symptom-definition cancelCycles="{0:s}" waitCycles="{1:s}" resourceKindKey="{2:s}" adapterKindKey="{3:s}" xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:ops="http://webservice.vmware.com/vRealizeOpsMgr/1.0/">
+ <ops:symptom-definition cancelCycles="{0:s}" waitCycles="{1:s}"
+ resourceKindKey="{2:s}" adapterKindKey="{3:s}"
+ xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:ops="http://webservice.vmware.com/vRealizeOpsMgr/1.0/">
<ops:name>{4:s}</ops:name>
<ops:state severity="{5:s}">
<ops:condition xsi:type="ops:htCondition">
@@ -177,9 +202,12 @@
<ops:thresholdType>STATIC</ops:thresholdType>
</ops:condition>
</ops:state>
- </ops:symptom-definition>""".format(str(symptom_params['cancel_cycles']),str(symptom_params['wait_cycles']),symptom_params['resource_kind_key'],
- symptom_params['adapter_kind_key'],symptom_params['symptom_name'],symptom_params['severity'],
- symptom_params['metric_key'],symptom_params['operation'],str(symptom_params['threshold_value']))
+ </ops:symptom-definition>"""\
+ .format(str(symptom_params['cancel_cycles']),str(symptom_params['wait_cycles']),
+ symptom_params['resource_kind_key'], symptom_params['adapter_kind_key'],
+ symptom_params['symptom_name'],symptom_params['severity'],
+ symptom_params['metric_key'],symptom_params['operation'],
+ str(symptom_params['threshold_value']))
resp = requests.post(self.vrops_site + api_url,
auth=(self.vrops_user, self.vrops_password),
@@ -188,7 +216,8 @@
data=data)
if resp.status_code != 201:
- log.warn("Failed to create Symptom definition: {}, response {}".format(symptom_params['symptom_name'], resp.content))
+ log.warn("Failed to create Symptom definition: {}, response {}"\
+ .format(symptom_params['symptom_name'], resp.content))
return None
symptom_xmlroot = XmlElementTree.fromstring(resp.content)
@@ -198,7 +227,8 @@
return symptom_id
except Exception as exp:
- log.warn("Error creating symptom definition : {}\n{}".format(exp, traceback.format_exc()))
+ log.warn("Error creating symptom definition : {}\n{}"\
+ .format(exp, traceback.format_exc()))
def create_alarm_definition(self, alarm_params):
@@ -207,12 +237,12 @@
Params:
'name': Alarm Name,
'description':Alarm description,
- 'adapterKindKey': Adapter type in vROPs "VMWARE",
+ 'adapterKindKey': Adapter type in vROPs "VMWARE",
'resourceKindKey':Resource type in vROPs "VirtualMachine",
- 'waitCycles': No of wait cycles,
+ 'waitCycles': No of wait cycles,
'cancelCycles': No of cancel cycles,
- 'type': Alarm type: ,
- 'subType': Alarm subtype: ,
+ 'type': Alarm type,
+ 'subType': Alarm subtype,
'severity': Severity in vROPs "CRITICAL",
'symptomDefinitionId':symptom Definition uuid,
'impact': impact 'risk'
@@ -226,7 +256,9 @@
api_url = '/suite-api/api/alertdefinitions'
headers = {'Content-Type': 'application/xml'}
data = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
- <ops:alert-definition xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:ops="http://webservice.vmware.com/vRealizeOpsMgr/1.0/">
+ <ops:alert-definition xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:ops="http://webservice.vmware.com/vRealizeOpsMgr/1.0/">
<ops:name>{0:s}</ops:name>
<ops:description>{1:s}</ops:description>
<ops:adapterKindKey>{2:s}</ops:adapterKindKey>
@@ -251,9 +283,12 @@
</ops:impact>
</ops:state>
</ops:states>
- </ops:alert-definition>""".format(alarm_params['name'],alarm_params['description'],alarm_params['adapterKindKey'],
- alarm_params['resourceKindKey'],str(alarm_params['type']),str(alarm_params['subType']),
- alarm_params['severity'],alarm_params['symptomDefinitionId'],alarm_params['impact'])
+ </ops:alert-definition>"""\
+ .format(alarm_params['name'],alarm_params['description'],
+ alarm_params['adapterKindKey'],alarm_params['resourceKindKey'],
+ str(alarm_params['type']),str(alarm_params['subType']),
+ alarm_params['severity'],alarm_params['symptomDefinitionId'],
+ alarm_params['impact'])
resp = requests.post(self.vrops_site + api_url,
auth=(self.vrops_user, self.vrops_password),
@@ -262,7 +297,8 @@
data=data)
if resp.status_code != 201:
- log.warn("Failed to create Alarm definition: {}, response {}".format(alarm_params['name'], resp.content))
+ log.warn("Failed to create Alarm definition: {}, response {}"\
+ .format(alarm_params['name'], resp.content))
return None
alarm_xmlroot = XmlElementTree.fromstring(resp.content)
@@ -276,110 +312,78 @@
log.warn("Error creating alarm definition : {}\n{}".format(exp, traceback.format_exc()))
- def configure_rest_plugin(self, plugin_name, webhook_url, certificate):
+ def configure_rest_plugin(self):
"""
Creates REST Plug-in for vROPs outbound alerts
- Params:
- plugin_name: name of REST plugin instance
- pluginTypeId - RestPlugin
- Optional configValues
- "Url">https://dev14136.service-now.com:8080</ops:configValue> - reqd
- "Username">admin</ops:configValue> - optional
- "Password">VMware1!</ops:configValue> - optional
- "Content-type">application/xml</ops:configValue> - reqd
- "Certificate">abcdefgh123456</ops:configValue> - get n set
- "ConnectionCount" - 20 - default
+ Returns Plugin ID
"""
plugin_id = None
+ plugin_name = 'MON_module_REST_Plugin'
+ plugin_id = self.check_if_plugin_configured(plugin_name)
- api_url = '/suite-api/api/alertplugins'
- headers = {'Content-Type': 'application/xml'}
- data = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
- <ops:notification-plugin version="0" xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:ops="http://webservice.vmware.com/vRealizeOpsMgr/1.0/">
- <ops:pluginTypeId>RestPlugin</ops:pluginTypeId>
- <ops:name>{0:s}</ops:name>
- <ops:configValues>
- <ops:configValue name="Url">{1:s}</ops:configValue>
- <ops:configValue name="Content-type">application/xml</ops:configValue>
- <ops:configValue name="Certificate">{2:s}</ops:configValue>
- <ops:configValue name="ConnectionCount">20</ops:configValue>
- </ops:configValues>
- </ops:notification-plugin>""".format(plugin_name, webhook_url, certificate)
-
- resp = requests.post(self.vrops_site + api_url,
- auth=(self.vrops_user, self.vrops_password),
- headers=headers,
- verify = False,
- data=data)
-
- if resp.status_code is not 201:
- log.warn("Failed to create REST Plugin: {} for url: {}, \nresponse code: {},\nresponse content: {}"\
- .format(plugin_name, webhook_url, resp.status_code, resp.content))
- return None
-
- plugin_xmlroot = XmlElementTree.fromstring(resp.content)
- if plugin_xmlroot is not None:
- for child in plugin_xmlroot:
- if child.tag.split("}")[1] == 'pluginId':
- plugin_id = plugin_xmlroot.find('{http://webservice.vmware.com/vRealizeOpsMgr/1.0/}pluginId').text
-
- if plugin_id is None:
- log.warn("Failed to get REST Plugin ID for {}, url: {}".format(plugin_name, webhook_url))
- return None
+ #If REST plugin not configured, configure it
+ if plugin_id is not None:
+ return plugin_id
else:
- log.info("Created REST Plugin: {} with ID : {} for url: {}".format(plugin_name, plugin_id, webhook_url))
- status = self.enable_rest_plugin(plugin_id, plugin_name)
- if status is False:
- log.warn("Failed to enable created REST Plugin: {} for url: {}".format(plugin_name, webhook_url))
+ #To Do - Add actual webhook url
+ webhook_url = "https://mano-dev-1:8080/notify/" #for testing
+ cert_file_string = open("10.172.137.214.cert", "rb").read() #for testing
+ cert = load_certificate(FILETYPE_PEM, cert_file_string)
+ certificate = cert.digest("sha1")
+ api_url = '/suite-api/api/alertplugins'
+ headers = {'Content-Type': 'application/xml'}
+ data = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+ <ops:notification-plugin version="0" xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:ops="http://webservice.vmware.com/vRealizeOpsMgr/1.0/">
+ <ops:pluginTypeId>RestPlugin</ops:pluginTypeId>
+ <ops:name>{0:s}</ops:name>
+ <ops:configValues>
+ <ops:configValue name="Url">{1:s}</ops:configValue>
+ <ops:configValue name="Content-type">application/xml</ops:configValue>
+ <ops:configValue name="Certificate">{2:s}</ops:configValue>
+ <ops:configValue name="ConnectionCount">20</ops:configValue>
+ </ops:configValues>
+ </ops:notification-plugin>""".format(plugin_name, webhook_url, certificate)
+
+ resp = requests.post(self.vrops_site + api_url,
+ auth=(self.vrops_user, self.vrops_password),
+ headers=headers,
+ verify = False,
+ data=data)
+
+ if resp.status_code is not 201:
+ log.warn("Failed to create REST Plugin: {} for url: {}, \nresponse code: {},"\
+ "\nresponse content: {}".format(plugin_name, webhook_url,\
+ resp.status_code, resp.content))
+ return None
+
+ plugin_xmlroot = XmlElementTree.fromstring(resp.content)
+ if plugin_xmlroot is not None:
+ for child in plugin_xmlroot:
+ if child.tag.split("}")[1] == 'pluginId':
+ plugin_id = plugin_xmlroot.find('{http://webservice.vmware.com/vRealizeOpsMgr/1.0/}pluginId').text
+
+ if plugin_id is None:
+ log.warn("Failed to get REST Plugin ID for {}, url: {}".format(plugin_name, webhook_url))
return None
else:
- log.info("Enabled REST Plugin: {} for url: {}".format(plugin_name, webhook_url))
- return plugin_id
+ log.info("Created REST Plugin: {} with ID : {} for url: {}".format(plugin_name, plugin_id, webhook_url))
+ status = self.enable_rest_plugin(plugin_id, plugin_name)
+ if status is False:
+ log.warn("Failed to enable created REST Plugin: {} for url: {}".format(plugin_name, webhook_url))
+ return None
+ else:
+ log.info("Enabled REST Plugin: {} for url: {}".format(plugin_name, webhook_url))
+ return plugin_id
-
- def enable_rest_plugin(self, plugin_id, plugin_name):
+ def check_if_plugin_configured(self, plugin_name):
+ """Check if the REST plugin is already created
+ Returns: plugin_id: if already created, None: if needs to be created
"""
- Enable the REST plugin using plugin_id
- Params: plugin_id: plugin ID string that is to be enabled
- Returns: status (Boolean) - True for success, False for failure
- """
-
- if plugin_id is None or plugin_name is None:
- log.debug("enable_rest_plugin() : Plugin ID or plugin_name not provided for {} plugin".format(plugin_name))
- return False
-
- try:
- api_url = "/suite-api/api/alertplugins/{}/enable/True".format(plugin_id)
-
- resp = requests.put(self.vrops_site + api_url, auth=(self.vrops_user, self.vrops_password), verify = False)
-
- if resp.status_code is not 204:
- log.warn("Failed to enable REST plugin {}. \nResponse code {}\nResponse Content: {}"\
- .format(plugin_name, resp.status_code, resp.content))
- return False
-
- log.info("Enabled REST plugin {}.".format(plugin_name))
- return True
-
- except Exception as exp:
- log.warn("Error enabling REST plugin for {} plugin: Exception: {}\n{}".format(plugin_name, exp, traceback.format_exc()))
-
- def create_alarm_notification(self, alarm_name, alarm_id, resource_id):
- """
- Create notification for each alarm
- Params:
- alarm_name
- alarm_id
- resource_id
-
- Returns:
- notification_id: notification_id or None
- """
- notification_name = 'notify_' + alarm_name
- notification_id = None
-
- #1) Find the REST Plugin id details for - MON_module_REST_Plugin
+ plugin_id = None
+ #Find the REST Plugin id details for - MON_module_REST_Plugin
api_url = '/suite-api/api/alertplugins'
headers = {'Accept': 'application/xml'}
namespace = {'params':"http://webservice.vmware.com/vRealizeOpsMgr/1.0/"}
@@ -396,11 +400,68 @@
# Look for specific plugin & parse pluginId for 'MON_module_REST_Plugin'
xmlroot_resp = XmlElementTree.fromstring(resp.content)
for notify_plugin in xmlroot_resp.findall('params:notification-plugin',namespace):
- if notify_plugin.find('params:name',namespace) is not None and notify_plugin.find('params:pluginId',namespace) is not None:
- if notify_plugin.find('params:name',namespace).text == 'MON_module_REST_Plugin':
+ if notify_plugin.find('params:name',namespace) is not None and\
+ notify_plugin.find('params:pluginId',namespace) is not None:
+ if notify_plugin.find('params:name',namespace).text == plugin_name:
plugin_id = notify_plugin.find('params:pluginId',namespace).text
if plugin_id is None:
+ log.warn("REST plugin {} not found".format('MON_module_REST_Plugin'))
+ return None
+ else:
+ log.info("Found REST Plugin: {}".format(plugin_name))
+ return plugin_id
+
+
+ def enable_rest_plugin(self, plugin_id, plugin_name):
+ """
+ Enable the REST plugin using plugin_id
+ Params: plugin_id: plugin ID string that is to be enabled
+ Returns: status (Boolean) - True for success, False for failure
+ """
+
+ if plugin_id is None or plugin_name is None:
+ log.debug("enable_rest_plugin() : Plugin ID or plugin_name not provided for {} plugin"\
+ .format(plugin_name))
+ return False
+
+ try:
+ api_url = "/suite-api/api/alertplugins/{}/enable/True".format(plugin_id)
+
+ resp = requests.put(self.vrops_site + api_url,
+ auth=(self.vrops_user, self.vrops_password),
+ verify = False)
+
+ if resp.status_code is not 204:
+ log.warn("Failed to enable REST plugin {}. \nResponse code {}\nResponse Content: {}"\
+ .format(plugin_name, resp.status_code, resp.content))
+ return False
+
+ log.info("Enabled REST plugin {}.".format(plugin_name))
+ return True
+
+ except Exception as exp:
+ log.warn("Error enabling REST plugin for {} plugin: Exception: {}\n{}"\
+ .format(plugin_name, exp, traceback.format_exc()))
+
+ def create_alarm_notification(self, alarm_name, alarm_id, resource_id):
+ """
+ Create notification for each alarm
+ Params:
+ alarm_name
+ alarm_id
+ resource_id
+
+ Returns:
+ notification_id: notification_id or None
+ """
+ notification_name = 'notify_' + alarm_name
+ notification_id = None
+ plugin_name = 'MON_module_REST_Plugin'
+
+ #1) Find the REST Plugin id details for - MON_module_REST_Plugin
+ plugin_id = self.check_if_plugin_configured(plugin_name)
+ if plugin_id is None:
log.warn("Failed to get REST plugin_id for : {}".format('MON_module_REST_Plugin'))
return None
@@ -408,7 +469,9 @@
api_url = '/suite-api/api/notifications/rules'
headers = {'Content-Type': 'application/xml'}
data = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
- <ops:notification-rule xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:ops="http://webservice.vmware.com/vRealizeOpsMgr/1.0/">
+ <ops:notification-rule xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:ops="http://webservice.vmware.com/vRealizeOpsMgr/1.0/">
<ops:name>{0:s}</ops:name>
<ops:pluginId>{1:s}</ops:pluginId>
<ops:resourceFilter resourceId="{2:s}">
@@ -417,7 +480,8 @@
<ops:alertDefinitionIdFilters>
<ops:values>{3:s}</ops:values>
</ops:alertDefinitionIdFilters>
- </ops:notification-rule>""".format(notification_name, plugin_id, resource_id, alarm_id)
+ </ops:notification-rule>"""\
+ .format(notification_name, plugin_id, resource_id, alarm_id)
resp = requests.post(self.vrops_site + api_url,
auth=(self.vrops_user, self.vrops_password),
@@ -426,8 +490,9 @@
data=data)
if resp.status_code is not 201:
- log.warn("Failed to create Alarm notification {} for {} alarm. \nResponse code: {}\nResponse content: {}"\
- .format(notification_name, alarm_name, resp.status_code, resp.content))
+ log.warn("Failed to create Alarm notification {} for {} alarm."\
+ "\nResponse code: {}\nResponse content: {}"\
+ .format(notification_name, alarm_name, resp.status_code, resp.content))
return None
#parse notification id from response
@@ -452,7 +517,8 @@
return vm_moref_id
except Exception as exp:
- log.warn("Error occurred while getting VM moref ID for VM : {}\n{}".format(exp, traceback.format_exc()))
+ log.warn("Error occurred while getting VM moref ID for VM : {}\n{}"\
+ .format(exp, traceback.format_exc()))
def get_vapp_details_rest(self, vapp_uuid=None):
@@ -485,7 +551,8 @@
verify=vca.verify)
if response.status_code != 200:
- log.warn("REST API call {} failed. Return status code {}".format(get_vapp_restcall, response.content))
+ log.warn("REST API call {} failed. Return status code {}"\
+ .format(get_vapp_restcall, response.content))
return parsed_respond
try:
@@ -509,7 +576,8 @@
parsed_respond["vm_vcenter_info"]= vm_vcenter_info
except Exception as exp :
- log.warn("Error occurred calling rest api for getting vApp details: {}\n{}".format(exp, traceback.format_exc()))
+ log.warn("Error occurred calling rest api for getting vApp details: {}\n{}"\
+ .format(exp, traceback.format_exc()))
return parsed_respond
@@ -526,17 +594,18 @@
log.info("Logging in to a VCD org as admin.")
vca_admin = VCA(host=self.vcloud_site,
- username=self.vcloud_username,
+ username=self.admin_username,
service_type='standalone',
version='5.9',
verify=False,
log=False)
- result = vca_admin.login(password=self.vcloud_password, org='System')
+ result = vca_admin.login(password=self.admin_password, org='System')
if not result:
- log.warn("Can't connect to a vCloud director as: {}".format(self.vcloud_username))
+ log.warn("Can't connect to a vCloud director as: {}".format(self.admin_username))
result = vca_admin.login(token=vca_admin.token, org='System', org_url=vca_admin.vcloud_session.org_url)
if result is True:
- log.info("Successfully logged to a vcloud direct org: {} as user: {}".format('System', self.vcloud_username))
+ log.info("Successfully logged to a vcloud direct org: {} as user: {}"\
+ .format('System', self.admin_username))
return vca_admin
@@ -573,8 +642,116 @@
resourceIdentifiers = child
for r_id in resourceIdentifiers:
if r_id.find('params:value',namespace).text == vm_moref_id:
- log.info("Found Resource ID : {} in vROPs for {}".format(resource.attrib['identifier'], vm_moref_id))
+ log.info("Found Resource ID : {} in vROPs for {}"\
+ .format(resource.attrib['identifier'], vm_moref_id))
return resource.attrib['identifier']
except Exception as exp:
log.warn("Error in parsing {}\n{}".format(exp, traceback.format_exc()))
+
+ def get_metrics_data(self, metric={}):
+ """Get an individual metric's data of a resource.
+ Params:
+ 'metric_name': Normalized name of metric (string)
+ 'resource_uuid': Resource UUID (string)
+ 'period': Time period in Period Unit for which metrics data to be collected from
+ Monitoring tool from now.
+ 'period_unit': Period measurement unit can be one of 'HR', 'DAY', 'MONTH', 'YEAR'
+
+ Return a dict that contains:
+ 'metric_name': Normalized name of metric (string)
+ 'resource_uuid': Resource UUID (string)
+ 'tenant_id': tenent id name in which the resource is present in string format
+ 'metrics_data': Dictionary containing time_series & metric_series data.
+ 'time_series': List of individual time stamp values in msec
+ 'metric_series': List of individual metrics data values
+ Raises an exception upon error or when network is not found
+ """
+ return_data = {}
+ return_data['schema_version'] = 1.0
+ return_data['schema_type'] = 'read_metric_data_response'
+ return_data['metric_name'] = metric['metric_name']
+ #To do - No metric_uuid in vROPs, thus returning metric_name
+ return_data['metric_uuid'] = metric['metric_name']
+ return_data['correlation_id'] = metric['correlation_id']
+ return_data['resource_uuid'] = metric['resource_uuid']
+ return_data['metrics_data'] = {'time_series':[], 'metric_series':[]}
+ #To do - Need confirmation about uuid & id
+ return_data['tenant_uuid'] = metric['tenant_uuid']
+ return_data['unit'] = None
+ #return_data['tenant_id'] = self.tenant_id
+ #log.warn("return_data: {}".format(return_data))
+
+ #1) Get metric details from plugin specific file & format it into vROPs metrics
+ metric_key_params = self.get_default_Params(metric['metric_name'])
+
+ if not metric_key_params:
+ log.warn("Metric not supported: {}".format(metric['metric_name']))
+ #To Do: Return message
+ return return_data
+
+ return_data['unit'] = metric_key_params['unit']
+
+ #2) Find the resource id in vROPs based on OSM resource_uuid
+ #2.a) Find vm_moref_id from vApp uuid in vCD
+ vm_moref_id = self.get_vm_moref_id(metric['resource_uuid'])
+ if vm_moref_id is None:
+ log.warn("Failed to find vm morefid for vApp in vCD: {}".format(config_dict['resource_uuid']))
+ return return_data
+ #2.b) Based on vm_moref_id, find VM's corresponding resource_id in vROPs to set notification
+ resource_id = self.get_vm_resource_id(vm_moref_id)
+ if resource_id is None:
+ log.warn("Failed to find resource in vROPs: {}".format(config_dict['resource_uuid']))
+ return return_data
+
+ #3) Calculate begin & end time for period & period unit
+ end_time = int(round(time.time() * 1000))
+ if metric['collection_unit'] == 'YR':
+ time_diff = PERIOD_MSEC[metric['collection_unit']]
+ else:
+ time_diff = metric['collection_period']* PERIOD_MSEC[metric['collection_unit']]
+ begin_time = end_time - time_diff
+
+ #4) Get the metrics data
+ log.info("metric_key_params['metric_key'] = {}".format(metric_key_params['metric_key']))
+ log.info("end_time: {}, begin_time: {}".format(end_time, begin_time))
+
+ url_list = ['/suite-api/api/resources/', resource_id, '/stats?statKey=',\
+ metric_key_params['metric_key'], '&begin=', str(begin_time),'&end=',str(end_time)]
+ api_url = ''.join(url_list)
+ headers = {'Accept': 'application/json'}
+
+ resp = requests.get(self.vrops_site + api_url,
+ auth=(self.vrops_user, self.vrops_password),
+ verify = False, headers = headers)
+
+ if resp.status_code is not 200:
+ log.warn("Failed to retrive Metric data from vROPs for {}\nResponse code:{}\nResponse Content: {}"\
+ .format(metric['metric_name'], resp.status_code, resp.content))
+ return return_data
+
+ #5) Convert to required format
+ metrics_data = {}
+ json_data = json.loads(resp.content)
+ for resp_key,resp_val in json_data.iteritems():
+ if resp_key == 'values':
+ data = json_data['values'][0]
+ for data_k,data_v in data.iteritems():
+ if data_k == 'stat-list':
+ stat_list = data_v
+ for stat_list_k,stat_list_v in stat_list.iteritems():
+ for stat_keys,stat_vals in stat_list_v[0].iteritems():
+ if stat_keys == 'timestamps':
+ metrics_data['time_series'] = stat_list_v[0]['timestamps']
+ if stat_keys == 'data':
+ metrics_data['metric_series'] = stat_list_v[0]['data']
+
+ return_data['metrics_data'] = metrics_data
+
+ return return_data
+
+ def reconfigure_alarm(self, config_dict):
+ """
+ """
+ return None
+
diff --git a/plugins/vRealiseOps/plugin_receiver.py b/plugins/vRealiseOps/plugin_receiver.py
new file mode 100644
index 0000000..6a134bd
--- /dev/null
+++ b/plugins/vRealiseOps/plugin_receiver.py
@@ -0,0 +1,116 @@
+# -*- coding: utf-8 -*-
+"""
+Montoring plugin receiver that consumes the request messages &
+responds using producer for vROPs
+"""
+
+from mon_plugin_vrops import MonPlugin
+from kafka_consumer_vrops import vROP_KafkaConsumer
+#To Do- Change producer
+#from core.message_bus.producer import KafkaProducer
+import json
+import logging as log
+
+class PluginReceiver():
+ """MON Plugin receiver receiving request messages & responding using producer for vROPs
+ telemetry plugin
+ """
+ def __init__(self):
+ """Constructor of PluginReceiver
+ """
+
+ topics = ['alarm_request', 'metric_request', 'Access_Credentials', 'alarm_response']
+ #To Do - Add broker uri
+ broker_uri = None
+ self.mon_plugin = MonPlugin()
+ self.consumer = vROP_KafkaConsumer(topics, broker_uri)
+ #To Do- Change producer
+ #self.producer = KafkaProducer()
+
+ def consume(self):
+ """Consume the message, act on it & respond
+ """
+ try:
+ for message in self.consumer.vrops_consumer:
+ if message.topic == 'alarm_request':
+ if message.key == "create_alarm_request":
+ config_alarm_info = json.loads(message.value)
+ alarm_uuid = self.create_alarm(config_alarm_info['alarm_creation_request'])
+ log.info("Alarm created with alarm uuid: {}".format(alarm_uuid))
+ #To Do - Publish message using producer
+ #self.publish_create_alarm_status(alarm_uuid, config_alarm_info)
+ elif message.key == "update_alarm_request":
+ update_alarm_info = json.loads(message.value)
+ alarm_uuid = self.update_alarm(update_alarm_info['alarm_creation_request'])
+ log.info("Alarm defination updated : alarm uuid: {}".format(alarm_uuid))
+ #To Do - Publish message using producer
+ #self.publish_update_alarm_status(alarm_uuid, update_alarm_info)
+ elif message.topic == 'metric_request':
+ if message.key == "read_metric_data_request":
+ metric_request_info = json.loads(message.value)
+ metrics_data = self.mon_plugin.get_metrics_data(metric_request_info)
+ log.info("Collected Metrics Data: {}".format(metrics_data))
+ #To Do - Publish message using producer
+ #self.publish_metrics_data_status(metrics_data)
+
+ except Exception as exp:
+ log.error("Exception in receiver: {}".format(exp))
+
+ def create_alarm(self, config_alarm_info):
+ """Create alarm using vROPs plugin
+ """
+ plugin_uuid = self.mon_plugin.configure_rest_plugin()
+ alarm_uuid = self.mon_plugin.configure_alarm(config_alarm_info)
+ return alarm_uuid
+
+ def publish_create_alarm_status(self, alarm_uuid, config_alarm_info):
+ """Publish create alarm status using producer
+ """
+ topic = 'alarm_response'
+ msg_key = 'create_alarm_response'
+ response_msg = {"schema_version":1.0,
+ "schema_type":"create_alarm_response",
+ "alarm_creation_response":
+ {"correlation_id":config_alarm_info["alarm_creation_request"]["correlation_id"],
+ "alarm_uuid":alarm_uuid,
+ "status": True if alarm_uuid else False
+ }
+ }
+ #To Do - Add producer
+ #self.producer.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
+
+ def update_alarm(self, update_alarm_info):
+ """Updare already created alarm
+ """
+ alarm_uuid = self.mon_plugin.reconfigure_alarm(update_alarm_info)
+ return alarm_uuid
+
+ def publish_update_alarm_status(self, alarm_uuid, update_alarm_info):
+ """Publish update alarm status requests using producer
+ """
+ topic = 'alarm_response'
+ msg_key = 'update_alarm_response'
+ response_msg = {"schema_version":1.0,
+ "schema_type":"update_alarm_response",
+ "alarm_update_response":
+ {"correlation_id":update_alarm_info["alarm_creation_request"]["correlation_id"],
+ "alarm_uuid":alarm_uuid,
+ "status": True if alarm_uuid else False
+ }
+ }
+ #To Do - Add producer
+ #self.producer.publish(key=msg_key, value=json.dumps(response_msg), topic=topic)
+
+ def publish_metrics_data_status(self, metrics_data):
+ """Publish the requested metric data using producer
+ """
+ topic = 'metric_response'
+ msg_key = 'read_metric_data_response'
+ #To Do - Add producer
+ #self.producer.publish(key=msg_key, value=json.dumps(metrics_data), topic=topic)
+
+#For testing
+#log.basicConfig(filename='mon_vrops_log.log',level=log.DEBUG)
+#plugin_rcvr = PluginReceiver()
+#plugin_rcvr.consume()
+
diff --git a/plugins/vRealiseOps/vrops_config.xml b/plugins/vRealiseOps/vrops_config.xml
index aa628d7..da7cc53 100644
--- a/plugins/vRealiseOps/vrops_config.xml
+++ b/plugins/vRealiseOps/vrops_config.xml
@@ -1,6 +1,6 @@
-<?xml version="1.0" encoding="UTF-8" ?>
<alarmsDefaultConfig>
<Average_Memory_Usage_Above_Threshold>
+ <vrops_alarm>Avg_Mem_Usage_Above_Thr</vrops_alarm>
<period>300</period>
<evaluation>2</evaluation>
<cancel_period>300</cancel_period>
@@ -16,6 +16,7 @@
<unit>%</unit>
</Average_Memory_Usage_Above_Threshold>
<Read_Latency_Above_Threshold>
+ <vrops_alarm>RD_Latency_Above_Thr</vrops_alarm>
<period>300</period>
<evaluation>3</evaluation>
<cancel_period>300</cancel_period>
@@ -31,6 +32,7 @@
<unit>msec</unit>
</Read_Latency_Above_Threshold>
<Write_Latency_Above_Threshold>
+ <vrops_alarm>WR_Latency_Above_Thr</vrops_alarm>
<period>300</period>
<evaluation>3</evaluation>
<cancel_period>300</cancel_period>
@@ -46,6 +48,7 @@
<unit>msec</unit>
</Write_Latency_Above_Threshold>
<Net_Packets_Dropped>
+ <vrops_alarm>Net_Packets_Dropped</vrops_alarm>
<period>300</period>
<evaluation>1</evaluation>
<cancel_period>300</cancel_period>
@@ -61,6 +64,7 @@
<unit>nos</unit>
</Net_Packets_Dropped>
<CPU_Utilization_Above_Threshold>
+ <vrops_alarm>CPU_Utilization_Above_Thr</vrops_alarm>
<period>300</period>
<evaluation>1</evaluation>
<cancel_period>300</cancel_period>
@@ -119,4 +123,22 @@
<metric_key>net:Aggregate of all instances|packetsTxPerSec</metric_key>
<unit>nos</unit>
</PACKETS_SENT>
+ <Access_Config>
+ <vrops_site>https://192.169.241.123</vrops_site>
+ <vrops_user>Admin</vrops_user>
+ <vrops_password>VMware1!</vrops_password>
+ <vcloud-site>https://mano-vcd-1.corp.local</vcloud-site>
+ <admin_username>administrator</admin_username>
+ <admin_password>VMware1!</admin_password>
+ <nsx_manager>https://192.169.241.104</nsx_manager>
+ <nsx_user>admin</nsx_user>
+ <nsx_password>VMware1!</nsx_password>
+ <vcenter_ip>192.169.241.103</vcenter_ip>
+ <vcenter_port>443</vcenter_port>
+ <vcenter_user>administrator@vsphere.local</vcenter_user>
+ <vcenter_password>VMware1!</vcenter_password>
+ <vim_tenant_name>Org2-VDC-PVDC1</vim_tenant_name>
+ <orgname>Org2</orgname>
+ <tenant_id>Org2-VDC-PVDC1</tenant_id>
+ </Access_Config>
</alarmsDefaultConfig>