6e9e7184b758fa2cee001d9c598d78ccc0ba63d3
[osm/NG-SA.git] / osm_webhook_translator / src / osm_webhook_translator / main.py
1 #######################################################################################
2 # Copyright ETSI Contributors and Others.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #######################################################################################
17 from datetime import datetime
18 import logging
19 import os
20 from random import randint
21
22 from fastapi import FastAPI
23 import requests
24
25
26 logging.basicConfig(
27 format="%(asctime)s %(levelname)s %(filename)s:%(lineno)s %(message)s",
28 datefmt="%Y/%m/%d %H:%M:%S",
29 )
30 logger = logging.getLogger(__name__)
31 logger.setLevel(logging.INFO)
32 app = FastAPI()
33
34
35 def send_to_airflow(output_endpoint, content):
36 try:
37 requests.Session()
38 # Airflow params should come from env variables from configmaps and secrets
39 airflow_host = os.environ["AIRFLOW_HOST"]
40 airflow_port = os.environ["AIRFLOW_PORT"]
41 airflow_user = os.environ["AIRFLOW_USER"]
42 airflow_pass = os.environ["AIRFLOW_PASS"]
43 url = f"http://{airflow_host}:{airflow_port}/api/v1/dags/{output_endpoint}/dagRuns"
44 rnd = str(randint(0, 999999)).rjust(6, "0")
45 timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
46 dag_run_id = output_endpoint + "_" + timestamp + "_" + rnd
47 logger.info(f"HTTP POST {url}")
48 req = requests.post(
49 url=url,
50 auth=(airflow_user, airflow_pass),
51 json={"dag_run_id": dag_run_id, "conf": content},
52 )
53 logger.info(f"Response: {req.text}")
54 # timeout and retries
55 except Exception as e:
56 logger.error(f"HTTP error: {repr(e)}")
57 raise requests.HTTPException(status_code=403, detail=repr(e))
58
59
60 @app.post("/{input_endpoint}")
61 async def webhook(input_endpoint: str, content: dict):
62 send_to_airflow(input_endpoint, content)
63 return {}