class WFTemporal(object):
- clients = {}
+ _client = None
+ temporal_api = None
- def __init__(self, temporal_api=None, logger_name="temporal.client"):
+ def __init__(self, logger_name="temporal.client"):
self.logger = logging.getLogger(logger_name)
- self.temporal_api = temporal_api
async def execute_workflow(
self, task_queue: str, workflow_name: str, workflow_data: any, id: str = None
async def start_workflow(
self, task_queue: str, workflow_name: str, workflow_data: any, id: str = None
):
- client = await self.get_client()
+ if WFTemporal._client is None:
+ self.logger.debug(
+ f"No cached client found, connecting to {WFTemporal.temporal_api}"
+ )
+ WFTemporal._client = await Client.connect(WFTemporal.temporal_api)
+
if id is None:
id = str(uuid.uuid4())
self.logger.info(f"Starting workflow {workflow_name}, id {id}")
- handle = await client.start_workflow(
+ handle = await WFTemporal._client.start_workflow(
workflow=workflow_name, arg=workflow_data, id=id, task_queue=task_queue
)
return handle
-
- async def get_client(self):
- if self.temporal_api in WFTemporal.clients:
- client = WFTemporal.clients[self.temporal_api]
- else:
- self.logger.debug(
- f"No cached client found, connecting to {self.temporal_api}"
- )
- client = await Client.connect(self.temporal_api)
- WFTemporal.clients[self.temporal_api] = client
-
- self.logger.debug(f"Using client {client} for {self.temporal_api}")
- return client