X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=juju%2Futils.py;h=f4db66e723c850770b24ccfb90ce17484f72a517;hb=5cf8d29d1ec5ff9360920dd831ff6ead145e3d11;hp=c0a500c252e6bb1ba494a1737d8e56d722768f4f;hpb=896db9ff2ab6afeb9756948c771bddae942e2723;p=osm%2FN2VC.git diff --git a/juju/utils.py b/juju/utils.py index c0a500c..f4db66e 100644 --- a/juju/utils.py +++ b/juju/utils.py @@ -1,5 +1,7 @@ import asyncio import os +from collections import defaultdict +from functools import partial from pathlib import Path @@ -45,3 +47,25 @@ async def read_ssh_key(loop): ''' return await loop.run_in_executor(None, _read_ssh_key) + + +class IdQueue: + """ + Wrapper around asyncio.Queue that maintains a separate queue for each ID. + """ + def __init__(self, maxsize=0, *, loop=None): + self._queues = defaultdict(partial(asyncio.Queue, maxsize, loop=loop)) + + async def get(self, id): + value = await self._queues[id].get() + del self._queues[id] + if isinstance(value, Exception): + raise value + return value + + async def put(self, id, value): + await self._queues[id].put(value) + + async def put_all(self, value): + for queue in self._queues.values(): + await queue.put(value)