include requirements.txt
include README.rst
-recursive-include osm_mon *.py *.xml *.sh
+recursive-include osm_mon *.py *.xml *.sh *.yaml
recursive-include devops-stages *
recursive-include test *.py
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+
all: clean package
clean:
Supported Collector Plugins
***************************
- - OpenStack: Requires Aodh and Gnocchi to be enabled.
- - VROPS: TBD
+ - OpenStack: Requires Gnocchi to be enabled.
+ - VROPS
- AWS: TBD
Developers
#!/bin/bash
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+
echo "Installing python dependencies via pip..."
pip3 install aiokafka==0.4.*
pip3 install requests==2.18.*
RUN pip3 install /mon
-ENV BROKER_URI kafka:9092
-ENV MONGO_URI mongodb://mongo:27017
-ENV DATABASE sqlite:///mon_sqlite.db
-ENV OS_NOTIFIER_URI localhost:8662
-ENV OS_DEFAULT_GRANULARITY 300
-ENV OSMMON_REQUEST_TIMEOUT 10
-ENV OSMMON_LOG_LEVEL INFO
-ENV OSMMON_KAFKA_LOG_LEVEL INFO
+ENV OSMMON_MESSAGE_DRIVER kafka
+ENV OSMMON_MESSAGE_HOST kafka
+ENV OSMMON_MESSAGE_PORT 9092
+
+ENV OSMMON_DATABASE_DRIVER mongo
+ENV OSMMON_DATABASE_URI mongodb://mongo:27017
+
+ENV OSMMON_SQL_DATABASE_URI sqlite:///mon_sqlite.db
+ENV OSMMON_OPENSTACK_DEFAULT_GRANULARITY 300
+ENV OSMMON_GLOBAL_REQUEST_TIMEOUT 10
+ENV OSMMON_GLOBAL_LOGLEVEL INFO
ENV OSMMON_VCA_HOST localhost
ENV OSMMON_VCA_SECRET secret
ENV OSMMON_VCA_USER admin
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
+import argparse
import logging
import sys
from osm_mon.collector.collector import Collector
+from osm_mon.core.config import Config
from osm_mon.core.database import DatabaseManager
-from osm_mon.core.settings import Config
def main():
- cfg = Config.instance()
+ parser = argparse.ArgumentParser(prog='osm-policy-agent')
+ parser.add_argument('--config-file', nargs='?', help='POL configuration file')
+ args = parser.parse_args()
+ cfg = Config(args.config_file)
root = logging.getLogger()
- root.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+ root.setLevel(logging.getLevelName(cfg.get('global', 'loglevel')))
ch = logging.StreamHandler(sys.stdout)
- ch.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+ ch.setLevel(logging.getLevelName(cfg.get('global', 'loglevel')))
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p')
ch.setFormatter(formatter)
root.addHandler(ch)
- kafka_logger = logging.getLogger('kafka')
- kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
-
log = logging.getLogger(__name__)
log.info("Starting MON Collector...")
- log.debug("Config: %s", vars(cfg))
+ log.debug("Config: %s", cfg.conf)
log.info("Initializing database...")
- db_manager = DatabaseManager()
+ db_manager = DatabaseManager(cfg)
db_manager.create_tables()
log.info("Database initialized correctly.")
- collector = Collector()
+ collector = Collector(cfg)
collector.collect_forever()
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
+import argparse
import logging
import sys
+from osm_mon.core.config import Config
from osm_mon.core.database import DatabaseManager
-from osm_mon.core.settings import Config
from osm_mon.evaluator.evaluator import Evaluator
def main():
- cfg = Config.instance()
+ parser = argparse.ArgumentParser(prog='osm-policy-agent')
+ parser.add_argument('--config-file', nargs='?', help='POL configuration file')
+ args = parser.parse_args()
+ cfg = Config(args.config_file)
root = logging.getLogger()
- root.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+ root.setLevel(logging.getLevelName(cfg.get('global', 'loglevel')))
ch = logging.StreamHandler(sys.stdout)
- ch.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+ ch.setLevel(logging.getLevelName(cfg.get('global', 'loglevel')))
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p')
ch.setFormatter(formatter)
root.addHandler(ch)
- kafka_logger = logging.getLogger('kafka')
- kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
-
log = logging.getLogger(__name__)
log.info("Starting MON Evaluator...")
- log.debug("Config: %s", vars(cfg))
+ log.debug("Config: %s", cfg.conf)
log.info("Initializing database...")
- db_manager = DatabaseManager()
+ db_manager = DatabaseManager(cfg)
db_manager.create_tables()
log.info("Database initialized correctly.")
- evaluator = Evaluator()
+ evaluator = Evaluator(cfg)
evaluator.evaluate_forever()
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
+import argparse
import asyncio
import logging
import subprocess
import requests
from aiokafka import AIOKafkaConsumer
-from osm_mon.core.settings import Config
+from osm_mon.core.config import Config
log = logging.getLogger(__name__)
def main():
- # Check Kafka
+ parser = argparse.ArgumentParser(prog='osm-mon-healthcheck')
+ parser.add_argument('--config-file', nargs='?', help='MON configuration file')
+ args = parser.parse_args()
+ cfg = Config(args.config_file)
+
if not _processes_running():
sys.exit(1)
- if not _is_kafka_ok():
+ if not _is_kafka_ok(cfg.get('message', 'host'), cfg.get('message', 'port')):
sys.exit(1)
if not _is_prometheus_exporter_ok():
sys.exit(1)
if process_name in row:
return True
return False
+
processes_to_check = ['osm-mon-collector', 'osm-mon-evaluator', 'osm-mon-server']
ps = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE).communicate()[0]
processes_running = ps.decode().split('\n')
return False
-def _is_kafka_ok():
+def _is_kafka_ok(host, port):
async def _test_kafka(loop):
- cfg = Config.instance()
consumer = AIOKafkaConsumer(
'healthcheck',
- loop=loop, bootstrap_servers=cfg.BROKER_URI)
+ loop=loop, bootstrap_servers='{}:{}'.format(host, port))
await consumer.start()
await consumer.stop()
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
+import argparse
+import asyncio
import logging
import sys
+from osm_mon.core.config import Config
from osm_mon.core.database import DatabaseManager
-from osm_mon.core.settings import Config
from osm_mon.server.server import Server
def main():
- cfg = Config.instance()
+ parser = argparse.ArgumentParser(prog='osm-policy-agent')
+ parser.add_argument('--config-file', nargs='?', help='POL configuration file')
+ args = parser.parse_args()
+ cfg = Config(args.config_file)
root = logging.getLogger()
- root.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+ root.setLevel(logging.getLevelName(cfg.get('global', 'loglevel')))
ch = logging.StreamHandler(sys.stdout)
- ch.setLevel(logging.getLevelName(cfg.OSMMON_LOG_LEVEL))
+ ch.setLevel(logging.getLevelName(cfg.get('global', 'loglevel')))
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p')
ch.setFormatter(formatter)
root.addHandler(ch)
- kafka_logger = logging.getLogger('kafka')
- kafka_logger.setLevel(logging.getLevelName(cfg.OSMMON_KAFKA_LOG_LEVEL))
-
log = logging.getLogger(__name__)
- log.info("Starting MON Server...")
- log.debug("Config: %s", vars(cfg))
+ log.debug("Config: %s", cfg.conf)
log.info("Initializing database...")
- db_manager = DatabaseManager()
+ db_manager = DatabaseManager(cfg)
db_manager.create_tables()
+ log.info("Starting MON Server...")
log.info("Database initialized correctly.")
- server = Server()
+ loop = asyncio.get_event_loop()
+ server = Server(cfg, loop)
server.run()
import peewee
from osm_mon.collector.backends.prometheus import PrometheusBackend
-from osm_mon.collector.vnf_collectors.vmware import VMwareCollector
from osm_mon.collector.infra_collectors.openstack import OpenstackInfraCollector
from osm_mon.collector.metric import Metric
from osm_mon.collector.vnf_collectors.juju import VCACollector
from osm_mon.collector.vnf_collectors.openstack import OpenstackCollector
+from osm_mon.collector.vnf_collectors.vmware import VMwareCollector
from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
from osm_mon.core.database import DatabaseManager
-from osm_mon.core.settings import Config
log = logging.getLogger(__name__)
class Collector:
- def __init__(self):
- self.common_db = CommonDbClient()
+ def __init__(self, config: Config):
+ self.conf = config
+ self.common_db = CommonDbClient(self.conf)
self.plugins = []
- self.database_manager = DatabaseManager()
+ self.database_manager = DatabaseManager(self.conf)
self.database_manager.create_tables()
self.queue = multiprocessing.Queue()
self._init_backends()
def collect_forever(self):
log.debug('collect_forever')
- cfg = Config.instance()
while True:
try:
self.collect_metrics()
- time.sleep(cfg.OSMMON_COLLECTOR_INTERVAL)
+ time.sleep(int(self.conf.get('collector', 'interval')))
except peewee.PeeweeException:
log.exception("Database error consuming message: ")
raise
def _collect_vim_metrics(self, vnfr: dict, vim_account_id: str):
# TODO(diazb) Add support for vrops and aws
- database_manager = DatabaseManager()
+ database_manager = DatabaseManager(self.conf)
vim_type = database_manager.get_vim_type(vim_account_id)
if vim_type in VIM_COLLECTORS:
- collector = VIM_COLLECTORS[vim_type](vim_account_id)
+ collector = VIM_COLLECTORS[vim_type](self.conf, vim_account_id)
metrics = collector.collect(vnfr)
for metric in metrics:
self.queue.put(metric)
log.debug("vimtype %s is not supported.", vim_type)
def _collect_vim_infra_metrics(self, vim_account_id: str):
- database_manager = DatabaseManager()
+ database_manager = DatabaseManager(self.conf)
vim_type = database_manager.get_vim_type(vim_account_id)
if vim_type in VIM_INFRA_COLLECTORS:
- collector = VIM_INFRA_COLLECTORS[vim_type](vim_account_id)
+ collector = VIM_INFRA_COLLECTORS[vim_type](self.conf, vim_account_id)
status = collector.is_vim_ok()
status_metric = Metric({'vim_id': vim_account_id}, 'vim_status', status)
self.queue.put(status_metric)
def _collect_vca_metrics(self, vnfr: dict):
log.debug('_collect_vca_metrics')
log.debug('vnfr: %s', vnfr)
- vca_collector = VCACollector()
+ vca_collector = VCACollector(self.conf)
metrics = vca_collector.collect(vnfr)
for metric in metrics:
self.queue.put(metric)
processes.append(p)
p.start()
for process in processes:
- process.join()
+ process.join(timeout=10)
metrics = []
while not self.queue.empty():
metrics.append(self.queue.get())
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
+from osm_mon.core.config import Config
+
from osm_mon.collector.infra_collectors.base import BaseInfraCollector
class BaseVimInfraCollector(BaseInfraCollector):
- def __init__(self, vim_account_id: str):
+ def __init__(self, config: Config, vim_account_id: str):
pass
def is_vim_ok(self) -> bool:
from osm_mon.collector.infra_collectors.base_vim import BaseVimInfraCollector
from osm_mon.core.auth import AuthManager
+from osm_mon.core.config import Config
log = logging.getLogger(__name__)
class OpenstackInfraCollector(BaseVimInfraCollector):
- def __init__(self, vim_account_id: str):
- super().__init__(vim_account_id)
- self.auth_manager = AuthManager()
+ def __init__(self, config: Config, vim_account_id: str):
+ super().__init__(config, vim_account_id)
+ self.auth_manager = AuthManager(config)
self.keystone_client = self._build_keystone_client(vim_account_id)
def is_vim_ok(self) -> bool:
from typing import List
from osm_mon.collector.metric import Metric
+from osm_mon.core.config import Config
class BaseCollector:
+ def __init__(self, config: Config):
+ pass
+
def collect(self, vnfr: dict) -> List[Metric]:
pass
##
from osm_mon.collector.vnf_collectors.base import BaseCollector
+from osm_mon.core.config import Config
class BaseVimCollector(BaseCollector):
- def __init__(self, vim_account_id: str):
- pass
+ def __init__(self, config: Config, vim_account_id: str):
+ super().__init__(config)
from osm_mon.collector.vnf_collectors.base import BaseCollector
from osm_mon.collector.vnf_metric import VnfMetric
from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
from osm_mon.core.exceptions import VcaDeploymentInfoNotFound
-from osm_mon.core.settings import Config
log = logging.getLogger(__name__)
class VCACollector(BaseCollector):
- def __init__(self):
- cfg = Config.instance()
- self.common_db = CommonDbClient()
+ def __init__(self, config: Config):
+ super().__init__(config)
+ self.common_db = CommonDbClient(config)
self.loop = asyncio.get_event_loop()
- self.n2vc = N2VC(server=cfg.OSMMON_VCA_HOST, user=cfg.OSMMON_VCA_USER, secret=cfg.OSMMON_VCA_SECRET)
+ self.n2vc = N2VC(server=config.get('vca', 'host'), user=config.get('vca', 'user'),
+ secret=config.get('vca', 'secret'))
def collect(self, vnfr: dict) -> List[Metric]:
nsr_id = vnfr['nsr-id-ref']
from osm_mon.collector.vnf_metric import VnfMetric
from osm_mon.core.auth import AuthManager
from osm_mon.core.common_db import CommonDbClient
-from osm_mon.core.settings import Config
+from osm_mon.core.config import Config
log = logging.getLogger(__name__)
class OpenstackCollector(BaseVimCollector):
- def __init__(self, vim_account_id: str):
- super().__init__(vim_account_id)
- self.common_db = CommonDbClient()
- self.auth_manager = AuthManager()
+ def __init__(self, config: Config, vim_account_id: str):
+ super().__init__(config, vim_account_id)
+ self.conf = config
+ self.common_db = CommonDbClient(config)
+ self.auth_manager = AuthManager(config)
self.granularity = self._get_granularity(vim_account_id)
self.gnocchi_client = self._build_gnocchi_client(vim_account_id)
if 'granularity' in vim_config:
return int(vim_config['granularity'])
else:
- cfg = Config.instance()
- return cfg.OS_DEFAULT_GRANULARITY
+ return int(self.conf.get('openstack', 'default_granularity'))
def collect(self, vnfr: dict) -> List[Metric]:
nsr_id = vnfr['nsr-id-ref']
from osm_mon.collector.vnf_metric import VnfMetric
from osm_mon.core.auth import AuthManager
from osm_mon.core.common_db import CommonDbClient
-from osm_mon.core.settings import Config
+from osm_mon.core.config import Config
log = logging.getLogger(__name__)
class VMwareCollector(BaseVimCollector):
- def __init__(self, vim_account_id: str):
- super().__init__(vim_account_id)
- self.common_db = CommonDbClient()
- self.auth_manager = AuthManager()
- self.granularity = self._get_granularity(vim_account_id)
+ def __init__(self, config: Config, vim_account_id: str):
+ super().__init__(config, vim_account_id)
+ self.common_db = CommonDbClient(config)
+ self.auth_manager = AuthManager(config)
vim_account = self.get_vim_account(vim_account_id)
self.vrops_site = vim_account['vrops_site']
self.vrops_user = vim_account['vrops_user']
log.info("Logging into vCD org as admin.")
+ admin_user = None
try:
host = self.vcloud_site
admin_user = self.admin_username
return vim_account
- def _get_granularity(self, vim_account_id: str):
- creds = self.auth_manager.get_credentials(vim_account_id)
- vim_config = json.loads(creds.config)
- if 'granularity' in vim_config:
- return int(vim_config['granularity'])
- else:
- cfg = Config.instance()
- return cfg.OS_DEFAULT_GRANULARITY
-
def get_vm_moref_id(self, vapp_uuid):
"""
Method to get the moref_id of given VM
arg - vapp_uuid
return - VM mored_id
"""
+ vm_moref_id = None
try:
if vapp_uuid:
vm_details = self.get_vapp_details_rest(vapp_uuid)
import json
import logging
+from osm_mon.core.config import Config
+
from osm_mon.core.database import VimCredentials, DatabaseManager
log = logging.getLogger(__name__)
class AuthManager:
- def __init__(self):
- self.database_manager = DatabaseManager()
+ def __init__(self, config: Config):
+ self.database_manager = DatabaseManager(config)
def store_auth_credentials(self, creds_dict):
log.info(creds_dict)
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-from osm_common import dbmongo
+from osm_common import dbmongo, dbmemory
-from osm_mon.core.settings import Config
+from osm_mon.core.config import Config
class CommonDbClient:
- def __init__(self):
- cfg = Config.instance()
- self.common_db = dbmongo.DbMongo()
- self.common_db.db_connect({'uri': cfg.MONGO_URI,
- 'name': 'osm',
- 'commonkey': cfg.OSMMON_DATABASE_COMMONKEY})
+ def __init__(self, config: Config):
+ if config.get('database', 'driver') == "mongo":
+ self.common_db = dbmongo.DbMongo()
+ elif config.get('database', 'driver') == "memory":
+ self.common_db = dbmemory.DbMemory()
+ else:
+ raise Exception("Unknown database driver {}".format(config.get('section', 'driver')))
+ self.common_db.db_connect(config.get("database"))
def get_vnfr(self, nsr_id: str, member_index: int):
vnfr = self.common_db.get_one("vnfrs",
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+"""Global configuration managed by environment variables."""
+
+import logging
+import os
+
+import pkg_resources
+import yaml
+
+logger = logging.getLogger(__name__)
+
+
+class Config:
+ def __init__(self, config_file: str = ''):
+ self.conf = {}
+ self._read_config_file(config_file)
+ self._read_env()
+
+ def _read_config_file(self, config_file):
+ if not config_file:
+ path = 'mon.yaml'
+ config_file = pkg_resources.resource_filename(__name__, path)
+ with open(config_file) as f:
+ self.conf = yaml.load(f)
+
+ def get(self, section, field=None):
+ if not field:
+ return self.conf[section]
+ return self.conf[section][field]
+
+ def set(self, section, field, value):
+ if section not in self.conf:
+ self.conf[section] = {}
+ self.conf[section][field] = value
+
+ def _read_env(self):
+ for env in os.environ:
+ if not env.startswith("OSMMON_"):
+ continue
+ elements = env.lower().split("_")
+ if len(elements) < 3:
+ logger.warning(
+ "Environment variable %s=%s does not comply with required format. Section and/or field missing.",
+ env, os.getenv(env))
+ continue
+ section = elements[1]
+ field = '_'.join(elements[2:])
+ value = os.getenv(env)
+ if section not in self.conf:
+ self.conf[section] = {}
+ self.conf[section][field] = value
from peewee import CharField, TextField, FloatField, Model, AutoField, Proxy
from playhouse.db_url import connect
-from osm_mon.core.settings import Config
+from osm_mon.core.config import Config
log = logging.getLogger(__name__)
class DatabaseManager:
- def __init__(self):
- cfg = Config.instance()
- cfg.read_environ()
- db.initialize(connect(cfg.DATABASE))
+ def __init__(self, config: Config):
+ db.initialize(connect(config.get('sql', 'database_uri')))
def create_tables(self) -> None:
with db.atomic():
+++ /dev/null
-# Copyright 2017 Intel Research and Development Ireland Limited
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Intel Corporation
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
-##
+++ /dev/null
-from kafka import KafkaConsumer
-
-from osm_mon.core.settings import Config
-
-
-# noinspection PyAbstractClass
-class Consumer(KafkaConsumer):
- def __init__(self, group_id, **kwargs):
- cfg = Config.instance()
- super().__init__(bootstrap_servers=cfg.BROKER_URI,
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- max_poll_interval_ms=180000,
- group_id=group_id,
- **kwargs)
+++ /dev/null
-# Copyright 2018 Whitestack, LLC
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Whitestack, LLC
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: bdiaz@whitestack.com or glavado@whitestack.com
-##
-from kafka import KafkaProducer
-
-from osm_mon.core.settings import Config
-
-
-class Producer(KafkaProducer):
- def __init__(self):
- cfg = Config.instance()
- super().__init__(bootstrap_servers=cfg.BROKER_URI,
- key_serializer=str.encode,
- value_serializer=str.encode)
-
- def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
- return super().send(topic, value, key, partition, timestamp_ms)
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import asyncio
+from typing import List, Callable
+
+from osm_common import msgkafka, msglocal
+
+from osm_mon.core.config import Config
+
+
+class MessageBusClient:
+ def __init__(self, config: Config, loop=None):
+ if config.get('message', 'driver') == "local":
+ self.msg_bus = msglocal.MsgLocal()
+ elif config.get('message', 'driver') == "kafka":
+ self.msg_bus = msgkafka.MsgKafka()
+ else:
+ raise Exception("Unknown message bug driver {}".format(config.get('section', 'driver')))
+ self.msg_bus.connect(config.get('message'))
+ if not loop:
+ loop = asyncio.get_event_loop()
+ self.loop = loop
+
+ async def aioread(self, topics: List[str], callback: Callable = None, **kwargs):
+ """
+ Retrieves messages continuously from bus and executes callback for each message consumed.
+ :param topics: List of message bus topics to consume from.
+ :param callback: Async callback function to be called for each message received.
+ :param kwargs: Keyword arguments to be passed to callback function.
+ :return: None
+ """
+ await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs)
+
+ async def aiowrite(self, topic: str, key: str, msg: dict):
+ """
+ Writes message to bus.
+ :param topic: Topic to write to.
+ :param key: Key to write to.
+ :param msg: Dictionary containing message to be written.
+ :return: None
+ """
+ await self.msg_bus.aiowrite(topic, key, msg, self.loop)
+
+ async def aioread_once(self, topic: str):
+ """
+ Retrieves last message from bus.
+ :param topic: topic to retrieve message from.
+ :return: tuple(topic, key, message)
+ """
+ result = await self.msg_bus.aioread(topic, self.loop)
+ return result
--- /dev/null
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+
+global:
+ loglevel: INFO
+ request_timeout: 10
+
+database:
+ driver: mongo
+ uri: mongodb://mongo:27017
+ name: osm
+ commonkey: changeme
+
+message:
+ driver: kafka
+ host: kafka
+ port: 9092
+ group_id: mon-consumer
+
+sql:
+ database_uri: sqlite:///mon_sqlite.db
+
+collector:
+ interval: 30
+
+evaluator:
+ interval: 30
+
+prometheus:
+ url: http://prometheus:9090
+
+vca:
+ host: localhost
+ secret: secret
+ user: admin
+
+openstack:
+ default_granularity: 300
\ No newline at end of file
+++ /dev/null
-# Copyright 2017 Intel Research and Development Ireland Limited
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Intel Corporation
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
-##
-"""Global configuration managed by environment variables."""
-
-import logging
-import os
-
-from collections import namedtuple
-
-from osm_mon.core.singleton import Singleton
-
-import six
-
-__author__ = "Helena McGough"
-
-log = logging.getLogger(__name__)
-
-
-class BadConfigError(Exception):
- """Configuration exception."""
-
- pass
-
-
-class CfgParam(namedtuple('CfgParam', ['key', 'default', 'data_type'])):
- """Configuration parameter definition."""
-
- def value(self, data):
- """Convert a string to the parameter type."""
- try:
- return self.data_type(data)
- except (ValueError, TypeError):
- raise BadConfigError(
- 'Invalid value "%s" for configuration parameter "%s"' % (
- data, self.key))
-
-
-@Singleton
-class Config(object):
- """Configuration object."""
-
- _configuration = [
- CfgParam('BROKER_URI', "localhost:9092", six.text_type),
- CfgParam('MONGO_URI', "mongodb://mongo:27017", six.text_type),
- CfgParam('DATABASE', "sqlite:///mon_sqlite.db", six.text_type),
- CfgParam('OS_DEFAULT_GRANULARITY', 300, int),
- CfgParam('OSMMON_REQUEST_TIMEOUT', 10, int),
- CfgParam('OSMMON_LOG_LEVEL', "INFO", six.text_type),
- CfgParam('OSMMON_KAFKA_LOG_LEVEL', "WARN", six.text_type),
- CfgParam('OSMMON_COLLECTOR_INTERVAL', 30, int),
- CfgParam('OSMMON_EVALUATOR_INTERVAL', 30, int),
- CfgParam('OSMMON_VCA_HOST', "localhost", six.text_type),
- CfgParam('OSMMON_VCA_SECRET', "secret", six.text_type),
- CfgParam('OSMMON_VCA_USER', "admin", six.text_type),
- CfgParam('OSMMON_DATABASE_COMMONKEY', "changeme", six.text_type),
- CfgParam('OSMMON_PROMETHEUS_URL', "http://prometheus:9090", six.text_type),
- ]
-
- _config_dict = {cfg.key: cfg for cfg in _configuration}
- _config_keys = _config_dict.keys()
-
- def __init__(self):
- """Set the default values."""
- for cfg in self._configuration:
- setattr(self, cfg.key, cfg.default)
- self.read_environ()
-
- def read_environ(self):
- """Check the appropriate environment variables and update defaults."""
- for key in self._config_keys:
- try:
- val = self._config_dict[key].data_type(os.environ[key])
- setattr(self, key, val)
- except KeyError as exc:
- log.debug("Environment variable not present: %s", exc)
- return
+++ /dev/null
-# Copyright 2017 Intel Research and Development Ireland Limited
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Intel Corporation
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: helena.mcgough@intel.com or adrian.hoban@intel.com
-##
-"""Simple singleton class."""
-
-from __future__ import unicode_literals
-
-
-class Singleton(object):
- """Simple singleton class."""
-
- def __init__(self, decorated):
- """Initialize singleton instance."""
- self._decorated = decorated
-
- def instance(self):
- """Return singleton instance."""
- try:
- return self._instance
- except AttributeError:
- self._instance = self._decorated()
- return self._instance
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-import json
+import asyncio
import logging
import multiprocessing
import time
from osm_mon.collector.backends.prometheus import OSM_METRIC_PREFIX
from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
from osm_mon.core.database import DatabaseManager, Alarm
-from osm_mon.core.message_bus.producer import Producer
+from osm_mon.core.message_bus_client import MessageBusClient
from osm_mon.core.response import ResponseBuilder
-from osm_mon.core.settings import Config
log = logging.getLogger(__name__)
class Evaluator:
- def __init__(self):
- self.common_db = CommonDbClient()
+ def __init__(self, config: Config, loop=None):
+ self.conf = config
+ if not loop:
+ loop = asyncio.get_event_loop()
+ self.loop = loop
+ self.common_db = CommonDbClient(self.conf)
self.plugins = []
- self.database_manager = DatabaseManager()
+ self.database_manager = DatabaseManager(self.conf)
self.database_manager.create_tables()
self.queue = multiprocessing.Queue()
+ self.msg_bus = MessageBusClient(config)
def _evaluate_metric(self,
nsr_id: str,
alarm: Alarm):
log.debug("_evaluate_metric")
# TODO: Refactor to fit backend plugin model
- cfg = Config.instance()
query_section = "query={0}{{ns_id=\"{1}\",vdu_name=\"{2}\",vnf_member_index=\"{3}\"}}".format(
OSM_METRIC_PREFIX + metric_name, nsr_id, vdur_name, vnf_member_index)
- request_url = cfg.OSMMON_PROMETHEUS_URL + "/api/v1/query?" + query_section
+ request_url = self.conf.get('prometheus', 'url') + "/api/v1/query?" + query_section
log.info("Querying Prometheus: %s", request_url)
- r = requests.get(request_url, timeout=cfg.OSMMON_REQUEST_TIMEOUT)
+ r = requests.get(request_url, timeout=int(self.conf.get('global', 'request_timeout')))
if r.status_code == 200:
json_response = r.json()
if json_response['status'] == 'success':
def evaluate_forever(self):
log.debug('evaluate_forever')
- cfg = Config.instance()
while True:
try:
self.evaluate()
- time.sleep(cfg.OSMMON_EVALUATOR_INTERVAL)
+ time.sleep(int(self.conf.get('evaluator', 'interval')))
except peewee.PeeweeException:
log.exception("Database error evaluating alarms: ")
raise
p.start()
for process in processes:
- process.join()
+ process.join(timeout=10)
triggered_alarms = []
while not self.queue.empty():
triggered_alarms.append(self.queue.get())
for alarm in triggered_alarms:
- self.notify_alarm(alarm)
p = multiprocessing.Process(target=self.notify_alarm,
args=(alarm,))
p.start()
sev=alarm.severity,
status='alarm',
date=now)
- producer = Producer()
- producer.send(topic='alarm_response', key='notify_alarm', value=json.dumps(resp_message))
- producer.flush()
+ self.loop.run_until_complete(self.msg_bus.aiowrite('alarm_response', 'notify_alarm', resp_message))
log.info("Sent alarm notification: %s", resp_message)
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
import asyncio
import json
import logging
-from json import JSONDecodeError
-
-import yaml
-from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from osm_mon.core.auth import AuthManager
from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
from osm_mon.core.database import DatabaseManager
+from osm_mon.core.message_bus_client import MessageBusClient
from osm_mon.core.response import ResponseBuilder
-from osm_mon.core.settings import Config
log = logging.getLogger(__name__)
class Server:
- def __init__(self, loop=None):
- cfg = Config.instance()
+ def __init__(self, config: Config, loop=None):
+ self.conf = config
if not loop:
loop = asyncio.get_event_loop()
self.loop = loop
- self.auth_manager = AuthManager()
- self.database_manager = DatabaseManager()
+ self.auth_manager = AuthManager(config)
+ self.database_manager = DatabaseManager(config)
self.database_manager.create_tables()
- self.common_db = CommonDbClient()
- self.kafka_server = cfg.BROKER_URI
+ self.common_db = CommonDbClient(config)
+ self.msg_bus = MessageBusClient(config)
def run(self):
self.loop.run_until_complete(self.start())
async def start(self):
- consumer = AIOKafkaConsumer(
+ topics = [
"vim_account",
- "alarm_request",
- loop=self.loop,
- bootstrap_servers=self.kafka_server,
- group_id="mon-server",
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- )
- await consumer.start()
- try:
- async for message in consumer:
- log.info("Message arrived: %s", message)
- await self.consume_message(message)
- finally:
- await consumer.stop()
+ "alarm_request"
+ ]
+ await self.msg_bus.aioread(topics, self._process_msg)
- async def consume_message(self, message):
+ async def _process_msg(self, topic, key, values):
+ log.info("Message arrived: %s", values)
try:
- try:
- values = json.loads(message.value)
- except JSONDecodeError:
- values = yaml.safe_load(message.value)
-
- if message.topic == "vim_account":
- if message.key == "create" or message.key == "edit":
+ if topic == "vim_account":
+ if key == "create" or key == "edit":
values['vim_password'] = self.common_db.decrypt_vim_password(values['vim_password'],
values['schema_version'],
values['_id'])
values['_id'])
self.auth_manager.store_auth_credentials(values)
- if message.key == "delete":
+ if key == "delete":
self.auth_manager.delete_auth_credentials(values)
- elif message.topic == "alarm_request":
- if message.key == "create_alarm_request":
+ elif topic == "alarm_request":
+ if key == "create_alarm_request":
alarm_details = values['alarm_create_request']
cor_id = alarm_details['correlation_id']
response_builder = ResponseBuilder()
alarm_id=None)
await self._publish_response('alarm_response_' + str(cor_id), 'create_alarm_response', response)
- if message.key == "delete_alarm_request":
+ if key == "delete_alarm_request":
alarm_details = values['alarm_delete_request']
alarm_uuid = alarm_details['alarm_uuid']
response_builder = ResponseBuilder()
log.exception("Exception processing message: ")
async def _publish_response(self, topic: str, key: str, msg: dict):
- producer = AIOKafkaProducer(loop=self.loop,
- bootstrap_servers=self.kafka_server,
- key_serializer=str.encode,
- value_serializer=str.encode)
- await producer.start()
log.info("Sending response %s to topic %s with key %s", json.dumps(msg), topic, key)
- try:
- await producer.send_and_wait(topic, key=key, value=json.dumps(msg))
- finally:
- await producer.stop()
-
-
-if __name__ == '__main__':
- Server().run()
+ await self.msg_bus.aiowrite(topic, key, msg)
from osm_mon.collector.collector import Collector
from osm_mon.collector.vnf_collectors.openstack import OpenstackCollector
+from osm_mon.core.config import Config
from osm_mon.core.database import DatabaseManager, db
class CollectorTest(unittest.TestCase):
def setUp(self):
super().setUp()
- os.environ["DATABASE"] = "sqlite:///:memory:"
- db_manager = DatabaseManager()
+ os.environ["OSMMON_SQL_DATABASE_URI"] = "sqlite:///:memory:"
+ self.config = Config()
+ db_manager = DatabaseManager(self.config)
db_manager.create_tables()
def tearDown(self):
@mock.patch.object(DatabaseManager, "get_vim_type")
def test_init_vim_collector_and_collect_openstack(self, _get_vim_type, collect):
_get_vim_type.return_value = 'openstack'
- collector = Collector()
+ collector = Collector(self.config)
collector._collect_vim_metrics({}, 'test_vim_account_id')
collect.assert_called_once_with({})
@mock.patch.object(DatabaseManager, "get_vim_type")
def test_init_vim_collector_and_collect_unknown(self, _get_vim_type, openstack_collect):
_get_vim_type.return_value = 'unknown'
- collector = Collector()
+ collector = Collector(self.config)
collector._collect_vim_metrics({}, 'test_vim_account_id')
openstack_collect.assert_not_called()
@mock.patch("osm_mon.collector.collector.CommonDbClient", mock.Mock())
@mock.patch("osm_mon.collector.collector.VCACollector", autospec=True)
def test_collect_vca_metrics(self, vca_collector):
- collector = Collector()
+ collector = Collector(self.config)
collector._collect_vca_metrics({})
- vca_collector.assert_called_once_with()
+ vca_collector.assert_called_once_with(self.config)
vca_collector.return_value.collect.assert_called_once_with({})
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-# Copyright 2018 Whitestack, LLC
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Whitestack, LLC
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: bdiaz@whitestack.com or glavado@whitestack.com
-##
-import unittest
-from unittest import mock
-
-from osm_common import dbmongo
-
-from osm_mon.core.common_db import CommonDbClient
-
-
-class CommonDbClientTest(unittest.TestCase):
- def setUp(self):
- super().setUp()
-
- @mock.patch.object(dbmongo.DbMongo, "db_connect", mock.Mock())
- @mock.patch.object(CommonDbClient, "get_vnfr")
- def test_get_vim_id(self, get_vnfr):
- get_vnfr.return_value = {'_id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
- '_admin': {
- 'projects_read': ['admin'], 'created': 1526044312.102287,
- 'modified': 1526044312.102287, 'projects_write': ['admin']
- },
- 'vim-account-id': 'c1740601-7287-48c8-a2c9-bce8fee459eb',
- 'nsr-id-ref': '5ec3f571-d540-4cb0-9992-971d1b08312e',
- 'vdur': [
- {
- 'internal-connection-point': [],
- 'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
- 'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
- 'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7',
- 'name': 'ubuntuvnf_vnfd-VM'
- }
- ],
- 'vnfd-ref': 'ubuntuvnf_vnfd',
- 'member-vnf-index-ref': '1',
- 'created-time': 1526044312.0999322,
- 'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
- 'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'}
- common_db_client = CommonDbClient()
- vim_account_id = common_db_client.get_vim_account_id('5ec3f571-d540-4cb0-9992-971d1b08312e', 1)
- self.assertEqual(vim_account_id, 'c1740601-7287-48c8-a2c9-bce8fee459eb')
-
- @mock.patch.object(dbmongo.DbMongo, "db_connect", mock.Mock())
- @mock.patch.object(dbmongo.DbMongo, "get_one")
- def test_get_vdur(self, get_one):
- get_one.return_value = {'_id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
- '_admin': {
- 'projects_read': ['admin'], 'created': 1526044312.102287,
- 'modified': 1526044312.102287, 'projects_write': ['admin']
- },
- 'vim-account-id': 'c1740601-7287-48c8-a2c9-bce8fee459eb',
- 'nsr-id-ref': '5ec3f571-d540-4cb0-9992-971d1b08312e',
- 'vdur': [
- {
- 'internal-connection-point': [],
- 'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
- 'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
- 'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7',
- 'name': 'ubuntuvnf_vnfd-VM'
- }
- ],
- 'vnfd-ref': 'ubuntuvnf_vnfd',
- 'member-vnf-index-ref': '1',
- 'created-time': 1526044312.0999322,
- 'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
- 'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'}
- common_db_client = CommonDbClient()
- vdur = common_db_client.get_vdur('5ec3f571-d540-4cb0-9992-971d1b08312e', '1', 'ubuntuvnf_vnfd-VM')
- expected_vdur = {
- 'internal-connection-point': [],
- 'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
- 'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
- 'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7',
- 'name': 'ubuntuvnf_vnfd-VM'
- }
-
- self.assertDictEqual(vdur, expected_vdur)
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import unittest
+from unittest import mock
+
+from osm_common import dbmongo
+
+from osm_mon.core.common_db import CommonDbClient
+from osm_mon.core.config import Config
+
+
+class CommonDbClientTest(unittest.TestCase):
+ def setUp(self):
+ super().setUp()
+ self.config = Config()
+
+ @mock.patch.object(dbmongo.DbMongo, "db_connect", mock.Mock())
+ @mock.patch.object(CommonDbClient, "get_vnfr")
+ def test_get_vim_id(self, get_vnfr):
+ get_vnfr.return_value = {'_id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
+ '_admin': {
+ 'projects_read': ['admin'], 'created': 1526044312.102287,
+ 'modified': 1526044312.102287, 'projects_write': ['admin']
+ },
+ 'vim-account-id': 'c1740601-7287-48c8-a2c9-bce8fee459eb',
+ 'nsr-id-ref': '5ec3f571-d540-4cb0-9992-971d1b08312e',
+ 'vdur': [
+ {
+ 'internal-connection-point': [],
+ 'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
+ 'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
+ 'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7',
+ 'name': 'ubuntuvnf_vnfd-VM'
+ }
+ ],
+ 'vnfd-ref': 'ubuntuvnf_vnfd',
+ 'member-vnf-index-ref': '1',
+ 'created-time': 1526044312.0999322,
+ 'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
+ 'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'}
+ common_db_client = CommonDbClient(self.config)
+ vim_account_id = common_db_client.get_vim_account_id('5ec3f571-d540-4cb0-9992-971d1b08312e', 1)
+ self.assertEqual(vim_account_id, 'c1740601-7287-48c8-a2c9-bce8fee459eb')
+
+ @mock.patch.object(dbmongo.DbMongo, "db_connect", mock.Mock())
+ @mock.patch.object(dbmongo.DbMongo, "get_one")
+ def test_get_vdur(self, get_one):
+ get_one.return_value = {'_id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
+ '_admin': {
+ 'projects_read': ['admin'], 'created': 1526044312.102287,
+ 'modified': 1526044312.102287, 'projects_write': ['admin']
+ },
+ 'vim-account-id': 'c1740601-7287-48c8-a2c9-bce8fee459eb',
+ 'nsr-id-ref': '5ec3f571-d540-4cb0-9992-971d1b08312e',
+ 'vdur': [
+ {
+ 'internal-connection-point': [],
+ 'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
+ 'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
+ 'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7',
+ 'name': 'ubuntuvnf_vnfd-VM'
+ }
+ ],
+ 'vnfd-ref': 'ubuntuvnf_vnfd',
+ 'member-vnf-index-ref': '1',
+ 'created-time': 1526044312.0999322,
+ 'vnfd-id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01',
+ 'id': 'a314c865-aee7-4d9b-9c9d-079d7f857f01'}
+ common_db_client = CommonDbClient(self.config)
+ vdur = common_db_client.get_vdur('5ec3f571-d540-4cb0-9992-971d1b08312e', '1', 'ubuntuvnf_vnfd-VM')
+ expected_vdur = {
+ 'internal-connection-point': [],
+ 'vdu-id-ref': 'ubuntuvnf_vnfd-VM',
+ 'id': 'ffd73f33-c8bb-4541-a977-44dcc3cbe28d',
+ 'vim-id': '27042672-5190-4209-b844-95bbaeea7ea7',
+ 'name': 'ubuntuvnf_vnfd-VM'
+ }
+
+ self.assertDictEqual(vdur, expected_vdur)
import unittest
from unittest import mock
+from osm_mon.core.config import Config
+
from osm_mon.core.database import VimCredentials, DatabaseManager
class DatbaseManagerTest(unittest.TestCase):
def setUp(self):
super().setUp()
+ self.config = Config()
@mock.patch.object(DatabaseManager, "get_credentials")
def test_get_vim_type(self, get_credentials):
mock_creds.type = 'openstack'
get_credentials.return_value = mock_creds
- database_manager = DatabaseManager()
+ database_manager = DatabaseManager(self.config)
vim_type = database_manager.get_vim_type('test_id')
self.assertEqual(vim_type, 'openstack')
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import asyncio
+from unittest import TestCase, mock
+
+from osm_common.msgkafka import MsgKafka
+
+from osm_mon.core.message_bus_client import MessageBusClient
+from osm_mon.core.config import Config
+
+
+class TestMessageBusClient(TestCase):
+
+ def setUp(self):
+ self.config = Config()
+ self.config.set('message', 'driver', 'kafka')
+ self.loop = asyncio.new_event_loop()
+
+ @mock.patch.object(MsgKafka, 'aioread')
+ def test_aioread(self, aioread):
+ async def mock_callback():
+ pass
+
+ future = asyncio.Future(loop=self.loop)
+ future.set_result('mock')
+ aioread.return_value = future
+ msg_bus = MessageBusClient(self.config, loop=self.loop)
+ topic = 'test_topic'
+ self.loop.run_until_complete(msg_bus.aioread([topic], mock_callback))
+ aioread.assert_called_with(['test_topic'], self.loop, aiocallback=mock_callback)
+
+ @mock.patch.object(MsgKafka, 'aiowrite')
+ def test_aiowrite(self, aiowrite):
+ future = asyncio.Future(loop=self.loop)
+ future.set_result('mock')
+ aiowrite.return_value = future
+ msg_bus = MessageBusClient(self.config, loop=self.loop)
+ topic = 'test_topic'
+ key = 'test_key'
+ msg = {'test': 'test_msg'}
+ self.loop.run_until_complete(msg_bus.aiowrite(topic, key, msg))
+ aiowrite.assert_called_with(topic, key, msg, self.loop)
+
+ @mock.patch.object(MsgKafka, 'aioread')
+ def test_aioread_once(self, aioread):
+ future = asyncio.Future(loop=self.loop)
+ future.set_result('mock')
+ aioread.return_value = future
+ msg_bus = MessageBusClient(self.config, loop=self.loop)
+ topic = 'test_topic'
+ self.loop.run_until_complete(msg_bus.aioread_once(topic))
+ aioread.assert_called_with('test_topic', self.loop)
pyyaml==3.*
prometheus_client==0.4.*
gnocchiclient==7.0.*
-pyvcloud==19.1.1
-git+https://osm.etsi.org/gerrit/osm/common.git#egg=osm-common
-git+https://osm.etsi.org/gerrit/osm/N2VC.git#egg=n2vc
+pymysql==0.9.*
+pyvcloud==19.1.*
+git+https://osm.etsi.org/gerrit/osm/common.git@v5.0#egg=osm-common
+git+https://osm.etsi.org/gerrit/osm/N2VC.git@v5.0#egg=n2vc