From 5ac7c081ca13495185ecf6bdf302c16c25a4b759 Mon Sep 17 00:00:00 2001 From: Benjamin Diaz Date: Wed, 6 Feb 2019 11:58:00 -0300 Subject: [PATCH] Implements filebased config, config override through env vars, use of osm common msg bus drivers Change-Id: I2ae26408f03a7faf86d5621efda50df948c55951 Signed-off-by: Benjamin Diaz --- MANIFEST.in | 2 +- Makefile | 21 ++++ README.rst | 4 +- debian/python3-osm-mon.postinst | 22 +++++ docker/Dockerfile | 19 ++-- osm_mon/cmd/mon_collector.py | 21 ++-- osm_mon/cmd/mon_evaluator.py | 21 ++-- osm_mon/cmd/mon_healthcheck.py | 17 ++-- osm_mon/cmd/mon_server.py | 25 ++--- osm_mon/collector/collector.py | 26 ++--- .../collector/infra_collectors/base_vim.py | 4 +- .../collector/infra_collectors/openstack.py | 7 +- osm_mon/collector/vnf_collectors/base.py | 4 + osm_mon/collector/vnf_collectors/base_vim.py | 5 +- osm_mon/collector/vnf_collectors/juju.py | 11 ++- osm_mon/collector/vnf_collectors/openstack.py | 14 +-- osm_mon/collector/vnf_collectors/vmware.py | 22 ++--- osm_mon/core/auth.py | 6 +- osm_mon/core/common_db.py | 18 ++-- osm_mon/core/config.py | 73 ++++++++++++++ osm_mon/core/database.py | 8 +- osm_mon/core/message_bus/__init__.py | 21 ---- osm_mon/core/message_bus/consumer.py | 15 --- osm_mon/core/message_bus_client.py | 72 ++++++++++++++ .../{message_bus/producer.py => mon.yaml} | 42 ++++++-- osm_mon/core/settings.py | 95 ------------------- osm_mon/core/singleton.py | 40 -------- osm_mon/evaluator/evaluator.py | 32 +++---- osm_mon/server/__init__.py | 22 +++++ osm_mon/server/server.py | 72 ++++---------- osm_mon/tests/collector/test_collector.py | 14 +-- osm_mon/tests/common/__init__.py | 0 .../{common => core}/test_common_db_client.py | 6 +- osm_mon/tests/core/test_database.py | 5 +- osm_mon/tests/core/test_message_bus_client.py | 73 ++++++++++++++ requirements.txt | 7 +- 36 files changed, 497 insertions(+), 369 deletions(-) create mode 100644 osm_mon/core/config.py delete mode 100755 osm_mon/core/message_bus/__init__.py delete mode 100644 osm_mon/core/message_bus/consumer.py create mode 100644 osm_mon/core/message_bus_client.py rename osm_mon/core/{message_bus/producer.py => mon.yaml} (64%) delete mode 100644 osm_mon/core/settings.py delete mode 100644 osm_mon/core/singleton.py delete mode 100644 osm_mon/tests/common/__init__.py rename osm_mon/tests/{common => core}/test_common_db_client.py (96%) create mode 100644 osm_mon/tests/core/test_message_bus_client.py diff --git a/MANIFEST.in b/MANIFEST.in index e63d839..6e7058b 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -21,6 +21,6 @@ include requirements.txt include README.rst -recursive-include osm_mon *.py *.xml *.sh +recursive-include osm_mon *.py *.xml *.sh *.yaml recursive-include devops-stages * recursive-include test *.py diff --git a/Makefile b/Makefile index 99de6f4..902909a 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,24 @@ +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## + all: clean package clean: diff --git a/README.rst b/README.rst index ef34a3d..ffec126 100644 --- a/README.rst +++ b/README.rst @@ -39,8 +39,8 @@ MON module has the following components: Supported Collector Plugins *************************** - - OpenStack: Requires Aodh and Gnocchi to be enabled. - - VROPS: TBD + - OpenStack: Requires Gnocchi to be enabled. + - VROPS - AWS: TBD Developers diff --git a/debian/python3-osm-mon.postinst b/debian/python3-osm-mon.postinst index 06e6781..8ab760d 100644 --- a/debian/python3-osm-mon.postinst +++ b/debian/python3-osm-mon.postinst @@ -1,5 +1,27 @@ #!/bin/bash +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## + echo "Installing python dependencies via pip..." pip3 install aiokafka==0.4.* pip3 install requests==2.18.* diff --git a/docker/Dockerfile b/docker/Dockerfile index 8079f23..dca983c 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -36,14 +36,17 @@ COPY . /mon RUN pip3 install /mon -ENV BROKER_URI kafka:9092 -ENV MONGO_URI mongodb://mongo:27017 -ENV DATABASE sqlite:///mon_sqlite.db -ENV OS_NOTIFIER_URI localhost:8662 -ENV OS_DEFAULT_GRANULARITY 300 -ENV OSMMON_REQUEST_TIMEOUT 10 -ENV OSMMON_LOG_LEVEL INFO -ENV OSMMON_KAFKA_LOG_LEVEL INFO +ENV OSMMON_MESSAGE_DRIVER kafka +ENV OSMMON_MESSAGE_HOST kafka +ENV OSMMON_MESSAGE_PORT 9092 + +ENV OSMMON_DATABASE_DRIVER mongo +ENV OSMMON_DATABASE_URI mongodb://mongo:27017 + +ENV OSMMON_SQL_DATABASE_URI sqlite:///mon_sqlite.db +ENV OSMMON_OPENSTACK_DEFAULT_GRANULARITY 300 +ENV OSMMON_GLOBAL_REQUEST_TIMEOUT 10 +ENV OSMMON_GLOBAL_LOGLEVEL INFO ENV OSMMON_VCA_HOST localhost ENV OSMMON_VCA_SECRET secret ENV OSMMON_VCA_USER admin diff --git a/osm_mon/cmd/mon_collector.py b/osm_mon/cmd/mon_collector.py index c4e2969..3e493de 100644 --- a/osm_mon/cmd/mon_collector.py +++ b/osm_mon/cmd/mon_collector.py @@ -21,36 +21,37 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## +import argparse import logging import sys from osm_mon.collector.collector import Collector +from osm_mon.core.config import Config from osm_mon.core.database import DatabaseManager -from osm_mon.core.settings import Config def main(): - cfg = Config.instance() + parser = argparse.ArgumentParser(prog='osm-policy-agent') + parser.add_argument('--config-file', nargs='?', help='POL configuration file') + args = parser.parse_args() + cfg = Config(args.config_file) root = logging.getLogger() - root.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL)) + root.setLevel(logging.getLevelName(cfg.get('global', 'loglevel'))) ch = logging.StreamHandler(sys.stdout) - ch.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL)) + ch.setLevel(logging.getLevelName(cfg.get('global', 'loglevel'))) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p') ch.setFormatter(formatter) root.addHandler(ch) - kafka_logger = logging.getLogger('kafka') - kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL)) - log = logging.getLogger(__name__) log.info("Starting MON Collector...") - log.debug("Config: %s", vars(cfg)) + log.debug("Config: %s", cfg.conf) log.info("Initializing database...") - db_manager = DatabaseManager() + db_manager = DatabaseManager(cfg) db_manager.create_tables() log.info("Database initialized correctly.") - collector = Collector() + collector = Collector(cfg) collector.collect_forever() diff --git a/osm_mon/cmd/mon_evaluator.py b/osm_mon/cmd/mon_evaluator.py index 79adabd..3835d7e 100644 --- a/osm_mon/cmd/mon_evaluator.py +++ b/osm_mon/cmd/mon_evaluator.py @@ -21,36 +21,37 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## +import argparse import logging import sys +from osm_mon.core.config import Config from osm_mon.core.database import DatabaseManager -from osm_mon.core.settings import Config from osm_mon.evaluator.evaluator import Evaluator def main(): - cfg = Config.instance() + parser = argparse.ArgumentParser(prog='osm-policy-agent') + parser.add_argument('--config-file', nargs='?', help='POL configuration file') + args = parser.parse_args() + cfg = Config(args.config_file) root = logging.getLogger() - root.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL)) + root.setLevel(logging.getLevelName(cfg.get('global', 'loglevel'))) ch = logging.StreamHandler(sys.stdout) - ch.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL)) + ch.setLevel(logging.getLevelName(cfg.get('global', 'loglevel'))) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p') ch.setFormatter(formatter) root.addHandler(ch) - kafka_logger = logging.getLogger('kafka') - kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL)) - log = logging.getLogger(__name__) log.info("Starting MON Evaluator...") - log.debug("Config: %s", vars(cfg)) + log.debug("Config: %s", cfg.conf) log.info("Initializing database...") - db_manager = DatabaseManager() + db_manager = DatabaseManager(cfg) db_manager.create_tables() log.info("Database initialized correctly.") - evaluator = Evaluator() + evaluator = Evaluator(cfg) evaluator.evaluate_forever() diff --git a/osm_mon/cmd/mon_healthcheck.py b/osm_mon/cmd/mon_healthcheck.py index 1fa2c2b..cc6fd8f 100644 --- a/osm_mon/cmd/mon_healthcheck.py +++ b/osm_mon/cmd/mon_healthcheck.py @@ -19,6 +19,7 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## +import argparse import asyncio import logging import subprocess @@ -27,16 +28,20 @@ import sys import requests from aiokafka import AIOKafkaConsumer -from osm_mon.core.settings import Config +from osm_mon.core.config import Config log = logging.getLogger(__name__) def main(): - # Check Kafka + parser = argparse.ArgumentParser(prog='osm-mon-healthcheck') + parser.add_argument('--config-file', nargs='?', help='MON configuration file') + args = parser.parse_args() + cfg = Config(args.config_file) + if not _processes_running(): sys.exit(1) - if not _is_kafka_ok(): + if not _is_kafka_ok(cfg.get('message', 'host'), cfg.get('message', 'port')): sys.exit(1) if not _is_prometheus_exporter_ok(): sys.exit(1) @@ -49,6 +54,7 @@ def _processes_running(): if process_name in row: return True return False + processes_to_check = ['osm-mon-collector', 'osm-mon-evaluator', 'osm-mon-server'] ps = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE).communicate()[0] processes_running = ps.decode().split('\n') @@ -68,12 +74,11 @@ def _is_prometheus_exporter_ok(): return False -def _is_kafka_ok(): +def _is_kafka_ok(host, port): async def _test_kafka(loop): - cfg = Config.instance() consumer = AIOKafkaConsumer( 'healthcheck', - loop=loop, bootstrap_servers=cfg.BROKER_URI) + loop=loop, bootstrap_servers='{}:{}'.format(host, port)) await consumer.start() await consumer.stop() diff --git a/osm_mon/cmd/mon_server.py b/osm_mon/cmd/mon_server.py index 34fe0b1..b23aa20 100644 --- a/osm_mon/cmd/mon_server.py +++ b/osm_mon/cmd/mon_server.py @@ -21,36 +21,39 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## +import argparse +import asyncio import logging import sys +from osm_mon.core.config import Config from osm_mon.core.database import DatabaseManager -from osm_mon.core.settings import Config from osm_mon.server.server import Server def main(): - cfg = Config.instance() + parser = argparse.ArgumentParser(prog='osm-policy-agent') + parser.add_argument('--config-file', nargs='?', help='POL configuration file') + args = parser.parse_args() + cfg = Config(args.config_file) root = logging.getLogger() - root.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL)) + root.setLevel(logging.getLevelName(cfg.get('global', 'loglevel'))) ch = logging.StreamHandler(sys.stdout) - ch.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL)) + ch.setLevel(logging.getLevelName(cfg.get('global', 'loglevel'))) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p') ch.setFormatter(formatter) root.addHandler(ch) - kafka_logger = logging.getLogger('kafka') - kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL)) - log = logging.getLogger(__name__) - log.info("Starting MON Server...") - log.debug("Config: %s", vars(cfg)) + log.debug("Config: %s", cfg.conf) log.info("Initializing database...") - db_manager = DatabaseManager() + db_manager = DatabaseManager(cfg) db_manager.create_tables() + log.info("Starting MON Server...") log.info("Database initialized correctly.") - server = Server() + loop = asyncio.get_event_loop() + server = Server(cfg, loop) server.run() diff --git a/osm_mon/collector/collector.py b/osm_mon/collector/collector.py index 36ce1b0..aad395a 100644 --- a/osm_mon/collector/collector.py +++ b/osm_mon/collector/collector.py @@ -27,14 +27,14 @@ import time import peewee from osm_mon.collector.backends.prometheus import PrometheusBackend -from osm_mon.collector.vnf_collectors.vmware import VMwareCollector from osm_mon.collector.infra_collectors.openstack import OpenstackInfraCollector from osm_mon.collector.metric import Metric from osm_mon.collector.vnf_collectors.juju import VCACollector from osm_mon.collector.vnf_collectors.openstack import OpenstackCollector +from osm_mon.collector.vnf_collectors.vmware import VMwareCollector from osm_mon.core.common_db import CommonDbClient +from osm_mon.core.config import Config from osm_mon.core.database import DatabaseManager -from osm_mon.core.settings import Config log = logging.getLogger(__name__) @@ -51,21 +51,21 @@ METRIC_BACKENDS = [ class Collector: - def __init__(self): - self.common_db = CommonDbClient() + def __init__(self, config: Config): + self.conf = config + self.common_db = CommonDbClient(self.conf) self.plugins = [] - self.database_manager = DatabaseManager() + self.database_manager = DatabaseManager(self.conf) self.database_manager.create_tables() self.queue = multiprocessing.Queue() self._init_backends() def collect_forever(self): log.debug('collect_forever') - cfg = Config.instance() while True: try: self.collect_metrics() - time.sleep(cfg.OSMMON_COLLECTOR_INTERVAL) + time.sleep(int(self.conf.get('collector', 'interval'))) except peewee.PeeweeException: log.exception("Database error consuming message: ") raise @@ -74,10 +74,10 @@ class Collector: def _collect_vim_metrics(self, vnfr: dict, vim_account_id: str): # TODO(diazb) Add support for vrops and aws - database_manager = DatabaseManager() + database_manager = DatabaseManager(self.conf) vim_type = database_manager.get_vim_type(vim_account_id) if vim_type in VIM_COLLECTORS: - collector = VIM_COLLECTORS[vim_type](vim_account_id) + collector = VIM_COLLECTORS[vim_type](self.conf, vim_account_id) metrics = collector.collect(vnfr) for metric in metrics: self.queue.put(metric) @@ -85,10 +85,10 @@ class Collector: log.debug("vimtype %s is not supported.", vim_type) def _collect_vim_infra_metrics(self, vim_account_id: str): - database_manager = DatabaseManager() + database_manager = DatabaseManager(self.conf) vim_type = database_manager.get_vim_type(vim_account_id) if vim_type in VIM_INFRA_COLLECTORS: - collector = VIM_INFRA_COLLECTORS[vim_type](vim_account_id) + collector = VIM_INFRA_COLLECTORS[vim_type](self.conf, vim_account_id) status = collector.is_vim_ok() status_metric = Metric({'vim_id': vim_account_id}, 'vim_status', status) self.queue.put(status_metric) @@ -98,7 +98,7 @@ class Collector: def _collect_vca_metrics(self, vnfr: dict): log.debug('_collect_vca_metrics') log.debug('vnfr: %s', vnfr) - vca_collector = VCACollector() + vca_collector = VCACollector(self.conf) metrics = vca_collector.collect(vnfr) for metric in metrics: self.queue.put(metric) @@ -125,7 +125,7 @@ class Collector: processes.append(p) p.start() for process in processes: - process.join() + process.join(timeout=10) metrics = [] while not self.queue.empty(): metrics.append(self.queue.get()) diff --git a/osm_mon/collector/infra_collectors/base_vim.py b/osm_mon/collector/infra_collectors/base_vim.py index 0a075a1..2f5954a 100644 --- a/osm_mon/collector/infra_collectors/base_vim.py +++ b/osm_mon/collector/infra_collectors/base_vim.py @@ -19,11 +19,13 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## +from osm_mon.core.config import Config + from osm_mon.collector.infra_collectors.base import BaseInfraCollector class BaseVimInfraCollector(BaseInfraCollector): - def __init__(self, vim_account_id: str): + def __init__(self, config: Config, vim_account_id: str): pass def is_vim_ok(self) -> bool: diff --git a/osm_mon/collector/infra_collectors/openstack.py b/osm_mon/collector/infra_collectors/openstack.py index 4237e4f..5f62edf 100644 --- a/osm_mon/collector/infra_collectors/openstack.py +++ b/osm_mon/collector/infra_collectors/openstack.py @@ -27,14 +27,15 @@ from keystoneclient.v3 import client from osm_mon.collector.infra_collectors.base_vim import BaseVimInfraCollector from osm_mon.core.auth import AuthManager +from osm_mon.core.config import Config log = logging.getLogger(__name__) class OpenstackInfraCollector(BaseVimInfraCollector): - def __init__(self, vim_account_id: str): - super().__init__(vim_account_id) - self.auth_manager = AuthManager() + def __init__(self, config: Config, vim_account_id: str): + super().__init__(config, vim_account_id) + self.auth_manager = AuthManager(config) self.keystone_client = self._build_keystone_client(vim_account_id) def is_vim_ok(self) -> bool: diff --git a/osm_mon/collector/vnf_collectors/base.py b/osm_mon/collector/vnf_collectors/base.py index 824e106..0ec12b5 100644 --- a/osm_mon/collector/vnf_collectors/base.py +++ b/osm_mon/collector/vnf_collectors/base.py @@ -22,8 +22,12 @@ from typing import List from osm_mon.collector.metric import Metric +from osm_mon.core.config import Config class BaseCollector: + def __init__(self, config: Config): + pass + def collect(self, vnfr: dict) -> List[Metric]: pass diff --git a/osm_mon/collector/vnf_collectors/base_vim.py b/osm_mon/collector/vnf_collectors/base_vim.py index 29a348f..6c270f4 100644 --- a/osm_mon/collector/vnf_collectors/base_vim.py +++ b/osm_mon/collector/vnf_collectors/base_vim.py @@ -21,8 +21,9 @@ ## from osm_mon.collector.vnf_collectors.base import BaseCollector +from osm_mon.core.config import Config class BaseVimCollector(BaseCollector): - def __init__(self, vim_account_id: str): - pass + def __init__(self, config: Config, vim_account_id: str): + super().__init__(config) diff --git a/osm_mon/collector/vnf_collectors/juju.py b/osm_mon/collector/vnf_collectors/juju.py index 928b35a..6c5e314 100644 --- a/osm_mon/collector/vnf_collectors/juju.py +++ b/osm_mon/collector/vnf_collectors/juju.py @@ -29,18 +29,19 @@ from osm_mon.collector.metric import Metric from osm_mon.collector.vnf_collectors.base import BaseCollector from osm_mon.collector.vnf_metric import VnfMetric from osm_mon.core.common_db import CommonDbClient +from osm_mon.core.config import Config from osm_mon.core.exceptions import VcaDeploymentInfoNotFound -from osm_mon.core.settings import Config log = logging.getLogger(__name__) class VCACollector(BaseCollector): - def __init__(self): - cfg = Config.instance() - self.common_db = CommonDbClient() + def __init__(self, config: Config): + super().__init__(config) + self.common_db = CommonDbClient(config) self.loop = asyncio.get_event_loop() - self.n2vc = N2VC(server=cfg.OSMMON_VCA_HOST, user=cfg.OSMMON_VCA_USER, secret=cfg.OSMMON_VCA_SECRET) + self.n2vc = N2VC(server=config.get('vca', 'host'), user=config.get('vca', 'user'), + secret=config.get('vca', 'secret')) def collect(self, vnfr: dict) -> List[Metric]: nsr_id = vnfr['nsr-id-ref'] diff --git a/osm_mon/collector/vnf_collectors/openstack.py b/osm_mon/collector/vnf_collectors/openstack.py index 8dbab5c..ba7097e 100644 --- a/osm_mon/collector/vnf_collectors/openstack.py +++ b/osm_mon/collector/vnf_collectors/openstack.py @@ -34,7 +34,7 @@ from osm_mon.collector.vnf_collectors.base_vim import BaseVimCollector from osm_mon.collector.vnf_metric import VnfMetric from osm_mon.core.auth import AuthManager from osm_mon.core.common_db import CommonDbClient -from osm_mon.core.settings import Config +from osm_mon.core.config import Config log = logging.getLogger(__name__) @@ -52,10 +52,11 @@ METRIC_MAPPINGS = { class OpenstackCollector(BaseVimCollector): - def __init__(self, vim_account_id: str): - super().__init__(vim_account_id) - self.common_db = CommonDbClient() - self.auth_manager = AuthManager() + def __init__(self, config: Config, vim_account_id: str): + super().__init__(config, vim_account_id) + self.conf = config + self.common_db = CommonDbClient(config) + self.auth_manager = AuthManager(config) self.granularity = self._get_granularity(vim_account_id) self.gnocchi_client = self._build_gnocchi_client(vim_account_id) @@ -81,8 +82,7 @@ class OpenstackCollector(BaseVimCollector): if 'granularity' in vim_config: return int(vim_config['granularity']) else: - cfg = Config.instance() - return cfg.OS_DEFAULT_GRANULARITY + return int(self.conf.get('openstack', 'default_granularity')) def collect(self, vnfr: dict) -> List[Metric]: nsr_id = vnfr['nsr-id-ref'] diff --git a/osm_mon/collector/vnf_collectors/vmware.py b/osm_mon/collector/vnf_collectors/vmware.py index 7402afb..ba499c7 100644 --- a/osm_mon/collector/vnf_collectors/vmware.py +++ b/osm_mon/collector/vnf_collectors/vmware.py @@ -36,7 +36,7 @@ from osm_mon.collector.vnf_collectors.base_vim import BaseVimCollector from osm_mon.collector.vnf_metric import VnfMetric from osm_mon.core.auth import AuthManager from osm_mon.core.common_db import CommonDbClient -from osm_mon.core.settings import Config +from osm_mon.core.config import Config log = logging.getLogger(__name__) @@ -67,11 +67,10 @@ requests.packages.urllib3.disable_warnings() class VMwareCollector(BaseVimCollector): - def __init__(self, vim_account_id: str): - super().__init__(vim_account_id) - self.common_db = CommonDbClient() - self.auth_manager = AuthManager() - self.granularity = self._get_granularity(vim_account_id) + def __init__(self, config: Config, vim_account_id: str): + super().__init__(config, vim_account_id) + self.common_db = CommonDbClient(config) + self.auth_manager = AuthManager(config) vim_account = self.get_vim_account(vim_account_id) self.vrops_site = vim_account['vrops_site'] self.vrops_user = vim_account['vrops_user'] @@ -92,6 +91,7 @@ class VMwareCollector(BaseVimCollector): log.info("Logging into vCD org as admin.") + admin_user = None try: host = self.vcloud_site admin_user = self.admin_username @@ -151,21 +151,13 @@ class VMwareCollector(BaseVimCollector): return vim_account - def _get_granularity(self, vim_account_id: str): - creds = self.auth_manager.get_credentials(vim_account_id) - vim_config = json.loads(creds.config) - if 'granularity' in vim_config: - return int(vim_config['granularity']) - else: - cfg = Config.instance() - return cfg.OS_DEFAULT_GRANULARITY - def get_vm_moref_id(self, vapp_uuid): """ Method to get the moref_id of given VM arg - vapp_uuid return - VM mored_id """ + vm_moref_id = None try: if vapp_uuid: vm_details = self.get_vapp_details_rest(vapp_uuid) diff --git a/osm_mon/core/auth.py b/osm_mon/core/auth.py index 71a817e..4627a30 100644 --- a/osm_mon/core/auth.py +++ b/osm_mon/core/auth.py @@ -25,14 +25,16 @@ import json import logging +from osm_mon.core.config import Config + from osm_mon.core.database import VimCredentials, DatabaseManager log = logging.getLogger(__name__) class AuthManager: - def __init__(self): - self.database_manager = DatabaseManager() + def __init__(self, config: Config): + self.database_manager = DatabaseManager(config) def store_auth_credentials(self, creds_dict): log.info(creds_dict) diff --git a/osm_mon/core/common_db.py b/osm_mon/core/common_db.py index 9cc9c06..5922290 100644 --- a/osm_mon/core/common_db.py +++ b/osm_mon/core/common_db.py @@ -21,18 +21,20 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## -from osm_common import dbmongo +from osm_common import dbmongo, dbmemory -from osm_mon.core.settings import Config +from osm_mon.core.config import Config class CommonDbClient: - def __init__(self): - cfg = Config.instance() - self.common_db = dbmongo.DbMongo() - self.common_db.db_connect({'uri': cfg.MONGO_URI, - 'name': 'osm', - 'commonkey': cfg.OSMMON_DATABASE_COMMONKEY}) + def __init__(self, config: Config): + if config.get('database', 'driver') == "mongo": + self.common_db = dbmongo.DbMongo() + elif config.get('database', 'driver') == "memory": + self.common_db = dbmemory.DbMemory() + else: + raise Exception("Unknown database driver {}".format(config.get('section', 'driver'))) + self.common_db.db_connect(config.get("database")) def get_vnfr(self, nsr_id: str, member_index: int): vnfr = self.common_db.get_one("vnfrs", diff --git a/osm_mon/core/config.py b/osm_mon/core/config.py new file mode 100644 index 0000000..c4c3972 --- /dev/null +++ b/osm_mon/core/config.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- + +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## +"""Global configuration managed by environment variables.""" + +import logging +import os + +import pkg_resources +import yaml + +logger = logging.getLogger(__name__) + + +class Config: + def __init__(self, config_file: str = ''): + self.conf = {} + self._read_config_file(config_file) + self._read_env() + + def _read_config_file(self, config_file): + if not config_file: + path = 'mon.yaml' + config_file = pkg_resources.resource_filename(__name__, path) + with open(config_file) as f: + self.conf = yaml.load(f) + + def get(self, section, field=None): + if not field: + return self.conf[section] + return self.conf[section][field] + + def set(self, section, field, value): + if section not in self.conf: + self.conf[section] = {} + self.conf[section][field] = value + + def _read_env(self): + for env in os.environ: + if not env.startswith("OSMMON_"): + continue + elements = env.lower().split("_") + if len(elements) < 3: + logger.warning( + "Environment variable %s=%s does not comply with required format. Section and/or field missing.", + env, os.getenv(env)) + continue + section = elements[1] + field = '_'.join(elements[2:]) + value = os.getenv(env) + if section not in self.conf: + self.conf[section] = {} + self.conf[section][field] = value diff --git a/osm_mon/core/database.py b/osm_mon/core/database.py index 4cbd75f..eca08e9 100644 --- a/osm_mon/core/database.py +++ b/osm_mon/core/database.py @@ -28,7 +28,7 @@ import uuid from peewee import CharField, TextField, FloatField, Model, AutoField, Proxy from playhouse.db_url import connect -from osm_mon.core.settings import Config +from osm_mon.core.config import Config log = logging.getLogger(__name__) @@ -67,10 +67,8 @@ class Alarm(BaseModel): class DatabaseManager: - def __init__(self): - cfg = Config.instance() - cfg.read_environ() - db.initialize(connect(cfg.DATABASE)) + def __init__(self, config: Config): + db.initialize(connect(config.get('sql', 'database_uri'))) def create_tables(self) -> None: with db.atomic(): diff --git a/osm_mon/core/message_bus/__init__.py b/osm_mon/core/message_bus/__init__.py deleted file mode 100755 index 32eb94e..0000000 --- a/osm_mon/core/message_bus/__init__.py +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright 2017 Intel Research and Development Ireland Limited -# ************************************************************* - -# This file is part of OSM Monitoring module -# All Rights Reserved to Intel Corporation - -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# For those usages not covered by the Apache License, Version 2.0 please -# contact: helena.mcgough@intel.com or adrian.hoban@intel.com -## diff --git a/osm_mon/core/message_bus/consumer.py b/osm_mon/core/message_bus/consumer.py deleted file mode 100644 index c0a9dd0..0000000 --- a/osm_mon/core/message_bus/consumer.py +++ /dev/null @@ -1,15 +0,0 @@ -from kafka import KafkaConsumer - -from osm_mon.core.settings import Config - - -# noinspection PyAbstractClass -class Consumer(KafkaConsumer): - def __init__(self, group_id, **kwargs): - cfg = Config.instance() - super().__init__(bootstrap_servers=cfg.BROKER_URI, - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - max_poll_interval_ms=180000, - group_id=group_id, - **kwargs) diff --git a/osm_mon/core/message_bus_client.py b/osm_mon/core/message_bus_client.py new file mode 100644 index 0000000..6a7ef60 --- /dev/null +++ b/osm_mon/core/message_bus_client.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- + +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## +import asyncio +from typing import List, Callable + +from osm_common import msgkafka, msglocal + +from osm_mon.core.config import Config + + +class MessageBusClient: + def __init__(self, config: Config, loop=None): + if config.get('message', 'driver') == "local": + self.msg_bus = msglocal.MsgLocal() + elif config.get('message', 'driver') == "kafka": + self.msg_bus = msgkafka.MsgKafka() + else: + raise Exception("Unknown message bug driver {}".format(config.get('section', 'driver'))) + self.msg_bus.connect(config.get('message')) + if not loop: + loop = asyncio.get_event_loop() + self.loop = loop + + async def aioread(self, topics: List[str], callback: Callable = None, **kwargs): + """ + Retrieves messages continuously from bus and executes callback for each message consumed. + :param topics: List of message bus topics to consume from. + :param callback: Async callback function to be called for each message received. + :param kwargs: Keyword arguments to be passed to callback function. + :return: None + """ + await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs) + + async def aiowrite(self, topic: str, key: str, msg: dict): + """ + Writes message to bus. + :param topic: Topic to write to. + :param key: Key to write to. + :param msg: Dictionary containing message to be written. + :return: None + """ + await self.msg_bus.aiowrite(topic, key, msg, self.loop) + + async def aioread_once(self, topic: str): + """ + Retrieves last message from bus. + :param topic: topic to retrieve message from. + :return: tuple(topic, key, message) + """ + result = await self.msg_bus.aioread(topic, self.loop) + return result diff --git a/osm_mon/core/message_bus/producer.py b/osm_mon/core/mon.yaml similarity index 64% rename from osm_mon/core/message_bus/producer.py rename to osm_mon/core/mon.yaml index 573e332..b1607ec 100644 --- a/osm_mon/core/message_bus/producer.py +++ b/osm_mon/core/mon.yaml @@ -19,17 +19,39 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## -from kafka import KafkaProducer -from osm_mon.core.settings import Config +global: + loglevel: INFO + request_timeout: 10 +database: + driver: mongo + uri: mongodb://mongo:27017 + name: osm + commonkey: changeme -class Producer(KafkaProducer): - def __init__(self): - cfg = Config.instance() - super().__init__(bootstrap_servers=cfg.BROKER_URI, - key_serializer=str.encode, - value_serializer=str.encode) +message: + driver: kafka + host: kafka + port: 9092 + group_id: mon-consumer - def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): - return super().send(topic, value, key, partition, timestamp_ms) +sql: + database_uri: sqlite:///mon_sqlite.db + +collector: + interval: 30 + +evaluator: + interval: 30 + +prometheus: + url: http://prometheus:9090 + +vca: + host: localhost + secret: secret + user: admin + +openstack: + default_granularity: 300 \ No newline at end of file diff --git a/osm_mon/core/settings.py b/osm_mon/core/settings.py deleted file mode 100644 index a5d352b..0000000 --- a/osm_mon/core/settings.py +++ /dev/null @@ -1,95 +0,0 @@ -# Copyright 2017 Intel Research and Development Ireland Limited -# ************************************************************* - -# This file is part of OSM Monitoring module -# All Rights Reserved to Intel Corporation - -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# For those usages not covered by the Apache License, Version 2.0 please -# contact: helena.mcgough@intel.com or adrian.hoban@intel.com -## -"""Global configuration managed by environment variables.""" - -import logging -import os - -from collections import namedtuple - -from osm_mon.core.singleton import Singleton - -import six - -__author__ = "Helena McGough" - -log = logging.getLogger(__name__) - - -class BadConfigError(Exception): - """Configuration exception.""" - - pass - - -class CfgParam(namedtuple('CfgParam', ['key', 'default', 'data_type'])): - """Configuration parameter definition.""" - - def value(self, data): - """Convert a string to the parameter type.""" - try: - return self.data_type(data) - except (ValueError, TypeError): - raise BadConfigError( - 'Invalid value "%s" for configuration parameter "%s"' % ( - data, self.key)) - - -@Singleton -class Config(object): - """Configuration object.""" - - _configuration = [ - CfgParam('BROKER_URI', "localhost:9092", six.text_type), - CfgParam('MONGO_URI', "mongodb://mongo:27017", six.text_type), - CfgParam('DATABASE', "sqlite:///mon_sqlite.db", six.text_type), - CfgParam('OS_DEFAULT_GRANULARITY', 300, int), - CfgParam('OSMMON_REQUEST_TIMEOUT', 10, int), - CfgParam('OSMMON_LOG_LEVEL', "INFO", six.text_type), - CfgParam('OSMMON_KAFKA_LOG_LEVEL', "WARN", six.text_type), - CfgParam('OSMMON_COLLECTOR_INTERVAL', 30, int), - CfgParam('OSMMON_EVALUATOR_INTERVAL', 30, int), - CfgParam('OSMMON_VCA_HOST', "localhost", six.text_type), - CfgParam('OSMMON_VCA_SECRET', "secret", six.text_type), - CfgParam('OSMMON_VCA_USER', "admin", six.text_type), - CfgParam('OSMMON_DATABASE_COMMONKEY', "changeme", six.text_type), - CfgParam('OSMMON_PROMETHEUS_URL', "http://prometheus:9090", six.text_type), - ] - - _config_dict = {cfg.key: cfg for cfg in _configuration} - _config_keys = _config_dict.keys() - - def __init__(self): - """Set the default values.""" - for cfg in self._configuration: - setattr(self, cfg.key, cfg.default) - self.read_environ() - - def read_environ(self): - """Check the appropriate environment variables and update defaults.""" - for key in self._config_keys: - try: - val = self._config_dict[key].data_type(os.environ[key]) - setattr(self, key, val) - except KeyError as exc: - log.debug("Environment variable not present: %s", exc) - return diff --git a/osm_mon/core/singleton.py b/osm_mon/core/singleton.py deleted file mode 100644 index 59c5ee5..0000000 --- a/osm_mon/core/singleton.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright 2017 Intel Research and Development Ireland Limited -# ************************************************************* - -# This file is part of OSM Monitoring module -# All Rights Reserved to Intel Corporation - -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# For those usages not covered by the Apache License, Version 2.0 please -# contact: helena.mcgough@intel.com or adrian.hoban@intel.com -## -"""Simple singleton class.""" - -from __future__ import unicode_literals - - -class Singleton(object): - """Simple singleton class.""" - - def __init__(self, decorated): - """Initialize singleton instance.""" - self._decorated = decorated - - def instance(self): - """Return singleton instance.""" - try: - return self._instance - except AttributeError: - self._instance = self._decorated() - return self._instance diff --git a/osm_mon/evaluator/evaluator.py b/osm_mon/evaluator/evaluator.py index 9dc8c48..76881b9 100644 --- a/osm_mon/evaluator/evaluator.py +++ b/osm_mon/evaluator/evaluator.py @@ -20,7 +20,7 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## -import json +import asyncio import logging import multiprocessing import time @@ -31,21 +31,26 @@ from osm_common.dbbase import DbException from osm_mon.collector.backends.prometheus import OSM_METRIC_PREFIX from osm_mon.core.common_db import CommonDbClient +from osm_mon.core.config import Config from osm_mon.core.database import DatabaseManager, Alarm -from osm_mon.core.message_bus.producer import Producer +from osm_mon.core.message_bus_client import MessageBusClient from osm_mon.core.response import ResponseBuilder -from osm_mon.core.settings import Config log = logging.getLogger(__name__) class Evaluator: - def __init__(self): - self.common_db = CommonDbClient() + def __init__(self, config: Config, loop=None): + self.conf = config + if not loop: + loop = asyncio.get_event_loop() + self.loop = loop + self.common_db = CommonDbClient(self.conf) self.plugins = [] - self.database_manager = DatabaseManager() + self.database_manager = DatabaseManager(self.conf) self.database_manager.create_tables() self.queue = multiprocessing.Queue() + self.msg_bus = MessageBusClient(config) def _evaluate_metric(self, nsr_id: str, @@ -55,12 +60,11 @@ class Evaluator: alarm: Alarm): log.debug("_evaluate_metric") # TODO: Refactor to fit backend plugin model - cfg = Config.instance() query_section = "query={0}{{ns_id=\"{1}\",vdu_name=\"{2}\",vnf_member_index=\"{3}\"}}".format( OSM_METRIC_PREFIX + metric_name, nsr_id, vdur_name, vnf_member_index) - request_url = cfg.OSMMON_PROMETHEUS_URL + "/api/v1/query?" + query_section + request_url = self.conf.get('prometheus', 'url') + "/api/v1/query?" + query_section log.info("Querying Prometheus: %s", request_url) - r = requests.get(request_url, timeout=cfg.OSMMON_REQUEST_TIMEOUT) + r = requests.get(request_url, timeout=int(self.conf.get('global', 'request_timeout'))) if r.status_code == 200: json_response = r.json() if json_response['status'] == 'success': @@ -83,11 +87,10 @@ class Evaluator: def evaluate_forever(self): log.debug('evaluate_forever') - cfg = Config.instance() while True: try: self.evaluate() - time.sleep(cfg.OSMMON_EVALUATOR_INTERVAL) + time.sleep(int(self.conf.get('evaluator', 'interval'))) except peewee.PeeweeException: log.exception("Database error evaluating alarms: ") raise @@ -151,12 +154,11 @@ class Evaluator: p.start() for process in processes: - process.join() + process.join(timeout=10) triggered_alarms = [] while not self.queue.empty(): triggered_alarms.append(self.queue.get()) for alarm in triggered_alarms: - self.notify_alarm(alarm) p = multiprocessing.Process(target=self.notify_alarm, args=(alarm,)) p.start() @@ -178,7 +180,5 @@ class Evaluator: sev=alarm.severity, status='alarm', date=now) - producer = Producer() - producer.send(topic='alarm_response', key='notify_alarm', value=json.dumps(resp_message)) - producer.flush() + self.loop.run_until_complete(self.msg_bus.aiowrite('alarm_response', 'notify_alarm', resp_message)) log.info("Sent alarm notification: %s", resp_message) diff --git a/osm_mon/server/__init__.py b/osm_mon/server/__init__.py index e69de29..4450364 100644 --- a/osm_mon/server/__init__.py +++ b/osm_mon/server/__init__.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- + +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## diff --git a/osm_mon/server/server.py b/osm_mon/server/server.py index 0f17d99..0011618 100755 --- a/osm_mon/server/server.py +++ b/osm_mon/server/server.py @@ -24,63 +24,45 @@ import asyncio import json import logging -from json import JSONDecodeError - -import yaml -from aiokafka import AIOKafkaConsumer, AIOKafkaProducer from osm_mon.core.auth import AuthManager from osm_mon.core.common_db import CommonDbClient +from osm_mon.core.config import Config from osm_mon.core.database import DatabaseManager +from osm_mon.core.message_bus_client import MessageBusClient from osm_mon.core.response import ResponseBuilder -from osm_mon.core.settings import Config log = logging.getLogger(__name__) class Server: - def __init__(self, loop=None): - cfg = Config.instance() + def __init__(self, config: Config, loop=None): + self.conf = config if not loop: loop = asyncio.get_event_loop() self.loop = loop - self.auth_manager = AuthManager() - self.database_manager = DatabaseManager() + self.auth_manager = AuthManager(config) + self.database_manager = DatabaseManager(config) self.database_manager.create_tables() - self.common_db = CommonDbClient() - self.kafka_server = cfg.BROKER_URI + self.common_db = CommonDbClient(config) + self.msg_bus = MessageBusClient(config) def run(self): self.loop.run_until_complete(self.start()) async def start(self): - consumer = AIOKafkaConsumer( + topics = [ "vim_account", - "alarm_request", - loop=self.loop, - bootstrap_servers=self.kafka_server, - group_id="mon-server", - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - ) - await consumer.start() - try: - async for message in consumer: - log.info("Message arrived: %s", message) - await self.consume_message(message) - finally: - await consumer.stop() + "alarm_request" + ] + await self.msg_bus.aioread(topics, self._process_msg) - async def consume_message(self, message): + async def _process_msg(self, topic, key, values): + log.info("Message arrived: %s", values) try: - try: - values = json.loads(message.value) - except JSONDecodeError: - values = yaml.safe_load(message.value) - - if message.topic == "vim_account": - if message.key == "create" or message.key == "edit": + if topic == "vim_account": + if key == "create" or key == "edit": values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'], values['schema_version'], values['_id']) @@ -94,11 +76,11 @@ class Server: values['_id']) self.auth_manager.store_auth_credentials(values) - if message.key == "delete": + if key == "delete": self.auth_manager.delete_auth_credentials(values) - elif message.topic == "alarm_request": - if message.key == "create_alarm_request": + elif topic == "alarm_request": + if key == "create_alarm_request": alarm_details = values['alarm_create_request'] cor_id = alarm_details['correlation_id'] response_builder = ResponseBuilder() @@ -126,7 +108,7 @@ class Server: alarm_id=None) await self._publish_response('alarm_response_' + str(cor_id), 'create_alarm_response', response) - if message.key == "delete_alarm_request": + if key == "delete_alarm_request": alarm_details = values['alarm_delete_request'] alarm_uuid = alarm_details['alarm_uuid'] response_builder = ResponseBuilder() @@ -149,17 +131,5 @@ class Server: log.exception("Exception processing message: ") async def _publish_response(self, topic: str, key: str, msg: dict): - producer = AIOKafkaProducer(loop=self.loop, - bootstrap_servers=self.kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) - await producer.start() log.info("Sending response %s to topic %s with key %s", json.dumps(msg), topic, key) - try: - await producer.send_and_wait(topic, key=key, value=json.dumps(msg)) - finally: - await producer.stop() - - -if __name__ == '__main__': - Server().run() + await self.msg_bus.aiowrite(topic, key, msg) diff --git a/osm_mon/tests/collector/test_collector.py b/osm_mon/tests/collector/test_collector.py index c386ed2..4bbe10e 100644 --- a/osm_mon/tests/collector/test_collector.py +++ b/osm_mon/tests/collector/test_collector.py @@ -26,14 +26,16 @@ from unittest import mock from osm_mon.collector.collector import Collector from osm_mon.collector.vnf_collectors.openstack import OpenstackCollector +from osm_mon.core.config import Config from osm_mon.core.database import DatabaseManager, db class CollectorTest(unittest.TestCase): def setUp(self): super().setUp() - os.environ["DATABASE"] = "sqlite:///:memory:" - db_manager = DatabaseManager() + os.environ["OSMMON_SQL_DATABASE_URI"] = "sqlite:///:memory:" + self.config = Config() + db_manager = DatabaseManager(self.config) db_manager.create_tables() def tearDown(self): @@ -47,7 +49,7 @@ class CollectorTest(unittest.TestCase): @mock.patch.object(DatabaseManager, "get_vim_type") def test_init_vim_collector_and_collect_openstack(self, _get_vim_type, collect): _get_vim_type.return_value = 'openstack' - collector = Collector() + collector = Collector(self.config) collector._collect_vim_metrics({}, 'test_vim_account_id') collect.assert_called_once_with({}) @@ -57,14 +59,14 @@ class CollectorTest(unittest.TestCase): @mock.patch.object(DatabaseManager, "get_vim_type") def test_init_vim_collector_and_collect_unknown(self, _get_vim_type, openstack_collect): _get_vim_type.return_value = 'unknown' - collector = Collector() + collector = Collector(self.config) collector._collect_vim_metrics({}, 'test_vim_account_id') openstack_collect.assert_not_called() @mock.patch("osm_mon.collector.collector.CommonDbClient", mock.Mock()) @mock.patch("osm_mon.collector.collector.VCACollector", autospec=True) def test_collect_vca_metrics(self, vca_collector): - collector = Collector() + collector = Collector(self.config) collector._collect_vca_metrics({}) - vca_collector.assert_called_once_with() + vca_collector.assert_called_once_with(self.config) vca_collector.return_value.collect.assert_called_once_with({}) diff --git a/osm_mon/tests/common/__init__.py b/osm_mon/tests/common/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/osm_mon/tests/common/test_common_db_client.py b/osm_mon/tests/core/test_common_db_client.py similarity index 96% rename from osm_mon/tests/common/test_common_db_client.py rename to osm_mon/tests/core/test_common_db_client.py index 7d66426..81159ee 100644 --- a/osm_mon/tests/common/test_common_db_client.py +++ b/osm_mon/tests/core/test_common_db_client.py @@ -26,11 +26,13 @@ from unittest import mock from osm_common import dbmongo from osm_mon.core.common_db import CommonDbClient +from osm_mon.core.config import Config class CommonDbClientTest(unittest.TestCase): def setUp(self): super().setUp() + self.config = Config() @mock.patch.object(dbmongo.DbMongo, "db_connect", mock.Mock()) @mock.patch.object(CommonDbClient, "get_vnfr") @@ -56,7 +58,7 @@ class CommonDbClientTest(unittest.TestCase): 'created-time': 1526044312.0999322, 'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01', 'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'} - common_db_client = CommonDbClient() + common_db_client = CommonDbClient(self.config) vim_account_id = common_db_client.get_vim_account_id('5ec3f571-d540-4cb0-9992-971d1b08312e', 1) self.assertEqual(vim_account_id, 'c1740601-7287-48c8-a2c9-bce8fee459eb') @@ -84,7 +86,7 @@ class CommonDbClientTest(unittest.TestCase): 'created-time': 1526044312.0999322, 'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01', 'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'} - common_db_client = CommonDbClient() + common_db_client = CommonDbClient(self.config) vdur = common_db_client.get_vdur('5ec3f571-d540-4cb0-9992-971d1b08312e', '1', 'ubuntuvnf_vnfd-VM') expected_vdur = { 'internal-connection-point': [], diff --git a/osm_mon/tests/core/test_database.py b/osm_mon/tests/core/test_database.py index 482e58b..0329c74 100644 --- a/osm_mon/tests/core/test_database.py +++ b/osm_mon/tests/core/test_database.py @@ -23,12 +23,15 @@ import unittest from unittest import mock +from osm_mon.core.config import Config + from osm_mon.core.database import VimCredentials, DatabaseManager class DatbaseManagerTest(unittest.TestCase): def setUp(self): super().setUp() + self.config = Config() @mock.patch.object(DatabaseManager, "get_credentials") def test_get_vim_type(self, get_credentials): @@ -41,6 +44,6 @@ class DatbaseManagerTest(unittest.TestCase): mock_creds.type = 'openstack' get_credentials.return_value = mock_creds - database_manager = DatabaseManager() + database_manager = DatabaseManager(self.config) vim_type = database_manager.get_vim_type('test_id') self.assertEqual(vim_type, 'openstack') diff --git a/osm_mon/tests/core/test_message_bus_client.py b/osm_mon/tests/core/test_message_bus_client.py new file mode 100644 index 0000000..292fbe3 --- /dev/null +++ b/osm_mon/tests/core/test_message_bus_client.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- + +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## +import asyncio +from unittest import TestCase, mock + +from osm_common.msgkafka import MsgKafka + +from osm_mon.core.message_bus_client import MessageBusClient +from osm_mon.core.config import Config + + +class TestMessageBusClient(TestCase): + + def setUp(self): + self.config = Config() + self.config.set('message', 'driver', 'kafka') + self.loop = asyncio.new_event_loop() + + @mock.patch.object(MsgKafka, 'aioread') + def test_aioread(self, aioread): + async def mock_callback(): + pass + + future = asyncio.Future(loop=self.loop) + future.set_result('mock') + aioread.return_value = future + msg_bus = MessageBusClient(self.config, loop=self.loop) + topic = 'test_topic' + self.loop.run_until_complete(msg_bus.aioread([topic], mock_callback)) + aioread.assert_called_with(['test_topic'], self.loop, aiocallback=mock_callback) + + @mock.patch.object(MsgKafka, 'aiowrite') + def test_aiowrite(self, aiowrite): + future = asyncio.Future(loop=self.loop) + future.set_result('mock') + aiowrite.return_value = future + msg_bus = MessageBusClient(self.config, loop=self.loop) + topic = 'test_topic' + key = 'test_key' + msg = {'test': 'test_msg'} + self.loop.run_until_complete(msg_bus.aiowrite(topic, key, msg)) + aiowrite.assert_called_with(topic, key, msg, self.loop) + + @mock.patch.object(MsgKafka, 'aioread') + def test_aioread_once(self, aioread): + future = asyncio.Future(loop=self.loop) + future.set_result('mock') + aioread.return_value = future + msg_bus = MessageBusClient(self.config, loop=self.loop) + topic = 'test_topic' + self.loop.run_until_complete(msg_bus.aioread_once(topic)) + aioread.assert_called_with('test_topic', self.loop) diff --git a/requirements.txt b/requirements.txt index 5c6d6ca..1ebe677 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,6 +29,7 @@ peewee==3.1.* pyyaml==3.* prometheus_client==0.4.* gnocchiclient==7.0.* -pyvcloud==19.1.1 -git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common -git+https://osm.etsi.org/gerrit/osm/N2VC.git#egg=n2vc +pymysql==0.9.* +pyvcloud==19.1.* +git+https://osm.etsi.org/gerrit/osm/common.git@v5.0#egg=osm-common +git+https://osm.etsi.org/gerrit/osm/N2VC.git@v5.0#egg=n2vc -- 2.17.1