1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from datetime
import datetime
20 from random
import randint
22 from fastapi
import FastAPI
27 format
="%(asctime)s %(levelname)s %(filename)s:%(lineno)s %(message)s",
28 datefmt
="%Y/%m/%d %H:%M:%S",
30 logger
= logging
.getLogger(__name__
)
31 logger
.setLevel(logging
.INFO
)
35 def send_to_airflow(output_endpoint
, content
):
38 # Airflow params should come from env variables from configmaps and secrets
39 airflow_host
= os
.environ
["AIRFLOW_HOST"]
40 airflow_port
= os
.environ
["AIRFLOW_PORT"]
41 airflow_user
= os
.environ
["AIRFLOW_USER"]
42 airflow_pass
= os
.environ
["AIRFLOW_PASS"]
43 url
= f
"http://{airflow_host}:{airflow_port}/api/v1/dags/{output_endpoint}/dagRuns"
44 rnd
= str(randint(0, 999999)).rjust(6, "0")
45 timestamp
= datetime
.now().strftime("%Y%m%d%H%M%S")
46 dag_run_id
= output_endpoint
+ "_" + timestamp
+ "_" + rnd
47 logger
.info(f
"HTTP POST {url}")
50 auth
=(airflow_user
, airflow_pass
),
51 json
={"dag_run_id": dag_run_id
, "conf": content
},
53 logger
.info(f
"Response: {req.text}")
55 except Exception as e
:
56 logger
.error(f
"HTTP error: {repr(e)}")
57 raise requests
.HTTPException(status_code
=403, detail
=repr(e
))
60 @app.post("/{input_endpoint}")
61 async def webhook(input_endpoint
: str, content
: dict):
62 send_to_airflow(input_endpoint
, content
)