X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=juju%2Futils.py;h=1d1b24ecd647089ebb335a706350fa61dc29a52a;hb=d6348ef405d6e87a4f72e7842ea4376678456283;hp=9f5d63d1465232ad13dfa51db88324ed1ad098ac;hpb=f3e0df690919bb6413aee809bd9d6d295daa7cc8;p=osm%2FN2VC.git diff --git a/juju/utils.py b/juju/utils.py index 9f5d63d..1d1b24e 100644 --- a/juju/utils.py +++ b/juju/utils.py @@ -59,7 +59,40 @@ class IdQueue: async def get(self, id): value = await self._queues[id].get() del self._queues[id] + if isinstance(value, Exception): + raise value return value async def put(self, id, value): await self._queues[id].put(value) + + async def put_all(self, value): + for queue in self._queues.values(): + await queue.put(value) + + +async def run_with_interrupt(task, event, loop=None): + """ + Awaits a task while allowing it to be interrupted by an `asyncio.Event`. + + If the task finishes without the event becoming set, the results of the + task will be returned. If the event becomes set, the task will be + cancelled ``None`` will be returned. + + :param task: Task to run + :param event: An `asyncio.Event` which, if set, will interrupt `task` + and cause it to be cancelled. + :param loop: Optional event loop to use other than the default. + """ + loop = loop or asyncio.get_event_loop() + event_task = loop.create_task(event.wait()) + done, pending = await asyncio.wait([task, event_task], + loop=loop, + return_when=asyncio.FIRST_COMPLETED) + for f in pending: + f.cancel() + result = [f.result() for f in done if f is not event_task] + if result: + return result[0] + else: + return None