Adds license header to python files
[osm/MON.git] / policy_module / osm_policy_module / tests / integration / test_scaling_config_kafka_msg.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import json
25 import logging
26 import os
27 import unittest
28
29 from kafka import KafkaProducer, KafkaConsumer
30 from kafka.errors import KafkaError
31
32 log = logging.getLogger(__name__)
33
34
35 class ScalingConfigTest(unittest.TestCase):
36 def setUp(self):
37 try:
38 kafka_server = '{}:{}'.format(os.getenv("KAFKA_SERVER_HOST", "localhost"),
39 os.getenv("KAFKA_SERVER_PORT", "9092"))
40 self.producer = KafkaProducer(bootstrap_servers=kafka_server,
41 key_serializer=str.encode,
42 value_serializer=str.encode)
43 self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
44 group_id='osm_mon')
45 self.consumer.subscribe(['lcm_pm'])
46 except KafkaError:
47 self.skipTest('Kafka server not present.')
48
49 def test_send_scaling_config_msg(self):
50 try:
51 with open(
52 os.path.join(os.path.dirname(__file__), '../examples/configure_scaling_full_example.json')) as file:
53 payload = json.load(file)
54 future = self.producer.send('lcm_pm', json.dumps(payload), key="configure_scaling")
55 result = future.get(timeout=60)
56 log.info('Result: %s', result)
57
58 self.producer.flush()
59 # TODO: Improve assertions
60 self.assertIsNotNone(result)
61 except Exception as e:
62 self.fail(e)
63
64
65 if __name__ == '__main__':
66 unittest.main()