X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=NG-RO%2Fosm_ng_ro%2Fvim_admin.py;h=3350d434a9965696a413db4fe83110e344dc0281;hb=7b521f73dd996a279f23b2f512cd89a42c1c79f6;hp=17bfb20238d0a6984f5988d8dcd55b6af9d9aefc;hpb=80135b928ab442c38898750b4751480205b4affc;p=osm%2FRO.git diff --git a/NG-RO/osm_ng_ro/vim_admin.py b/NG-RO/osm_ng_ro/vim_admin.py index 17bfb202..3350d434 100644 --- a/NG-RO/osm_ng_ro/vim_admin.py +++ b/NG-RO/osm_ng_ro/vim_admin.py @@ -19,15 +19,15 @@ It is based on asyncio. It is in charge of load tasks assigned to VIMs that nobody is in chage of it """ -import logging -import threading import asyncio from http import HTTPStatus +import logging +import threading +from time import time -from osm_common import dbmongo, dbmemory, msglocal, msgkafka +from osm_common import dbmemory, dbmongo, msgkafka, msglocal from osm_common.dbbase import DbException from osm_common.msgbase import MsgException -from time import time __author__ = "Alfonso Tierno " @@ -277,9 +277,10 @@ class VimAdminThread(threading.Thread): try: if not self.aiomain_task_kafka: # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop) - await self.msg.aiowrite( - "vim_account", "echo", "dummy message", loop=self.loop - ) + for kafka_topic in self.kafka_topics: + await self.msg.aiowrite( + kafka_topic, "echo", "dummy message", loop=self.loop + ) kafka_working = True self.logger.debug("Starting vim_account subscription task") self.aiomain_task_kafka = asyncio.ensure_future( @@ -333,7 +334,7 @@ class VimAdminThread(threading.Thread): self.logger.error("renew_locks task exception: {}".format(exc)) self.aiomain_task_renew_lock = None except asyncio.CancelledError: - pass + self.logger.exception("asyncio.CancelledError occured.") except Exception as e: if self.to_terminate: