# devops-stages/stage-build.sh
#
-FROM ubuntu:20.04
+FROM ubuntu:22.04
ARG APT_PROXY
RUN if [ ! -z $APT_PROXY ] ; then \
python3 \
python3-all \
python3-dev \
- python3-setuptools
+ python3-setuptools \
+ python3-pip \
+ tox
-RUN python3 -m easy_install pip==22.3
-RUN pip install tox==3.24.5
+ENV LC_ALL C.UTF-8
+ENV LANG C.UTF-8
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
-packaging==23.0
+packaging==23.1
# via setuptools-scm
setuptools-scm==7.1.0
# via -r osm_webhook_translator/requirements-dist.in
# via -r osm_webhook_translator/requirements-dist.in
tomli==2.0.1
# via setuptools-scm
-typing-extensions==4.5.0
+typing-extensions==4.6.0
# via setuptools-scm
# The following packages are considered to be unsafe in a requirements file:
#######################################################################################
anyio==3.6.2
# via starlette
-certifi==2022.12.7
+certifi==2023.5.7
# via requests
charset-normalizer==3.1.0
# via requests
click==8.1.3
# via uvicorn
-fastapi==0.95.0
+fastapi==0.95.2
# via -r osm_webhook_translator/requirements.in
h11==0.14.0
# via uvicorn
# via
# anyio
# requests
-pydantic==1.10.7
+pydantic==1.10.8
# via fastapi
-requests==2.28.2
+requests==2.31.0
# via -r osm_webhook_translator/requirements.in
sniffio==1.3.0
# via anyio
-starlette==0.26.1
+starlette==0.27.0
# via fastapi
-typing-extensions==4.5.0
+typing-extensions==4.6.0
# via
# pydantic
# starlette
-urllib3==1.26.15
+urllib3==2.0.2
# via requests
-uvicorn==0.21.1
+uvicorn==0.22.0
# via -r osm_webhook_translator/requirements.in
name=_name,
description=_description,
long_description=README,
- version=__version__, # noqa: F821
+ version=__version__, # noqa: F821 # pylint: disable=E0602
author="ETSI OSM",
author_email="osmsupport@etsi.org",
maintainer="ETSI OSM",
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+__version__ = version = '12.0.4.dev21+g4e36505'
+__version_tuple__ = version_tuple = (12, 0, 4, 'dev21', 'g4e36505')
# timeout and retries
except Exception as e:
logger.error(f"HTTP error: {repr(e)}")
- raise requests.HTTPException(status_code=403, detail=repr(e))
+ raise requests.HTTPError(status_code=403, detail=repr(e))
@app.post("/{input_endpoint}")
git+https://osm.etsi.org/gerrit/osm/common.git@paas#egg=osm-common
-r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
-apache-airflow==2.4.*
+apache-airflow==2.5.3
+urllib3==1.26.16
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
aiosignal==1.3.1
# via aiohttp
-alembic==1.9.4
+alembic==1.11.1
# via apache-airflow
anyio==3.6.2
# via httpcore
-apache-airflow==2.4.3
+apache-airflow==2.5.3
# via -r requirements-dev.in
-apache-airflow-providers-common-sql==1.3.3
+apache-airflow-providers-common-sql==1.4.0
# via
# apache-airflow
# apache-airflow-providers-sqlite
apache-airflow-providers-ftp==3.3.1
# via apache-airflow
-apache-airflow-providers-http==4.2.0
+apache-airflow-providers-http==4.3.0
# via apache-airflow
apache-airflow-providers-imap==3.1.1
# via apache-airflow
-apache-airflow-providers-sqlite==3.3.1
+apache-airflow-providers-sqlite==3.3.2
# via apache-airflow
apispec[yaml]==3.3.2
# via flask-appbuilder
-argcomplete==2.0.0
+argcomplete==3.0.8
# via apache-airflow
-asgiref==3.6.0
+asgiref==3.7.0
# via apache-airflow-providers-http
async-timeout==4.0.2
# via
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
# aiohttp
# aiokafka
-attrs==22.2.0
+attrs==23.1.0
# via
# aiohttp
# apache-airflow
# jsonschema
babel==2.12.1
# via flask-babel
-blinker==1.5
+blinker==1.6.2
# via apache-airflow
cachelib==0.9.0
# via
# flask-session
cattrs==22.2.0
# via apache-airflow
-certifi==2022.12.7
+certifi==2023.5.7
# via
# httpcore
# httpx
# requests
cffi==1.15.1
# via cryptography
-charset-normalizer==3.0.1
+charset-normalizer==3.1.0
# via
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
# aiohttp
# via apache-airflow
configupdater==3.1.1
# via apache-airflow
-connexion[flask,swagger-ui]==2.14.2
+connexion[flask]==2.14.2
# via apache-airflow
-cron-descriptor==1.2.35
+cron-descriptor==1.4.0
# via apache-airflow
-croniter==1.3.8
+croniter==1.3.14
# via apache-airflow
-cryptography==39.0.1
+cryptography==40.0.2
# via apache-airflow
dataclasses==0.6
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
dill==0.3.6
# via apache-airflow
dnspython==2.3.0
- # via email-validator
-docutils==0.19
+ # via
+ # -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
+ # email-validator
+ # pymongo
+docutils==0.20.1
# via python-daemon
email-validator==1.3.1
# via flask-appbuilder
-exceptiongroup==1.1.0
+exceptiongroup==1.1.1
# via cattrs
-flask==2.2.3
+flask==2.2.5
# via
# apache-airflow
# connexion
# via
# apache-airflow
# flask-appbuilder
-flask-session==0.4.0
+flask-session==0.5.0
# via apache-airflow
flask-sqlalchemy==2.5.1
# via flask-appbuilder
# via apache-airflow
h11==0.14.0
# via httpcore
-httpcore==0.16.3
+httpcore==0.17.2
# via httpx
-httpx==0.23.3
+httpx==0.24.1
# via apache-airflow
idna==3.4
# via
# anyio
# email-validator
+ # httpx
# requests
- # rfc3986
# yarl
importlib-metadata==6.0.0
# via
# flask
# flask-babel
# python-nvd3
- # swagger-ui-bundle
jsonschema==4.17.3
# via
# apache-airflow
# aiokafka
lazy-object-proxy==1.9.0
# via apache-airflow
-linkify-it-py==2.0.0
+linkify-it-py==2.0.2
# via apache-airflow
lockfile==0.12.2
# via
# python-daemon
mako==1.2.4
# via alembic
-markdown==3.4.1
+markdown==3.4.3
# via apache-airflow
markdown-it-py==2.2.0
# via
# via apache-airflow
marshmallow-sqlalchemy==0.26.1
# via flask-appbuilder
-mdit-py-plugins==0.3.4
+mdit-py-plugins==0.3.5
# via apache-airflow
mdurl==0.1.2
# via markdown-it-py
-motor==1.3.1
+motor==3.1.2
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
multidict==6.0.4
# via
# yarl
osm-common @ git+https://osm.etsi.org/gerrit/osm/common.git@paas
# via -r requirements-dev.in
-packaging==23.0
+packaging==23.1
# via
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
# aiokafka
# via apache-airflow
pendulum==2.1.2
# via apache-airflow
-pkgutil-resolve-name==1.3.10
- # via jsonschema
pluggy==1.0.0
# via apache-airflow
prison==0.2.1
# via
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
# temporalio
-psutil==5.9.4
+psutil==5.9.5
# via apache-airflow
pycparser==2.21
# via cffi
pycryptodome==3.17
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
-pygments==2.14.0
+pygments==2.15.1
# via
# apache-airflow
# rich
-pyjwt==2.6.0
+pyjwt==2.7.0
# via
# apache-airflow
# flask-appbuilder
# flask-jwt-extended
-pymongo==3.13.0
+pymongo==4.3.3
# via
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
# motor
pyrsistent==0.19.3
# via jsonschema
-python-daemon==2.3.2
+python-daemon==3.0.1
# via apache-airflow
python-dateutil==2.8.2
# via
# via
# apache-airflow
# python-nvd3
-pytz==2022.7.1
- # via
- # babel
- # flask-babel
+pytz==2023.3
+ # via flask-babel
pytzdata==2020.1
# via pendulum
pyyaml==5.4.1
# apispec
# clickclick
# connexion
-requests==2.28.2
+requests==2.31.0
# via
# apache-airflow-providers-http
# connexion
# requests-toolbelt
-requests-toolbelt==0.10.1
+requests-toolbelt==1.0.0
# via apache-airflow-providers-http
-rfc3986[idna2008]==1.5.0
- # via httpx
-rich==13.3.1
+rfc3339-validator==0.1.4
+ # via apache-airflow
+rich==13.3.5
# via apache-airflow
setproctitle==1.3.2
# via apache-airflow
# -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
# prison
# python-dateutil
+ # rfc3339-validator
sniffio==1.3.0
# via
# anyio
# httpcore
# httpx
-sqlalchemy==1.4.46
+sqlalchemy==1.4.48
# via
# alembic
# apache-airflow
# sqlalchemy-utils
sqlalchemy-jsonfield==1.0.1.post0
# via apache-airflow
-sqlalchemy-utils==0.40.0
+sqlalchemy-utils==0.41.1
# via flask-appbuilder
-sqlparse==0.4.3
+sqlparse==0.4.4
# via apache-airflow-providers-common-sql
-swagger-ui-bundle==0.0.9
- # via connexion
tabulate==0.9.0
# via apache-airflow
temporalio==1.1.0
# via -r https://osm.etsi.org/gitweb/?p=osm/common.git;a=blob_plain;f=requirements.txt;hb=paas
tenacity==8.2.2
# via apache-airflow
-termcolor==2.2.0
+termcolor==2.3.0
# via apache-airflow
text-unidecode==1.3
# via python-slugify
# apache-airflow
# rich
# temporalio
-uc-micro-py==1.0.1
+uc-micro-py==1.0.2
# via linkify-it-py
unicodecsv==0.14.1
# via apache-airflow
-urllib3==1.26.14
- # via requests
+urllib3==1.26.16
+ # via
+ # -r requirements-dev.in
+ # requests
werkzeug==2.2.3
# via
# apache-airflow
# via
# flask-appbuilder
# flask-wtf
-yarl==1.8.2
+yarl==1.9.2
# via aiohttp
zipp==3.15.0
# via
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
-packaging==23.0
+packaging==23.1
# via setuptools-scm
setuptools-scm==7.1.0
# via -r requirements-dist.in
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
-coverage==7.2.1
+coverage==7.2.5
# via -r requirements-test.in
-mock==5.0.1
+mock==5.0.2
# via -r requirements-test.in
-nose2==0.12.0
+nose2==0.13.0
# via -r requirements-test.in
azure-common
azure-identity
azure-mgmt-compute
+azure-mgmt-monitor
gnocchiclient
google-api-python-client
google-auth
+prometheus-api-client
prometheus-client
python-ceilometerclient
python-keystoneclient
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
-attrs==22.2.0
+attrs==23.1.0
# via cmd2
autopage==0.5.1
# via cliff
# via
# -r requirements.in
# azure-mgmt-compute
+ # azure-mgmt-monitor
azure-core==1.26.4
# via
# azure-identity
# azure-mgmt-core
# msrest
-azure-identity==1.12.0
+azure-identity==1.13.0
# via -r requirements.in
azure-mgmt-compute==29.1.0
# via -r requirements.in
azure-mgmt-core==1.4.0
- # via azure-mgmt-compute
+ # via
+ # azure-mgmt-compute
+ # azure-mgmt-monitor
+azure-mgmt-monitor==6.0.1
+ # via -r requirements.in
cachetools==5.3.0
# via google-auth
-certifi==2022.12.7
+certifi==2023.5.7
# via
# msrest
# requests
cffi==1.15.1
# via cryptography
-charset-normalizer==3.0.1
+charset-normalizer==3.1.0
# via requests
-cliff==4.2.0
+cliff==4.3.0
# via gnocchiclient
cmd2==2.4.3
# via cliff
-cryptography==40.0.1
+contourpy==1.0.7
+ # via matplotlib
+cryptography==40.0.2
# via
# azure-identity
# msal
# pyjwt
+cycler==0.11.0
+ # via matplotlib
+dateparser==1.1.8
+ # via prometheus-api-client
debtcollector==2.5.0
# via
# gnocchiclient
# oslo-config
# oslo-utils
# python-keystoneclient
+fonttools==4.39.4
+ # via matplotlib
futurist==2.4.1
# via gnocchiclient
gnocchiclient==7.0.8
# via -r requirements.in
google-api-core==2.11.0
# via google-api-python-client
-google-api-python-client==2.84.0
+google-api-python-client==2.86.0
# via -r requirements.in
-google-auth==2.17.2
+google-auth==2.18.1
# via
# -r requirements.in
# google-api-core
# via google-api-python-client
googleapis-common-protos==1.59.0
# via google-api-core
+httmock==1.4.0
+ # via prometheus-api-client
httplib2==0.22.0
# via
# google-api-python-client
# google-auth-httplib2
idna==3.4
# via requests
-importlib-metadata==6.3.0
+importlib-metadata==6.6.0
# via cliff
iso8601==1.1.0
# via
# python-ceilometerclient
# python-novaclient
isodate==0.6.1
- # via msrest
-keystoneauth1==5.1.2
+ # via
+ # azure-mgmt-monitor
+ # msrest
+keystoneauth1==5.2.0
# via
# gnocchiclient
# python-ceilometerclient
# python-keystoneclient
# python-novaclient
-msal==1.21.0
+kiwisolver==1.4.4
+ # via matplotlib
+matplotlib==3.7.1
+ # via prometheus-api-client
+msal==1.22.0
# via
# azure-identity
# msal-extensions
# oslo-utils
netifaces==0.11.0
# via oslo-utils
+numpy==1.24.3
+ # via
+ # contourpy
+ # matplotlib
+ # pandas
+ # prometheus-api-client
oauthlib==3.2.2
# via requests-oauthlib
os-service-types==1.7.0
# python-ceilometerclient
# python-keystoneclient
# python-novaclient
-packaging==23.0
+packaging==23.1
# via
+ # matplotlib
# oslo-utils
# python-keystoneclient
+pandas==2.0.1
+ # via prometheus-api-client
pbr==5.11.1
# via
# keystoneauth1
# python-keystoneclient
# python-novaclient
# stevedore
+pillow==9.5.0
+ # via matplotlib
portalocker==2.7.0
# via msal-extensions
prettytable==0.7.2
# cliff
# python-ceilometerclient
# python-novaclient
+prometheus-api-client==0.5.3
+ # via -r requirements.in
prometheus-client==0.16.0
# via -r requirements.in
protobuf==3.20.3
# -r requirements.in
# google-api-core
# googleapis-common-protos
-pyasn1==0.4.8
+pyasn1==0.5.0
# via
# pyasn1-modules
# rsa
-pyasn1-modules==0.2.8
+pyasn1-modules==0.3.0
# via google-auth
pycparser==2.21
# via cffi
-pyjwt[crypto]==2.6.0
+pyjwt[crypto]==2.7.0
# via msal
pyparsing==3.0.9
# via
# httplib2
+ # matplotlib
# oslo-utils
pyperclip==1.8.2
# via cmd2
python-ceilometerclient==2.9.0
# via -r requirements.in
python-dateutil==2.8.2
- # via gnocchiclient
+ # via
+ # dateparser
+ # gnocchiclient
+ # matplotlib
+ # pandas
python-keystoneclient==5.1.0
# via -r requirements.in
python-novaclient==18.3.0
# via -r requirements.in
pytz==2023.3
# via
+ # dateparser
# oslo-serialization
# oslo-utils
+ # pandas
pyyaml==5.4.1
# via
# -r requirements.in
# cliff
# oslo-config
-requests==2.28.2
+regex==2023.5.5
+ # via dateparser
+requests==2.31.0
# via
# azure-core
# google-api-core
+ # httmock
# keystoneauth1
# msal
# msrest
# oslo-config
+ # prometheus-api-client
# python-ceilometerclient
# python-keystoneclient
# requests-oauthlib
# google-auth
# google-auth-httplib2
# isodate
- # keystoneauth1
# python-ceilometerclient
# python-dateutil
# python-keystoneclient
-stevedore==5.0.0
+stevedore==5.1.0
# via
# cliff
# keystoneauth1
# python-novaclient
typing-extensions==4.5.0
# via azure-core
+tzdata==2023.3
+ # via pandas
+tzlocal==5.0.1
+ # via dateparser
ujson==5.7.0
# via gnocchiclient
uritemplate==4.1.1
# via google-api-python-client
-urllib3==1.26.15
- # via requests
+urllib3==1.26.16
+ # via
+ # google-auth
+ # requests
wcwidth==0.2.6
# via cmd2
wrapt==1.15.0
if status == "firing":
# Searching alerting rule in MongoDB
logger.info(
- f"Searching alert rule in MongoDB: ns_id {ns_id}, "
+ f"Searching healing alert rule in MongoDB: ns_id {ns_id}, "
f"vnf_member_index {vnf_member_index}, "
f"vdu_name {vdu_name}, "
f"vm_id {vm_id}"
)
alert = common_db.get_alert(
- nsr_id=ns_id, vnf_member_index=vnf_member_index, vdu_name=vdu_name
+ nsr_id=ns_id,
+ vnf_member_index=vnf_member_index,
+ vdu_id=None,
+ vdu_name=vdu_name,
+ action_type="healing",
)
- if alert and alert["action_type"] == "healing":
+ if alert:
logger.info("Found an alert rule:")
logger.info(alert)
# Update alert status
f"vm_id {vm_id}"
)
alert = common_db.get_alert(
- nsr_id=ns_id, vnf_member_index=vnf_member_index, vdu_name=vdu_name
+ nsr_id=ns_id,
+ vnf_member_index=vnf_member_index,
+ vdu_id=None,
+ vdu_name=vdu_name,
+ action_type="healing",
)
if alert:
logger.info("Found an alert rule, updating status")
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+from datetime import datetime, timedelta
+import logging
+
+from airflow import DAG
+from airflow.decorators import task
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+from osm_mon.sdnc_connectors.onos import OnosInfraCollector
+from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
+
+
+SUPPORTED_SDNC_TYPES = ["onos_vpls"]
+PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091"
+PROMETHEUS_JOB_PREFIX = "airflow_osm_sdnc_status_"
+PROMETHEUS_METRIC = "osm_sdnc_status"
+PROMETHEUS_METRIC_DESCRIPTION = "SDN Controller status"
+SCHEDULE_INTERVAL = 1
+
+# Logging
+logger = logging.getLogger("airflow.task")
+
+
+def get_all_sdnc():
+ """Get SDNCs from MongoDB"""
+ logger.info("Getting SDNC list")
+
+ cfg = Config()
+ logger.info(cfg.conf)
+ common_db = CommonDbClient(cfg)
+ sdnc_accounts = common_db.get_sdnc_accounts()
+ sdnc_list = []
+ for sdnc in sdnc_accounts:
+ logger.info(f'Read SDNC {sdnc["_id"]} ({sdnc["name"]})')
+ sdnc_list.append(
+ {"_id": sdnc["_id"], "name": sdnc["name"], "type": sdnc["type"]}
+ )
+
+ logger.info(sdnc_list)
+ logger.info("Getting SDNC list OK")
+ return sdnc_list
+
+
+def create_dag(dag_id, dag_number, dag_description, sdnc_id):
+ dag = DAG(
+ dag_id,
+ catchup=False,
+ default_args={
+ "depends_on_past": False,
+ "retries": 1,
+ # "retry_delay": timedelta(minutes=1),
+ "retry_delay": timedelta(seconds=10),
+ },
+ description=dag_description,
+ is_paused_upon_creation=False,
+ # schedule_interval=timedelta(minutes=SCHEDULE_INTERVAL),
+ schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *",
+ start_date=datetime(2022, 1, 1),
+ tags=["osm", "sdnc"],
+ )
+
+ with dag:
+
+ def get_sdnc_collector(sdnc_account):
+ """Return a SDNC collector for the sdnc_account"""
+ sdnc_type = sdnc_account["type"]
+ if sdnc_type == "onos_vpls":
+ return OnosInfraCollector(sdnc_account)
+ logger.info(f"SDNC type '{sdnc_type}' not supported")
+ return None
+
+ @task(task_id="get_sdnc_status_and_send_to_prometheus")
+ def get_sdnc_status_and_send_to_prometheus(sdnc_id: str):
+ """Authenticate against SDN controller and check status"""
+
+ # Get SDNC account info from MongoDB
+ logger.info(f"Reading SDNC info, id: {sdnc_id}")
+ cfg = Config()
+ common_db = CommonDbClient(cfg)
+ sdnc_account = common_db.get_sdnc_account(sdnc_account_id=sdnc_id)
+ logger.info(sdnc_account)
+
+ # Define Prometheus Metric for NS topology
+ registry = CollectorRegistry()
+ metric = Gauge(
+ PROMETHEUS_METRIC,
+ PROMETHEUS_METRIC_DESCRIPTION,
+ labelnames=[
+ "sdnc_id",
+ ],
+ registry=registry,
+ )
+ metric.labels(sdnc_id).set(0)
+
+ # Get status of SDNC
+ collector = get_sdnc_collector(sdnc_account)
+ if collector:
+ status = collector.is_sdnc_ok()
+ logger.info(f"SDNC status: {status}")
+ metric.labels(sdnc_id).set(1)
+ else:
+ logger.info("Error creating SDNC collector")
+ # Push to Prometheus
+ push_to_gateway(
+ gateway=PROMETHEUS_PUSHGW,
+ job=f"{PROMETHEUS_JOB_PREFIX}{sdnc_id}",
+ registry=registry,
+ )
+ return
+
+ get_sdnc_status_and_send_to_prometheus(sdnc_id)
+
+ return dag
+
+
+sdnc_list = get_all_sdnc()
+for index, sdnc in enumerate(sdnc_list):
+ sdnc_type = sdnc["type"]
+ if sdnc_type in SUPPORTED_SDNC_TYPES:
+ sdnc_id = sdnc["_id"]
+ sdnc_name = sdnc["name"]
+ dag_description = f"Dag for SDNC {sdnc_name} status"
+ dag_id = f"sdnc_status_{sdnc_id}"
+ logger.info(f"Creating DAG {dag_id}")
+ globals()[dag_id] = create_dag(
+ dag_id=dag_id,
+ dag_number=index,
+ dag_description=dag_description,
+ sdnc_id=sdnc_id,
+ )
+ else:
+ logger.info(f"SDNC type '{sdnc_type}' not supported for monitoring SDNC status")
SUPPORTED_VIM_TYPES = ["openstack", "vio", "gcp", "azure"]
PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091"
PROMETHEUS_JOB_PREFIX = "airflow_osm_vim_status_"
-PROMETHEUS_METRIC = "vim_status"
+PROMETHEUS_METRIC = "osm_vim_status"
PROMETHEUS_METRIC_DESCRIPTION = "VIM status"
SCHEDULE_INTERVAL = 1
PROMETHEUS_METRIC,
PROMETHEUS_METRIC_DESCRIPTION,
labelnames=[
- "vim_id",
+ "vim_account_id",
],
registry=registry,
)
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+from datetime import datetime, timedelta
+import logging
+from math import ceil
+from typing import Dict, List
+
+from airflow import DAG
+from airflow.decorators import task
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+from osm_mon.vim_connectors.azure import AzureCollector
+from osm_mon.vim_connectors.openstack import OpenStackCollector
+from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
+
+
+SCHEDULE_INTERVAL = 5
+COLLECTOR_MAX_METRICS_PER_TASK = 100
+SUPPORTED_VIM_TYPES = ["openstack", "vio", "azure"]
+PROMETHEUS_PUSHGW = "pushgateway-prometheus-pushgateway:9091"
+PROMETHEUS_JOB_PREFIX = "airflow_osm_vm_metrics_"
+PROMETHEUS_METRICS = {
+ "cpu_utilization": {
+ "metric_name": "osm_cpu_utilization",
+ "metric_descr": "CPU usage percentage",
+ },
+ "average_memory_utilization": {
+ "metric_name": "osm_average_memory_utilization",
+ "metric_descr": "Volume of RAM in MB used by the VM",
+ },
+ "disk_read_ops": {
+ "metric_name": "osm_disk_read_ops",
+ "metric_descr": "Number of read requests",
+ },
+ "disk_write_ops": {
+ "metric_name": "osm_disk_write_ops",
+ "metric_descr": "Number of write requests",
+ },
+ "disk_read_bytes": {
+ "metric_name": "osm_disk_read_bytes",
+ "metric_descr": "Volume of reads in bytes",
+ },
+ "disk_write_bytes": {
+ "metric_name": "osm_disk_write_bytes",
+ "metric_descr": "Volume of writes in bytes",
+ },
+ "packets_received": {
+ "metric_name": "osm_packets_received",
+ "metric_descr": "Number of incoming packets",
+ },
+ "packets_sent": {
+ "metric_name": "osm_packets_sent",
+ "metric_descr": "Number of outgoing packets",
+ },
+ "packets_in_dropped": {
+ "metric_name": "osm_packets_in_dropped",
+ "metric_descr": "Number of incoming dropped packets",
+ },
+ "packets_out_dropped": {
+ "metric_name": "osm_packets_out_dropped",
+ "metric_descr": "Number of outgoing dropped packets",
+ },
+}
+
+# Logging
+logger = logging.getLogger("airflow.task")
+
+
+def get_all_vim():
+ """Get VIMs from MongoDB"""
+ logger.info("Getting VIM list")
+
+ cfg = Config()
+ logger.info(cfg.conf)
+ common_db = CommonDbClient(cfg)
+ vim_accounts = common_db.get_vim_accounts()
+ vim_list = []
+ for vim in vim_accounts:
+ logger.info(f'Read VIM {vim["_id"]} ({vim["name"]})')
+ vim_list.append(
+ {"_id": vim["_id"], "name": vim["name"], "vim_type": vim["vim_type"]}
+ )
+
+ logger.info(vim_list)
+ logger.info("Getting VIM list OK")
+ return vim_list
+
+
+def create_dag(dag_id, dag_number, dag_description, vim_id):
+ dag = DAG(
+ dag_id,
+ catchup=False,
+ default_args={
+ "depends_on_past": False,
+ "retries": 1,
+ # "retry_delay": timedelta(minutes=1),
+ "retry_delay": timedelta(seconds=10),
+ },
+ description=dag_description,
+ is_paused_upon_creation=False,
+ schedule_interval=f"*/{SCHEDULE_INTERVAL} * * * *",
+ start_date=datetime(2022, 1, 1),
+ tags=["osm", "vdu"],
+ )
+
+ with dag:
+
+ @task(task_id="extract_metrics_from_vnfrs")
+ def extract_metrics_from_vnfrs(vim_id: str):
+ """Get metric list to collect from VIM based on VNFDs and VNFRs stored in MongoDB"""
+
+ # Get VNFDs that include "monitoring-parameter" from MongoDB
+ cfg = Config()
+ common_db = CommonDbClient(cfg)
+ logger.info("Getting VNFDs with monitoring parameters from MongoDB")
+ vnfd_list = common_db.get_monitoring_vnfds()
+ # Get VNFR list from MongoDB
+ logger.info("Getting VNFR list from MongoDB")
+ vnfr_list = common_db.get_vnfrs(vim_account_id=vim_id)
+ # Only read metrics if ns state is one of the nsAllowedStatesSet
+ nsAllowedStatesSet = {"INSTANTIATED"}
+ metric_list = []
+ for vnfr in vnfr_list:
+ if vnfr["_admin"]["nsState"] not in nsAllowedStatesSet:
+ continue
+ # Check if VNFR is in "monitoring-parameter" VNFDs list
+ vnfd_id = vnfr["vnfd-id"]
+ vnfd = next(
+ (item for item in vnfd_list if item["_id"] == vnfd_id), None
+ )
+ if not vnfd:
+ continue
+ ns_id = vnfr["nsr-id-ref"]
+ vnf_index = vnfr["member-vnf-index-ref"]
+ logger.info(
+ f"NS {ns_id}, vnf-index {vnf_index} has monitoring-parameter"
+ )
+ project_list = vnfr.get("_admin", {}).get("projects_read", [])
+ project_id = "None"
+ if project_list:
+ project_id = project_list[0]
+ for vdur in vnfr.get("vdur", []):
+ vim_info = vdur.get("vim_info")
+ if not vim_info:
+ logger.error("Error: vim_info not available in vdur")
+ continue
+ if len(vim_info) != 1:
+ logger.error("Error: more than one vim_info in vdur")
+ continue
+ vim_id = next(iter(vim_info))[4:]
+ vm_id = vdur.get("vim-id")
+ if not vm_id:
+ logger.error("Error: vim-id not available in vdur")
+ continue
+ vdu_name = vdur.get("name", "UNKNOWN")
+ vdu = next(
+ filter(lambda vdu: vdu["id"] == vdur["vdu-id-ref"], vnfd["vdu"])
+ )
+ if "monitoring-parameter" not in vdu:
+ logger.error("Error: no monitoring-parameter in descriptor")
+ continue
+ for param in vdu["monitoring-parameter"]:
+ metric_name = param["performance-metric"]
+ metric_id = param["id"]
+ metric = {
+ "metric": metric_name,
+ "metric_id": metric_id,
+ "vm_id": vm_id,
+ "ns_id": ns_id,
+ "project_id": project_id,
+ "vdu_name": vdu_name,
+ "vnf_member_index": vnf_index,
+ "vdu_id": vdu["id"],
+ }
+ metric_list.append(metric)
+
+ logger.info(f"Metrics to collect: {len(metric_list)}")
+ return metric_list
+
+ @task(task_id="split_metrics")
+ def split_metrics(metric_list: List[Dict]):
+ """Split metric list based on COLLECTOR_MAX_METRICS_PER_TASK"""
+ n_metrics = len(metric_list)
+ if n_metrics <= COLLECTOR_MAX_METRICS_PER_TASK:
+ return [metric_list]
+ metrics_per_chunk = ceil(
+ n_metrics / ceil(n_metrics / COLLECTOR_MAX_METRICS_PER_TASK)
+ )
+ logger.info(
+ f"Splitting {n_metrics} metrics into chunks of {metrics_per_chunk} items"
+ )
+ chunks = []
+ for i in range(0, n_metrics, metrics_per_chunk):
+ chunks.append(metric_list[i : i + metrics_per_chunk])
+ return chunks
+
+ @task(task_id="collect_metrics")
+ def collect_metrics(vim_id: str, metric_list: List[Dict]):
+ """Collect servers metrics"""
+ if not metric_list:
+ return []
+
+ # Get VIM account info from MongoDB
+ logger.info(f"Reading VIM info, id: {vim_id}")
+ cfg = Config()
+ common_db = CommonDbClient(cfg)
+ vim_account = common_db.get_vim_account(vim_account_id=vim_id)
+ # Create VIM metrics collector
+ vim_type = vim_account["vim_type"]
+ if "config" in vim_account and "vim_type" in vim_account["config"]:
+ vim_type = vim_account["config"]["vim_type"].lower()
+ if vim_type == "vio" and "vrops_site" not in vim_account["config"]:
+ vim_type = "openstack"
+ if vim_type == "openstack":
+ collector = OpenStackCollector(vim_account)
+ elif vim_type == "azure":
+ collector = AzureCollector(vim_account)
+ else:
+ logger.error(f"VIM type '{vim_type}' not supported")
+ return None
+ # Get metrics
+ results = []
+ if collector:
+ results = collector.collect_metrics(metric_list)
+ logger.info(results)
+ return results
+
+ @task(task_id="send_prometheus")
+ def send_prometheus(metric_lists: List[List[Dict]]):
+ """Send servers metrics to Prometheus Push Gateway"""
+ logger.info(metric_lists)
+
+ # Define Prometheus metrics
+ registry = CollectorRegistry()
+ prom_metrics = {}
+ prom_metrics_keys = PROMETHEUS_METRICS.keys()
+ for key in prom_metrics_keys:
+ prom_metrics[key] = Gauge(
+ PROMETHEUS_METRICS[key]["metric_name"],
+ PROMETHEUS_METRICS[key]["metric_descr"],
+ labelnames=[
+ "metric_id",
+ "ns_id",
+ "project_id",
+ "vnf_member_index",
+ "vm_id",
+ "vim_id",
+ "vdu_name",
+ "vdu_id",
+ ],
+ registry=registry,
+ )
+
+ for metric_list in metric_lists:
+ for metric in metric_list:
+ metric_name = metric["metric"]
+ metric_id = metric["metric_id"]
+ value = metric["value"]
+ vm_id = metric["vm_id"]
+ vm_name = metric.get("vdu_name", "")
+ ns_id = metric["ns_id"]
+ project_id = metric["project_id"]
+ vnf_index = metric["vnf_member_index"]
+ vdu_id = metric["vdu_id"]
+ logger.info(
+ f" metric: {metric_name}, metric_id: {metric_id} value: {value}, vm_id: {vm_id}, vm_name: {vm_name}, ns_id: {ns_id}, vnf_index: {vnf_index}, project_id: {project_id}, vdu_id: {vdu_id} "
+ )
+ if metric_name in prom_metrics_keys:
+ prom_metrics[metric_name].labels(
+ metric_id,
+ ns_id,
+ project_id,
+ vnf_index,
+ vm_id,
+ vim_id,
+ vm_name,
+ vdu_id,
+ ).set(value)
+
+ # Push to Prometheus
+ push_to_gateway(
+ gateway=PROMETHEUS_PUSHGW,
+ job=f"{PROMETHEUS_JOB_PREFIX}{vim_id}",
+ registry=registry,
+ )
+ return
+
+ chunks = split_metrics(extract_metrics_from_vnfrs(vim_id))
+ send_prometheus(
+ collect_metrics.partial(vim_id=vim_id).expand(metric_list=chunks)
+ )
+
+ return dag
+
+
+vim_list = get_all_vim()
+for index, vim in enumerate(vim_list):
+ vim_type = vim["vim_type"]
+ if vim_type in SUPPORTED_VIM_TYPES:
+ vim_id = vim["_id"]
+ vim_name = vim["name"]
+ dag_description = f"Dag for collecting VM metrics from VIM {vim_name}"
+ dag_id = f"vm_metrics_vim_{vim_id}"
+ logger.info(f"Creating DAG {dag_id}")
+ globals()[dag_id] = create_dag(
+ dag_id=dag_id,
+ dag_number=index,
+ dag_description=dag_description,
+ vim_id=vim_id,
+ )
+ else:
+ logger.info(f"VIM type '{vim_type}' not supported for collecting VM metrics")
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+import asyncio
+from datetime import datetime, timedelta
+import logging
+import time
+import uuid
+
+from airflow.decorators import dag, task
+from airflow.operators.python import get_current_context
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+from osm_mon.core.message_bus_client import MessageBusClient
+
+# Logging
+logger = logging.getLogger("airflow.task")
+
+
+@dag(
+ catchup=False,
+ default_args={
+ "depends_on_past": False,
+ "retries": 1,
+ "retry_delay": timedelta(seconds=15),
+ },
+ description="Webhook callback for scale-in alarm from Prometheus AlertManager",
+ is_paused_upon_creation=False,
+ schedule_interval=None,
+ start_date=datetime(2022, 1, 1),
+ tags=["osm", "webhook"],
+)
+def scalein_vdu():
+ @task(task_id="main_task")
+ def main_task():
+ logger.debug("Running main task...")
+ # Read input parameters
+ context = get_current_context()
+ conf = context["dag_run"].conf
+ for alarm in conf["alerts"]:
+ logger.info("Scale-in alarm:")
+ status = alarm["status"]
+ logger.info(f" status: {status}")
+ logger.info(f' annotations: {alarm["annotations"]}')
+ logger.info(f' startsAt: {alarm["startsAt"]}')
+ logger.info(f' endsAt: {alarm["endsAt"]}')
+ logger.info(f' labels: {alarm["labels"]}')
+ alertname = alarm["labels"].get("alertname")
+ if not alertname.startswith("scalein_"):
+ continue
+ # scalein_vdu alert type
+ config = Config()
+ common_db = CommonDbClient(config)
+ ns_id = alarm["labels"]["ns_id"]
+ vdu_id = alarm["labels"]["vdu_id"]
+ vnf_member_index = alarm["labels"]["vnf_member_index"]
+ if status == "firing":
+ # Searching alerting rule in MongoDB
+ logger.info(
+ f"Searching scale-in alert rule in MongoDB: ns_id {ns_id}, "
+ f"vnf_member_index {vnf_member_index}, "
+ f"vdu_id {vdu_id}, "
+ )
+ alert = common_db.get_alert(
+ nsr_id=ns_id,
+ vnf_member_index=vnf_member_index,
+ vdu_id=vdu_id,
+ vdu_name=None,
+ action_type="scale_in",
+ )
+ if alert:
+ logger.info("Found an alert rule:")
+ logger.info(alert)
+ # Update alert status
+ common_db.update_alert_status(
+ uuid=alert["uuid"], alarm_status="alarm"
+ )
+ # Get VNFR from MongoDB
+ vnfr = common_db.get_vnfr(
+ nsr_id=ns_id, member_index=vnf_member_index
+ )
+ logger.info(
+ f"Found VNFR ns_id: {ns_id}, vnf_member_index: {vnf_member_index}"
+ )
+ # Check cooldown-time before scale-in
+ send_lcm = 1
+ if "cooldown-time" in alert["action"]:
+ cooldown_time = alert["action"]["cooldown-time"]
+ cooldown_time = cooldown_time * 60
+ now = time.time()
+ since = now - cooldown_time
+ logger.info(
+ f"Looking for scale operations in cooldown interval ({cooldown_time} s)"
+ )
+ nslcmops = common_db.get_nslcmop(
+ nsr_id=ns_id, operation_type="scale", since=since
+ )
+ op = next(
+ (
+ sub
+ for sub in nslcmops
+ if ("scaleVnfData" in sub["operationParams"])
+ and (
+ "scaleByStepData"
+ in sub["operationParams"]["scaleVnfData"]
+ )
+ and (
+ "member-vnf-index"
+ in sub["operationParams"]["scaleVnfData"][
+ "scaleByStepData"
+ ]
+ )
+ and (
+ sub["operationParams"]["scaleVnfData"][
+ "scaleByStepData"
+ ]["member-vnf-index"]
+ == vnf_member_index
+ )
+ ),
+ None,
+ )
+ if op:
+ logger.info(
+ f"No scale-in will be launched, found a previous scale operation in cooldown interval: {op}"
+ )
+ send_lcm = 0
+
+ if send_lcm:
+ # Save nslcmop object in MongoDB
+ msg_bus = MessageBusClient(config)
+ loop = asyncio.get_event_loop()
+ _id = str(uuid.uuid4())
+ now = time.time()
+ projects_read = vnfr["_admin"]["projects_read"]
+ projects_write = vnfr["_admin"]["projects_write"]
+ scaling_group = alert["action"]["scaling-group"]
+ params = {
+ "scaleType": "SCALE_VNF",
+ "scaleVnfData": {
+ "scaleVnfType": "SCALE_IN",
+ "scaleByStepData": {
+ "scaling-group-descriptor": scaling_group,
+ "member-vnf-index": vnf_member_index,
+ },
+ },
+ "scaleTime": "{}Z".format(datetime.utcnow().isoformat()),
+ }
+ nslcmop = {
+ "id": _id,
+ "_id": _id,
+ "operationState": "PROCESSING",
+ "statusEnteredTime": now,
+ "nsInstanceId": ns_id,
+ "lcmOperationType": "scale",
+ "startTime": now,
+ "isAutomaticInvocation": True,
+ "operationParams": params,
+ "isCancelPending": False,
+ "links": {
+ "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
+ "nsInstance": "/osm/nslcm/v1/ns_instances/" + ns_id,
+ },
+ "_admin": {
+ "projects_read": projects_read,
+ "projects_write": projects_write,
+ },
+ }
+ common_db.create_nslcmop(nslcmop)
+ # Send Kafka message to LCM
+ logger.info("Sending scale-in action message:")
+ logger.info(nslcmop)
+ loop.run_until_complete(
+ msg_bus.aiowrite("ns", "scale", nslcmop)
+ )
+ else:
+ logger.info("No alert rule was found")
+ elif status == "resolved":
+ # Searching alerting rule in MongoDB
+ logger.info(
+ f"Searching alert rule in MongoDB: ns_id {ns_id}, "
+ f"vnf_member_index {vnf_member_index}, "
+ )
+ alert = common_db.get_alert(
+ nsr_id=ns_id,
+ vnf_member_index=vnf_member_index,
+ vdu_id=vdu_id,
+ vdu_name=None,
+ action_type="scale_in",
+ )
+ if alert:
+ logger.info("Found an alert rule, updating status")
+ # Update alert status
+ common_db.update_alert_status(uuid=alert["uuid"], alarm_status="ok")
+
+ main_task()
+
+
+dag = scalein_vdu()
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+import asyncio
+from datetime import datetime, timedelta
+import logging
+import time
+import uuid
+
+from airflow.decorators import dag, task
+from airflow.operators.python import get_current_context
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+from osm_mon.core.message_bus_client import MessageBusClient
+
+# Logging
+logger = logging.getLogger("airflow.task")
+
+
+@dag(
+ catchup=False,
+ default_args={
+ "depends_on_past": False,
+ "retries": 1,
+ "retry_delay": timedelta(seconds=15),
+ },
+ description="Webhook callback for scale-out alarm from Prometheus AlertManager",
+ is_paused_upon_creation=False,
+ schedule_interval=None,
+ start_date=datetime(2022, 1, 1),
+ tags=["osm", "webhook"],
+)
+def scaleout_vdu():
+ @task(task_id="main_task")
+ def main_task():
+ logger.debug("Running main task...")
+ # Read input parameters
+ context = get_current_context()
+ conf = context["dag_run"].conf
+ for alarm in conf["alerts"]:
+ logger.info("Scale-out alarm:")
+ status = alarm["status"]
+ logger.info(f" status: {status}")
+ logger.info(f' annotations: {alarm["annotations"]}')
+ logger.info(f' startsAt: {alarm["startsAt"]}')
+ logger.info(f' endsAt: {alarm["endsAt"]}')
+ logger.info(f' labels: {alarm["labels"]}')
+ alertname = alarm["labels"].get("alertname")
+ if not alertname.startswith("scaleout_"):
+ continue
+ # scaleout_vdu alert type
+ config = Config()
+ common_db = CommonDbClient(config)
+ ns_id = alarm["labels"]["ns_id"]
+ vdu_id = alarm["labels"]["vdu_id"]
+ vnf_member_index = alarm["labels"]["vnf_member_index"]
+ if status == "firing":
+ # Searching alerting rule in MongoDB
+ logger.info(
+ f"Searching scale-out alert rule in MongoDB: ns_id {ns_id}, "
+ f"vnf_member_index {vnf_member_index}, "
+ f"vdu_id {vdu_id}, "
+ )
+ alert = common_db.get_alert(
+ nsr_id=ns_id,
+ vnf_member_index=vnf_member_index,
+ vdu_id=vdu_id,
+ vdu_name=None,
+ action_type="scale_out",
+ )
+ if alert:
+ logger.info("Found an alert rule:")
+ logger.info(alert)
+ # Update alert status
+ common_db.update_alert_status(
+ uuid=alert["uuid"], alarm_status="alarm"
+ )
+ # Get VNFR from MongoDB
+ vnfr = common_db.get_vnfr(
+ nsr_id=ns_id, member_index=vnf_member_index
+ )
+ logger.info(
+ f"Found VNFR ns_id: {ns_id}, vnf_member_index: {vnf_member_index}"
+ )
+ # Check cooldown-time before scale-out
+ send_lcm = 1
+ if "cooldown-time" in alert["action"]:
+ cooldown_time = alert["action"]["cooldown-time"]
+ cooldown_time = cooldown_time * 60
+ now = time.time()
+ since = now - cooldown_time
+ logger.info(
+ f"Looking for scale operations in cooldown interval ({cooldown_time} s)"
+ )
+ nslcmops = common_db.get_nslcmop(
+ nsr_id=ns_id, operation_type="scale", since=since
+ )
+ op = next(
+ (
+ sub
+ for sub in nslcmops
+ if ("scaleVnfData" in sub["operationParams"])
+ and (
+ "scaleByStepData"
+ in sub["operationParams"]["scaleVnfData"]
+ )
+ and (
+ "member-vnf-index"
+ in sub["operationParams"]["scaleVnfData"][
+ "scaleByStepData"
+ ]
+ )
+ and (
+ sub["operationParams"]["scaleVnfData"][
+ "scaleByStepData"
+ ]["member-vnf-index"]
+ == vnf_member_index
+ )
+ ),
+ None,
+ )
+ if op:
+ logger.info(
+ f"No scale-out will be launched, found a previous scale operation in cooldown interval: {op}"
+ )
+ send_lcm = 0
+
+ if send_lcm:
+ # Save nslcmop object in MongoDB
+ msg_bus = MessageBusClient(config)
+ loop = asyncio.get_event_loop()
+ _id = str(uuid.uuid4())
+ projects_read = vnfr["_admin"]["projects_read"]
+ projects_write = vnfr["_admin"]["projects_write"]
+ scaling_group = alert["action"]["scaling-group"]
+ params = {
+ "scaleType": "SCALE_VNF",
+ "scaleVnfData": {
+ "scaleVnfType": "SCALE_OUT",
+ "scaleByStepData": {
+ "scaling-group-descriptor": scaling_group,
+ "member-vnf-index": vnf_member_index,
+ },
+ },
+ "scaleTime": "{}Z".format(datetime.utcnow().isoformat()),
+ }
+ nslcmop = {
+ "id": _id,
+ "_id": _id,
+ "operationState": "PROCESSING",
+ "statusEnteredTime": now,
+ "nsInstanceId": ns_id,
+ "lcmOperationType": "scale",
+ "startTime": now,
+ "isAutomaticInvocation": True,
+ "operationParams": params,
+ "isCancelPending": False,
+ "links": {
+ "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id,
+ "nsInstance": "/osm/nslcm/v1/ns_instances/" + ns_id,
+ },
+ "_admin": {
+ "projects_read": projects_read,
+ "projects_write": projects_write,
+ },
+ }
+ common_db.create_nslcmop(nslcmop)
+ # Send Kafka message to LCM
+ logger.info("Sending scale-out action message:")
+ logger.info(nslcmop)
+ loop.run_until_complete(
+ msg_bus.aiowrite("ns", "scale", nslcmop)
+ )
+ else:
+ logger.info("No alert rule was found")
+ elif status == "resolved":
+ # Searching alerting rule in MongoDB
+ logger.info(
+ f"Searching alert rule in MongoDB: ns_id {ns_id}, "
+ f"vnf_member_index {vnf_member_index}, "
+ )
+ alert = common_db.get_alert(
+ nsr_id=ns_id,
+ vnf_member_index=vnf_member_index,
+ vdu_id=vdu_id,
+ vdu_name=None,
+ action_type="scale_out",
+ )
+ if alert:
+ logger.info("Found an alert rule, updating status")
+ # Update alert status
+ common_db.update_alert_status(uuid=alert["uuid"], alarm_status="ok")
+
+ main_task()
+
+
+dag = scaleout_vdu()
nsr = self.common_db.get_one("nsrs", {"id": nsr_id})
return nsr
+ def get_vnfds(self):
+ return self.common_db.get_list("vnfds")
+
+ def get_monitoring_vnfds(self):
+ return self.common_db.get_list(
+ "vnfds", {"vdu.monitoring-parameter": {"$exists": "true"}}
+ )
+
def decrypt_vim_password(self, vim_password: str, schema_version: str, vim_id: str):
return self.common_db.decrypt(vim_password, schema_version, vim_id)
+ def decrypt_sdnc_password(
+ self, sdnc_password: str, schema_version: str, sdnc_id: str
+ ):
+ return self.common_db.decrypt(sdnc_password, schema_version, sdnc_id)
+
def get_vim_accounts(self):
return self.common_db.get_list("vim_accounts")
)
return vim_account
- def get_alert(self, nsr_id: str, vnf_member_index: str, vdu_name: str):
+ def get_sdnc_accounts(self):
+ return self.common_db.get_list("sdns")
+
+ def get_sdnc_account(self, sdnc_account_id: str) -> dict:
+ sdnc_account = self.common_db.get_one("sdns", {"_id": sdnc_account_id})
+ sdnc_account["password"] = self.decrypt_vim_password(
+ sdnc_account["password"], sdnc_account["schema_version"], sdnc_account_id
+ )
+ return sdnc_account
+
+ def get_alert(
+ self,
+ nsr_id: str,
+ vnf_member_index: str,
+ vdu_id: str,
+ vdu_name: str,
+ action_type: str,
+ ):
+ q_filter = {"action_type": action_type}
+ if nsr_id:
+ q_filter["tags.ns_id"] = nsr_id
+ if vnf_member_index:
+ q_filter["tags.vnf_member_index"] = vnf_member_index
+ if vdu_id:
+ q_filter["tags.vdu_id"] = vdu_id
+ if vdu_name:
+ q_filter["tags.vdu_name"] = vdu_name
alert = self.common_db.get_one(
- "alerts",
- {
- "tags.ns_id": nsr_id,
- "tags.vnf_member_index": vnf_member_index,
- "tags.vdu_name": vdu_name,
- },
+ table="alerts", q_filter=q_filter, fail_on_empty=False
)
return alert
def create_nslcmop(self, nslcmop: dict):
self.common_db.create("nslcmops", nslcmop)
+
+ def get_nslcmop(self, nsr_id: str, operation_type: str, since: str):
+ q_filter = {}
+ if nsr_id:
+ q_filter["nsInstanceId"] = nsr_id
+ if operation_type:
+ q_filter["lcmOperationType"] = operation_type
+ if since:
+ q_filter["startTime"] = {"$gt": since}
+ ops = self.common_db.get_list(table="nslcmops", q_filter=q_filter)
+ return ops
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
-import asyncio
from typing import Callable, List
from osm_common import msgkafka, msglocal
class MessageBusClient:
- def __init__(self, config: Config, loop=None):
+ def __init__(self, config: Config):
if config.get("message", "driver") == "local":
self.msg_bus = msglocal.MsgLocal()
elif config.get("message", "driver") == "kafka":
"Unknown message bug driver {}".format(config.get("section", "driver"))
)
self.msg_bus.connect(config.get("message"))
- if not loop:
- loop = asyncio.get_event_loop()
- self.loop = loop
async def aioread(self, topics: List[str], callback: Callable = None, **kwargs):
"""
:param kwargs: Keyword arguments to be passed to callback function.
:return: None
"""
- await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs)
+ await self.msg_bus.aioread(topics, aiocallback=callback, **kwargs)
async def aiowrite(self, topic: str, key: str, msg: dict):
"""
:param msg: Dictionary containing message to be written.
:return: None
"""
- await self.msg_bus.aiowrite(topic, key, msg, self.loop)
+ await self.msg_bus.aiowrite(topic, key, msg)
async def aioread_once(self, topic: str):
"""
:param topic: topic to retrieve message from.
:return: tuple(topic, key, message)
"""
- result = await self.msg_bus.aioread(topic, self.loop)
+ result = await self.msg_bus.aioread(topic)
return result
--- /dev/null
+#######################################################################################
+# Copyright ETSI Contributors and Others.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#######################################################################################
+from typing import Dict
+
+
+class SDNCConnector:
+ def __init__(self, sdnc_account: Dict):
+ pass
+
+ def is_sdnc_ok(self) -> bool:
+ pass
--- /dev/null
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import logging
+from typing import Dict
+
+from osm_mon.sdnc_connectors.base_sdnc import SDNCConnector
+import requests
+from requests.auth import HTTPBasicAuth
+
+log = logging.getLogger(__name__)
+
+
+class OnosInfraCollector(SDNCConnector):
+ def __init__(self, sdnc_account: Dict):
+ self.sdnc_account = sdnc_account
+
+ def _obtain_url(self):
+ url = self.sdnc_account.get("url")
+ if url:
+ return url
+ else:
+ if not self.sdnc_account.get("ip") or not self.sdnc_account.get("port"):
+ raise Exception("You must provide a URL to contact the SDN Controller")
+ else:
+ return "http://{}:{}/onos/v1/devices".format(
+ self.sdnc_account["ip"], self.sdnc_account["port"]
+ )
+
+ def is_sdnc_ok(self) -> bool:
+ try:
+ url = self._obtain_url()
+ user = self.sdnc_account["user"]
+ password = self.sdnc_account["password"]
+
+ requests.get(url, auth=HTTPBasicAuth(user, password))
+ return True
+ except Exception:
+ log.exception("SDNC status is not OK!")
+ return False
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
+import datetime
import logging
from typing import Dict, List
from azure.identity import ClientSecretCredential
from azure.mgmt.compute import ComputeManagementClient
+from azure.mgmt.monitor import MonitorManagementClient
from azure.profiles import ProfileDefinition
from osm_mon.vim_connectors.base_vim import VIMConnector
log = logging.getLogger(__name__)
+METRIC_MAPPINGS = {
+ "cpu_utilization": {
+ "metricname": "Percentage CPU",
+ "aggregation": "Average",
+ },
+ "disk_read_ops": {
+ "metricname": "Disk Read Operations/Sec",
+ "aggregation": "Average",
+ },
+ "disk_write_ops": {
+ "metricname": "Disk Write Operations/Sec",
+ "aggregation": "Average",
+ },
+ "disk_read_bytes": {
+ "metricname": "Disk Read Bytes",
+ "aggregation": "Total",
+ },
+ "disk_write_bytes": {
+ "metricname": "Disk Write Bytes",
+ "aggregation": "Total",
+ },
+ # "average_memory_utilization": {},
+ # "packets_in_dropped": {},
+ # "packets_out_dropped": {},
+ # "packets_received": {},
+ # "packets_sent": {},
+}
+
+
class AzureCollector(VIMConnector):
# Translate azure provisioning state to OSM provision state.
# The first three ones are the transitional status once a user initiated
def __init__(self, vim_account: Dict):
self.vim_account = vim_account
self.reload_client = True
- logger = logging.getLogger("azure")
- logger.setLevel(logging.ERROR)
+
# Store config to create azure subscription later
self._config = {
"user": vim_account["vim_user"],
self._config["subscription_id"],
profile=self.AZURE_COMPUTE_MGMT_PROFILE,
)
+ # create client
+ self.conn_monitor = MonitorManagementClient(
+ self.credentials,
+ self._config["subscription_id"],
+ )
# Set to client created
self.reload_client = False
except Exception as e:
except Exception as e:
log.error(e)
return status
+
+ def collect_metrics(self, metric_list: List[Dict]) -> List[Dict]:
+ log.debug("collect_metrics")
+ self._reload_connection()
+
+ metric_results = []
+ log.info(metric_list)
+ for metric in metric_list:
+ server = metric["vm_id"]
+ metric_name = metric["metric"]
+ metric_mapping = METRIC_MAPPINGS.get(metric_name)
+ if not metric_mapping:
+ # log.info(f"Metric {metric_name} not available in Azure")
+ continue
+ azure_metric_name = metric_mapping["metricname"]
+ azure_aggregation = metric_mapping["aggregation"]
+ end = datetime.datetime.now()
+ init = end - datetime.timedelta(minutes=5)
+ try:
+ metrics_data = self.conn_monitor.metrics.list(
+ server,
+ timespan="{}/{}".format(init, end),
+ interval="PT1M",
+ metricnames=azure_metric_name,
+ aggregation=azure_aggregation,
+ )
+ except Exception as e:
+ log.error(e)
+ continue
+ total = 0
+ n_metrics = 0
+ for item in metrics_data.value:
+ log.info("{} ({})".format(item.name.localized_value, item.unit))
+ for timeserie in item.timeseries:
+ for data in timeserie.data:
+ if azure_aggregation == "Average":
+ val = data.average
+ elif azure_aggregation == "Total":
+ val = data.total
+ else:
+ val = None
+ log.info("{}: {}".format(data.time_stamp, val))
+ if val is not None:
+ total += val
+ n_metrics += 1
+ if n_metrics > 0:
+ value = total / n_metrics
+ log.info(f"value = {value}")
+ metric["value"] = value
+ metric_results.append(metric)
+ else:
+ log.info("No metric available")
+
+ return metric_results
# See the License for the specific language governing permissions and
# limitations under the License.
#######################################################################################
+from enum import Enum
import logging
+import time
from typing import Dict, List
+from ceilometerclient import client as ceilometer_client
+from ceilometerclient.exc import HTTPException
+import gnocchiclient.exceptions
+from gnocchiclient.v1 import client as gnocchi_client
from keystoneauth1 import session
+from keystoneauth1.exceptions.catalog import EndpointNotFound
from keystoneauth1.identity import v3
from novaclient import client as nova_client
from osm_mon.vim_connectors.base_vim import VIMConnector
+from prometheus_api_client import PrometheusConnect as prometheus_client
log = logging.getLogger(__name__)
+METRIC_MULTIPLIERS = {"cpu": 0.0000001}
+
+METRIC_AGGREGATORS = {"cpu": "rate:mean"}
+
+INTERFACE_METRICS = [
+ "packets_in_dropped",
+ "packets_out_dropped",
+ "packets_received",
+ "packets_sent",
+]
+
+INSTANCE_DISK = [
+ "disk_read_ops",
+ "disk_write_ops",
+ "disk_read_bytes",
+ "disk_write_bytes",
+]
+
+METRIC_MAPPINGS = {
+ "average_memory_utilization": "memory.usage",
+ "disk_read_ops": "disk.device.read.requests",
+ "disk_write_ops": "disk.device.write.requests",
+ "disk_read_bytes": "disk.device.read.bytes",
+ "disk_write_bytes": "disk.device.write.bytes",
+ "packets_in_dropped": "network.outgoing.packets.drop",
+ "packets_out_dropped": "network.incoming.packets.drop",
+ "packets_received": "network.incoming.packets",
+ "packets_sent": "network.outgoing.packets",
+ "cpu_utilization": "cpu",
+}
+
+METRIC_MAPPINGS_FOR_PROMETHEUS_TSBD = {
+ "cpu_utilization": "cpu",
+ "average_memory_utilization": "memory_usage",
+ "disk_read_ops": "disk_device_read_requests",
+ "disk_write_ops": "disk_device_write_requests",
+ "disk_read_bytes": "disk_device_read_bytes",
+ "disk_write_bytes": "disk_device_write_bytes",
+ "packets_in_dropped": "network_incoming_packets_drop",
+ "packets_out_dropped": "network_outgoing_packets_drop",
+ "packets_received": "network_incoming_packets",
+ "packets_sent": "network_outgoing_packets",
+}
+
+
+class MetricType(Enum):
+ INSTANCE = "instance"
+ INTERFACE_ALL = "interface_all"
+ INTERFACE_ONE = "interface_one"
+ INSTANCEDISK = "instancedisk"
+
class CertificateNotCreated(Exception):
pass
class OpenStackCollector(VIMConnector):
def __init__(self, vim_account: Dict):
- log.info("__init__")
+ log.debug("__init__")
self.vim_account = vim_account
self.vim_session = None
self.vim_session = self._get_session(vim_account)
self.nova = self._build_nova_client()
+ # self.gnocchi = self._build_gnocchi_client()
+ self.backend = self._get_backend(vim_account, self.vim_session)
def _get_session(self, creds: Dict):
+ log.debug("_get_session")
verify_ssl = True
project_domain_name = "Default"
user_domain_name = "Default"
except CertificateNotCreated as e:
log.error(e)
+ def _get_backend(self, vim_account: dict, vim_session: object):
+ if vim_account.get("prometheus-config"):
+ # try:
+ # tsbd = PrometheusTSBDBackend(vim_account)
+ # log.debug("Using prometheustsbd backend to collect metric")
+ # return tsbd
+ # except Exception as e:
+ # log.error(f"Can't create prometheus client, {e}")
+ # return None
+ return None
+ try:
+ gnocchi = GnocchiBackend(vim_account, vim_session)
+ gnocchi.client.metric.list(limit=1)
+ log.debug("Using gnocchi backend to collect metric")
+ return gnocchi
+ except (HTTPException, EndpointNotFound):
+ ceilometer = CeilometerBackend(vim_account, vim_session)
+ ceilometer.client.capabilities.get()
+ log.debug("Using ceilometer backend to collect metric")
+ return ceilometer
+
def _build_nova_client(self) -> nova_client.Client:
return nova_client.Client("2", session=self.vim_session, timeout=10)
+ def _build_gnocchi_client(self) -> gnocchi_client.Client:
+ return gnocchi_client.Client(session=self.vim_session)
+
def collect_servers_status(self) -> List[Dict]:
- log.info("collect_servers_status")
+ log.debug("collect_servers_status")
servers = []
for server in self.nova.servers.list(detailed=True):
vm = {
return servers
def is_vim_ok(self) -> bool:
+ log.debug("is_vim_ok")
try:
self.nova.servers.list()
return True
except Exception as e:
log.warning("VIM status is not OK: %s" % e)
return False
+
+ def _get_metric_type(self, metric_name: str) -> MetricType:
+ if metric_name not in INTERFACE_METRICS:
+ if metric_name not in INSTANCE_DISK:
+ return MetricType.INSTANCE
+ else:
+ return MetricType.INSTANCEDISK
+ else:
+ return MetricType.INTERFACE_ALL
+
+ def collect_metrics(self, metric_list: List[Dict]) -> List[Dict]:
+ log.debug("collect_metrics")
+ if not self.backend:
+ log.error("Undefined backend")
+ return []
+
+ if type(self.backend) is PrometheusTSBDBackend:
+ log.info("Using Prometheus as backend (NOT SUPPORTED)")
+ return []
+
+ metric_results = []
+ for metric in metric_list:
+ server = metric["vm_id"]
+ metric_name = metric["metric"]
+ openstack_metric_name = METRIC_MAPPINGS[metric_name]
+ metric_type = self._get_metric_type(metric_name)
+ log.info(f"Collecting metric {openstack_metric_name} for {server}")
+ try:
+ value = self.backend.collect_metric(
+ metric_type, openstack_metric_name, server
+ )
+ if value is not None:
+ log.info(f"value: {value}")
+ metric["value"] = value
+ metric_results.append(metric)
+ else:
+ log.info("metric value is empty")
+ except Exception as e:
+ log.error("Error in metric collection: %s" % e)
+ return metric_results
+
+
+class OpenstackBackend:
+ def collect_metric(
+ self, metric_type: MetricType, metric_name: str, resource_id: str
+ ):
+ pass
+
+
+class PrometheusTSBDBackend(OpenstackBackend):
+ def __init__(self, vim_account: dict):
+ self.map = self._build_map(vim_account)
+ self.cred = vim_account["prometheus-config"].get("prometheus-cred")
+ self.client = self._build_prometheus_client(
+ vim_account["prometheus-config"]["prometheus-url"]
+ )
+
+ def _build_prometheus_client(self, url: str) -> prometheus_client:
+ return prometheus_client(url, disable_ssl=True)
+
+ def _build_map(self, vim_account: dict) -> dict:
+ custom_map = METRIC_MAPPINGS_FOR_PROMETHEUS_TSBD
+ if "prometheus-map" in vim_account["prometheus-config"]:
+ custom_map.update(vim_account["prometheus-config"]["prometheus-map"])
+ return custom_map
+
+ def collect_metric(
+ self, metric_type: MetricType, metric_name: str, resource_id: str
+ ):
+ metric = self.query_metric(metric_name, resource_id)
+ return metric["value"][1] if metric else None
+
+ def map_metric(self, metric_name: str):
+ return self.map[metric_name]
+
+ def query_metric(self, metric_name, resource_id=None):
+ metrics = self.client.get_current_metric_value(metric_name=metric_name)
+ if resource_id:
+ metric = next(
+ filter(lambda x: resource_id in x["metric"]["resource_id"], metrics)
+ )
+ return metric
+ return metrics
+
+
+class GnocchiBackend(OpenstackBackend):
+ def __init__(self, vim_account: dict, vim_session: object):
+ self.client = self._build_gnocchi_client(vim_account, vim_session)
+
+ def _build_gnocchi_client(
+ self, vim_account: dict, vim_session: object
+ ) -> gnocchi_client.Client:
+ return gnocchi_client.Client(session=vim_session)
+
+ def collect_metric(
+ self, metric_type: MetricType, metric_name: str, resource_id: str
+ ):
+ if metric_type == MetricType.INTERFACE_ALL:
+ return self._collect_interface_all_metric(metric_name, resource_id)
+
+ elif metric_type == MetricType.INSTANCE:
+ return self._collect_instance_metric(metric_name, resource_id)
+
+ elif metric_type == MetricType.INSTANCEDISK:
+ return self._collect_instance_disk_metric(metric_name, resource_id)
+
+ else:
+ raise Exception("Unknown metric type %s" % metric_type.value)
+
+ def _collect_interface_all_metric(self, openstack_metric_name, resource_id):
+ total_measure = None
+ interfaces = self.client.resource.search(
+ resource_type="instance_network_interface",
+ query={"=": {"instance_id": resource_id}},
+ )
+ for interface in interfaces:
+ try:
+ measures = self.client.metric.get_measures(
+ openstack_metric_name, resource_id=interface["id"], limit=1
+ )
+ if measures:
+ if not total_measure:
+ total_measure = 0.0
+ total_measure += measures[-1][2]
+ except (gnocchiclient.exceptions.NotFound, TypeError) as e:
+ # Gnocchi in some Openstack versions raise TypeError instead of NotFound
+ log.debug(
+ "No metric %s found for interface %s: %s",
+ openstack_metric_name,
+ interface["id"],
+ e,
+ )
+ return total_measure
+
+ def _collect_instance_disk_metric(self, openstack_metric_name, resource_id):
+ value = None
+ instances = self.client.resource.search(
+ resource_type="instance_disk",
+ query={"=": {"instance_id": resource_id}},
+ )
+ for instance in instances:
+ try:
+ measures = self.client.metric.get_measures(
+ openstack_metric_name, resource_id=instance["id"], limit=1
+ )
+ if measures:
+ value = measures[-1][2]
+
+ except gnocchiclient.exceptions.NotFound as e:
+ log.debug(
+ "No metric %s found for instance disk %s: %s",
+ openstack_metric_name,
+ instance["id"],
+ e,
+ )
+ return value
+
+ def _collect_instance_metric(self, openstack_metric_name, resource_id):
+ value = None
+ try:
+ aggregation = METRIC_AGGREGATORS.get(openstack_metric_name)
+
+ try:
+ measures = self.client.metric.get_measures(
+ openstack_metric_name,
+ aggregation=aggregation,
+ start=time.time() - 1200,
+ resource_id=resource_id,
+ )
+ if measures:
+ value = measures[-1][2]
+ except (
+ gnocchiclient.exceptions.NotFound,
+ gnocchiclient.exceptions.BadRequest,
+ TypeError,
+ ) as e:
+ # CPU metric in previous Openstack versions do not support rate:mean aggregation method
+ # Gnocchi in some Openstack versions raise TypeError instead of NotFound or BadRequest
+ if openstack_metric_name == "cpu":
+ log.debug(
+ "No metric %s found for instance %s: %s",
+ openstack_metric_name,
+ resource_id,
+ e,
+ )
+ log.info(
+ "Retrying to get metric %s for instance %s without aggregation",
+ openstack_metric_name,
+ resource_id,
+ )
+ measures = self.client.metric.get_measures(
+ openstack_metric_name, resource_id=resource_id, limit=1
+ )
+ else:
+ raise e
+ # measures[-1] is the last measure
+ # measures[-2] is the previous measure
+ # measures[x][2] is the value of the metric
+ if measures and len(measures) >= 2:
+ value = measures[-1][2] - measures[-2][2]
+ if value:
+ # measures[-1][0] is the time of the reporting interval
+ # measures[-1][1] is the duration of the reporting interval
+ if aggregation:
+ # If this is an aggregate, we need to divide the total over the reported time period.
+ # Even if the aggregation method is not supported by Openstack, the code will execute it
+ # because aggregation is specified in METRIC_AGGREGATORS
+ value = value / measures[-1][1]
+ if openstack_metric_name in METRIC_MULTIPLIERS:
+ value = value * METRIC_MULTIPLIERS[openstack_metric_name]
+ except gnocchiclient.exceptions.NotFound as e:
+ log.debug(
+ "No metric %s found for instance %s: %s",
+ openstack_metric_name,
+ resource_id,
+ e,
+ )
+ return value
+
+
+class CeilometerBackend(OpenstackBackend):
+ def __init__(self, vim_account: dict, vim_session: object):
+ self.client = self._build_ceilometer_client(vim_account, vim_session)
+
+ def _build_ceilometer_client(
+ self, vim_account: dict, vim_session: object
+ ) -> ceilometer_client.Client:
+ return ceilometer_client.Client("2", session=vim_session)
+
+ def collect_metric(
+ self, metric_type: MetricType, metric_name: str, resource_id: str
+ ):
+ if metric_type != MetricType.INSTANCE:
+ raise NotImplementedError(
+ "Ceilometer backend only support instance metrics"
+ )
+ measures = self.client.samples.list(
+ meter_name=metric_name,
+ limit=1,
+ q=[{"field": "resource_id", "op": "eq", "value": resource_id}],
+ )
+ return measures[0].counter_volume if measures else None
#######################################################################################
[tox]
-envlist = black, flake8
+envlist = black, flake8, pylint, pylint-webhook
[tox:jenkins]
toxworkdir = /tmp/.tox
[testenv]
usedevelop = True
-basepython = python3
+basepython = python3.10
setenv = VIRTUAL_ENV={envdir}
PYTHONDONTWRITEBYTECODE = 1
deps = -r{toxinidir}/requirements.txt
pylint
skip_install = true
commands =
- pylint -E src
- pylint -E osm_webhook_translator
+ pylint -E src setup.py --disable=E0401
+
+
+[testenv:pylint-webhook]
+changedir = {toxinidir}/osm_webhook_translator
+deps = -r{toxinidir}/osm_webhook_translator/requirements.txt
+ pylint
+skip_install = true
+commands =
+ pylint -E src setup.py
#######################################################################################
#######################################################################################
[testenv:pip-compile]
-deps = pip-tools==6.6.2
+deps = pip-tools==6.13.0
skip_install = true
allowlist_externals =
bash
bash -c "for file in requirements*.in ; do \
UNSAFE="" ; \
if [[ $file =~ 'dist' ]] ; then UNSAFE='--allow-unsafe' ; fi ; \
- pip-compile -rU --no-header $UNSAFE $file ;\
+ pip-compile -rU --resolver=backtracking --no-header $UNSAFE $file ;\
+ out=`echo $file | sed 's/.in/.txt/'` ; \
+ sed -i -e '1 e head -16 tox.ini' $out ;\
+ done"
+ bash -c "for file in osm_webhook_translator/requirements*.in ; do \
+ UNSAFE="" ; \
+ if [[ $file =~ 'dist' ]] ; then UNSAFE='--allow-unsafe' ; fi ; \
+ pip-compile --resolver=backtracking -rU --no-header $UNSAFE $file ;\
out=`echo $file | sed 's/.in/.txt/'` ; \
sed -i -e '1 e head -16 tox.ini' $out ;\
done"