Implements aiokafka and modifies code to support asyncio 89/6689/11
authorBenjamin Diaz <bdiaz@whitestack.com>
Thu, 11 Oct 2018 15:34:20 +0000 (12:34 -0300)
committerBenjamin Diaz <bdiaz@whitestack.com>
Tue, 16 Oct 2018 20:08:33 +0000 (17:08 -0300)
Replaces multithreading with asyncronous calls to kafka.
Implements the async/await keywords in methods that needed
it.

Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
Change-Id: Ic93054dc9a6b3835e2eaf1a480e5081c5eb9d4f5

debian/python3-osm-policy-module.postinst
osm_policy_module/cmd/policy_module_agent.py
osm_policy_module/common/lcm_client.py
osm_policy_module/common/mon_client.py
osm_policy_module/core/agent.py
osm_policy_module/tests/integration/test_kafka_messages.py
osm_policy_module/tests/integration/test_policy_agent.py
requirements.txt
setup.py

index 3b24842..cd57d53 100644 (file)
@@ -1,7 +1,7 @@
 #!/bin/bash
 
 echo "Installing python dependencies via pip..."
-pip3 install kafka==1.3.*
+pip3 install aiokafka==0.4.*
 pip3 install peewee==3.1.*
 pip3 install jsonschema==2.6.*
 pip3 install six==1.11.*
index c82f006..49c7d3a 100644 (file)
@@ -21,7 +21,7 @@
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
-import argparse
+import asyncio
 import logging
 import sys
 
@@ -32,11 +32,6 @@ from osm_policy_module.core.database import DatabaseManager
 
 def main():
     cfg = Config.instance()
-    parser = argparse.ArgumentParser(prog='pm-scaling-config-agent')
-    parser.add_argument('--config-file', nargs='?', help='Policy module agent configuration file')
-    args = parser.parse_args()
-    if args.config_file:
-        cfg.load_file(args.config_file)
     log_formatter_str = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
     logging.basicConfig(stream=sys.stdout,
                         format=log_formatter_str,
@@ -55,7 +50,8 @@ def main():
     db_manager.create_tables()
     log.info("Database synced correctly.")
     log.info("Starting policy module agent...")
-    agent = PolicyModuleAgent()
+    loop = asyncio.get_event_loop()
+    agent = PolicyModuleAgent(loop)
     agent.run()
 
 
index 34e212f..3d8012f 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+import asyncio
 import datetime
 import json
 import logging
 import time
 import uuid
 
-from kafka import KafkaProducer
+from aiokafka import AIOKafkaProducer
 from osm_common import dbmongo
 
 from osm_policy_module.core.config import Config
@@ -36,25 +37,34 @@ log = logging.getLogger(__name__)
 
 
 class LcmClient:
-    def __init__(self):
+    def __init__(self, loop=None):
         cfg = Config.instance()
         self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
                                            cfg.OSMPOL_MESSAGE_PORT)
-        self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
-                                      key_serializer=str.encode,
-                                      value_serializer=str.encode)
         self.common_db = dbmongo.DbMongo()
         self.common_db.db_connect({'host': cfg.OSMPOL_DATABASE_HOST,
                                    'port': int(cfg.OSMPOL_DATABASE_PORT),
                                    'name': 'osm'})
+        if not loop:
+            loop = asyncio.get_event_loop()
+        self.loop = loop
 
-    def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
+    async def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
         log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
         nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action)
         self.common_db.create("nslcmops", nslcmop)
         log.info("Sending scale action message: %s", json.dumps(nslcmop))
