1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from datetime
import datetime
19 from random
import randint
21 from fastapi
import FastAPI
28 def send_to_airflow(output_endpoint
, content
):
31 # Airflow params should come from env variables from configmaps and secrets
32 airflow_host
= os
.environ
["AIRFLOW_HOST"]
33 airflow_port
= os
.environ
["AIRFLOW_PORT"]
34 airflow_user
= os
.environ
["AIRFLOW_USER"]
35 airflow_pass
= os
.environ
["AIRFLOW_PASS"]
36 url
= f
"http://{airflow_host}:{airflow_port}/api/v1/dags/{output_endpoint}/dagRuns"
37 rnd
= str(randint(0, 999999)).rjust(6, "0")
38 timestamp
= datetime
.now().strftime("%Y%m%d%H%M%S")
39 dag_run_id
= output_endpoint
+ "_" + timestamp
+ "_" + rnd
40 print(f
"HTTP POST {url}...")
43 auth
=(airflow_user
, airflow_pass
),
44 json
={"dag_run_id": dag_run_id
, "conf": content
},
48 except Exception as e
:
49 print(f
"HTTP error: {repr(e)}")
50 raise requests
.HTTPException(status_code
=403, detail
=repr(e
))
53 @app.post("/{input_endpoint}")
54 async def webhook(input_endpoint
: str, content
: dict):
55 send_to_airflow(input_endpoint
, content
)