1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
29 from kafka
import KafkaProducer
, KafkaConsumer
30 from kafka
.errors
import KafkaError
32 log
= logging
.getLogger(__name__
)
35 class ScalingConfigTest(unittest
.TestCase
):
38 kafka_server
= '{}:{}'.format(os
.getenv("KAFKA_SERVER_HOST", "localhost"),
39 os
.getenv("KAFKA_SERVER_PORT", "9092"))
40 self
.producer
= KafkaProducer(bootstrap_servers
=kafka_server
,
41 key_serializer
=str.encode
,
42 value_serializer
=str.encode
)
43 self
.consumer
= KafkaConsumer(bootstrap_servers
='localhost:9092',
45 self
.consumer
.subscribe(['lcm_pm'])
47 self
.skipTest('Kafka server not present.')
49 def test_send_scaling_config_msg(self
):
52 os
.path
.join(os
.path
.dirname(__file__
), '../examples/configure_scaling_full_example.json')) as file:
53 payload
= json
.load(file)
54 future
= self
.producer
.send('lcm_pm', json
.dumps(payload
), key
="configure_scaling")
55 result
= future
.get(timeout
=60)
56 log
.info('Result: %s', result
)
59 # TODO: Improve assertions
60 self
.assertIsNotNone(result
)
61 except Exception as e
:
65 if __name__
== '__main__':