From 2e2dca0ad0f237ab5e30c77beee3d20a2a0a7dd0 Mon Sep 17 00:00:00 2001 From: aguilard Date: Wed, 5 Apr 2023 10:09:18 +0000 Subject: [PATCH] Feature 10981: Added main for webhook-translator Change-Id: Idcc45514261eeb645becc56c0aee5f681b49fb0a Signed-off-by: aguilard --- .../src/osm_webhook_translator/main.py | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 osm_webhook_translator/src/osm_webhook_translator/main.py diff --git a/osm_webhook_translator/src/osm_webhook_translator/main.py b/osm_webhook_translator/src/osm_webhook_translator/main.py new file mode 100644 index 0000000..587fa9c --- /dev/null +++ b/osm_webhook_translator/src/osm_webhook_translator/main.py @@ -0,0 +1,56 @@ +####################################################################################### +# Copyright ETSI Contributors and Others. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +####################################################################################### +from datetime import datetime +import os +from random import randint + +from fastapi import FastAPI +import requests + + +app = FastAPI() + + +def send_to_airflow(output_endpoint, content): + try: + requests.Session() + # Airflow params should come from env variables from configmaps and secrets + airflow_host = os.environ["AIRFLOW_HOST"] + airflow_port = os.environ["AIRFLOW_PORT"] + airflow_user = os.environ["AIRFLOW_USER"] + airflow_pass = os.environ["AIRFLOW_PASS"] + url = f"http://{airflow_host}:{airflow_port}/api/v1/dags/{output_endpoint}/dagRuns" + rnd = str(randint(0, 999999)).rjust(6, "0") + timestamp = datetime.now().strftime("%Y%m%d%H%M%S") + dag_run_id = output_endpoint + "_" + timestamp + "_" + rnd + print(f"HTTP POST {url}...") + req = requests.post( + url=url, + auth=(airflow_user, airflow_pass), + json={"dag_run_id": dag_run_id, "conf": content}, + ) + print(req.text) + # timeout and retries + except Exception as e: + print(f"HTTP error: {repr(e)}") + raise requests.HTTPException(status_code=403, detail=repr(e)) + + +@app.post("/{input_endpoint}") +async def webhook(input_endpoint: str, content: dict): + send_to_airflow(input_endpoint, content) + return {} -- 2.25.1