-        self.producer.send(topic='ns', key='scale', value=json.dumps(nslcmop))
-        self.producer.flush()
+        producer = AIOKafkaProducer(loop=self.loop,
+                                    bootstrap_servers=self.kafka_server,
+                                    key_serializer=str.encode,
+                                    value_serializer=str.encode)
+        await producer.start()
+        try:
+            # Produce message
+            await producer.send_and_wait("ns", key="scale", value=json.dumps(nslcmop))
+        finally:
+            # Wait for all pending messages to be delivered or expire.
+            await producer.stop()
 
     def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
         log.debug("_generate_nslcmop %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
index 6309025..911e655 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+import asyncio
 import json
 import logging
 import random
 import uuid
 
-from kafka import KafkaProducer, KafkaConsumer
+from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
 
 from osm_policy_module.core.config import Config
 
@@ -34,63 +35,92 @@ log = logging.getLogger(__name__)
 
 
 class MonClient:
-    def __init__(self):
+    def __init__(self, loop=None):
         cfg = Config.instance()
         self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
                                            cfg.OSMPOL_MESSAGE_PORT)
-        self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
-                                      key_serializer=str.encode,
-                                      value_serializer=str.encode)
+        if not loop:
+            loop = asyncio.get_event_loop()
+        self.loop = loop
 
-    def create_alarm(self, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: int, threshold: int,
-                     statistic: str, operation: str):
-        cor_id = random.randint(1, 1000000)
+    async def create_alarm(self, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: int, threshold: int,
+                           statistic: str, operation: str):
+        cor_id = random.randint(1, 10e7)
         msg = self._build_create_alarm_payload(cor_id, metric_name, ns_id, vdu_name, vnf_member_index, threshold,
                                                statistic,
                                                operation)
         log.info("Sending create_alarm_request %s", msg)
-        self.producer.send(topic='alarm_request', key='create_alarm_request', value=json.dumps(msg))
-        self.producer.flush()
-        consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
-                                 key_deserializer=bytes.decode,
-                                 value_deserializer=bytes.decode,
-                                 consumer_timeout_ms=10000,
-                                 group_id='mon-client-' + str(uuid.uuid4()))
-        consumer.subscribe(['alarm_response'])
-        for message in consumer:
-            if message.key == 'create_alarm_response':
-                content = json.loads(message.value)
-                log.info("Received create_alarm_response %s", content)
-                if self._is_alarm_response_correlation_id_eq(cor_id, content):
-                    if not content['alarm_create_response']['status']:
-                        raise ValueError("Error creating alarm in MON")
-                    alarm_uuid = content['alarm_create_response']['alarm_uuid']
-                    return alarm_uuid
-
+        producer = AIOKafkaProducer(loop=self.loop,
+                                    bootstrap_servers=self.kafka_server,
+                                    key_serializer=str.encode,
+                                    value_serializer=str.encode)
+        await producer.start()
+        consumer = AIOKafkaConsumer(
+            "alarm_response",
+            loop=self.loop,
+            bootstrap_servers=self.kafka_server,
+            group_id="pol-consumer-" + str(uuid.uuid4()),
+            enable_auto_commit=False,
+            key_deserializer=bytes.decode,
+            value_deserializer=bytes.decode,
+            consumer_timeout_ms=10000)
+        await consumer.start()
+        try:
+            await producer.send_and_wait("alarm_request", key="create_alarm_request", value=json.dumps(msg))
+        finally:
+            await producer.stop()
+        try:
+            async for message in consumer:
+                if message.key == 'create_alarm_response':
+                    content = json.loads(message.value)
+                    log.info("Received create_alarm_response %s", content)
+                    if self._is_alarm_response_correlation_id_eq(cor_id, content):
+                        if not content['alarm_create_response']['status']:
+                            raise ValueError("Error creating alarm in MON")
+                        alarm_uuid = content['alarm_create_response']['alarm_uuid']
+                        await consumer.stop()
+                        return alarm_uuid
+        finally:
+            await consumer.stop()
         raise ValueError('Timeout: No alarm creation response from MON. Is MON up?')
 
-    def delete_alarm(self, ns_id: str, vnf_member_index: int, vdu_name: str, alarm_uuid: str):
-        cor_id = random.randint(1, 1000000)
+    async def delete_alarm(self, ns_id: str, vnf_member_index: int, vdu_name: str, alarm_uuid: str):
+        cor_id = random.randint(1, 10e7)
         msg = self._build_delete_alarm_payload(cor_id, ns_id, vdu_name, vnf_member_index, alarm_uuid)
         log.info("Sending delete_alarm_request %s", msg)
-        self.producer.send(topic='alarm_request', key='delete_alarm_request', value=json.dumps(msg))
-        self.producer.flush()
-        consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
-                                 key_deserializer=bytes.decode,
-                                 value_deserializer=bytes.decode,
-                                 consumer_timeout_ms=10000)
-        consumer.subscribe(['alarm_response'])
-        for message in consumer:
-            if message.key == 'delete_alarm_response':
-                content = json.loads(message.value)
-                log.info("Received delete_alarm_response %s", content)
-                if self._is_alarm_response_correlation_id_eq(cor_id, content):
-                    if not content['alarm_delete_response']['status']:
-                        raise ValueError("Error deleting alarm in MON")
-                    alarm_uuid = content['alarm_delete_response']['alarm_uuid']
-                    return alarm_uuid
-
-        raise ValueError('Timeout: No alarm creation response from MON. Is MON up?')
+        producer = AIOKafkaProducer(loop=self.loop,
+                                    bootstrap_servers=self.kafka_server,
+                                    key_serializer=str.encode,
+                                    value_serializer=str.encode)
+        await producer.start()
+        consumer = AIOKafkaConsumer(
+            "alarm_response",
+            loop=self.loop,
+            bootstrap_servers=self.kafka_server,
+            group_id="pol-consumer-" + str(uuid.uuid4()),
+            enable_auto_commit=False,
+            key_deserializer=bytes.decode,
+            value_deserializer=bytes.decode,
+            consumer_timeout_ms=10000)
+        await consumer.start()
+        try:
+            await producer.send_and_wait("alarm_request", key="delete_alarm_request", value=json.dumps(msg))
+        finally:
+            await producer.stop()
+        try:
+            async for message in consumer:
+                if message.key == 'delete_alarm_response':
+                    content = json.loads(message.value)
+                    log.info("Received delete_alarm_response %s", content)
+                    if self._is_alarm_response_correlation_id_eq(cor_id, content):
+                        if not content['alarm_delete_response']['status']:
+                            raise ValueError("Error deleting alarm in MON")
+                        alarm_uuid = content['alarm_delete_response']['alarm_uuid']
+                        await consumer.stop()
+                        return alarm_uuid
+        finally:
+            await consumer.stop()
+        raise ValueError('Timeout: No alarm deletion response from MON. Is MON up?')
 
     def _build_create_alarm_payload(self, cor_id: int, metric_name: str, ns_id: str, vdu_name: str,
                                     vnf_member_index: int,
index ba35391..d637374 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+import asyncio
 import datetime
 import json
 import logging
-import threading
 from json import JSONDecodeError
 
 import yaml
-from kafka import KafkaConsumer
+from aiokafka import AIOKafkaConsumer
 
 from osm_policy_module.common.db_client import DbClient
 from osm_policy_module.common.lcm_client import LcmClient
@@ -43,26 +43,38 @@ ALLOWED_KAFKA_KEYS = ['instantiated', 'scaled', 'notify_alarm']
 
 
 class PolicyModuleAgent:
-    def __init__(self):
+    def __init__(self, loop=None):
         cfg = Config.instance()
+        if not loop:
+            loop = asyncio.get_event_loop()
+        self.loop = loop
         self.db_client = DbClient()
-        self.mon_client = MonClient()
-        self.lcm_client = LcmClient()
+        self.mon_client = MonClient(loop=self.loop)
+        self.lcm_client = LcmClient(loop=self.loop)
         self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
                                            cfg.OSMPOL_MESSAGE_PORT)
 
     def run(self):
