Feature 10981: Added main for webhook-translator 53/13153/3
authoraguilard <e.dah.tid@telefonica.com>
Wed, 5 Apr 2023 10:09:18 +0000 (10:09 +0000)
committeraguilard <e.dah.tid@telefonica.com>
Wed, 5 Apr 2023 10:38:43 +0000 (10:38 +0000)
Change-Id: Idcc45514261eeb645becc56c0aee5f681b49fb0a
Signed-off-by: aguilard <e.dah.tid@telefonica.com>
osm_webhook_translator/src/osm_webhook_translator/main.py [new file with mode: 0644]

diff --git a/osm_webhook_translator/src/osm_webhook_translator/main.py b/osm_webhook_translator/src/osm_webhook_translator/main.py
new file mode 100644 (file)
index 0000000..587fa9c
--- /dev/null
@@ -0,0 +1,56 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+from datetime import datetime
+import os
+from random import randint
+
+from fastapi import FastAPI
+import requests
+
+
+app = FastAPI()
+
+
+def send_to_airflow(output_endpoint, content):
+    try:
+        requests.Session()
+        # Airflow params should come from env variables from configmaps and secrets
+        airflow_host = os.environ["AIRFLOW_HOST"]
+        airflow_port = os.environ["AIRFLOW_PORT"]
+        airflow_user = os.environ["AIRFLOW_USER"]
+        airflow_pass = os.environ["AIRFLOW_PASS"]
+        url = f"http://{airflow_host}:{airflow_port}/api/v1/dags/{output_endpoint}/dagRuns"
+        rnd = str(randint(0, 999999)).rjust(6, "0")
+        timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
+        dag_run_id = output_endpoint + "_" + timestamp + "_" + rnd
+        print(f"HTTP POST {url}...")
+        req = requests.post(
+            url=url,
+            auth=(airflow_user, airflow_pass),
+            json={"dag_run_id": dag_run_id, "conf": content},
+        )
+        print(req.text)
+        # timeout and retries
+    except Exception as e:
+        print(f"HTTP error: {repr(e)}")
+        raise requests.HTTPException(status_code=403, detail=repr(e))
+
+
+@app.post("/{input_endpoint}")
+async def webhook(input_endpoint: str, content: dict):
+    send_to_airflow(input_endpoint, content)
+    return {}