Implements filebased config, config override through env vars, use of osm 17/7217/8
authorBenjamin Diaz <bdiaz@whitestack.com>
Wed, 6 Feb 2019 14:58:00 +0000 (11:58 -0300)
committerBenjamin Diaz <bdiaz@whitestack.com>
Fri, 8 Feb 2019 22:25:18 +0000 (19:25 -0300)
common msg bus drivers

Change-Id: I2ae26408f03a7faf86d5621efda50df948c55951
Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
38 files changed:
MANIFEST.in
Makefile
README.rst
debian/python3-osm-mon.postinst
docker/Dockerfile
osm_mon/cmd/mon_collector.py
osm_mon/cmd/mon_evaluator.py
osm_mon/cmd/mon_healthcheck.py
osm_mon/cmd/mon_server.py
osm_mon/collector/collector.py
osm_mon/collector/infra_collectors/base_vim.py
osm_mon/collector/infra_collectors/openstack.py
osm_mon/collector/vnf_collectors/base.py
osm_mon/collector/vnf_collectors/base_vim.py
osm_mon/collector/vnf_collectors/juju.py
osm_mon/collector/vnf_collectors/openstack.py
osm_mon/collector/vnf_collectors/vmware.py
osm_mon/core/auth.py
osm_mon/core/common_db.py
osm_mon/core/config.py [new file with mode: 0644]
osm_mon/core/database.py
osm_mon/core/message_bus/__init__.py [deleted file]
osm_mon/core/message_bus/consumer.py [deleted file]
osm_mon/core/message_bus/producer.py [deleted file]
osm_mon/core/message_bus_client.py [new file with mode: 0644]
osm_mon/core/mon.yaml [new file with mode: 0644]
osm_mon/core/settings.py [deleted file]
osm_mon/core/singleton.py [deleted file]
osm_mon/evaluator/evaluator.py
osm_mon/server/__init__.py
osm_mon/server/server.py
osm_mon/tests/collector/test_collector.py
osm_mon/tests/common/__init__.py [deleted file]
osm_mon/tests/common/test_common_db_client.py [deleted file]
osm_mon/tests/core/test_common_db_client.py [new file with mode: 0644]
osm_mon/tests/core/test_database.py
osm_mon/tests/core/test_message_bus_client.py [new file with mode: 0644]
requirements.txt

index e63d839..6e7058b 100644 (file)
@@ -21,6 +21,6 @@
 
 include requirements.txt
 include README.rst
-recursive-include osm_mon *.py *.xml *.sh
+recursive-include osm_mon *.py *.xml *.sh *.yaml
 recursive-include devops-stages *
 recursive-include test *.py
index 99de6f4..902909a 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -1,3 +1,24 @@
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+
 all: clean package
 
 clean:
index ef34a3d..ffec126 100644 (file)
@@ -39,8 +39,8 @@ MON module has the following components:
 Supported Collector Plugins
 ***************************
 
- - OpenStack: Requires Aodh and Gnocchi to be enabled.
- - VROPS: TBD
+ - OpenStack: Requires Gnocchi to be enabled.
+ - VROPS
  - AWS: TBD
 
 Developers
index 06e6781..8ab760d 100644 (file)
@@ -1,5 +1,27 @@
 #!/bin/bash
 
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+
 echo "Installing python dependencies via pip..."
 pip3 install aiokafka==0.4.*
 pip3 install requests==2.18.*
index 8079f23..dca983c 100644 (file)
@@ -36,14 +36,17 @@ COPY . /mon
 
 RUN pip3 install /mon
 
-ENV BROKER_URI kafka:9092
-ENV MONGO_URI mongodb://mongo:27017
-ENV DATABASE sqlite:///mon_sqlite.db
-ENV OS_NOTIFIER_URI localhost:8662
-ENV OS_DEFAULT_GRANULARITY 300
-ENV OSMMON_REQUEST_TIMEOUT 10
-ENV OSMMON_LOG_LEVEL INFO
-ENV OSMMON_KAFKA_LOG_LEVEL INFO
+ENV OSMMON_MESSAGE_DRIVER kafka
+ENV OSMMON_MESSAGE_HOST kafka
+ENV OSMMON_MESSAGE_PORT 9092
+
+ENV OSMMON_DATABASE_DRIVER mongo
+ENV OSMMON_DATABASE_URI mongodb://mongo:27017
+
+ENV OSMMON_SQL_DATABASE_URI sqlite:///mon_sqlite.db
+ENV OSMMON_OPENSTACK_DEFAULT_GRANULARITY 300
+ENV OSMMON_GLOBAL_REQUEST_TIMEOUT 10
+ENV OSMMON_GLOBAL_LOGLEVEL INFO
 ENV OSMMON_VCA_HOST localhost
 ENV OSMMON_VCA_SECRET secret
 ENV OSMMON_VCA_USER admin
index c4e2969..3e493de 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+import argparse
 import logging
 import sys
 
 from osm_mon.collector.collector import Collector
+from osm_mon.core.config import Config
 from osm_mon.core.database import DatabaseManager
-from osm_mon.core.settings import Config
 
 
 def main():
-    cfg = Config.instance()
+    parser = argparse.ArgumentParser(prog='osm-policy-agent')
+    parser.add_argument('--config-file', nargs='?', help='POL configuration file')
+    args = parser.parse_args()
+    cfg = Config(args.config_file)
 
     root = logging.getLogger()
-    root.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+    root.setLevel(logging.getLevelName(cfg.get('global', 'loglevel')))
     ch = logging.StreamHandler(sys.stdout)
-    ch.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+    ch.setLevel(logging.getLevelName(cfg.get('global', 'loglevel')))
     formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p')
     ch.setFormatter(formatter)
     root.addHandler(ch)
 
-    kafka_logger = logging.getLogger('kafka')
-    kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
-
     log = logging.getLogger(__name__)
     log.info("Starting MON Collector...")
-    log.debug("Config: %s", vars(cfg))
+    log.debug("Config: %s", cfg.conf)
     log.info("Initializing database...")
-    db_manager = DatabaseManager()
+    db_manager = DatabaseManager(cfg)
     db_manager.create_tables()
     log.info("Database initialized correctly.")
-    collector = Collector()
+    collector = Collector(cfg)
     collector.collect_forever()
 
 
index 79adabd..3835d7e 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+import argparse
 import logging
 import sys
 
+from osm_mon.core.config import Config
 from osm_mon.core.database import DatabaseManager
-from osm_mon.core.settings import Config
 from osm_mon.evaluator.evaluator import Evaluator
 
 
 def main():
-    cfg = Config.instance()
+    parser = argparse.ArgumentParser(prog='osm-policy-agent')
+    parser.add_argument('--config-file', nargs='?', help='POL configuration file')
+    args = parser.parse_args()
+    cfg = Config(args.config_file)
 
     root = logging.getLogger()