-        consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
-                                 key_deserializer=bytes.decode,
-                                 value_deserializer=bytes.decode,
-                                 group_id='pol-consumer')
-        consumer.subscribe(["ns", "alarm_response"])
+        self.loop.run_until_complete(self.start())
 
-        for message in consumer:
-            t = threading.Thread(target=self._process_msg, args=(message.topic, message.key, message.value,))
-            t.start()
+    async def start(self):
+        consumer = AIOKafkaConsumer(
+            "ns",
+            "alarm_response",
+            loop=self.loop,
+            bootstrap_servers=self.kafka_server,
+            group_id="pol-consumer",
+            key_deserializer=bytes.decode,
+            value_deserializer=bytes.decode,
+        )
+        await consumer.start()
+        try:
+            async for msg in consumer:
+                await self._process_msg(msg.topic, msg.key, msg.value)
+        finally:
+            await consumer.stop()
 
-    def _process_msg(self, topic, key, msg):
+    async def _process_msg(self, topic, key, msg):
         log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg)
         try:
             if key in ALLOWED_KAFKA_KEYS:
@@ -72,16 +84,16 @@ class PolicyModuleAgent:
                     content = yaml.safe_load(msg)
 
                 if key == 'instantiated' or key == 'scaled':
-                    self._handle_instantiated_or_scaled(content)
+                    await self._handle_instantiated_or_scaled(content)
 
                 if key == 'notify_alarm':
-                    self._handle_alarm_notification(content)
+                    await self._handle_alarm_notification(content)
             else:
                 log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key)
         except Exception:
             log.exception("Error consuming message: ")
 
-    def _handle_alarm_notification(self, content):
+    async def _handle_alarm_notification(self, content):
         log.debug("_handle_alarm_notification: %s", content)
         alarm_id = content['notify_details']['alarm_uuid']
         metric_name = content['notify_details']['metric_name']
@@ -109,30 +121,30 @@ class PolicyModuleAgent:
                 log.info("Time between last scale and now is less than cooldown time. Skipping.")
                 return
             log.info("Sending scaling action message for ns: %s", alarm_id)
-            self.lcm_client.scale(alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
-                                  alarm.scaling_criteria.scaling_policy.scaling_group.name,
-                                  alarm.vnf_member_index,
-                                  alarm.action)
+            await self.lcm_client.scale(alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
+                                        alarm.scaling_criteria.scaling_policy.scaling_group.name,
+                                        alarm.vnf_member_index,
+                                        alarm.action)
             alarm.scaling_criteria.scaling_policy.last_scale = datetime.datetime.now()
             alarm.scaling_criteria.scaling_policy.save()
         except ScalingAlarm.DoesNotExist:
             log.info("There is no action configured for alarm %s.", alarm_id)
 
-    def _handle_instantiated_or_scaled(self, content):
+    async def _handle_instantiated_or_scaled(self, content):
         log.debug("_handle_instantiated_or_scaled: %s", content)
         nslcmop_id = content['nslcmop_id']
         nslcmop = self.db_client.get_nslcmop(nslcmop_id)
         if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
             nsr_id = nslcmop['nsInstanceId']
             log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
-            self._configure_scaling_groups(nsr_id)
+            await self._configure_scaling_groups(nsr_id)
         else:
             log.info(
                 "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
                 "Current state is %s. Skipping...",
                 nslcmop['operationState'])
 
-    def _configure_scaling_groups(self, nsr_id: str):
+    async def _configure_scaling_groups(self, nsr_id: str):
         log.debug("_configure_scaling_groups: %s", nsr_id)
         # TODO: Add support for non-nfvi metrics
         alarms_created = []
@@ -209,8 +221,11 @@ class PolicyModuleAgent:
 
                                     for vdu_ref in scaling_group['vdu']:
                                         vnf_monitoring_param = next(
-                                            filter(lambda param: param['id'] == scaling_criteria[
-                                                'vnf-monitoring-param-ref'], vnf_monitoring_params))
+                                            filter(
+                                                lambda param: param['id'] == scaling_criteria[
+                                                    'vnf-monitoring-param-ref'],
+                                                vnf_monitoring_params)
+                                        )
                                         if not vdu_ref['vdu-id-ref'] == vnf_monitoring_param['vdu-ref']:
                                             continue
                                         vdu = next(
@@ -227,15 +242,21 @@ class PolicyModuleAgent:
                                                    vnfr['vdur']))
                                         for vdur in vdurs:
                                             try:
-                                                ScalingAlarm.select().join(ScalingCriteria).where(
+                                                (ScalingAlarm.select()
+                                                 .join(ScalingCriteria)
+                                                 .join(ScalingPolicy)
+                                                 .join(ScalingGroup)
+                                                 .where(
                                                     ScalingAlarm.vdu_name == vdur['name'],
-                                                    ScalingCriteria.name == scaling_criteria['name']
-                                                ).get()
+                                                    ScalingCriteria.name == scaling_criteria['name'],
+                                                    ScalingPolicy.name == scaling_policy['name'],
+                                                    ScalingGroup.nsr_id == nsr_id
+                                                ).get())
                                                 log.debug("vdu %s already has an alarm configured", vdur['name'])
                                                 continue
                                             except ScalingAlarm.DoesNotExist:
                                                 pass
