Update to have temporal in common 24/13324/1
authorMark Beierl <mark.beierl@canonical.com>
Wed, 1 Mar 2023 18:51:34 +0000 (18:51 +0000)
committerMark Beierl <mark.beierl@canonical.com>
Fri, 28 Apr 2023 12:53:57 +0000 (12:53 +0000)
Change-Id: I5a144ed22b65ed5337e0e870ab7281b80c5e0b62
Signed-off-by: Mark Beierl <mark.beierl@canonical.com>
Add Python logging to DAGs

Change-Id: I2f8dd3b351ceb9a7da8e8b28d392e6fdef73f663
Signed-off-by: aguilard <e.dah.tid@telefonica.com>
Change in tox.ini to use allowlist_externals instead of whitelist_externals

Change-Id: Id457e368adffcc81d3e7451015b2c0905b9cb7ea
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
Minor change in tox.ini related to flake8 env

Change-Id: I7d83cce754dc73f515af771855ba682783109569
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
Fix black and flake errors in setup.py

Change-Id: I9522836f89e660c280c49a75ad5fd853454401e3
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
Update ns_topology and multivim_vm_status DAGs to disable overlapped execution

Change-Id: Idbfa27879a3604e45cf4c92270c4c86de48bca93
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
Feature 10981: skeleton of osm_webhook_translator package

Change-Id: I80217214941241e1e97dee80b978e0f1e55f4d1a
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
Feature 10981: Added main for webhook-translator

Change-Id: Idcc45514261eeb645becc56c0aee5f681b49fb0a
Signed-off-by: aguilard <e.dah.tid@telefonica.com>
Feature 10981: added autohealing DAG and updated requirements

Change-Id: Ib1ed56c220969d54480ddd2382beae03e536b72b
Signed-off-by: aguilard <e.dah.tid@telefonica.com>
Fix bug in multivim_vm_status DAG when there are no VMs

Change-Id: Idd67bb9f59a61edbe15012ca05df8c83d920d04e
Signed-off-by: aguilard <e.dah.tid@telefonica.com>
Add MANIFEST.in to osm_webhook_translator to include README and requirements

Change-Id: I7f47826c5438348894ff525c591cfd93415ace04
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
Clean stage-archive.sh

Change-Id: Ib42096edb6bcbb29031749291fe5de410cb93755
Signed-off-by: garciadeblas <gerardo.garciadeblas@telefonica.com>
Feature 10981: use Python library for logging in Webhook

Change-Id: Ib60ef2005618f94da34da1910fb26f95d77bb7a2
Signed-off-by: aguilard <e.dah.tid@telefonica.com>
Signed-off-by: Mark Beierl <mark.beierl@canonical.com>
23 files changed:
.gitignore
devops-stages/stage-archive.sh
devops-stages/stage-build.sh
osm_webhook_translator/MANIFEST.in [new file with mode: 0644]
osm_webhook_translator/README.rst [new file with mode: 0644]
osm_webhook_translator/requirements-dist.in [new file with mode: 0644]
osm_webhook_translator/requirements-dist.txt [new file with mode: 0644]
osm_webhook_translator/requirements.in [new file with mode: 0644]
osm_webhook_translator/requirements.txt [new file with mode: 0644]
osm_webhook_translator/setup.py [new file with mode: 0644]
osm_webhook_translator/src/osm_webhook_translator/__init__.py [new file with mode: 0644]
osm_webhook_translator/src/osm_webhook_translator/main.py [new file with mode: 0644]
requirements.in
requirements.txt
setup.py
src/osm_ngsa/dags/alert_vdu.py [new file with mode: 0644]
src/osm_ngsa/dags/multivim_vim_status.py
src/osm_ngsa/dags/multivim_vm_status.py
src/osm_ngsa/dags/ns_topology.py
src/osm_ngsa/osm_mon/core/common_db.py
src/osm_ngsa/osm_mon/core/config.yaml
src/osm_ngsa/osm_mon/core/message_bus_client.py [new file with mode: 0644]
tox.ini

index fb6947e..c6f5743 100644 (file)
@@ -36,4 +36,6 @@ dist
 *.local
 local
 
+# Version files
 src/osm_ngsa/_version.py