-    root.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+    root.setLevel(logging.getLevelName(cfg.get('global', 'loglevel')))
     ch = logging.StreamHandler(sys.stdout)
-    ch.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+    ch.setLevel(logging.getLevelName(cfg.get('global', 'loglevel')))
     formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p')
     ch.setFormatter(formatter)
     root.addHandler(ch)
 
-    kafka_logger = logging.getLogger('kafka')
-    kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
-
     log = logging.getLogger(__name__)
     log.info("Starting MON Evaluator...")
-    log.debug("Config: %s", vars(cfg))
+    log.debug("Config: %s", cfg.conf)
     log.info("Initializing database...")
-    db_manager = DatabaseManager()
+    db_manager = DatabaseManager(cfg)
     db_manager.create_tables()
     log.info("Database initialized correctly.")
-    evaluator = Evaluator()
+    evaluator = Evaluator(cfg)
     evaluator.evaluate_forever()
 
 
index 1fa2c2b..cc6fd8f 100644 (file)
@@ -19,6 +19,7 @@
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+import argparse
 import asyncio
 import logging
 import subprocess
@@ -27,16 +28,20 @@ import sys
 import requests
 from aiokafka import AIOKafkaConsumer
 
-from osm_mon.core.settings import Config
+from osm_mon.core.config import Config
 
 log = logging.getLogger(__name__)
 
 
 def main():
-    # Check Kafka
+    parser = argparse.ArgumentParser(prog='osm-mon-healthcheck')
+    parser.add_argument('--config-file', nargs='?', help='MON configuration file')
+    args = parser.parse_args()
+    cfg = Config(args.config_file)
+
     if not _processes_running():
         sys.exit(1)
-    if not _is_kafka_ok():
+    if not _is_kafka_ok(cfg.get('message', 'host'), cfg.get('message', 'port')):
         sys.exit(1)
     if not _is_prometheus_exporter_ok():
         sys.exit(1)
@@ -49,6 +54,7 @@ def _processes_running():
             if process_name in row:
                 return True
         return False
+
     processes_to_check = ['osm-mon-collector', 'osm-mon-evaluator', 'osm-mon-server']
     ps = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE).communicate()[0]
     processes_running = ps.decode().split('\n')
@@ -68,12 +74,11 @@ def _is_prometheus_exporter_ok():
         return False
 
 
-def _is_kafka_ok():
+def _is_kafka_ok(host, port):
     async def _test_kafka(loop):
-        cfg = Config.instance()
         consumer = AIOKafkaConsumer(
             'healthcheck',
-            loop=loop, bootstrap_servers=cfg.BROKER_URI)
+            loop=loop, bootstrap_servers='{}:{}'.format(host, port))
         await consumer.start()
         await consumer.stop()
 
index 34fe0b1..b23aa20 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+import argparse
+import asyncio
 import logging
 import sys
 
+from osm_mon.core.config import Config
 from osm_mon.core.database import DatabaseManager
-from osm_mon.core.settings import Config
 from osm_mon.server.server import Server
 
 
 def main():
-    cfg = Config.instance()
+    parser = argparse.ArgumentParser(prog='osm-policy-agent')
+    parser.add_argument('--config-file', nargs='?', help='POL configuration file')
+    args = parser.parse_args()
+    cfg = Config(args.config_file)
 
     root = logging.getLogger()
-    root.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+    root.setLevel(logging.getLevelName(cfg.get('global', 'loglevel')))
     ch = logging.StreamHandler(sys.stdout)
-    ch.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+    ch.setLevel(logging.getLevelName(cfg.get('global', 'loglevel')))
     formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p')
     ch.setFormatter(formatter)
     root.addHandler(ch)
 
-    kafka_logger = logging.getLogger('kafka')
-    kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
-
     log = logging.getLogger(__name__)
-    log.info("Starting MON Server...")
-    log.debug("Config: %s", vars(cfg))
+    log.debug("Config: %s", cfg.conf)
     log.info("Initializing database...")
-    db_manager = DatabaseManager()
+    db_manager = DatabaseManager(cfg)
     db_manager.create_tables()
+    log.info("Starting MON Server...")
     log.info("Database initialized correctly.")
-    server = Server()
+    loop = asyncio.get_event_loop()
+    server = Server(cfg, loop)
     server.run()
 
 
index 36ce1b0..aad395a 100644 (file)
@@ -27,14 +27,14 @@ import time
 import peewee
 
 from osm_mon.collector.backends.prometheus import PrometheusBackend
-from osm_mon.collector.vnf_collectors.vmware import VMwareCollector
 from osm_mon.collector.infra_collectors.openstack import OpenstackInfraCollector
 from osm_mon.collector.metric import Metric
 from osm_mon.collector.vnf_collectors.juju import VCACollector
 from osm_mon.collector.vnf_collectors.openstack import OpenstackCollector
+from osm_mon.collector.vnf_collectors.vmware import VMwareCollector
 from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
 from osm_mon.core.database import DatabaseManager
-from osm_mon.core.settings import Config
 
 log = logging.getLogger(__name__)
 
