Adds use of aiokafka in mon-server
[osm/MON.git] / osm_mon / cmd / mon_healthcheck.py
index 412410b..1fa2c2b 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
-
+import asyncio
 import logging
 import subprocess
 import sys
-import uuid
 
 import requests
+from aiokafka import AIOKafkaConsumer
 
-from osm_mon.core.message_bus.consumer import Consumer
+from osm_mon.core.settings import Config
 
 log = logging.getLogger(__name__)
 
@@ -51,7 +51,7 @@ def _processes_running():
         return False
     processes_to_check = ['osm-mon-collector', 'osm-mon-evaluator', 'osm-mon-server']
     ps = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE).communicate()[0]
-    processes_running = ps.split('\n')
+    processes_running = ps.decode().split('\n')
     for p in processes_to_check:
         if not _contains_process(processes_running, p):
             return False
@@ -69,12 +69,17 @@ def _is_prometheus_exporter_ok():
 
 
 def _is_kafka_ok():
+    async def _test_kafka(loop):
+        cfg = Config.instance()
+        consumer = AIOKafkaConsumer(
+            'healthcheck',
+            loop=loop, bootstrap_servers=cfg.BROKER_URI)
+        await consumer.start()
+        await consumer.stop()
+
     try:
-        common_consumer = Consumer("mon-healthcheck-" + str(uuid.uuid4()))
-        topics = ['alarm_request', 'vim_account']
-        common_consumer.subscribe(topics)
-        common_consumer.poll()
-        common_consumer.close(autocommit=False)
+        loop = asyncio.get_event_loop()
+        loop.run_until_complete(_test_kafka(loop))
         return True
     except Exception:
         log.exception("MON can not connect to Kafka")