Clean stage-archive.sh
[osm/NG-SA.git] / osm_webhook_translator / src / osm_webhook_translator / main.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from datetime import datetime
18 import os
19 from random import randint
20
21 from fastapi import FastAPI
22 import requests
23
24
25 app = FastAPI()
26
27
28 def send_to_airflow(output_endpoint, content):
29 try:
30 requests.Session()
31 # Airflow params should come from env variables from configmaps and secrets
32 airflow_host = os.environ["AIRFLOW_HOST"]
33 airflow_port = os.environ["AIRFLOW_PORT"]
34 airflow_user = os.environ["AIRFLOW_USER"]
35 airflow_pass = os.environ["AIRFLOW_PASS"]
36 url = f"http://{airflow_host}:{airflow_port}/api/v1/dags/{output_endpoint}/dagRuns"
37 rnd = str(randint(0, 999999)).rjust(6, "0")
38 timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
39 dag_run_id = output_endpoint + "_" + timestamp + "_" + rnd
40 print(f"HTTP POST {url}...")
41 req = requests.post(
42 url=url,
43 auth=(airflow_user, airflow_pass),
44 json={"dag_run_id": dag_run_id, "conf": content},
45 )
46 print(req.text)
47 # timeout and retries
48 except Exception as e:
49 print(f"HTTP error: {repr(e)}")
50 raise requests.HTTPException(status_code=403, detail=repr(e))
51
52
53 @app.post("/{input_endpoint}")
54 async def webhook(input_endpoint: str, content: dict):
55 send_to_airflow(input_endpoint, content)
56 return {}