Replaces multithreading with asyncronous calls to kafka.
Implements the async/await keywords in methods that needed
it.
Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
Change-Id: Ic93054dc9a6b3835e2eaf1a480e5081c5eb9d4f5
#!/bin/bash
echo "Installing python dependencies via pip..."
-pip3 install kafka==1.3.*
+pip3 install aiokafka==0.4.*
pip3 install peewee==3.1.*
pip3 install jsonschema==2.6.*
pip3 install six==1.11.*
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-import argparse
+import asyncio
import logging
import sys
def main():
cfg = Config.instance()
- parser = argparse.ArgumentParser(prog='pm-scaling-config-agent')
- parser.add_argument('--config-file', nargs='?', help='Policy module agent configuration file')
- args = parser.parse_args()
- if args.config_file:
- cfg.load_file(args.config_file)
log_formatter_str = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(stream=sys.stdout,
format=log_formatter_str,
db_manager.create_tables()
log.info("Database synced correctly.")
log.info("Starting policy module agent...")
- agent = PolicyModuleAgent()
+ loop = asyncio.get_event_loop()
+ agent = PolicyModuleAgent(loop)
agent.run()
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
+import asyncio
import datetime
import json
import logging
import time
import uuid
-from kafka import KafkaProducer
+from aiokafka import AIOKafkaProducer
from osm_common import dbmongo
from osm_policy_module.core.config import Config
class LcmClient:
- def __init__(self):
+ def __init__(self, loop=None):
cfg = Config.instance()
self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
cfg.OSMPOL_MESSAGE_PORT)
- self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
- key_serializer=str.encode,
- value_serializer=str.encode)
self.common_db = dbmongo.DbMongo()
self.common_db.db_connect({'host': cfg.OSMPOL_DATABASE_HOST,
'port': int(cfg.OSMPOL_DATABASE_PORT),
'name': 'osm'})
+ if not loop:
+ loop = asyncio.get_event_loop()
+ self.loop = loop
- def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
+ async def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action)
self.common_db.create("nslcmops", nslcmop)
log.info("Sending scale action message: %s", json.dumps(nslcmop))
- self.producer.send(topic='ns', key='scale', value=json.dumps(nslcmop))
- self.producer.flush()
+ producer = AIOKafkaProducer(loop=self.loop,
+ bootstrap_servers=self.kafka_server,
+ key_serializer=str.encode,
+ value_serializer=str.encode)
+ await producer.start()
+ try:
+ # Produce message
+ await producer.send_and_wait("ns", key="scale", value=json.dumps(nslcmop))
+ finally:
+ # Wait for all pending messages to be delivered or expire.
+ await producer.stop()
def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
log.debug("_generate_nslcmop %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
+import asyncio
import json
import logging
import random
import uuid
-from kafka import KafkaProducer, KafkaConsumer
+from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from osm_policy_module.core.config import Config
class MonClient:
- def __init__(self):
+ def __init__(self, loop=None):
cfg = Config.instance()
self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
cfg.OSMPOL_MESSAGE_PORT)
- self.producer = KafkaProducer(bootstrap_servers=self.kafka_server,
- key_serializer=str.encode,
- value_serializer=str.encode)
+ if not loop:
+ loop = asyncio.get_event_loop()
+ self.loop = loop
- def create_alarm(self, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: int, threshold: int,
- statistic: str, operation: str):
- cor_id = random.randint(1, 1000000)
+ async def create_alarm(self, metric_name: str, ns_id: str, vdu_name: str, vnf_member_index: int, threshold: int,
+ statistic: str, operation: str):
+ cor_id = random.randint(1, 10e7)
msg = self._build_create_alarm_payload(cor_id, metric_name, ns_id, vdu_name, vnf_member_index, threshold,
statistic,
operation)
log.info("Sending create_alarm_request %s", msg)
- self.producer.send(topic='alarm_request', key='create_alarm_request', value=json.dumps(msg))
- self.producer.flush()
- consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- consumer_timeout_ms=10000,
- group_id='mon-client-' + str(uuid.uuid4()))
- consumer.subscribe(['alarm_response'])
- for message in consumer:
- if message.key == 'create_alarm_response':
- content = json.loads(message.value)
- log.info("Received create_alarm_response %s", content)
- if self._is_alarm_response_correlation_id_eq(cor_id, content):
- if not content['alarm_create_response']['status']:
- raise ValueError("Error creating alarm in MON")
- alarm_uuid = content['alarm_create_response']['alarm_uuid']
- return alarm_uuid
-
+ producer = AIOKafkaProducer(loop=self.loop,
+ bootstrap_servers=self.kafka_server,
+ key_serializer=str.encode,
+ value_serializer=str.encode)
+ await producer.start()
+ consumer = AIOKafkaConsumer(
+ "alarm_response",
+ loop=self.loop,
+ bootstrap_servers=self.kafka_server,
+ group_id="pol-consumer-" + str(uuid.uuid4()),
+ enable_auto_commit=False,
+ key_deserializer=bytes.decode,
+ value_deserializer=bytes.decode,
+ consumer_timeout_ms=10000)
+ await consumer.start()
+ try:
+ await producer.send_and_wait("alarm_request", key="create_alarm_request", value=json.dumps(msg))
+ finally:
+ await producer.stop()
+ try:
+ async for message in consumer:
+ if message.key == 'create_alarm_response':
+ content = json.loads(message.value)
+ log.info("Received create_alarm_response %s", content)
+ if self._is_alarm_response_correlation_id_eq(cor_id, content):
+ if not content['alarm_create_response']['status']:
+ raise ValueError("Error creating alarm in MON")
+ alarm_uuid = content['alarm_create_response']['alarm_uuid']
+ await consumer.stop()
+ return alarm_uuid
+ finally:
+ await consumer.stop()
raise ValueError('Timeout: No alarm creation response from MON. Is MON up?')
- def delete_alarm(self, ns_id: str, vnf_member_index: int, vdu_name: str, alarm_uuid: str):
- cor_id = random.randint(1, 1000000)
+ async def delete_alarm(self, ns_id: str, vnf_member_index: int, vdu_name: str, alarm_uuid: str):
+ cor_id = random.randint(1, 10e7)
msg = self._build_delete_alarm_payload(cor_id, ns_id, vdu_name, vnf_member_index, alarm_uuid)
log.info("Sending delete_alarm_request %s", msg)
- self.producer.send(topic='alarm_request', key='delete_alarm_request', value=json.dumps(msg))
- self.producer.flush()
- consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- consumer_timeout_ms=10000)
- consumer.subscribe(['alarm_response'])
- for message in consumer:
- if message.key == 'delete_alarm_response':
- content = json.loads(message.value)
- log.info("Received delete_alarm_response %s", content)
- if self._is_alarm_response_correlation_id_eq(cor_id, content):
- if not content['alarm_delete_response']['status']:
- raise ValueError("Error deleting alarm in MON")
- alarm_uuid = content['alarm_delete_response']['alarm_uuid']
- return alarm_uuid
-
- raise ValueError('Timeout: No alarm creation response from MON. Is MON up?')
+ producer = AIOKafkaProducer(loop=self.loop,
+ bootstrap_servers=self.kafka_server,
+ key_serializer=str.encode,
+ value_serializer=str.encode)
+ await producer.start()
+ consumer = AIOKafkaConsumer(
+ "alarm_response",
+ loop=self.loop,
+ bootstrap_servers=self.kafka_server,
+ group_id="pol-consumer-" + str(uuid.uuid4()),
+ enable_auto_commit=False,
+ key_deserializer=bytes.decode,
+ value_deserializer=bytes.decode,
+ consumer_timeout_ms=10000)
+ await consumer.start()
+ try:
+ await producer.send_and_wait("alarm_request", key="delete_alarm_request", value=json.dumps(msg))
+ finally:
+ await producer.stop()
+ try:
+ async for message in consumer:
+ if message.key == 'delete_alarm_response':
+ content = json.loads(message.value)
+ log.info("Received delete_alarm_response %s", content)
+ if self._is_alarm_response_correlation_id_eq(cor_id, content):
+ if not content['alarm_delete_response']['status']:
+ raise ValueError("Error deleting alarm in MON")
+ alarm_uuid = content['alarm_delete_response']['alarm_uuid']
+ await consumer.stop()
+ return alarm_uuid
+ finally:
+ await consumer.stop()
+ raise ValueError('Timeout: No alarm deletion response from MON. Is MON up?')
def _build_create_alarm_payload(self, cor_id: int, metric_name: str, ns_id: str, vdu_name: str,
vnf_member_index: int,
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
+import asyncio
import datetime
import json
import logging
-import threading
from json import JSONDecodeError
import yaml
-from kafka import KafkaConsumer
+from aiokafka import AIOKafkaConsumer
from osm_policy_module.common.db_client import DbClient
from osm_policy_module.common.lcm_client import LcmClient
class PolicyModuleAgent:
- def __init__(self):
+ def __init__(self, loop=None):
cfg = Config.instance()
+ if not loop:
+ loop = asyncio.get_event_loop()
+ self.loop = loop
self.db_client = DbClient()
- self.mon_client = MonClient()
- self.lcm_client = LcmClient()
+ self.mon_client = MonClient(loop=self.loop)
+ self.lcm_client = LcmClient(loop=self.loop)
self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
cfg.OSMPOL_MESSAGE_PORT)
def run(self):
- consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- group_id='pol-consumer')
- consumer.subscribe(["ns", "alarm_response"])
+ self.loop.run_until_complete(self.start())
- for message in consumer:
- t = threading.Thread(target=self._process_msg, args=(message.topic, message.key, message.value,))
- t.start()
+ async def start(self):
+ consumer = AIOKafkaConsumer(
+ "ns",
+ "alarm_response",
+ loop=self.loop,
+ bootstrap_servers=self.kafka_server,
+ group_id="pol-consumer",
+ key_deserializer=bytes.decode,
+ value_deserializer=bytes.decode,
+ )
+ await consumer.start()
+ try:
+ async for msg in consumer:
+ await self._process_msg(msg.topic, msg.key, msg.value)
+ finally:
+ await consumer.stop()
- def _process_msg(self, topic, key, msg):
+ async def _process_msg(self, topic, key, msg):
log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg)
try:
if key in ALLOWED_KAFKA_KEYS:
content = yaml.safe_load(msg)
if key == 'instantiated' or key == 'scaled':
- self._handle_instantiated_or_scaled(content)
+ await self._handle_instantiated_or_scaled(content)
if key == 'notify_alarm':
- self._handle_alarm_notification(content)
+ await self._handle_alarm_notification(content)
else:
log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key)
except Exception:
log.exception("Error consuming message: ")
- def _handle_alarm_notification(self, content):
+ async def _handle_alarm_notification(self, content):
log.debug("_handle_alarm_notification: %s", content)
alarm_id = content['notify_details']['alarm_uuid']
metric_name = content['notify_details']['metric_name']
log.info("Time between last scale and now is less than cooldown time. Skipping.")
return
log.info("Sending scaling action message for ns: %s", alarm_id)
- self.lcm_client.scale(alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
- alarm.scaling_criteria.scaling_policy.scaling_group.name,
- alarm.vnf_member_index,
- alarm.action)
+ await self.lcm_client.scale(alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
+ alarm.scaling_criteria.scaling_policy.scaling_group.name,
+ alarm.vnf_member_index,
+ alarm.action)
alarm.scaling_criteria.scaling_policy.last_scale = datetime.datetime.now()
alarm.scaling_criteria.scaling_policy.save()
except ScalingAlarm.DoesNotExist:
log.info("There is no action configured for alarm %s.", alarm_id)
- def _handle_instantiated_or_scaled(self, content):
+ async def _handle_instantiated_or_scaled(self, content):
log.debug("_handle_instantiated_or_scaled: %s", content)
nslcmop_id = content['nslcmop_id']
nslcmop = self.db_client.get_nslcmop(nslcmop_id)
if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
nsr_id = nslcmop['nsInstanceId']
log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
- self._configure_scaling_groups(nsr_id)
+ await self._configure_scaling_groups(nsr_id)
else:
log.info(
"Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
"Current state is %s. Skipping...",
nslcmop['operationState'])
- def _configure_scaling_groups(self, nsr_id: str):
+ async def _configure_scaling_groups(self, nsr_id: str):
log.debug("_configure_scaling_groups: %s", nsr_id)
# TODO: Add support for non-nfvi metrics
alarms_created = []
for vdu_ref in scaling_group['vdu']:
vnf_monitoring_param = next(
- filter(lambda param: param['id'] == scaling_criteria[
- 'vnf-monitoring-param-ref'], vnf_monitoring_params))
+ filter(
+ lambda param: param['id'] == scaling_criteria[
+ 'vnf-monitoring-param-ref'],
+ vnf_monitoring_params)
+ )
if not vdu_ref['vdu-id-ref'] == vnf_monitoring_param['vdu-ref']:
continue
vdu = next(
vnfr['vdur']))
for vdur in vdurs:
try:
- ScalingAlarm.select().join(ScalingCriteria).where(
+ (ScalingAlarm.select()
+ .join(ScalingCriteria)
+ .join(ScalingPolicy)
+ .join(ScalingGroup)
+ .where(
ScalingAlarm.vdu_name == vdur['name'],
- ScalingCriteria.name == scaling_criteria['name']
- ).get()
+ ScalingCriteria.name == scaling_criteria['name'],
+ ScalingPolicy.name == scaling_policy['name'],
+ ScalingGroup.nsr_id == nsr_id
+ ).get())
log.debug("vdu %s already has an alarm configured", vdur['name'])
continue
except ScalingAlarm.DoesNotExist:
pass
- alarm_uuid = self.mon_client.create_alarm(
+ alarm_uuid = await self.mon_client.create_alarm(
metric_name=vdu_monitoring_param['nfvi-metric'],
ns_id=nsr_id,
vdu_name=vdur['name'],
vdu_name=vdur['name'],
scaling_criteria=scaling_criteria_record
)
- alarm_uuid = self.mon_client.create_alarm(
+ alarm_uuid = await self.mon_client.create_alarm(
metric_name=vdu_monitoring_param['nfvi-metric'],
ns_id=nsr_id,
vdu_name=vdur['name'],
if len(alarms_created) > 0:
log.info("Cleaning alarm resources in MON")
for alarm in alarms_created:
- self.mon_client.delete_alarm(*alarm)
+ await self.mon_client.delete_alarm(*alarm)
raise e
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
+import asyncio
import json
import logging
import os
import sys
import unittest
-from kafka import KafkaProducer, KafkaConsumer
+from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from kafka.errors import KafkaError
from osm_policy_module.core.config import Config
class KafkaMessagesTest(unittest.TestCase):
def setUp(self):
- try:
- cfg = Config.instance()
- kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
- cfg.OSMPOL_MESSAGE_PORT)
- self.producer = KafkaProducer(bootstrap_servers=kafka_server,
- key_serializer=str.encode,
- value_serializer=str.encode)
- self.consumer = KafkaConsumer(bootstrap_servers=kafka_server,
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- auto_offset_reset='earliest',
- consumer_timeout_ms=5000)
- self.consumer.subscribe(['ns'])
- except KafkaError:
- self.skipTest('Kafka server not present.')
+ super()
+ cfg = Config.instance()
+ self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
+ cfg.OSMPOL_MESSAGE_PORT)
+ self.loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(None)
def tearDown(self):
- self.producer.close()
- self.consumer.close()
+ super()
def test_send_instantiated_msg(self):
- with open(
- os.path.join(os.path.dirname(__file__), '../examples/instantiated.json')) as file:
- payload = json.load(file)
- self.producer.send('ns', json.dumps(payload), key="instantiated")
- self.producer.flush()
-
- for message in self.consumer:
- if message.key == 'instantiated':
- self.assertIsNotNone(message.value)
- return
- self.fail("No message received in consumer")
+ async def test_send_instantiated_msg():
+ producer = AIOKafkaProducer(loop=self.loop,
+ bootstrap_servers=self.kafka_server,
+ key_serializer=str.encode,
+ value_serializer=str.encode)
+ await producer.start()
+ consumer = AIOKafkaConsumer(
+ "ns",
+ loop=self.loop,
+ bootstrap_servers=self.kafka_server,
+ consumer_timeout_ms=10000,
+ auto_offset_reset='earliest',
+ value_deserializer=bytes.decode,
+ key_deserializer=bytes.decode)
+ await consumer.start()
+ try:
+ with open(
+ os.path.join(os.path.dirname(__file__), '../examples/instantiated.json')) as file:
+ payload = json.load(file)
+ await producer.send_and_wait("ns", key="instantiated", value=json.dumps(payload))
+ finally:
+ await producer.stop()
+ try:
+ async for message in consumer:
+ if message.key == 'instantiated':
+ self.assertIsNotNone(message.value)
+ return
+ finally:
+ await consumer.stop()
+
+ try:
+ self.loop.run_until_complete(test_send_instantiated_msg())
+ except KafkaError:
+ self.skipTest('Kafka server not present.')
if __name__ == '__main__':
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
+import asyncio
import logging
import sys
import unittest
test_db.connect()
test_db.drop_tables(MODELS)
test_db.create_tables(MODELS)
+ self.loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(None)
def tearDown(self):
super()
if '2' in args[1]:
return vnfr_record_mocks[1]
- def _test_configure_scaling_groups_create_alarm(*args, **kwargs):
+ async def _test_configure_scaling_groups_create_alarm(*args, **kwargs):
return uuid.uuid4()
kafka_producer_init.return_value = None
get_nsr.return_value = nsr_record_mock
get_vnfd.return_value = vnfd_record_mock
create_alarm.side_effect = _test_configure_scaling_groups_create_alarm
- agent = PolicyModuleAgent()
- agent._configure_scaling_groups("test_nsr_id")
+ agent = PolicyModuleAgent(self.loop)
+ self.loop.run_until_complete(agent._configure_scaling_groups("test_nsr_id"))
create_alarm.assert_any_call(metric_name='average_memory_utilization',
ns_id='test_nsr_id',
operation='GT',
-kafka==1.3.*
+aiokafka==0.4.*
peewee==3.1.*
jsonschema==2.6.*
six==1.11.*
packages=[_name],
include_package_data=True,
install_requires=[
- "kafka==1.3.*",
+ "aiokafka==0.4.*",
"peewee==3.1.*",
"jsonschema==2.6.*",
"six==1.11.*",