@@ -51,21 +51,21 @@ METRIC_BACKENDS = [
 
 
 class Collector:
-    def __init__(self):
-        self.common_db = CommonDbClient()
+    def __init__(self, config: Config):
+        self.conf = config
+        self.common_db = CommonDbClient(self.conf)
         self.plugins = []
-        self.database_manager = DatabaseManager()
+        self.database_manager = DatabaseManager(self.conf)
         self.database_manager.create_tables()
         self.queue = multiprocessing.Queue()
         self._init_backends()
 
     def collect_forever(self):
         log.debug('collect_forever')
-        cfg = Config.instance()
         while True:
             try:
                 self.collect_metrics()
-                time.sleep(cfg.OSMMON_COLLECTOR_INTERVAL)
+                time.sleep(int(self.conf.get('collector', 'interval')))
             except peewee.PeeweeException:
                 log.exception("Database error consuming message: ")
                 raise
@@ -74,10 +74,10 @@ class Collector:
 
     def _collect_vim_metrics(self, vnfr: dict, vim_account_id: str):
         # TODO(diazb) Add support for vrops and aws
-        database_manager = DatabaseManager()
+        database_manager = DatabaseManager(self.conf)
         vim_type = database_manager.get_vim_type(vim_account_id)
         if vim_type in VIM_COLLECTORS:
-            collector = VIM_COLLECTORS[vim_type](vim_account_id)
+            collector = VIM_COLLECTORS[vim_type](self.conf, vim_account_id)
             metrics = collector.collect(vnfr)
             for metric in metrics:
                 self.queue.put(metric)
@@ -85,10 +85,10 @@ class Collector:
             log.debug("vimtype %s is not supported.", vim_type)
 
     def _collect_vim_infra_metrics(self, vim_account_id: str):
-        database_manager = DatabaseManager()
+        database_manager = DatabaseManager(self.conf)
         vim_type = database_manager.get_vim_type(vim_account_id)
         if vim_type in VIM_INFRA_COLLECTORS:
-            collector = VIM_INFRA_COLLECTORS[vim_type](vim_account_id)
+            collector = VIM_INFRA_COLLECTORS[vim_type](self.conf, vim_account_id)
             status = collector.is_vim_ok()
             status_metric = Metric({'vim_id': vim_account_id}, 'vim_status', status)
             self.queue.put(status_metric)
@@ -98,7 +98,7 @@ class Collector:
     def _collect_vca_metrics(self, vnfr: dict):
         log.debug('_collect_vca_metrics')
         log.debug('vnfr: %s', vnfr)
-        vca_collector = VCACollector()
+        vca_collector = VCACollector(self.conf)
         metrics = vca_collector.collect(vnfr)
         for metric in metrics:
             self.queue.put(metric)
@@ -125,7 +125,7 @@ class Collector:
             processes.append(p)
             p.start()
         for process in processes:
-            process.join()
+            process.join(timeout=10)
         metrics = []
         while not self.queue.empty():
             metrics.append(self.queue.get())
index 0a075a1..2f5954a 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+from osm_mon.core.config import Config
+
 from osm_mon.collector.infra_collectors.base import BaseInfraCollector
 
 
 class BaseVimInfraCollector(BaseInfraCollector):
-    def __init__(self, vim_account_id: str):
+    def __init__(self, config: Config, vim_account_id: str):
         pass
 
     def is_vim_ok(self) -> bool:
index 4237e4f..5f62edf 100644 (file)
@@ -27,14 +27,15 @@ from keystoneclient.v3 import client
 
 from osm_mon.collector.infra_collectors.base_vim import BaseVimInfraCollector
 from osm_mon.core.auth import AuthManager
+from osm_mon.core.config import Config
 
 log = logging.getLogger(__name__)
 
 
 class OpenstackInfraCollector(BaseVimInfraCollector):
-    def __init__(self, vim_account_id: str):
-        super().__init__(vim_account_id)
-        self.auth_manager = AuthManager()
+    def __init__(self, config: Config, vim_account_id: str):
+        super().__init__(config, vim_account_id)
+        self.auth_manager = AuthManager(config)
         self.keystone_client = self._build_keystone_client(vim_account_id)
 
     def is_vim_ok(self) -> bool:
index 824e106..0ec12b5 100644 (file)
 from typing import List
 
 from osm_mon.collector.metric import Metric
+from osm_mon.core.config import Config
 
 
 class BaseCollector:
+    def __init__(self, config: Config):
+        pass
+
     def collect(self, vnfr: dict) -> List[Metric]:
         pass
index 29a348f..6c270f4 100644 (file)
@@ -21,8 +21,9 @@
 ##
 
 from osm_mon.collector.vnf_collectors.base import BaseCollector
+from osm_mon.core.config import Config
 
 
 class BaseVimCollector(BaseCollector):
-    def __init__(self, vim_account_id: str):
-        pass
+    def __init__(self, config: Config, vim_account_id: str):
+        super().__init__(config)
index 928b35a..6c5e314 100644 (file)
@@ -29,18 +29,19 @@ from osm_mon.collector.metric import Metric
 from osm_mon.collector.vnf_collectors.base import BaseCollector
 from osm_mon.collector.vnf_metric import VnfMetric
 from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
 from osm_mon.core.exceptions import VcaDeploymentInfoNotFound
-from osm_mon.core.settings import Config
 
 log = logging.getLogger(__name__)
 
 
 class VCACollector(BaseCollector):
-    def __init__(self):
-        cfg = Config.instance()
-        self.common_db = CommonDbClient()
+    def __init__(self, config: Config):
+        super().__init__(config)
+        self.common_db = CommonDbClient(config)
         self.loop = asyncio.get_event_loop()
-        self.n2vc = N2VC(server=cfg.OSMMON_VCA_HOST, user=cfg.OSMMON_VCA_USER, secret=cfg.OSMMON_VCA_SECRET)
+        self.n2vc = N2VC(server=config.get('vca', 'host'), user=config.get('vca', 'user'),
+                         secret=config.get('vca', 'secret'))
 
     def collect(self, vnfr: dict) -> List[Metric]:
         nsr_id = vnfr['nsr-id-ref']
index 8dbab5c..ba7097e 100644 (file)
@@ -34,7 +34,7 @@ from osm_mon.collector.vnf_collectors.base_vim import BaseVimCollector
 from osm_mon.collector.vnf_metric import VnfMetric
 from osm_mon.core.auth import AuthManager
 from osm_mon.core.common_db import CommonDbClient
-from osm_mon.core.settings import Config
+from osm_mon.core.config import Config
 
 log = logging.getLogger(__name__)
 
@@ -52,10 +52,11 @@ METRIC_MAPPINGS = {
 
 
 class OpenstackCollector(BaseVimCollector):
-    def __init__(self, vim_account_id: str):
-        super().__init__(vim_account_id)
-        self.common_db = CommonDbClient()
-        self.auth_manager = AuthManager()
+    def __init__(self, config: Config, vim_account_id: str):
+        super().__init__(config, vim_account_id)
+        self.conf = config
+        self.common_db = CommonDbClient(config)
+        self.auth_manager = AuthManager(config)
         self.granularity = self._get_granularity(vim_account_id)
         self.gnocchi_client = self._build_gnocchi_client(vim_account_id)
 
@@ -81,8 +82,7 @@ class OpenstackCollector(BaseVimCollector):
         if 'granularity' in vim_config:
             return int(vim_config['granularity'])
         else:
-            cfg = Config.instance()
-            return cfg.OS_DEFAULT_GRANULARITY
+            return int(self.conf.get('openstack', 'default_granularity'))
 
     def collect(self, vnfr: dict) -> List[Metric]:
         nsr_id = vnfr['nsr-id-ref']
index 7402afb..ba499c7 100644 (file)
@@ -36,7 +36,7 @@ from osm_mon.collector.vnf_collectors.base_vim import BaseVimCollector
 from osm_mon.collector.vnf_metric import VnfMetric
 from osm_mon.core.auth import AuthManager
 from osm_mon.core.common_db import CommonDbClient
-from osm_mon.core.settings import Config
+from osm_mon.core.config import Config
 
 log = logging.getLogger(__name__)
 
@@ -67,11 +67,10 @@ requests.packages.urllib3.disable_warnings()
 
 
 class VMwareCollector(BaseVimCollector):
-    def __init__(self, vim_account_id: str):
-        super().__init__(vim_account_id)
-        self.common_db = CommonDbClient()
-        self.auth_manager = AuthManager()
-        self.granularity = self._get_granularity(vim_account_id)
+    def __init__(self, config: Config, vim_account_id: str):
+        super().__init__(config, vim_account_id)
+        self.common_db = CommonDbClient(config)
+        self.auth_manager = AuthManager(config)
         vim_account = self.get_vim_account(vim_account_id)
         self.vrops_site = vim_account['vrops_site']
         self.vrops_user = vim_account['vrops_user']
@@ -92,6 +91,7 @@ class VMwareCollector(BaseVimCollector):
 
         log.info("Logging into vCD org as admin.")
 
+        admin_user = None
         try:
             host = self.vcloud_site
             admin_user = self.admin_username
@@ -151,21 +151,13 @@ class VMwareCollector(BaseVimCollector):
 
         return vim_account
 
-    def _get_granularity(self, vim_account_id: str):
-        creds = self.auth_manager.get_credentials(vim_account_id)
-        vim_config = json.loads(creds.config)
-        if 'granularity' in vim_config:
-            return int(vim_config['granularity'])
-        else:
-            cfg = Config.instance()
-            return cfg.OS_DEFAULT_GRANULARITY
-
     def get_vm_moref_id(self, vapp_uuid):
         """
            Method to get the moref_id of given VM
            arg - vapp_uuid
            return - VM mored_id
         """
+        vm_moref_id = None
         try:
             if vapp_uuid:
                 vm_details = self.get_vapp_details_rest(vapp_uuid)
index 71a817e..4627a30 100644 (file)
 import json
 import logging
 
+from osm_mon.core.config import Config
+
 from osm_mon.core.database import VimCredentials, DatabaseManager
 
 log = logging.getLogger(__name__)
 
 
 class AuthManager:
-    def __init__(self):
-        self.database_manager = DatabaseManager()
+    def __init__(self, config: Config):
+        self.database_manager = DatabaseManager(config)
 
     def store_auth_credentials(self, creds_dict):
         log.info(creds_dict)
index 9cc9c06..5922290 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
-from osm_common import dbmongo
+from osm_common import dbmongo, dbmemory
 
-from osm_mon.core.settings import Config
+from osm_mon.core.config import Config
 
 
 class CommonDbClient:
-    def __init__(self):
-        cfg = Config.instance()
-        self.common_db = dbmongo.DbMongo()
-        self.common_db.db_connect({'uri': cfg.MONGO_URI,
-                                   'name': 'osm',
-                                   'commonkey': cfg.OSMMON_DATABASE_COMMONKEY})
+    def __init__(self, config: Config):
+        if config.get('database', 'driver') == "mongo":
+            self.common_db = dbmongo.DbMongo()
+        elif config.get('database', 'driver') == "memory":
+            self.common_db = dbmemory.DbMemory()
+        else:
+            raise Exception("Unknown database driver {}".format(config.get('section', 'driver')))
+        self.common_db.db_connect(config.get("database"))
 
     def get_vnfr(self, nsr_id: str, member_index: int):
         vnfr = self.common_db.get_one("vnfrs",
diff --git a/osm_mon/core/config.py b/osm_mon/core/config.py
new file mode 100644 (file)
index 0000000..c4c3972
--- /dev/null
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+"""Global configuration managed by environment variables."""
+
+import logging
+import os
+
+import pkg_resources
+import yaml
+
+logger = logging.getLogger(__name__)
+
+
+class Config:
+    def __init__(self, config_file: str = ''):
+        self.conf = {}
+        self._read_config_file(config_file)
+        self._read_env()
+
+    def _read_config_file(self, config_file):
+        if not config_file:
+            path = 'mon.yaml'
+            config_file = pkg_resources.resource_filename(__name__, path)
+        with open(config_file) as f:
+            self.conf = yaml.load(f)
+
+    def get(self, section, field=None):
+        if not field:
+            return self.conf[section]
+        return self.conf[section][field]
+
+    def set(self, section, field, value):
+        if section not in self.conf:
+            self.conf[section] = {}
+        self.conf[section][field] = value
+
+    def _read_env(self):
+        for env in os.environ:
+            if not env.startswith("OSMMON_"):
+                continue
+            elements = env.lower().split("_")
+            if len(elements) < 3:
+                logger.warning(
+                    "Environment variable %s=%s does not comply with required format. Section and/or field missing.",
+                    env, os.getenv(env))
+                continue
+            section = elements[1]
+            field = '_'.join(elements[2:])
+            value = os.getenv(env)
+            if section not in self.conf:
+                self.conf[section] = {}
+            self.conf[section][field] = value
index 4cbd75f..eca08e9 100644 (file)
@@ -28,7 +28,7 @@ import uuid
 from peewee import CharField, TextField, FloatField, Model, AutoField, Proxy
 from playhouse.db_url import connect
 
-from osm_mon.core.settings import Config
+from osm_mon.core.config import Config
 
 log = logging.getLogger(__name__)
 
@@ -67,10 +67,8 @@ class Alarm(BaseModel):
 
 
 class DatabaseManager:
-    def __init__(self):
-        cfg = Config.instance()
-        cfg.read_environ()
-        db.initialize(connect(cfg.DATABASE))
+    def __init__(self, config: Config):
+        db.initialize(connect(config.get('sql', 'database_uri')))
 
     def create_tables(self) -> None:
         with db.atomic():
diff --git a/osm_mon/core/message_bus/__init__.py b/osm_mon/core/message_bus/__init__.py
deleted file mode 100755 (executable)
index 32eb94e..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-# Copyright 2017 Intel Research and Development Ireland Limited
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Intel Corporation
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-#         http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
-##
diff --git a/osm_mon/core/message_bus/consumer.py b/osm_mon/core/message_bus/consumer.py
deleted file mode 100644 (file)
index c0a9dd0..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-from kafka import KafkaConsumer
-
-from osm_mon.core.settings import Config
-
-
-# noinspection PyAbstractClass
-class Consumer(KafkaConsumer):
-    def __init__(self, group_id, **kwargs):
-        cfg = Config.instance()
-        super().__init__(bootstrap_servers=cfg.BROKER_URI,
-                         key_deserializer=bytes.decode,
-                         value_deserializer=bytes.decode,
-                         max_poll_interval_ms=180000,
-                         group_id=group_id,
-                         **kwargs)
diff --git a/osm_mon/core/message_bus/producer.py b/osm_mon/core/message_bus/producer.py
deleted file mode 100644 (file)
index 573e332..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-# Copyright 2018 Whitestack, LLC
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Whitestack, LLC
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-#         http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: bdiaz@whitestack.com or glavado@whitestack.com
-##
-from kafka import KafkaProducer
-
-from osm_mon.core.settings import Config
-
-
-class Producer(KafkaProducer):
-    def __init__(self):
-        cfg = Config.instance()
-        super().__init__(bootstrap_servers=cfg.BROKER_URI,
-                         key_serializer=str.encode,
-                         value_serializer=str.encode)
-
-    def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
-        return super().send(topic, value, key, partition, timestamp_ms)
diff --git a/osm_mon/core/message_bus_client.py b/osm_mon/core/message_bus_client.py
new file mode 100644 (file)
index 0000000..6a7ef60
--- /dev/null
@@ -0,0 +1,72 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import asyncio
+from typing import List, Callable
+
+from osm_common import msgkafka, msglocal
+
+from osm_mon.core.config import Config
+
+
+class MessageBusClient:
+    def __init__(self, config: Config, loop=None):
+        if config.get('message', 'driver') == "local":
+            self.msg_bus = msglocal.MsgLocal()
+        elif config.get('message', 'driver') == "kafka":
+            self.msg_bus = msgkafka.MsgKafka()
+        else:
+            raise Exception("Unknown message bug driver {}".format(config.get('section', 'driver')))
+        self.msg_bus.connect(config.get('message'))
+        if not loop:
+            loop = asyncio.get_event_loop()
+        self.loop = loop
+
+    async def aioread(self, topics: List[str], callback: Callable = None, **kwargs):
+        """
+        Retrieves messages continuously from bus and executes callback for each message consumed.
+        :param topics: List of message bus topics to consume from.
+        :param callback: Async callback function to be called for each message received.
+        :param kwargs: Keyword arguments to be passed to callback function.
+        :return: None
+        """
+        await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs)
+
+    async def aiowrite(self, topic: str, key: str, msg: dict):
+        """
+        Writes message to bus.
+        :param topic: Topic to write to.
+        :param key: Key to write to.
+        :param msg: Dictionary containing message to be written.
+        :return: None
+        """
+        await self.msg_bus.aiowrite(topic, key, msg, self.loop)
+
+    async def aioread_once(self, topic: str):
+        """
+        Retrieves last message from bus.
+        :param topic: topic to retrieve message from.
+        :return: tuple(topic, key, message)
+        """
+        result = await self.msg_bus.aioread(topic, self.loop)
+        return result
diff --git a/osm_mon/core/mon.yaml b/osm_mon/core/mon.yaml
new file mode 100644 (file)
index 0000000..b1607ec
--- /dev/null
@@ -0,0 +1,57 @@
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+
+global:
+  loglevel: INFO
+  request_timeout: 10
+
+database:
+  driver: mongo
+  uri: mongodb://mongo:27017
+  name: osm
+  commonkey: changeme
+
+message:
+  driver: kafka
+  host: kafka
+  port: 9092
+  group_id: mon-consumer
+
+sql:
+  database_uri: sqlite:///mon_sqlite.db
+
+collector:
+  interval: 30
+
+evaluator:
+  interval: 30
+
+prometheus:
+  url: http://prometheus:9090
+
+vca:
+  host: localhost
+  secret: secret
+  user: admin
+
+openstack:
+  default_granularity: 300
\ No newline at end of file
diff --git a/osm_mon/core/settings.py b/osm_mon/core/settings.py
deleted file mode 100644 (file)
index a5d352b..0000000
+++ /dev/null
@@ -1,95 +0,0 @@
-# Copyright 2017 Intel Research and Development Ireland Limited
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Intel Corporation
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-#         http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
-##
-"""Global configuration managed by environment variables."""
-
-import logging
-import os
-
-from collections import namedtuple
-
-from osm_mon.core.singleton import Singleton
-
-import six
-
-__author__ = "Helena McGough"
-
-log = logging.getLogger(__name__)
-
-
-class BadConfigError(Exception):
-    """Configuration exception."""
-
-    pass
-
-
-class CfgParam(namedtuple('CfgParam', ['key', 'default', 'data_type'])):
-    """Configuration parameter definition."""
-
-    def value(self, data):
-        """Convert a string to the parameter type."""
-        try:
-            return self.data_type(data)
-        except (ValueError, TypeError):
-            raise BadConfigError(
-                'Invalid value "%s" for configuration parameter "%s"' % (
-                    data, self.key))
-
-
-@Singleton
-class Config(object):
-    """Configuration object."""
-
-    _configuration = [
-        CfgParam('BROKER_URI', "localhost:9092", six.text_type),
-        CfgParam('MONGO_URI', "mongodb://mongo:27017", six.text_type),
-        CfgParam('DATABASE', "sqlite:///mon_sqlite.db", six.text_type),
-        CfgParam('OS_DEFAULT_GRANULARITY', 300, int),
-        CfgParam('OSMMON_REQUEST_TIMEOUT', 10, int),
-        CfgParam('OSMMON_LOG_LEVEL', "INFO", six.text_type),
-        CfgParam('OSMMON_KAFKA_LOG_LEVEL', "WARN", six.text_type),
-        CfgParam('OSMMON_COLLECTOR_INTERVAL', 30, int),
-        CfgParam('OSMMON_EVALUATOR_INTERVAL', 30, int),
-        CfgParam('OSMMON_VCA_HOST', "localhost", six.text_type),
-        CfgParam('OSMMON_VCA_SECRET', "secret", six.text_type),
-        CfgParam('OSMMON_VCA_USER', "admin", six.text_type),
-        CfgParam('OSMMON_DATABASE_COMMONKEY', "changeme", six.text_type),
-        CfgParam('OSMMON_PROMETHEUS_URL', "http://prometheus:9090", six.text_type),
-    ]
-
-    _config_dict = {cfg.key: cfg for cfg in _configuration}
-    _config_keys = _config_dict.keys()
-
-    def __init__(self):
-        """Set the default values."""
-        for cfg in self._configuration:
-            setattr(self, cfg.key, cfg.default)
-        self.read_environ()
-
-    def read_environ(self):
-        """Check the appropriate environment variables and update defaults."""
-        for key in self._config_keys:
-            try:
-                val = self._config_dict[key].data_type(os.environ[key])
-                setattr(self, key, val)
-            except KeyError as exc:
-                log.debug("Environment variable not present: %s", exc)
-        return
diff --git a/osm_mon/core/singleton.py b/osm_mon/core/singleton.py
deleted file mode 100644 (file)
index 59c5ee5..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-# Copyright 2017 Intel Research and Development Ireland Limited
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Intel Corporation
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-#         http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
-##
-"""Simple singleton class."""
-
-from __future__ import unicode_literals
-
-
-class Singleton(object):
-    """Simple singleton class."""
-
-    def __init__(self, decorated):
-        """Initialize singleton instance."""
-        self._decorated = decorated
-
-    def instance(self):
-        """Return singleton instance."""
-        try:
-            return self._instance
-        except AttributeError:
-            self._instance = self._decorated()
-            return self._instance
index 9dc8c48..76881b9 100644 (file)
@@ -20,7 +20,7 @@
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
-import json
+import asyncio
 import logging
 import multiprocessing
 import time
@@ -31,21 +31,26 @@ from osm_common.dbbase import DbException
 
 from osm_mon.collector.backends.prometheus import OSM_METRIC_PREFIX
 from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
 from osm_mon.core.database import DatabaseManager, Alarm
-from osm_mon.core.message_bus.producer import Producer
+from osm_mon.core.message_bus_client import MessageBusClient
 from osm_mon.core.response import ResponseBuilder
-from osm_mon.core.settings import Config
 
 log = logging.getLogger(__name__)
 
 
 class Evaluator:
-    def __init__(self):
-        self.common_db = CommonDbClient()
+    def __init__(self, config: Config, loop=None):
+        self.conf = config
+        if not loop:
+            loop = asyncio.get_event_loop()
+        self.loop = loop
+        self.common_db = CommonDbClient(self.conf)
         self.plugins = []
-        self.database_manager = DatabaseManager()
+        self.database_manager = DatabaseManager(self.conf)
         self.database_manager.create_tables()
         self.queue = multiprocessing.Queue()
+        self.msg_bus = MessageBusClient(config)
 
     def _evaluate_metric(self,
                          nsr_id: str,
@@ -55,12 +60,11 @@ class Evaluator:
                          alarm: Alarm):
         log.debug("_evaluate_metric")
         # TODO: Refactor to fit backend plugin model
-        cfg = Config.instance()
         query_section = "query={0}{{ns_id=\"{1}\",vdu_name=\"{2}\",vnf_member_index=\"{3}\"}}".format(
             OSM_METRIC_PREFIX + metric_name, nsr_id, vdur_name, vnf_member_index)
-        request_url = cfg.OSMMON_PROMETHEUS_URL + "/api/v1/query?" + query_section
+        request_url = self.conf.get('prometheus', 'url') + "/api/v1/query?" + query_section
         log.info("Querying Prometheus: %s", request_url)
-        r = requests.get(request_url, timeout=cfg.OSMMON_REQUEST_TIMEOUT)
+        r = requests.get(request_url, timeout=int(self.conf.get('global', 'request_timeout')))
         if r.status_code == 200:
             json_response = r.json()
             if json_response['status'] == 'success':
@@ -83,11 +87,10 @@ class Evaluator:
 
     def evaluate_forever(self):
         log.debug('evaluate_forever')
-        cfg = Config.instance()
         while True:
             try:
                 self.evaluate()
-                time.sleep(cfg.OSMMON_EVALUATOR_INTERVAL)
+                time.sleep(int(self.conf.get('evaluator', 'interval')))
             except peewee.PeeweeException:
                 log.exception("Database error evaluating alarms: ")
                 raise
@@ -151,12 +154,11 @@ class Evaluator:
                 p.start()
 
         for process in processes:
-            process.join()
+            process.join(timeout=10)
         triggered_alarms = []
         while not self.queue.empty():
             triggered_alarms.append(self.queue.get())
         for alarm in triggered_alarms:
-            self.notify_alarm(alarm)
             p = multiprocessing.Process(target=self.notify_alarm,
                                         args=(alarm,))
             p.start()
@@ -178,7 +180,5 @@ class Evaluator:
             sev=alarm.severity,
             status='alarm',
             date=now)
-        producer = Producer()
-        producer.send(topic='alarm_response', key='notify_alarm', value=json.dumps(resp_message))
-        producer.flush()
+        self.loop.run_until_complete(self.msg_bus.aiowrite('alarm_response', 'notify_alarm', resp_message))
         log.info("Sent alarm notification: %s", resp_message)
index e69de29..4450364 100644 (file)
@@ -0,0 +1,22 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
index 0f17d99..0011618 100755 (executable)
 import asyncio
 import json
 import logging
-from json import JSONDecodeError
-
-import yaml
-from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
 
 from osm_mon.core.auth import AuthManager
 from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
 from osm_mon.core.database import DatabaseManager
+from osm_mon.core.message_bus_client import MessageBusClient
 from osm_mon.core.response import ResponseBuilder
-from osm_mon.core.settings import Config
 
 log = logging.getLogger(__name__)
 
 
 class Server:
 
-    def __init__(self, loop=None):
-        cfg = Config.instance()
+    def __init__(self, config: Config, loop=None):
+        self.conf = config
         if not loop:
             loop = asyncio.get_event_loop()
         self.loop = loop
-        self.auth_manager = AuthManager()
-        self.database_manager = DatabaseManager()
+        self.auth_manager = AuthManager(config)
+        self.database_manager = DatabaseManager(config)
         self.database_manager.create_tables()
-        self.common_db = CommonDbClient()
-        self.kafka_server = cfg.BROKER_URI
+        self.common_db = CommonDbClient(config)
+        self.msg_bus = MessageBusClient(config)
 
     def run(self):
         self.loop.run_until_complete(self.start())
 
     async def start(self):
-        consumer = AIOKafkaConsumer(
+        topics = [
             "vim_account",
-            "alarm_request",
-            loop=self.loop,
-            bootstrap_servers=self.kafka_server,
-            group_id="mon-server",
-            key_deserializer=bytes.decode,
-            value_deserializer=bytes.decode,
-        )
-        await consumer.start()
-        try:
-            async for message in consumer:
-                log.info("Message arrived: %s", message)
-                await self.consume_message(message)
-        finally:
-            await consumer.stop()
+            "alarm_request"
+        ]
+        await self.msg_bus.aioread(topics, self._process_msg)
 
-    async def consume_message(self, message):
+    async def _process_msg(self, topic, key, values):
+        log.info("Message arrived: %s", values)
         try:
-            try:
-                values = json.loads(message.value)
-            except JSONDecodeError:
-                values = yaml.safe_load(message.value)
-
-            if message.topic == "vim_account":
-                if message.key == "create" or message.key == "edit":
+            if topic == "vim_account":
+                if key == "create" or key == "edit":
                     values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'],
                                                                                  values['schema_version'],
                                                                                  values['_id'])
