1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
21 from temporalio
.client
import Client
24 class WFTemporal(object):
27 def __init__(self
, temporal_api
=None, logger_name
="temporal.client"):
28 self
.logger
= logging
.getLogger(logger_name
)
29 self
.temporal_api
= temporal_api
31 async def execute_workflow(
32 self
, task_queue
: str, workflow_name
: str, workflow_data
: any
, id: str = None
34 handle
= await self
.start_workflow(
35 task_queue
=task_queue
,
36 workflow_name
=workflow_name
,
37 workflow_data
=workflow_data
,
40 result
= await handle
.result()
41 self
.logger
.info(f
"Completed workflow {workflow_name}, id {id}")
44 async def start_workflow(
45 self
, task_queue
: str, workflow_name
: str, workflow_data
: any
, id: str = None
47 client
= await self
.get_client()
49 id = str(uuid
.uuid4())
50 self
.logger
.info(f
"Starting workflow {workflow_name}, id {id}")
51 handle
= await client
.start_workflow(
52 workflow
=workflow_name
, arg
=workflow_data
, id=id, task_queue
=task_queue
56 async def get_client(self
):
57 if self
.temporal_api
in WFTemporal
.clients
:
58 client
= WFTemporal
.clients
[self
.temporal_api
]
61 f
"No cached client found, connecting to {self.temporal_api}"
63 client
= await Client
.connect(self
.temporal_api
)
64 WFTemporal
.clients
[self
.temporal_api
] = client
66 self
.logger
.debug(f
"Using client {client} for {self.temporal_api}")