+osm_webhook_translator/src/osm_webhook_translator/_version.py
index e70b172..dbae44a 100755 (executable)
@@ -21,8 +21,4 @@ rm -rf pool
 rm -rf dists
 mkdir -p pool/$MDG
 mv deb_dist/*.deb pool/$MDG/
-mkdir -p dists/unstable/$MDG/binary-amd64/
-apt-ftparchive packages pool/$MDG > dists/unstable/$MDG/binary-amd64/Packages
-gzip -9fk dists/unstable/$MDG/binary-amd64/Packages
-echo "dists/**,pool/$MDG/*.deb"
 
index 7b483a8..780d6c7 100755 (executable)
 # limitations under the License.
 #
 
+set -ex
+
 rm -rf dist deb_dist osm*.tar.gz *.egg-info .eggs
 
-tox -e dist
+for p in "osm_webhook_translator"; do 
+    rm -rf $p/dist $p/deb_dist $p/osm*.tar.gz $p/*.egg-info $p/.eggs
+done
+
+mkdir -p deb_dist 
+mkdir -p osm_webhook_translator/deb_dist
+
+PACKAGES="
+dist_ng_sa
+dist_webhook_translator"
+
+TOX_ENV_LIST="$(echo $PACKAGES | sed "s/ /,/g")"
+
+tox -e $TOX_ENV_LIST
+
+# Copying packages
+# Webhook Translator
+cp osm_webhook_translator/deb_dist/python3-osm-webhook-translator_*.deb deb_dist/
+
diff --git a/osm_webhook_translator/MANIFEST.in b/osm_webhook_translator/MANIFEST.in
new file mode 100644 (file)
index 0000000..5f44b2f
--- /dev/null
@@ -0,0 +1,20 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+
+include src/osm_webhook_translator/requirements.txt
+include src/osm_webhook_translator/README.rst
+
diff --git a/osm_webhook_translator/README.rst b/osm_webhook_translator/README.rst
new file mode 100644 (file)
index 0000000..a84b409
--- /dev/null
@@ -0,0 +1,33 @@
+..
+  #######################################################################################
+  # Copyright ETSI Contributors and Others.
+  #
+  # Licensed under the Apache License, Version 2.0 (the "License");
+  # you may not use this file except in compliance with the License.
+  # You may obtain a copy of the License at
+  #
+  #    http://www.apache.org/licenses/LICENSE-2.0
+  #
+  # Unless required by applicable law or agreed to in writing, software
+  # distributed under the License is distributed on an "AS IS" BASIS,
+  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+  # implied.
+  # See the License for the specific language governing permissions and
+  # limitations under the License.
+  #######################################################################################
+
+==================
+webhook-translator
+==================
+
+webhook-translator is a component in the Service Assurance architecture for OSM.
+
+Its role is to receive alerts from entities such as Prometheus AlertManager or external systems, and to translate them to a format that can be consumed by Airflow DAGs. It basically receives HTTP POST messages and forwards them to an Airflow webhook
+
+The main characteristics are:
+
+* Lightweight: a very small number of lines of code does the work.
+* Stateless. It only translates HTTP requests. No state for those translations. When running as a Kubernetes deployment, native scaling is achieved by means of Kubernetes services.
+* Simple. Based on `FastAPI <https://fastapi.tiangolo.com/>`
+* Independent from the source of the alert. No maintenance is required to incorporate new alert sources.
+
diff --git a/osm_webhook_translator/requirements-dist.in b/osm_webhook_translator/requirements-dist.in
new file mode 100644 (file)
index 0000000..03ff6e9
--- /dev/null
@@ -0,0 +1,18 @@
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+stdeb
+setuptools-scm
+setuptools<60
diff --git a/osm_webhook_translator/requirements-dist.txt b/osm_webhook_translator/requirements-dist.txt
new file mode 100644 (file)
index 0000000..6ddded6
--- /dev/null
@@ -0,0 +1,32 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+packaging==23.0
+    # via setuptools-scm
+setuptools-scm==7.1.0
+    # via -r osm_webhook_translator/requirements-dist.in
+stdeb==0.10.0
+    # via -r osm_webhook_translator/requirements-dist.in
+tomli==2.0.1
+    # via setuptools-scm
+typing-extensions==4.5.0
+    # via setuptools-scm
+
+# The following packages are considered to be unsafe in a requirements file:
+setuptools==59.8.0
+    # via
+    #   -r osm_webhook_translator/requirements-dist.in
+    #   setuptools-scm
diff --git a/osm_webhook_translator/requirements.in b/osm_webhook_translator/requirements.in
new file mode 100644 (file)
index 0000000..33c6214
--- /dev/null
@@ -0,0 +1,20 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+
+fastapi
+requests
+uvicorn
diff --git a/osm_webhook_translator/requirements.txt b/osm_webhook_translator/requirements.txt
new file mode 100644 (file)
index 0000000..6cf7f42
--- /dev/null
@@ -0,0 +1,48 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+anyio==3.6.2
+    # via starlette
+certifi==2022.12.7
+    # via requests
+charset-normalizer==3.1.0
+    # via requests
+click==8.1.3
+    # via uvicorn
+fastapi==0.95.0
+    # via -r osm_webhook_translator/requirements.in
+h11==0.14.0
+    # via uvicorn
+idna==3.4
+    # via
+    #   anyio
+    #   requests
+pydantic==1.10.7
+    # via fastapi
+requests==2.28.2
+    # via -r osm_webhook_translator/requirements.in
+sniffio==1.3.0
+    # via anyio
+starlette==0.26.1
+    # via fastapi
+typing-extensions==4.5.0
+    # via
+    #   pydantic
+    #   starlette
+urllib3==1.26.15
+    # via requests
+uvicorn==0.21.1
+    # via -r osm_webhook_translator/requirements.in
diff --git a/osm_webhook_translator/setup.py b/osm_webhook_translator/setup.py
new file mode 100644 (file)
index 0000000..3323757
--- /dev/null
@@ -0,0 +1,43 @@
+#!/usr/bin/env python3
+#
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+from setuptools import find_namespace_packages, setup
+
+exec(open("src/osm_webhook_translator/_version.py").read())
+
+_name = "osm_webhook_translator"
+_description = "OSM Webhook Translator"
+with open(os.path.join(".", "README.rst")) as readme_file:
+    README = readme_file.read()
+
+setup(
+    name=_name,
+    description=_description,
+    long_description=README,
+    version=__version__,  # noqa: F821
+    author="ETSI OSM",
+    author_email="osmsupport@etsi.org",
+    maintainer="ETSI OSM",
+    maintainer_email="osmsupport@etsi.org",
+    url="https://osm.etsi.org/gitweb/?p=osm/NG-SA.git;a=summary",
+    license="Apache 2.0",
+    package_dir={"": "src"},
+    packages=find_namespace_packages(where="src"),
+    include_package_data=True,
+)
diff --git a/osm_webhook_translator/src/osm_webhook_translator/__init__.py b/osm_webhook_translator/src/osm_webhook_translator/__init__.py
new file mode 100644 (file)
index 0000000..d5daf3e
--- /dev/null
@@ -0,0 +1,17 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+from osm_webhook_translator._version import __version__  # noqa: F401
diff --git a/osm_webhook_translator/src/osm_webhook_translator/main.py b/osm_webhook_translator/src/osm_webhook_translator/main.py
new file mode 100644 (file)
index 0000000..6e9e718
--- /dev/null
@@ -0,0 +1,63 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+from datetime import datetime
+import logging
+import os
+from random import randint
+
+from fastapi import FastAPI
+import requests
+
+
+logging.basicConfig(
+    format="%(asctime)s %(levelname)s %(filename)s:%(lineno)s %(message)s",
+    datefmt="%Y/%m/%d %H:%M:%S",
+)
+logger = logging.getLogger(__name__)
+logger.setLevel(logging.INFO)
+app = FastAPI()
+
+
+def send_to_airflow(output_endpoint, content):
+    try:
+        requests.Session()
+        # Airflow params should come from env variables from configmaps and secrets
+        airflow_host = os.environ["AIRFLOW_HOST"]
+        airflow_port = os.environ["AIRFLOW_PORT"]
+        airflow_user = os.environ["AIRFLOW_USER"]
+        airflow_pass = os.environ["AIRFLOW_PASS"]
+        url = f"http://{airflow_host}:{airflow_port}/api/v1/dags/{output_endpoint}/dagRuns"
+        rnd = str(randint(0, 999999)).rjust(6, "0")
+        timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
+        dag_run_id = output_endpoint + "_" + timestamp + "_" + rnd
+        logger.info(f"HTTP POST {url}")
+        req = requests.post(
+            url=url,
+            auth=(airflow_user, airflow_pass),
+            json={"dag_run_id": dag_run_id, "conf": content},
+        )
+        logger.info(f"Response: {req.text}")
+        # timeout and retries
+    except Exception as e:
+        logger.error(f"HTTP error: {repr(e)}")
+        raise requests.HTTPException(status_code=403, detail=repr(e))
+
+
+@app.post("/{input_endpoint}")
+async def webhook(input_endpoint: str, content: dict):
+    send_to_airflow(input_endpoint, content)
+    return {}
index 8c8e640..6779aef 100644 (file)
 azure-common
 azure-identity
 azure-mgmt-compute
+gnocchiclient
 google-api-python-client
 google-auth
 prometheus-client
-protobuf<4 # Required by common
+python-ceilometerclient
 python-keystoneclient
 python-novaclient
 pyyaml==5.4.1
index c461743..989acbd 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #######################################################################################
+attrs==22.2.0
+    # via cmd2
+autopage==0.5.1
+    # via cliff
 azure-common==1.1.28
     # via
     #   -r requirements.in
     #   azure-mgmt-compute
-azure-core==1.26.3
+azure-core==1.26.4
     # via
     #   azure-identity
     #   azure-mgmt-core
@@ -27,7 +31,7 @@ azure-identity==1.12.0
     # via -r requirements.in
 azure-mgmt-compute==29.1.0
     # via -r requirements.in
-azure-mgmt-core==1.3.2
+azure-mgmt-core==1.4.0
     # via azure-mgmt-compute
 cachetools==5.3.0
     # via google-auth
@@ -37,23 +41,32 @@ certifi==2022.12.7
     #   requests
 cffi==1.15.1
     # via cryptography
-charset-normalizer==3.0.1
+charset-normalizer==3.1.0
     # via requests
-cryptography==39.0.1
+cliff==4.2.0
+    # via gnocchiclient
+cmd2==2.4.3
+    # via cliff
+cryptography==40.0.1
     # via
     #   azure-identity
     #   msal
     #   pyjwt
 debtcollector==2.5.0
     # via
+    #   gnocchiclient
     #   oslo-config
     #   oslo-utils
     #   python-keystoneclient
+futurist==2.4.1
+    # via gnocchiclient
+gnocchiclient==7.0.8
+    # via -r requirements.in
 google-api-core==2.11.0
     # via google-api-python-client
-google-api-python-client==2.80.0
+google-api-python-client==2.84.0
     # via -r requirements.in
-google-auth==2.16.1
+google-auth==2.17.2
     # via
     #   -r requirements.in
     #   google-api-core
@@ -61,23 +74,29 @@ google-auth==2.16.1
     #   google-auth-httplib2
 google-auth-httplib2==0.1.0
     # via google-api-python-client
-googleapis-common-protos==1.58.0
+googleapis-common-protos==1.59.0
     # via google-api-core
-httplib2==0.21.0
+httplib2==0.22.0
     # via
     #   google-api-python-client
     #   google-auth-httplib2
 idna==3.4
     # via requests
+importlib-metadata==6.3.0
+    # via cliff
 iso8601==1.1.0
     # via
+    #   gnocchiclient
     #   keystoneauth1
     #   oslo-utils
+    #   python-ceilometerclient
     #   python-novaclient
 isodate==0.6.1
     # via msrest
 keystoneauth1==5.1.2
     # via
+    #   gnocchiclient
+    #   python-ceilometerclient
     #   python-keystoneclient
     #   python-novaclient
 msal==1.21.0
@@ -86,7 +105,7 @@ msal==1.21.0
     #   msal-extensions
 msal-extensions==1.0.0
     # via azure-identity
-msgpack==1.0.4
+msgpack==1.0.5
     # via oslo-serialization
 msrest==0.7.1
     # via azure-mgmt-compute
@@ -106,15 +125,18 @@ oslo-i18n==6.0.0
     # via
     #   oslo-config
     #   oslo-utils
+    #   python-ceilometerclient
     #   python-keystoneclient
     #   python-novaclient
 oslo-serialization==5.1.1
     # via
+    #   python-ceilometerclient
     #   python-keystoneclient
     #   python-novaclient
 oslo-utils==6.1.0
     # via
     #   oslo-serialization
+    #   python-ceilometerclient
     #   python-keystoneclient
     #   python-novaclient
 packaging==23.0
@@ -127,16 +149,20 @@ pbr==5.11.1
     #   os-service-types
     #   oslo-i18n
     #   oslo-serialization
+    #   python-ceilometerclient
     #   python-keystoneclient
     #   python-novaclient
     #   stevedore
 portalocker==2.7.0
     # via msal-extensions
-prettytable==3.6.0
-    # via python-novaclient
+prettytable==0.7.2
+    # via
+    #   cliff
+    #   python-ceilometerclient
+    #   python-novaclient
 prometheus-client==0.16.0
     # via -r requirements.in
-protobuf==3.20.3
+protobuf==4.22.1
     # via
     #   -r requirements.in
     #   google-api-core
@@ -155,17 +181,24 @@ pyparsing==3.0.9
     # via
     #   httplib2
     #   oslo-utils
+pyperclip==1.8.2
+    # via cmd2
+python-ceilometerclient==2.9.0
+    # via -r requirements.in
+python-dateutil==2.8.2
+    # via gnocchiclient
 python-keystoneclient==5.1.0
     # via -r requirements.in
 python-novaclient==18.3.0
     # via -r requirements.in
-pytz==2022.7.1
+pytz==2023.3
     # via
     #   oslo-serialization
     #   oslo-utils
 pyyaml==5.4.1
     # via
     #   -r requirements.in
+    #   cliff
     #   oslo-config
 requests==2.28.2
     # via
@@ -175,6 +208,7 @@ requests==2.28.2
     #   msal
     #   msrest
     #   oslo-config
+    #   python-ceilometerclient
     #   python-keystoneclient
     #   requests-oauthlib
 requests-oauthlib==1.3.1
@@ -187,24 +221,33 @@ six==1.16.0
     # via
     #   azure-core
     #   azure-identity
+    #   gnocchiclient
     #   google-auth
     #   google-auth-httplib2
     #   isodate
     #   keystoneauth1
+    #   python-ceilometerclient
+    #   python-dateutil
     #   python-keystoneclient
 stevedore==5.0.0
     # via
+    #   cliff
     #   keystoneauth1
     #   oslo-config
+    #   python-ceilometerclient
     #   python-keystoneclient
     #   python-novaclient
 typing-extensions==4.5.0
     # via azure-core
+ujson==5.7.0
+    # via gnocchiclient
 uritemplate==4.1.1
     # via google-api-python-client
-urllib3==1.26.14
+urllib3==1.26.15
     # via requests
 wcwidth==0.2.6
-    # via prettytable
+    # via cmd2
 wrapt==1.15.0
     # via debtcollector
+zipp==3.15.0
+    # via importlib-metadata
index 844bb93..3f4ddac 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -16,7 +16,8 @@
 # limitations under the License.
 
 import os
-from setuptools import setup, find_packages, find_namespace_packages
+
+from setuptools import find_namespace_packages, setup
 
 _name = "osm_ngsa"
 _description = "OSM Service Assurance Airflow DAGs and libraries"
@@ -27,9 +28,7 @@ setup(
     name=_name,
     description=_description,
     long_description=README,
-    use_scm_version={
-        "write_to": "src/osm_ngsa/_version.py"
-    },
+    use_scm_version={"write_to": "src/osm_ngsa/_version.py"},
     author="ETSI OSM",
     author_email="osmsupport@etsi.org",
     maintainer="ETSI OSM",
@@ -37,8 +36,7 @@ setup(
     url="https://osm.etsi.org/gitweb/?p=osm/NG-SA.git;a=summary",
     license="Apache 2.0",
     package_dir={"": "src"},
-    packages=find_namespace_packages(where='src'),
+    packages=find_namespace_packages(where="src"),
     include_package_data=True,
     setup_requires=["setuptools-scm"],
 )
-
diff --git a/src/osm_ngsa/dags/alert_vdu.py b/src/osm_ngsa/dags/alert_vdu.py
new file mode 100644 (file)
index 0000000..390460a
--- /dev/null
@@ -0,0 +1,179 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+import asyncio
+from datetime import datetime, timedelta
+import logging
+import time
+import uuid
+
+from airflow.decorators import dag, task
+from airflow.operators.python import get_current_context
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+from osm_mon.core.message_bus_client import MessageBusClient
+
+# Logging
+logger = logging.getLogger("airflow.task")
+
+
+@dag(
+    catchup=False,
+    default_args={
+        "depends_on_past": False,
+        "retries": 1,
+        "retry_delay": timedelta(seconds=5),
+    },
+    description="Webhook callback for VDU alarm from Prometheus AlertManager",
+    is_paused_upon_creation=False,
+    schedule_interval=None,
+    start_date=datetime(2022, 1, 1),
+    tags=["osm", "webhook"],
+)
+def alert_vdu():
+    @task(task_id="main_task")
+    def main_task():
+        logger.debug("Running main task...")
+        context = get_current_context()
+        conf = context["dag_run"].conf
+        for alarm in conf["alerts"]:
+            logger.info("VDU alarm:")
+            status = alarm["status"]
+            logger.info(f"  status: {status}")
+            logger.info(f'  annotations: {alarm["annotations"]}')
+            logger.info(f'  startsAt: {alarm["startsAt"]}')
+            logger.info(f'  endsAt: {alarm["endsAt"]}')
+            logger.info(f'  labels: {alarm["labels"]}')
+            # vdu_down alert type
+            if alarm["labels"]["alertname"] != "vdu_down":
+                continue
+            config = Config()
+            common_db = CommonDbClient(config)
+            ns_id = alarm["labels"]["ns_id"]
+            vdu_name = alarm["labels"]["vdu_name"]
+            vnf_member_index = alarm["labels"]["vnf_member_index"]
+            vm_id = alarm["labels"]["vm_id"]
+            if status == "firing":
+                # Searching alerting rule in MongoDB
+                logger.info(
+                    f"Searching alert rule in MongoDB: ns_id {ns_id}, "
+                    f"vnf_member_index {vnf_member_index}, "
+                    f"vdu_name {vdu_name}, "
+                    f"vm_id {vm_id}"
+                )
+                alert = common_db.get_alert(
+                    nsr_id=ns_id, vnf_member_index=vnf_member_index, vdu_name=vdu_name
+                )
+                if alert and alert["action_type"] == "healing":
+                    logger.info("Found an alert rule:")
+                    logger.info(alert)
+                    # Update alert status
+                    common_db.update_alert_status(
+                        uuid=alert["uuid"], alarm_status="alarm"
+                    )
+                    # Get VNFR from MongoDB
+                    vnfr = common_db.get_vnfr(
+                        nsr_id=ns_id, member_index=vnf_member_index
+                    )
+                    logger.info(
+                        f"Found VNFR ns_id: {ns_id}, vnf_member_index: {vnf_member_index}"
+                    )
+                    count_index = None
+                    for vdu in vnfr.get("vdur", []):
+                        if vdu["vim-id"] == vm_id:
+                            count_index = vdu["count-index"]
+                            break
+                    if count_index is None:
+                        logger.error(f"VDU {vm_id} not found in VNFR")
+                        break
+                    # Auto-healing type rule
+                    vnf_id = alarm["labels"]["vnf_id"]
+                    msg_bus = MessageBusClient(config)
+                    loop = asyncio.get_event_loop()
+                    _id = str(uuid.uuid4())
+                    now = time.time()
+                    vdu_id = alert["action"]["vdu-id"]
+                    day1 = alert["action"]["day1"]
+                    projects_read = vnfr["_admin"]["projects_read"]
+                    projects_write = vnfr["_admin"]["projects_write"]
+                    params = {
+                        "lcmOperationType": "heal",
+                        "nsInstanceId": ns_id,
+                        "healVnfData": [
+                            {
+                                "vnfInstanceId": vnf_id,
+                                "cause": "default",
+                                "additionalParams": {
+                                    "run-day1": day1,
+                                    "vdu": [
+                                        {
+                                            "run-day1": day1,
+                                            "count-index": count_index,
+                                            "vdu-id": vdu_id,
+                                        }
+                                    ],
+                                },
+                            }
+                        ],
+                    }
+                    nslcmop = {
+                        "id": _id,
+                        "_id": _id,
+                        "operationState": "PROCESSING",
+                        "statusEnteredTime": now,
+                        "nsInstanceId": ns_id,
+                        "member-vnf-index": vnf_member_index,
+                        "lcmOperationType": "heal",
+                        "startTime": now,
+                        "location": "default",
+                        "isAutomaticInvocation": True,
+                        "operationParams": params,
+                        "isCancelPending": False,
+                        "links": {
+                            "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
+                            "nsInstance": "/osm/nslcm/v1/ns_instances/" + ns_id,
+                        },
+                        "_admin": {
+                            "projects_read": projects_read,
+                            "projects_write": projects_write,
+                        },
+                    }
+                    common_db.create_nslcmop(nslcmop)
+                    logger.info("Sending heal action message:")
+                    logger.info(nslcmop)
+                    loop.run_until_complete(msg_bus.aiowrite("ns", "heal", nslcmop))
+                else:
+                    logger.info("No alert rule was found")
+            elif status == "resolved":
+                # Searching alerting rule in MongoDB
+                logger.info(
+                    f"Searching alert rule in MongoDB: ns_id {ns_id}, "
+                    f"vnf_member_index {vnf_member_index}, "
+                    f"vdu_name {vdu_name}, "
+                    f"vm_id {vm_id}"
+                )
+                alert = common_db.get_alert(
+                    nsr_id=ns_id, vnf_member_index=vnf_member_index, vdu_name=vdu_name
+                )
+                if alert:
+                    logger.info("Found an alert rule, updating status")
+                    # Update alert status
+                    common_db.update_alert_status(uuid=alert["uuid"], alarm_status="ok")
+
+    main_task()
+
+
+dag = alert_vdu()
index 93894b1..f63ab4f 100644 (file)
@@ -15,6 +15,7 @@
 # limitations under the License.
 #######################################################################################
 from datetime import datetime, timedelta
+import logging
 
 from airflow import DAG
 from airflow.decorators import task
@@ -33,24 +34,27 @@ PROMETHEUS_METRIC = "vim_status"
 PROMETHEUS_METRIC_DESCRIPTION = "VIM status"
 SCHEDULE_INTERVAL = 1
 
+# Logging
+logger = logging.getLogger("airflow.task")
+
 
 def get_all_vim():
     """Get VIMs from MongoDB"""
-    print("Getting VIM list")
+    logger.info("Getting VIM list")
 
     cfg = Config()
-    print(cfg.conf)
+    logger.info(cfg.conf)
     common_db = CommonDbClient(cfg)
     vim_accounts = common_db.get_vim_accounts()
     vim_list = []
     for vim in vim_accounts:
-        print(f'Read VIM {vim["_id"]} ({vim["name"]})')
+        logger.info(f'Read VIM {vim["_id"]} ({vim["name"]})')
         vim_list.append(
             {"_id": vim["_id"], "name": vim["name"], "vim_type": vim["vim_type"]}
         )
 
-    print(vim_list)
-    print("Getting VIM list OK")
+    logger.info(vim_list)
+    logger.info("Getting VIM list OK")
     return vim_list
 
 
@@ -87,7 +91,7 @@ def create_dag(dag_id, dag_number, dag_description, vim_id):
                 return GcpCollector(vim_account)
             if vim_type == "azure":
                 return AzureCollector(vim_account)
-            print(f"VIM type '{vim_type}' not supported")
+            logger.info(f"VIM type '{vim_type}' not supported")
             return None
 
         @task(task_id="get_vim_status_and_send_to_prometheus")
@@ -95,11 +99,11 @@ def create_dag(dag_id, dag_number, dag_description, vim_id):
             """Authenticate against VIM and check status"""
 
             # Get VIM account info from MongoDB
-            print(f"Reading VIM info, id: {vim_id}")
+            logger.info(f"Reading VIM info, id: {vim_id}")
             cfg = Config()
             common_db = CommonDbClient(cfg)
             vim_account = common_db.get_vim_account(vim_account_id=vim_id)
-            print(vim_account)
+            logger.info(vim_account)
 
             # Define Prometheus Metric for NS topology
             registry = CollectorRegistry()
@@ -117,10 +121,10 @@ def create_dag(dag_id, dag_number, dag_description, vim_id):
             collector = get_vim_collector(vim_account)
             if collector:
                 status = collector.is_vim_ok()
-                print(f"VIM status: {status}")
+                logger.info(f"VIM status: {status}")
                 metric.labels(vim_id).set(1)
             else:
-                print("Error creating VIM collector")
+                logger.info("Error creating VIM collector")
             # Push to Prometheus
             push_to_gateway(
                 gateway=PROMETHEUS_PUSHGW,
@@ -142,7 +146,7 @@ for index, vim in enumerate(vim_list):
         vim_name = vim["name"]
         dag_description = f"Dag for VIM {vim_name} status"
         dag_id = f"vim_status_{vim_id}"
-        print(f"Creating DAG {dag_id}")
+        logger.info(f"Creating DAG {dag_id}")
         globals()[dag_id] = create_dag(
             dag_id=dag_id,
             dag_number=index,
@@ -150,4 +154,4 @@ for index, vim in enumerate(vim_list):
             vim_id=vim_id,
         )
     else:
-        print(f"VIM type '{vim_type}' not supported for monitoring VIM status")
+        logger.info(f"VIM type '{vim_type}' not supported for monitoring VIM status")
index dbdbbc0..18a02e1 100644 (file)
@@ -15,6 +15,7 @@
 # limitations under the License.
 #######################################################################################
 from datetime import datetime, timedelta
+import logging
 
 from airflow import DAG
 from airflow.decorators import task
@@ -33,24 +34,27 @@ PROMETHEUS_METRIC = "vm_status"
 PROMETHEUS_METRIC_DESCRIPTION = "VM Status from VIM"
 SCHEDULE_INTERVAL = 1
 
+# Logging
+logger = logging.getLogger("airflow.task")
+
 
 def get_all_vim():
     """Get VIMs from MongoDB"""
-    print("Getting VIM list")
+    logger.info("Getting VIM list")
 
     cfg = Config()
-    print(cfg.conf)
+    logger.info(cfg.conf)
     common_db = CommonDbClient(cfg)
     vim_accounts = common_db.get_vim_accounts()
     vim_list = []
     for vim in vim_accounts:
-        print(f'Read VIM {vim["_id"]} ({vim["name"]})')
+        logger.info(f'Read VIM {vim["_id"]} ({vim["name"]})')
         vim_list.append(
             {"_id": vim["_id"], "name": vim["name"], "vim_type": vim["vim_type"]}
         )
 
-    print(vim_list)
-    print("Getting VIM list OK")
+    logger.info(vim_list)
+    logger.info("Getting VIM list OK")
     return vim_list
 
 
@@ -66,6 +70,7 @@ def create_dag(dag_id, dag_number, dag_description, vim_id):
         },
         description=dag_description,
         is_paused_upon_creation=False,
+        max_active_runs=1,
         # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL),
         schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *",
         start_date=datetime(2022, 1, 1),
@@ -87,7 +92,7 @@ def create_dag(dag_id, dag_number, dag_description, vim_id):
                 return GcpCollector(vim_account)
             if vim_type == "azure":
                 return AzureCollector(vim_account)
-            print(f"VIM type '{vim_type}' not supported")
+            logger.info(f"VIM type '{vim_type}' not supported")
             return None
 
         def get_all_vm_status(vim_account):
@@ -95,22 +100,23 @@ def create_dag(dag_id, dag_number, dag_description, vim_id):
             collector = get_vim_collector(vim_account)
             if collector:
                 status = collector.is_vim_ok()
-                print(f"VIM status: {status}")
+                logger.info(f"VIM status: {status}")
                 vm_status_list = collector.collect_servers_status()
                 return vm_status_list
             else:
-                return None
+                logger.error("No collector for VIM")
+                return []
 
         @task(task_id="get_all_vm_status_and_send_to_prometheus")
         def get_all_vm_status_and_send_to_prometheus(vim_id: str):
             """Authenticate against VIM, collect servers status and send to prometheus"""
 
             # Get VIM account info from MongoDB
-            print(f"Reading VIM info, id: {vim_id}")
+            logger.info(f"Reading VIM info, id: {vim_id}")
             cfg = Config()
             common_db = CommonDbClient(cfg)
             vim_account = common_db.get_vim_account(vim_account_id=vim_id)
-            print(vim_account)
+            logger.info(vim_account)
 
             # Define Prometheus Metric for NS topology
             registry = CollectorRegistry()
@@ -126,20 +132,19 @@ def create_dag(dag_id, dag_number, dag_description, vim_id):
 
             # Get status of all VM from VIM
             all_vm_status = get_all_vm_status(vim_account)
-            print(f"Got {len(all_vm_status)} VMs with their status:")
-            if all_vm_status:
-                for vm in all_vm_status:
-                    vm_id = vm["id"]
-                    vm_status = vm["status"]
-                    vm_name = vm.get("name", "")
-                    print(f"    {vm_name} ({vm_id}) {vm_status}")
-                    metric.labels(vm_id, vim_id).set(vm_status)
-                # Push to Prometheus only if there are VM
-                push_to_gateway(
-                    gateway=PROMETHEUS_PUSHGW,
-                    job=f"{PROMETHEUS_JOB_PREFIX}{vim_id}",
-                    registry=registry,
-                )
+            logger.info(f"Got {len(all_vm_status)} VMs with their status:")
+            for vm in all_vm_status:
+                vm_id = vm["id"]
+                vm_status = vm["status"]
+                vm_name = vm.get("name", "")
+                logger.info(f"    {vm_name} ({vm_id}) {vm_status}")
+                metric.labels(vm_id, vim_id).set(vm_status)
+            # Push to Prometheus
+            push_to_gateway(
+                gateway=PROMETHEUS_PUSHGW,
+                job=f"{PROMETHEUS_JOB_PREFIX}{vim_id}",
+                registry=registry,
+            )
             return
 
         get_all_vm_status_and_send_to_prometheus(vim_id)
@@ -155,7 +160,7 @@ for index, vim in enumerate(vim_list):
         vim_name = vim["name"]
         dag_description = f"Dag for vim {vim_name}"
         dag_id = f"vm_status_vim_{vim_id}"
-        print(f"Creating DAG {dag_id}")
+        logger.info(f"Creating DAG {dag_id}")
         globals()[dag_id] = create_dag(
             dag_id=dag_id,
             dag_number=index,
@@ -163,4 +168,4 @@ for index, vim in enumerate(vim_list):
             vim_id=vim_id,
         )
     else:
-        print(f"VIM type '{vim_type}' not supported for collecting VM status")
+        logger.info(f"VIM type '{vim_type}' not supported for collecting VM status")
index d3fb504..7d2af07 100644 (file)
@@ -15,6 +15,7 @@
 # limitations under the License.
 #######################################################################################
 from datetime import datetime, timedelta
+import logging
 
 from airflow.decorators import dag, task
 from osm_mon.core.common_db import CommonDbClient
@@ -28,6 +29,9 @@ PROMETHEUS_METRIC = "ns_topology"
 PROMETHEUS_METRIC_DESCRIPTION = "Network services topology"
 SCHEDULE_INTERVAL = 2
 
+# Logging
+logger = logging.getLogger("airflow.task")
+
 
 @dag(
     catchup=False,
@@ -38,6 +42,7 @@ SCHEDULE_INTERVAL = 2
     },
     description="NS topology",
     is_paused_upon_creation=False,
+    max_active_runs=1,
     # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL),
     schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *",
     start_date=datetime(2022, 1, 1),
@@ -70,9 +75,9 @@ def ns_topology():
         )
 
         # Getting VNFR list from MongoDB
-        print("Getting VNFR list from MongoDB")
+        logger.info("Getting VNFR list from MongoDB")
         cfg = Config()
-        print(cfg.conf)
+        # logger.debug(cfg.conf)
         common_db = CommonDbClient(cfg)
         vnfr_list = common_db.get_vnfrs()
 
@@ -93,45 +98,48 @@ def ns_topology():
             project_id = "None"
             if project_list:
                 project_id = project_list[0]
-            # TODO: use logger with loglevels instead of print
             # Other info
             ns_state = vnfr["_admin"]["nsState"]
             vnf_membex_index = vnfr["member-vnf-index-ref"]
-            print(
-                f"Read VNFR: id: {vnf_id}, ns_id: {ns_id}, ",
-                f"state: {ns_state}, vnfd_id: {vnfd_id}, ",
-                f"vnf_membex_index: {vnf_membex_index}, ",
-                f"project_id: {project_id}",
+            logger.info(
+                f"Read VNFR: id: {vnf_id}, ns_id: {ns_id}, "
+                f"state: {ns_state}, vnfd_id: {vnfd_id}, "
+                f"vnf_membex_index: {vnf_membex_index}, "
+                f"project_id: {project_id}"
             )
             # Only send topology if ns State is one of the nsAllowedStatesSet
             if ns_state not in nsAllowedStatesSet:
                 continue
 
-            print("VDU list:")
+            logger.debug("VDU list:")
             for vdu in vnfr.get("vdur", []):
                 # Label vdu_id
                 vdu_id = vdu["_id"]
                 # Label vim_id
                 vim_info = vdu.get("vim_info")
                 if not vim_info:
-                    print("Error: vim_info not available in vdur")
+                    logger.info("Error: vim_info not available in vdur")
                     continue
                 if len(vim_info) != 1:
-                    print("Error: more than one vim_info in vdur")
+                    logger.info("Error: more than one vim_info in vdur")
                     continue
                 vim_id = next(iter(vim_info))[4:]
+                # TODO: check if it makes sense to use vnfr.vim-account-id as vim_id instead of the vim_info key
                 # Label vm_id
-                vm_id = vdu["vim-id"]
+                vm_id = vdu.get("vim-id")
+                if not vm_id:
+                    logger.info("Error: vim-id not available in vdur")
+                    continue
                 # Other VDU info
                 vdu_name = vdu.get("name", "UNKNOWN")
-                print(
-                    f"    id: {vdu_id}, name: {vdu_name}, "
+                logger.debug(
+                    f"  VDU id: {vdu_id}, name: {vdu_name}, "
                     f"vim_id: {vim_id}, vm_id: {vm_id}"
                 )
-                print(
-                    f"METRIC SAMPLE: ns_id: {ns_id}, ",
-                    f"project_id: {project_id}, vnf_id: {vnf_id}, ",
-                    f"vdu_id: {vdu_id}, vm_id: {vm_id}, vim_id: {vim_id}",
+                logger.info(
+                    f"METRIC SAMPLE: ns_id: {ns_id}, "
+                    f"project_id: {project_id}, vnf_id: {vnf_id}, "
+                    f"vdu_id: {vdu_id}, vm_id: {vm_id}, vim_id: {vim_id}"
                 )
                 metric.labels(
                     ns_id,
@@ -144,7 +152,6 @@ def ns_topology():
                     vnf_membex_index,
                 ).set(1)
 
-        # print("Push to gateway")
         push_to_gateway(
             gateway=PROMETHEUS_PUSHGW, job=PROMETHEUS_JOB, registry=registry
         )
index 7c579c3..93254b1 100644 (file)
@@ -30,9 +30,9 @@ class CommonDbClient:
             )
         self.common_db.db_connect(config.get("database"))
 
-    def get_vnfr(self, nsr_id: str, member_index: int):
+    def get_vnfr(self, nsr_id: str, member_index: str):
         vnfr = self.common_db.get_one(
-            "vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)}
+            "vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": member_index}
         )
         return vnfr
 
@@ -88,3 +88,23 @@ class CommonDbClient:
                         vim_account_id,
                     )
         return vim_account
+
+    def get_alert(self, nsr_id: str, vnf_member_index: str, vdu_name: str):
+        alert = self.common_db.get_one(
+            "alerts",
+            {
+                "tags.ns_id": nsr_id,
+                "tags.vnf_member_index": vnf_member_index,
+                "tags.vdu_name": vdu_name,
+            },
+        )
+        return alert
+
+    def update_alert_status(self, uuid: str, alarm_status: str):
+        modified_count = self.common_db.set_one(
+            "alerts", {"uuid": uuid}, {"alarm_status": alarm_status}
+        )
+        return modified_count
+
+    def create_nslcmop(self, nslcmop: dict):
+        self.common_db.create("nslcmops", nslcmop)
index 197c818..4bff5a4 100644 (file)
@@ -25,3 +25,7 @@ database:
   name: osm
   commonkey: gj7LmbCexbmII7memwbGRRdfbYuT3nvy
 
+message:
+  driver: kafka
+  host: kafka
+  port: 9092
diff --git a/src/osm_ngsa/osm_mon/core/message_bus_client.py b/src/osm_ngsa/osm_mon/core/message_bus_client.py
new file mode 100644 (file)
index 0000000..2ae895c
--- /dev/null
@@ -0,0 +1,66 @@
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+import asyncio
+from typing import Callable, List
+
+from osm_common import msgkafka, msglocal
+from osm_mon.core.config import Config
+
+
+class MessageBusClient:
+    def __init__(self, config: Config, loop=None):
+        if config.get("message", "driver") == "local":
+            self.msg_bus = msglocal.MsgLocal()
+        elif config.get("message", "driver") == "kafka":
+            self.msg_bus = msgkafka.MsgKafka()
+        else:
+            raise Exception(
+                "Unknown message bug driver {}".format(config.get("section", "driver"))
+            )
+        self.msg_bus.connect(config.get("message"))
+        if not loop:
+            loop = asyncio.get_event_loop()
+        self.loop = loop
+
+    async def aioread(self, topics: List[str], callback: Callable = None, **kwargs):
+        """
+        Retrieves messages continuously from bus and executes callback for each message consumed.
+        :param topics: List of message bus topics to consume from.
+        :param callback: Async callback function to be called for each message received.
+        :param kwargs: Keyword arguments to be passed to callback function.
+        :return: None
+        """
+        await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs)
+
+    async def aiowrite(self, topic: str, key: str, msg: dict):
+        """
+        Writes message to bus.
+        :param topic: Topic to write to.
+        :param key: Key to write to.
+        :param msg: Dictionary containing message to be written.
+        :return: None
+        """
+        await self.msg_bus.aiowrite(topic, key, msg, self.loop)
+
+    async def aioread_once(self, topic: str):
+        """
+        Retrieves last message from bus.
+        :param topic: topic to retrieve message from.
+        :return: tuple(topic, key, message)
+        """
+        result = await self.msg_bus.aioread(topic, self.loop)
+        return result
diff --git a/tox.ini b/tox.ini
index 4a9dcbf..6c750c9 100644 (file)
--- a/tox.ini
+++ b/tox.ini
@@ -36,7 +36,8 @@ parallel_show_output = true
 deps = black
 skip_install = true
 commands =
-        black --check --diff src
+        black --check --diff src setup.py
+        black --check --diff osm_webhook_translator
 
 
 #######################################################################################
@@ -44,12 +45,14 @@ commands =
 deps =  {[testenv]deps}
         -r{toxinidir}/requirements-dev.txt
         -r{toxinidir}/requirements-test.txt
-whitelist_externals = sh
+allowlist_externals = sh
 commands =
         sh -c 'rm -f nosetests.xml'
         coverage erase
         nose2 -C --coverage src -s src
-        sh -c 'mv .coverage .coverage_mon'
+        sh -c 'mv .coverage .coverage_ngsa'
+        nose2 -C --coverage osm_webhook_translator -s osm_webhook_translator
+        sh -c 'mv .coverage .coverage_webhook_translator'
         coverage report --omit='*tests*'
         coverage html -d ./cover --omit='*tests*'
         coverage xml -o coverage.xml --omit='*tests*'
@@ -59,9 +62,9 @@ commands =
 [testenv:flake8]
 deps =  flake8==5.0.4
         flake8-import-order
-skip_install = true
 commands =
-        flake8 src/
+        flake8 src setup.py
+        flake8 osm_webhook_translator
 
 
 #######################################################################################
@@ -73,6 +76,7 @@ deps =  {[testenv]deps}
 skip_install = true
 commands =
         pylint -E src
+        pylint -E osm_webhook_translator
 
 
 #######################################################################################
@@ -90,7 +94,7 @@ commands =
 [testenv:pip-compile]
 deps =  pip-tools==6.6.2
 skip_install = true
-whitelist_externals =
+allowlist_externals =
         bash
         [
 commands =
@@ -101,15 +105,20 @@ commands =
         out=`echo $file | sed 's/.in/.txt/'` ; \
         sed -i -e '1 e head -16 tox.ini' $out ;\
         done"
+        bash -c "for file in osm_webhook_translator/requirements*.in ; do \
+        UNSAFE="" ; \
+        if [[ $file =~ 'dist' ]] ; then UNSAFE='--allow-unsafe' ; fi ; \
+        pip-compile -rU --no-header $UNSAFE $file ;\
+        out=`echo $file | sed 's/.in/.txt/'` ; \
+        sed -i -e '1 e head -16 tox.ini' $out ;\
+        done"
 
 
 #######################################################################################
-[testenv:dist]
+[testenv:dist_ng_sa]
 deps =  {[testenv]deps}
         -r{toxinidir}/requirements-dist.txt
-
-# In the commands, we copy the requirements.txt to be presented as a source file (.py)
-# so it gets included in the .deb package for others to consume
+allowlist_externals = sh
 commands =
         sh -c 'cp requirements.txt src/osm_ngsa/requirements.txt'
         sh -c 'cp README.rst src/osm_ngsa/README.rst'
@@ -117,7 +126,21 @@ commands =
         sh -c 'cd deb_dist/osm-ngsa*/ && dpkg-buildpackage -rfakeroot -uc -us'
         sh -c 'rm src/osm_ngsa/requirements.txt'
         sh -c 'rm src/osm_ngsa/README.rst'
-whitelist_externals = sh
+
+
+#######################################################################################
+[testenv:dist_webhook_translator]
+deps =  -r{toxinidir}/osm_webhook_translator/requirements.txt
+        -r{toxinidir}/osm_webhook_translator/requirements-dist.txt
+allowlist_externals = sh
+commands =
+        sh -c 'cp src/osm_ngsa/_version.py osm_webhook_translator/src/osm_webhook_translator/_version.py'
+        sh -c 'cd osm_webhook_translator && cp requirements.txt src/osm_webhook_translator/requirements.txt'
+        sh -c 'cd osm_webhook_translator && cp README.rst src/osm_webhook_translator/README.rst'
+        sh -c 'cd osm_webhook_translator && python3 setup.py --command-packages=stdeb.command sdist_dsc'
+        sh -c 'cd osm_webhook_translator/deb_dist/osm-webhook-translator*/ && dpkg-buildpackage -rfakeroot -uc -us'
+        sh -c 'rm osm_webhook_translator/src/osm_webhook_translator/requirements.txt'
+        sh -c 'rm osm_webhook_translator/src/osm_webhook_translator/README.rst'
 
 
 #######################################################################################