@@ -94,11 +76,11 @@ class Server:
                                                                                             values['_id'])
                     self.auth_manager.store_auth_credentials(values)
 
-                if message.key == "delete":
+                if key == "delete":
                     self.auth_manager.delete_auth_credentials(values)
 
-            elif message.topic == "alarm_request":
-                if message.key == "create_alarm_request":
+            elif topic == "alarm_request":
+                if key == "create_alarm_request":
                     alarm_details = values['alarm_create_request']
                     cor_id = alarm_details['correlation_id']
                     response_builder = ResponseBuilder()
@@ -126,7 +108,7 @@ class Server:
                                                                       alarm_id=None)
                     await self._publish_response('alarm_response_' + str(cor_id), 'create_alarm_response', response)
 
-                if message.key == "delete_alarm_request":
+                if key == "delete_alarm_request":
                     alarm_details = values['alarm_delete_request']
                     alarm_uuid = alarm_details['alarm_uuid']
                     response_builder = ResponseBuilder()
@@ -149,17 +131,5 @@ class Server:
             log.exception("Exception processing message: ")
 
     async def _publish_response(self, topic: str, key: str, msg: dict):
-        producer = AIOKafkaProducer(loop=self.loop,
-                                    bootstrap_servers=self.kafka_server,
-                                    key_serializer=str.encode,
-                                    value_serializer=str.encode)
-        await producer.start()
         log.info("Sending response %s to topic %s with key %s", json.dumps(msg), topic, key)
