- await asyncio.sleep(10, loop=self.loop)
- self.aiomain_task = asyncio.ensure_future(self.msg.aioread(("ns", "nsi"), loop=self.loop,
- callback=self._msg_callback),
- loop=self.loop)
- await asyncio.wait_for(self.aiomain_task, timeout=None, loop=self.loop)
+ if not self.aiomain_task_admin:
+ await asyncio.sleep(10, loop=self.loop)
+ self.logger.debug("Starting admin subscription task")
+ self.aiomain_task_admin = asyncio.ensure_future(
+ self.msg.aioread(
+ ("admin",),
+ loop=self.loop,
+ group_id=False,
+ aiocallback=self._msg_callback,
+ ),
+ loop=self.loop,
+ )
+ if not self.aiomain_task:
+ await asyncio.sleep(10, loop=self.loop)
+ self.logger.debug("Starting non-admin subscription task")
+ self.aiomain_task = asyncio.ensure_future(
+ self.msg.aioread(
+ ("ns", "nsi"),
+ loop=self.loop,
+ aiocallback=self._msg_callback,
+ ),
+ loop=self.loop,
+ )
+ done, _ = await asyncio.wait(
+ [self.aiomain_task, self.aiomain_task_admin],
+ timeout=None,
+ loop=self.loop,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+ try:
+ if self.aiomain_task_admin in done:
+ exc = self.aiomain_task_admin.exception()
+ self.logger.error(
+ "admin subscription task exception: {}".format(exc)
+ )
+ self.aiomain_task_admin = None
+ if self.aiomain_task in done:
+ exc = self.aiomain_task.exception()
+ self.logger.error(
+ "non-admin subscription task exception: {}".format(exc)
+ )
+ self.aiomain_task = None
+ except asyncio.CancelledError:
+ pass