-                                            alarm_uuid = self.mon_client.create_alarm(
+                                            alarm_uuid = await self.mon_client.create_alarm(
                                                 metric_name=vdu_monitoring_param['nfvi-metric'],
                                                 ns_id=nsr_id,
                                                 vdu_name=vdur['name'],
@@ -251,7 +272,7 @@ class PolicyModuleAgent:
                                                 vdu_name=vdur['name'],
                                                 scaling_criteria=scaling_criteria_record
                                             )
-                                            alarm_uuid = self.mon_client.create_alarm(
+                                            alarm_uuid = await self.mon_client.create_alarm(
                                                 metric_name=vdu_monitoring_param['nfvi-metric'],
                                                 ns_id=nsr_id,
                                                 vdu_name=vdur['name'],
@@ -273,5 +294,5 @@ class PolicyModuleAgent:
                 if len(alarms_created) > 0:
                     log.info("Cleaning alarm resources in MON")
                     for alarm in alarms_created:
-                        self.mon_client.delete_alarm(*alarm)
+                        await self.mon_client.delete_alarm(*alarm)
                 raise e
index 581d53f..29d88e2 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+import asyncio
 import json
 import logging
 import os
 import sys
 import unittest
 
-from kafka import KafkaProducer, KafkaConsumer
+from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
 from kafka.errors import KafkaError
 
 from osm_policy_module.core.config import Config
@@ -40,38 +41,51 @@ log.addHandler(stream_handler)
 
 class KafkaMessagesTest(unittest.TestCase):
     def setUp(self):
-        try:
-            cfg = Config.instance()
-            kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
-                                          cfg.OSMPOL_MESSAGE_PORT)
-            self.producer = KafkaProducer(bootstrap_servers=kafka_server,
-                                          key_serializer=str.encode,
-                                          value_serializer=str.encode)
-            self.consumer = KafkaConsumer(bootstrap_servers=kafka_server,
-                                          key_deserializer=bytes.decode,
-                                          value_deserializer=bytes.decode,
-                                          auto_offset_reset='earliest',
-                                          consumer_timeout_ms=5000)
-            self.consumer.subscribe(['ns'])
-        except KafkaError:
-            self.skipTest('Kafka server not present.')
+        super()
+        cfg = Config.instance()
+        self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
+                                           cfg.OSMPOL_MESSAGE_PORT)
+        self.loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(None)
 
     def tearDown(self):
-        self.producer.close()
-        self.consumer.close()
+        super()
 
     def test_send_instantiated_msg(self):
-        with open(
-                os.path.join(os.path.dirname(__file__), '../examples/instantiated.json')) as file:
-            payload = json.load(file)
-            self.producer.send('ns', json.dumps(payload), key="instantiated")
-            self.producer.flush()
-
-        for message in self.consumer:
-            if message.key == 'instantiated':
-                self.assertIsNotNone(message.value)
-                return
-        self.fail("No message received in consumer")
+        async def test_send_instantiated_msg():
+            producer = AIOKafkaProducer(loop=self.loop,
+                                        bootstrap_servers=self.kafka_server,
+                                        key_serializer=str.encode,
+                                        value_serializer=str.encode)
+            await producer.start()
+            consumer = AIOKafkaConsumer(
+                "ns",
+                loop=self.loop,
+                bootstrap_servers=self.kafka_server,
+                consumer_timeout_ms=10000,
+                auto_offset_reset='earliest',
+                value_deserializer=bytes.decode,
+                key_deserializer=bytes.decode)
+            await consumer.start()
+            try:
+                with open(
+                        os.path.join(os.path.dirname(__file__), '../examples/instantiated.json')) as file:
+                    payload = json.load(file)
+                    await producer.send_and_wait("ns", key="instantiated", value=json.dumps(payload))
+            finally:
+                await producer.stop()
+            try:
+                async for message in consumer:
+                    if message.key == 'instantiated':
+                        self.assertIsNotNone(message.value)
+                        return
+            finally:
+                await consumer.stop()
+
+        try:
+            self.loop.run_until_complete(test_send_instantiated_msg())
+        except KafkaError:
+            self.skipTest('Kafka server not present.')
 
 
 if __name__ == '__main__':
index bed6eb5..a2794eb 100644 (file)
@@ -21,6 +21,7 @@
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+import asyncio
 import logging
 import sys
 import unittest
@@ -413,6 +414,8 @@ class PolicyModuleAgentTest(unittest.TestCase):
         test_db.connect()
         test_db.drop_tables(MODELS)
         test_db.create_tables(MODELS)
+        self.loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(None)
 
     def tearDown(self):
         super()
@@ -430,7 +433,7 @@ class PolicyModuleAgentTest(unittest.TestCase):
             if '2' in args[1]:
                 return vnfr_record_mocks[1]
 
-        def _test_configure_scaling_groups_create_alarm(*args, **kwargs):
+        async def _test_configure_scaling_groups_create_alarm(*args, **kwargs):
             return uuid.uuid4()
 
         kafka_producer_init.return_value = None
@@ -438,8 +441,8 @@ class PolicyModuleAgentTest(unittest.TestCase):
         get_nsr.return_value = nsr_record_mock
         get_vnfd.return_value = vnfd_record_mock
         create_alarm.side_effect = _test_configure_scaling_groups_create_alarm
-        agent = PolicyModuleAgent()
-        agent._configure_scaling_groups("test_nsr_id")
+        agent = PolicyModuleAgent(self.loop)
+        self.loop.run_until_complete(agent._configure_scaling_groups("test_nsr_id"))
         create_alarm.assert_any_call(metric_name='average_memory_utilization',
                                      ns_id='test_nsr_id',
                                      operation='GT',
index cded148..f6451e4 100644 (file)
@@ -1,4 +1,4 @@
-kafka==1.3.*
+aiokafka==0.4.*
 peewee==3.1.*
 jsonschema==2.6.*
 six==1.11.*
index 86dacc4..2a85f83 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -47,7 +47,7 @@ setup(
     packages=[_name],
     include_package_data=True,
     install_requires=[
-        "kafka==1.3.*",
+        "aiokafka==0.4.*",
         "peewee==3.1.*",
         "jsonschema==2.6.*",
         "six==1.11.*",