+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
import unittest
+from unittest import mock
-import mock
+from kafka import KafkaProducer
+from kafka.errors import KafkaError
+from osm_common import dbmongo
-from osm_mon.core.database import VimCredentials
-from osm_mon.core.message_bus.common_consumer import *
+from osm_mon.core.database import VimCredentials, DatabaseManager
+from osm_mon.core.message_bus.common_consumer import CommonConsumer
+@mock.patch.object(dbmongo.DbMongo, "db_connect", mock.Mock())
class CommonConsumerTest(unittest.TestCase):
+
+ def setUp(self):
+ try:
+ KafkaProducer(bootstrap_servers='localhost:9092',
+ key_serializer=str.encode,
+ value_serializer=str.encode
+ )
+ except KafkaError:
+ self.skipTest('Kafka server not present.')
+
@mock.patch.object(DatabaseManager, "get_credentials")
def test_get_vim_type(self, get_creds):
mock_creds = VimCredentials()
get_creds.return_value = mock_creds
- db_manager = DatabaseManager()
- vim_type = get_vim_type(db_manager, 'test_id')
+ common_consumer = CommonConsumer()
+ vim_type = common_consumer.get_vim_type('test_id')
self.assertEqual(vim_type, 'openstack')
'internal-connection-point': [],
'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
- 'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7'
+ 'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7',
+ 'name': 'ubuntuvnf_vnfd-VM'
}
],
'vnfd-ref': 'ubuntuvnf_vnfd',
'created-time': 1526044312.0999322,
'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'}
-
- common_db = dbmongo.DbMongo()
- vdur = get_vdur(common_db, '5ec3f571-d540-4cb0-9992-971d1b08312e', '1', 'ubuntuvnf_vnfd-VM')
+ common_consumer = CommonConsumer()
+ vdur = common_consumer.common_db.get_vdur('5ec3f571-d540-4cb0-9992-971d1b08312e', '1', 'ubuntuvnf_vnfd-VM')
expected_vdur = {
'internal-connection-point': [],
'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
- 'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7'
+ 'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7',
+ 'name': 'ubuntuvnf_vnfd-VM'
}
self.assertDictEqual(vdur, expected_vdur)