pil_price_list_file = Path("/placement/pil_price_list.yaml")
vnf_price_list_file = Path("/placement/vnf_price_list.yaml")
- def __init__(self, config: Config, loop=None):
+ def __init__(self, config: Config):
self.log = logging.getLogger("pla.server")
self.db = None
self.msgBus = None
self.config = config
- self.loop = loop or asyncio.get_event_loop()
try:
if config.get("database", "driver") == "mongo":
config.get("message", "driver")
)
)
- self.msgBus.loop = loop
self.msgBus.connect(config.get("message"))
except Exception as e:
self.log.info("Kafka msg arrived: {} {} {}".format(topic, command, params))
if topic == "pla" and command == "get_placement":
nslcmop_id = params.get("nslcmopId")
- self.loop.create_task(self.get_placement(nslcmop_id))
+ asyncio.create_task(self.get_placement(nslcmop_id))
async def kafka_read(self):
self.log.info("Task kafka_read start")
while True:
try:
topics = "pla"
- await self.msgBus.aioread(topics, self.loop, self.handle_kafka_command)
+ await self.msgBus.aioread(topics, self.handle_kafka_command)
except Exception as e:
self.log.error("kafka read error. Exception: {}".format(e))
- await asyncio.sleep(5, loop=self.loop)
+ await asyncio.sleep(5)
def run(self):
- self.loop.run_until_complete(self.kafka_read())
- self.loop.close()
- self.loop = None
+ asyncio.run(self.kafka_read())
if self.msgBus:
self.msgBus.disconnect()