-        try:
-            await producer.send_and_wait(topic, key=key, value=json.dumps(msg))
-        finally:
-            await producer.stop()
-
-
-if __name__ == '__main__':
-    Server().run()
+        await self.msg_bus.aiowrite(topic, key, msg)
index c386ed2..4bbe10e 100644 (file)
@@ -26,14 +26,16 @@ from unittest import mock
 
 from osm_mon.collector.collector import Collector
 from osm_mon.collector.vnf_collectors.openstack import OpenstackCollector
+from osm_mon.core.config import Config
 from osm_mon.core.database import DatabaseManager, db
 
 
 class CollectorTest(unittest.TestCase):
     def setUp(self):
         super().setUp()
-        os.environ["DATABASE"] = "sqlite:///:memory:"
-        db_manager = DatabaseManager()
+        os.environ["OSMMON_SQL_DATABASE_URI"] = "sqlite:///:memory:"
+        self.config = Config()
+        db_manager = DatabaseManager(self.config)
         db_manager.create_tables()
 
     def tearDown(self):
@@ -47,7 +49,7 @@ class CollectorTest(unittest.TestCase):
     @mock.patch.object(DatabaseManager, "get_vim_type")
     def test_init_vim_collector_and_collect_openstack(self, _get_vim_type, collect):
         _get_vim_type.return_value = 'openstack'
