Merge "Fixes bugs for integration with policy module"
diff --git a/osm_mon/core/message_bus/common_consumer.py b/osm_mon/core/message_bus/common_consumer.py
index e79e98a..0ba003b 100755
--- a/osm_mon/core/message_bus/common_consumer.py
+++ b/osm_mon/core/message_bus/common_consumer.py
@@ -77,10 +77,12 @@
vrops_rcvr = plugin_receiver.PluginReceiver()
-def get_vim_type(message):
+def get_vim_type(msg):
"""Get the vim type that is required by the message."""
try:
- return json.loads(message.value)["vim_type"].lower()
+ vim_uuid = json.loads(msg.value)["vim_uuid"].lower()
+ credentials = database_manager.get_credentials(vim_uuid)
+ return credentials.type
except Exception as exc:
log.warn("vim_type is not configured correctly; %s", exc)
return None
diff --git a/osm_mon/plugins/OpenStack/common.py b/osm_mon/plugins/OpenStack/common.py
index 0c5e7c6..57acf40 100644
--- a/osm_mon/plugins/OpenStack/common.py
+++ b/osm_mon/plugins/OpenStack/common.py
@@ -76,17 +76,17 @@
if req_type == "put":
response = requests.put(
url, data=payload, headers=headers,
- timeout=1)
+ timeout=10)
elif req_type == "get":
response = requests.get(
- url, params=params, headers=headers, timeout=1)
+ url, params=params, headers=headers, timeout=10)
elif req_type == "delete":
response = requests.delete(
- url, headers=headers, timeout=1)
+ url, headers=headers, timeout=10)
else:
response = requests.post(
url, data=payload, headers=headers,
- timeout=1)
+ timeout=10)
# Raises exception if there was an error
try:
diff --git a/osm_mon/test/functional/__init__.py b/osm_mon/test/functional/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/osm_mon/test/functional/__init__.py
diff --git a/osm_mon/test/integration/test_vim_account.py b/osm_mon/test/functional/test_vim_account.py
similarity index 90%
rename from osm_mon/test/integration/test_vim_account.py
rename to osm_mon/test/functional/test_vim_account.py
index e84b3cb..2a7a722 100644
--- a/osm_mon/test/integration/test_vim_account.py
+++ b/osm_mon/test/functional/test_vim_account.py
@@ -49,8 +49,8 @@
except KafkaError:
self.skipTest('Kafka server not present.')
- # TODO: REFACTOR. This test requires common_consumer running. Needs to be changed so it does not.
- @unittest.skip("Needs refactoring.")
+ @unittest.skip("Correct support for functional tests is pending.")
+ # TODO: Refactor
def test_create_edit_delete_vim_account(self):
"""Test vim_account creation message from KafkaProducer."""
# Set-up message, producer and consumer for tests
@@ -73,7 +73,7 @@
self.producer.flush()
time.sleep(1)
- creds = self.auth_manager.get_credentials(create_payload['_id'])
+ creds = self.auth_manager.get_credentials('test_id')
self.assertIsNotNone(creds)
self.assertEqual(creds.name, create_payload['name'])
self.assertEqual(json.loads(creds.config), create_payload['config'])
@@ -98,7 +98,7 @@
self.producer.flush()
time.sleep(1)
- creds = self.auth_manager.get_credentials(edit_payload['_id'])
+ creds = self.auth_manager.get_credentials('test_id')
self.assertEqual(creds.name, edit_payload['name'])
self.assertEqual(json.loads(creds.config), edit_payload['config'])
@@ -111,5 +111,5 @@
self.producer.flush()
time.sleep(1)
- creds = self.auth_manager.get_credentials(delete_payload['_id'])
+ creds = self.auth_manager.get_credentials('test_id')
self.assertIsNone(creds)
diff --git a/osm_mon/test/integration/test_alarm_integration.py b/osm_mon/test/integration/test_alarm_integration.py
index 368cc10..b443d6a 100644
--- a/osm_mon/test/integration/test_alarm_integration.py
+++ b/osm_mon/test/integration/test_alarm_integration.py
@@ -32,6 +32,7 @@
from kafka.errors import KafkaError
from osm_mon.core.auth import AuthManager
+from osm_mon.core.database import DatabaseManager
from osm_mon.core.message_bus.producer import KafkaProducer as prod
from osm_mon.plugins.OpenStack import response
from osm_mon.plugins.OpenStack.Aodh import alarming
@@ -46,7 +47,8 @@
try:
self.producer = KafkaProducer(bootstrap_servers='localhost:9092')
self.req_consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
- consumer_timeout_ms=5000)
+ auto_offset_reset='earliest',
+ consumer_timeout_ms=60000)
self.req_consumer.subscribe(['alarm_request'])
except KafkaError:
self.skipTest('Kafka server not present.')
@@ -89,6 +91,7 @@
return
self.fail("No message received in consumer")
+ @mock.patch.object(DatabaseManager, "save_alarm", mock.Mock())
@mock.patch.object(Common, "get_auth_token", mock.Mock())
@mock.patch.object(Common, "get_endpoint", mock.Mock())
@mock.patch.object(prod, "create_alarm_response")
@@ -193,6 +196,8 @@
return
self.fail("No message received in consumer")
+ @mock.patch.object(Common, "get_auth_token", mock.Mock())
+ @mock.patch.object(Common, "get_endpoint", mock.Mock())
@mock.patch.object(alarming.Alarming, "update_alarm_state")
def test_ack_alarm_req(self, ack_alarm):
"""Test Aodh acknowledge alarm request message from KafkaProducer."""
@@ -204,4 +209,11 @@
self.producer.send('alarm_request', key="acknowledge_alarm",
value=json.dumps(payload))
- self.producer.flush()
+
+ for message in self.req_consumer:
+ # Check the vim desired by the message
+ if message.key == "acknowledge_alarm":
+ self.alarms.alarming(message)
+ return
+
+ self.fail("No message received in consumer")
\ No newline at end of file
diff --git a/osm_mon/test/integration/test_metric_integration.py b/osm_mon/test/integration/test_metric_integration.py
index 6b32a12..66f4f0a 100644
--- a/osm_mon/test/integration/test_metric_integration.py
+++ b/osm_mon/test/integration/test_metric_integration.py
@@ -55,7 +55,7 @@
self.producer = KafkaProducer(bootstrap_servers='localhost:9092')
self.req_consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
- consumer_timeout_ms=2000)
+ consumer_timeout_ms=60000)
self.req_consumer.subscribe(['metric_request'])
except KafkaError:
self.skipTest('Kafka server not present.')
@@ -116,8 +116,6 @@
value=json.dumps(payload))
for message in self.req_consumer:
- # Check the vim desired by the message
- vim_type = json.loads(message.value)["vim_type"].lower()
if message.key == "delete_metric_request":
# Metric has been deleted
del_metric.return_value = True