From 274a6e9fa4268db2a87525a27bc574c46eaa80dc Mon Sep 17 00:00:00 2001 From: Benjamin Diaz Date: Mon, 26 Nov 2018 13:14:33 -0300 Subject: [PATCH] Adds use of aiokafka in mon-server Adds request timeout param for requests made to Prometheus Adds use of response topics based on cor_id for alarm create/delete RPC Change-Id: I5544d749d812e4e77ba913b543ccaa83542535d4 Signed-off-by: Benjamin Diaz --- debian/python3-osm-mon.postinst | 2 +- docker/Dockerfile | 5 ++- osm_mon/cmd/mon_healthcheck.py | 23 +++++++---- osm_mon/core/settings.py | 2 +- osm_mon/evaluator/evaluator.py | 2 +- osm_mon/server/server.py | 73 +++++++++++++++++++++------------ requirements.txt | 2 +- setup.py | 2 +- 8 files changed, 69 insertions(+), 42 deletions(-) diff --git a/debian/python3-osm-mon.postinst b/debian/python3-osm-mon.postinst index 0e517ea..f2d1af9 100644 --- a/debian/python3-osm-mon.postinst +++ b/debian/python3-osm-mon.postinst @@ -1,7 +1,7 @@ #!/bin/bash echo "Installing python dependencies via pip..." -pip3 install kafka-python==1.4.3 +pip3 install aiokafka==0.4.* pip3 install requests==2.18.* pip3 install cherrypy==14.0.* pip3 install jsmin==2.2.* diff --git a/docker/Dockerfile b/docker/Dockerfile index f9f610a..8079f23 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -41,7 +41,7 @@ ENV MONGO_URI mongodb://mongo:27017 ENV DATABASE sqlite:///mon_sqlite.db ENV OS_NOTIFIER_URI localhost:8662 ENV OS_DEFAULT_GRANULARITY 300 -ENV REQUEST_TIMEOUT 10 +ENV OSMMON_REQUEST_TIMEOUT 10 ENV OSMMON_LOG_LEVEL INFO ENV OSMMON_KAFKA_LOG_LEVEL INFO ENV OSMMON_VCA_HOST localhost @@ -54,4 +54,7 @@ ENV OSMMON_PROMETHEUS_URL http://prometheus:9090 EXPOSE 8000 +HEALTHCHECK --interval=5s --timeout=2s --retries=12 \ + CMD osm-mon-healthcheck || exit 1 + CMD /bin/bash mon/docker/scripts/runInstall.sh diff --git a/osm_mon/cmd/mon_healthcheck.py b/osm_mon/cmd/mon_healthcheck.py index 412410b..1fa2c2b 100644 --- a/osm_mon/cmd/mon_healthcheck.py +++ b/osm_mon/cmd/mon_healthcheck.py @@ -19,15 +19,15 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## - +import asyncio import logging import subprocess import sys -import uuid import requests +from aiokafka import AIOKafkaConsumer -from osm_mon.core.message_bus.consumer import Consumer +from osm_mon.core.settings import Config log = logging.getLogger(__name__) @@ -51,7 +51,7 @@ def _processes_running(): return False processes_to_check = ['osm-mon-collector', 'osm-mon-evaluator', 'osm-mon-server'] ps = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE).communicate()[0] - processes_running = ps.split('\n') + processes_running = ps.decode().split('\n') for p in processes_to_check: if not _contains_process(processes_running, p): return False @@ -69,12 +69,17 @@ def _is_prometheus_exporter_ok(): def _is_kafka_ok(): + async def _test_kafka(loop): + cfg = Config.instance() + consumer = AIOKafkaConsumer( + 'healthcheck', + loop=loop, bootstrap_servers=cfg.BROKER_URI) + await consumer.start() + await consumer.stop() + try: - common_consumer = Consumer("mon-healthcheck-" + str(uuid.uuid4())) - topics = ['alarm_request', 'vim_account'] - common_consumer.subscribe(topics) - common_consumer.poll() - common_consumer.close(autocommit=False) + loop = asyncio.get_event_loop() + loop.run_until_complete(_test_kafka(loop)) return True except Exception: log.exception("MON can not connect to Kafka") diff --git a/osm_mon/core/settings.py b/osm_mon/core/settings.py index 680b58d..a5d352b 100644 --- a/osm_mon/core/settings.py +++ b/osm_mon/core/settings.py @@ -63,7 +63,7 @@ class Config(object): CfgParam('MONGO_URI', "mongodb://mongo:27017", six.text_type), CfgParam('DATABASE', "sqlite:///mon_sqlite.db", six.text_type), CfgParam('OS_DEFAULT_GRANULARITY', 300, int), - CfgParam('REQUEST_TIMEOUT', 10, int), + CfgParam('OSMMON_REQUEST_TIMEOUT', 10, int), CfgParam('OSMMON_LOG_LEVEL', "INFO", six.text_type), CfgParam('OSMMON_KAFKA_LOG_LEVEL', "WARN", six.text_type), CfgParam('OSMMON_COLLECTOR_INTERVAL', 30, int), diff --git a/osm_mon/evaluator/evaluator.py b/osm_mon/evaluator/evaluator.py index a672cb2..8f49a66 100644 --- a/osm_mon/evaluator/evaluator.py +++ b/osm_mon/evaluator/evaluator.py @@ -59,7 +59,7 @@ class Evaluator: OSM_METRIC_PREFIX + metric_name, nsr_id, vdur_name, vnf_member_index) request_url = cfg.OSMMON_PROMETHEUS_URL + "/api/v1/query?" + query_section log.info("Querying Prometheus: %s", request_url) - r = requests.get(request_url) + r = requests.get(request_url, timeout=cfg.OSMMON_REQUEST_TIMEOUT) if r.status_code == 200: json_response = r.json() if json_response['status'] == 'success': diff --git a/osm_mon/server/server.py b/osm_mon/server/server.py index 854d926..a1e848b 100755 --- a/osm_mon/server/server.py +++ b/osm_mon/server/server.py @@ -21,51 +21,64 @@ # contact: bdiaz@whitestack.com or glavado@whitestack.com ## """A common KafkaConsumer for all MON plugins.""" - +import asyncio import json import logging from json import JSONDecodeError import yaml +from aiokafka import AIOKafkaConsumer, AIOKafkaProducer from osm_mon.core.auth import AuthManager from osm_mon.core.common_db import CommonDbClient from osm_mon.core.database import DatabaseManager -from osm_mon.core.message_bus.consumer import Consumer -from osm_mon.core.message_bus.producer import Producer from osm_mon.core.response import ResponseBuilder +from osm_mon.core.settings import Config log = logging.getLogger(__name__) class Server: - def __init__(self): + def __init__(self, loop=None): + cfg = Config.instance() + if not loop: + loop = asyncio.get_event_loop() + self.loop = loop self.auth_manager = AuthManager() self.database_manager = DatabaseManager() self.database_manager.create_tables() self.common_db = CommonDbClient() + self.kafka_server = cfg.BROKER_URI def run(self): - common_consumer = Consumer("mon-server") - - topics = ['alarm_request', 'vim_account'] - common_consumer.subscribe(topics) - - log.info("Listening for messages...") - for message in common_consumer: - self.consume_message(message) + self.loop.run_until_complete(self.start()) + + async def start(self): + consumer = AIOKafkaConsumer( + "vim_account", + "alarm_request", + loop=self.loop, + bootstrap_servers=self.kafka_server, + group_id="mon-server", + key_deserializer=bytes.decode, + value_deserializer=bytes.decode, + ) + await consumer.start() + try: + async for message in consumer: + log.info("Message arrived: %s", message) + await self.consume_message(message) + finally: + await consumer.stop() - def consume_message(self, message): - log.info("Message arrived: %s", message) + async def consume_message(self, message): try: try: values = json.loads(message.value) except JSONDecodeError: values = yaml.safe_load(message.value) - response = None - if message.topic == "vim_account": if message.key == "create" or message.key == "edit": values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'], @@ -78,6 +91,7 @@ class Server: elif message.topic == "alarm_request": if message.key == "create_alarm_request": alarm_details = values['alarm_create_request'] + cor_id = alarm_details['correlation_id'] response_builder = ResponseBuilder() try: alarm = self.database_manager.save_alarm( @@ -92,15 +106,17 @@ class Server: alarm_details['ns_id'] ) response = response_builder.generate_response('create_alarm_response', - cor_id=alarm_details['correlation_id'], + cor_id=cor_id, status=True, alarm_id=alarm.uuid) except Exception: log.exception("Error creating alarm: ") response = response_builder.generate_response('create_alarm_response', - cor_id=alarm_details['correlation_id'], + cor_id=cor_id, status=False, alarm_id=None) + await self._publish_response('alarm_response_' + str(cor_id), 'create_alarm_response', response) + if message.key == "delete_alarm_request": alarm_details = values['alarm_delete_request'] alarm_uuid = alarm_details['alarm_uuid'] @@ -118,19 +134,22 @@ class Server: cor_id=cor_id, status=False, alarm_id=alarm_uuid) - if response: - self._publish_response(message.topic, message.key, response) + await self._publish_response('alarm_response_' + str(cor_id), 'delete_alarm_response', response) except Exception: log.exception("Exception processing message: ") - def _publish_response(self, topic: str, key: str, msg: dict): - topic = topic.replace('request', 'response') - key = key.replace('request', 'response') - producer = Producer() - producer.send(topic=topic, key=key, value=json.dumps(msg)) - producer.flush(timeout=5) - producer.close() + async def _publish_response(self, topic: str, key: str, msg: dict): + producer = AIOKafkaProducer(loop=self.loop, + bootstrap_servers=self.kafka_server, + key_serializer=str.encode, + value_serializer=str.encode) + await producer.start() + log.info("Sending response %s to topic %s with key %s", json.dumps(msg), topic, key) + try: + await producer.send_and_wait(topic, key=key, value=json.dumps(msg)) + finally: + await producer.stop() if __name__ == '__main__': diff --git a/requirements.txt b/requirements.txt index 10562f2..359259f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,7 +18,7 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: prithiv.mohan@intel.com or adrian.hoban@intel.com -kafka-python==1.4.3 +aiokafka==0.4.* requests==2.18.* cherrypy==14.0.* jsmin==2.2.* diff --git a/setup.py b/setup.py index c85fa76..942ec67 100644 --- a/setup.py +++ b/setup.py @@ -51,7 +51,7 @@ setup( packages=[_name], package_dir={_name: _name}, install_requires=[ - "kafka-python==1.4.3", + "aiokafka==0.4.*", "requests==2.18.*", "cherrypy==14.0.*", "jsmin==2.2.*", -- 2.25.1