-        collector = Collector()
+        collector = Collector(self.config)
         collector._collect_vim_metrics({}, 'test_vim_account_id')
         collect.assert_called_once_with({})
 
@@ -57,14 +59,14 @@ class CollectorTest(unittest.TestCase):
     @mock.patch.object(DatabaseManager, "get_vim_type")
     def test_init_vim_collector_and_collect_unknown(self, _get_vim_type, openstack_collect):
         _get_vim_type.return_value = 'unknown'
-        collector = Collector()
+        collector = Collector(self.config)
         collector._collect_vim_metrics({}, 'test_vim_account_id')
         openstack_collect.assert_not_called()
 
     @mock.patch("osm_mon.collector.collector.CommonDbClient", mock.Mock())
     @mock.patch("osm_mon.collector.collector.VCACollector", autospec=True)
     def test_collect_vca_metrics(self, vca_collector):
-        collector = Collector()
+        collector = Collector(self.config)
         collector._collect_vca_metrics({})
-        vca_collector.assert_called_once_with()
+        vca_collector.assert_called_once_with(self.config)
         vca_collector.return_value.collect.assert_called_once_with({})
diff --git a/osm_mon/tests/common/__init__.py b/osm_mon/tests/common/__init__.py
deleted file mode 100644 (file)
index e69de29..0000000
diff --git a/osm_mon/tests/common/test_common_db_client.py b/osm_mon/tests/common/test_common_db_client.py
deleted file mode 100644 (file)
index 7d66426..0000000
+++ /dev/null
@@ -1,97 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright 2018 Whitestack, LLC
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Whitestack, LLC
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-#         http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: bdiaz@whitestack.com or glavado@whitestack.com
-##
-import unittest
-from unittest import mock
-
-from osm_common import dbmongo
-
-from osm_mon.core.common_db import CommonDbClient
-
-
-class CommonDbClientTest(unittest.TestCase):
-    def setUp(self):
-        super().setUp()
-
-    @mock.patch.object(dbmongo.DbMongo, "db_connect", mock.Mock())
-    @mock.patch.object(CommonDbClient, "get_vnfr")
-    def test_get_vim_id(self, get_vnfr):
-        get_vnfr.return_value = {'_id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
-                                 '_admin': {
-                                     'projects_read': ['admin'], 'created': 1526044312.102287,
-                                     'modified': 1526044312.102287, 'projects_write': ['admin']
-                                 },
-                                 'vim-account-id': 'c1740601-7287-48c8-a2c9-bce8fee459eb',
-                                 'nsr-id-ref': '5ec3f571-d540-4cb0-9992-971d1b08312e',
-                                 'vdur': [
-                                     {
-                                         'internal-connection-point': [],
-                                         'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
-                                         'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
-                                         'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7',
-                                         'name': 'ubuntuvnf_vnfd-VM'
-                                     }
-                                 ],
-                                 'vnfd-ref': 'ubuntuvnf_vnfd',
-                                 'member-vnf-index-ref': '1',
-                                 'created-time': 1526044312.0999322,
-                                 'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
-                                 'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'}
-        common_db_client = CommonDbClient()
-        vim_account_id = common_db_client.get_vim_account_id('5ec3f571-d540-4cb0-9992-971d1b08312e', 1)
-        self.assertEqual(vim_account_id, 'c1740601-7287-48c8-a2c9-bce8fee459eb')
-
-    @mock.patch.object(dbmongo.DbMongo, "db_connect", mock.Mock())
-    @mock.patch.object(dbmongo.DbMongo, "get_one")
-    def test_get_vdur(self, get_one):
-        get_one.return_value = {'_id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
-                                '_admin': {
-                                    'projects_read': ['admin'], 'created': 1526044312.102287,
-                                    'modified': 1526044312.102287, 'projects_write': ['admin']
-                                },
-                                'vim-account-id': 'c1740601-7287-48c8-a2c9-bce8fee459eb',
-                                'nsr-id-ref': '5ec3f571-d540-4cb0-9992-971d1b08312e',
-                                'vdur': [
-                                    {
-                                        'internal-connection-point': [],
-                                        'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
-                                        'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
-                                        'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7',
-                                        'name': 'ubuntuvnf_vnfd-VM'
-                                    }
-                                ],
-                                'vnfd-ref': 'ubuntuvnf_vnfd',
-                                'member-vnf-index-ref': '1',
-                                'created-time': 1526044312.0999322,
-                                'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
-                                'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'}
-        common_db_client = CommonDbClient()
-        vdur = common_db_client.get_vdur('5ec3f571-d540-4cb0-9992-971d1b08312e', '1', 'ubuntuvnf_vnfd-VM')
-        expected_vdur = {
-            'internal-connection-point': [],
-            'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
-            'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
-            'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7',
-            'name': 'ubuntuvnf_vnfd-VM'
-        }
-
-        self.assertDictEqual(vdur, expected_vdur)
diff --git a/osm_mon/tests/core/test_common_db_client.py b/osm_mon/tests/core/test_common_db_client.py
new file mode 100644 (file)
index 0000000..81159ee
--- /dev/null
@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import unittest
+from unittest import mock
+
+from osm_common import dbmongo
+
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+
+
+class CommonDbClientTest(unittest.TestCase):
+    def setUp(self):
+        super().setUp()
+        self.config = Config()
+
+    @mock.patch.object(dbmongo.DbMongo, "db_connect", mock.Mock())
+    @mock.patch.object(CommonDbClient, "get_vnfr")
+    def test_get_vim_id(self, get_vnfr):
+        get_vnfr.return_value = {'_id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
+                                 '_admin': {
+                                     'projects_read': ['admin'], 'created': 1526044312.102287,
+                                     'modified': 1526044312.102287, 'projects_write': ['admin']
+                                 },
+                                 'vim-account-id': 'c1740601-7287-48c8-a2c9-bce8fee459eb',
+                                 'nsr-id-ref': '5ec3f571-d540-4cb0-9992-971d1b08312e',
+                                 'vdur': [
+                                     {
+                                         'internal-connection-point': [],
+                                         'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
+                                         'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
+                                         'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7',
+                                         'name': 'ubuntuvnf_vnfd-VM'
+                                     }
+                                 ],
+                                 'vnfd-ref': 'ubuntuvnf_vnfd',
+                                 'member-vnf-index-ref': '1',
+                                 'created-time': 1526044312.0999322,
+                                 'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
+                                 'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'}
+        common_db_client = CommonDbClient(self.config)
+        vim_account_id = common_db_client.get_vim_account_id('5ec3f571-d540-4cb0-9992-971d1b08312e', 1)
+        self.assertEqual(vim_account_id, 'c1740601-7287-48c8-a2c9-bce8fee459eb')
+
+    @mock.patch.object(dbmongo.DbMongo, "db_connect", mock.Mock())
+    @mock.patch.object(dbmongo.DbMongo, "get_one")
+    def test_get_vdur(self, get_one):
+        get_one.return_value = {'_id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
+                                '_admin': {
+                                    'projects_read': ['admin'], 'created': 1526044312.102287,
+                                    'modified': 1526044312.102287, 'projects_write': ['admin']
+                                },
+                                'vim-account-id': 'c1740601-7287-48c8-a2c9-bce8fee459eb',
+                                'nsr-id-ref': '5ec3f571-d540-4cb0-9992-971d1b08312e',
+                                'vdur': [
+                                    {
+                                        'internal-connection-point': [],
+                                        'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
+                                        'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
+                                        'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7',
+                                        'name': 'ubuntuvnf_vnfd-VM'
+                                    }
+                                ],
+                                'vnfd-ref': 'ubuntuvnf_vnfd',
+                                'member-vnf-index-ref': '1',
+                                'created-time': 1526044312.0999322,
+                                'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
+                                'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'}
+        common_db_client = CommonDbClient(self.config)
+        vdur = common_db_client.get_vdur('5ec3f571-d540-4cb0-9992-971d1b08312e', '1', 'ubuntuvnf_vnfd-VM')
+        expected_vdur = {
+            'internal-connection-point': [],
+            'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
+            'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
+            'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7',
+            'name': 'ubuntuvnf_vnfd-VM'
+        }
+
+        self.assertDictEqual(vdur, expected_vdur)
index 482e58b..0329c74 100644 (file)
 import unittest
 from unittest import mock
 
