+ elif topic == "admin":
+ self.logger.debug("received {} {} {}".format(topic, command, params))
+ if command in ["echo", "ping"]: # ignored commands
+ pass
+ elif command == "revoke_token":
+ if params:
+ if isinstance(params, dict) and "_id" in params:
+ tid = params.get("_id")
+ self.engine.authenticator.tokens_cache.pop(tid, None)
+ self.logger.debug("token '{}' removed from token_cache".format(tid))
+ else:
+ self.logger.debug("unrecognized params in command '{} {}': {}"
+ .format(topic, command, params))
+ else:
+ self.engine.authenticator.tokens_cache.clear()
+ self.logger.debug("token_cache cleared")
+ else:
+ self.logger.debug("unrecognized command '{} {}'".format(topic, command))
+ # writing to kafka must be done with our own loop. For this reason it is not allowed Engine to do that,
+ # but content to be written is stored at msg_to_send
+ for msg in msg_to_send:
+ await self.msg.aiowrite(*msg, loop=self.loop)