+from osm_mon.core.config import Config
+
 from osm_mon.core.database import VimCredentials, DatabaseManager
 
 
 class DatbaseManagerTest(unittest.TestCase):
     def setUp(self):
         super().setUp()
+        self.config = Config()
 
     @mock.patch.object(DatabaseManager, "get_credentials")
     def test_get_vim_type(self, get_credentials):
@@ -41,6 +44,6 @@ class DatbaseManagerTest(unittest.TestCase):
         mock_creds.type = 'openstack'
 
         get_credentials.return_value = mock_creds
-        database_manager = DatabaseManager()
+        database_manager = DatabaseManager(self.config)
         vim_type = database_manager.get_vim_type('test_id')
         self.assertEqual(vim_type, 'openstack')
diff --git a/osm_mon/tests/core/test_message_bus_client.py b/osm_mon/tests/core/test_message_bus_client.py
new file mode 100644 (file)
index 0000000..292fbe3
--- /dev/null
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import asyncio
+from unittest import TestCase, mock
+
+from osm_common.msgkafka import MsgKafka
+
+from osm_mon.core.message_bus_client import MessageBusClient
+from osm_mon.core.config import Config
+
+
+class TestMessageBusClient(TestCase):
+
+    def setUp(self):
+        self.config = Config()
+        self.config.set('message', 'driver', 'kafka')
+        self.loop = asyncio.new_event_loop()
+
+    @mock.patch.object(MsgKafka, 'aioread')
+    def test_aioread(self, aioread):
+        async def mock_callback():
+            pass
+
+        future = asyncio.Future(loop=self.loop)
+        future.set_result('mock')
+        aioread.return_value = future
+        msg_bus = MessageBusClient(self.config, loop=self.loop)
+        topic = 'test_topic'
+        self.loop.run_until_complete(msg_bus.aioread([topic], mock_callback))
+        aioread.assert_called_with(['test_topic'], self.loop, aiocallback=mock_callback)
+
+    @mock.patch.object(MsgKafka, 'aiowrite')
+    def test_aiowrite(self, aiowrite):
+        future = asyncio.Future(loop=self.loop)
+        future.set_result('mock')
+        aiowrite.return_value = future
+        msg_bus = MessageBusClient(self.config, loop=self.loop)
+        topic = 'test_topic'
+        key = 'test_key'
+        msg = {'test': 'test_msg'}
+        self.loop.run_until_complete(msg_bus.aiowrite(topic, key, msg))
+        aiowrite.assert_called_with(topic, key, msg, self.loop)
+
+    @mock.patch.object(MsgKafka, 'aioread')
+    def test_aioread_once(self, aioread):
+        future = asyncio.Future(loop=self.loop)
+        future.set_result('mock')
+        aioread.return_value = future
+        msg_bus = MessageBusClient(self.config, loop=self.loop)
+        topic = 'test_topic'
+        self.loop.run_until_complete(msg_bus.aioread_once(topic))
+        aioread.assert_called_with('test_topic', self.loop)
index 5c6d6ca..1ebe677 100644 (file)
@@ -29,6 +29,7 @@ peewee==3.1.*
 pyyaml==3.*
 prometheus_client==0.4.*
 gnocchiclient==7.0.*
-pyvcloud==19.1.1
-git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common
-git+https://osm.etsi.org/gerrit/osm/N2VC.git#egg=n2vc
+pymysql==0.9.*
+pyvcloud==19.1.*
+git+https://osm.etsi.org/gerrit/osm/common.git@v5.0#egg=osm-common
+git+https://osm.etsi.org/gerrit/osm/N2VC.git@v5.0#egg=n2vc