From a14cf16181c8b39f12c872c486e0b292c0068944 Mon Sep 17 00:00:00 2001 From: Benjamin Diaz Date: Fri, 1 Feb 2019 13:31:47 -0300 Subject: [PATCH] Replaces direct use of aiokafka with osm_common message bus in agent and lcmclient Changes config handling to comply with the way it is handled in other modules, by using a config file and overriding it with env vars. Adds unit tests for message_bus_client. Mon client remains using aiokafka directly, as there is no support yet for auto_offset_reset configuration in osm_common. Change-Id: I99615287cc934ce310105e86544a6bfe26bc0673 Signed-off-by: Benjamin Diaz --- MANIFEST.in | 2 +- debian/python3-osm-policy-module.postinst | 23 ++++ docker/Dockerfile | 3 +- osm_policy_module/cmd/policy_module_agent.py | 36 +++--- osm_policy_module/common/common_db_client.py | 18 ++- osm_policy_module/common/lcm_client.py | 29 ++--- .../common/message_bus_client.py | 72 ++++++++++++ osm_policy_module/common/mon_client.py | 7 +- osm_policy_module/core/agent.py | 70 ++++++------ osm_policy_module/core/config.py | 106 ++++++++---------- osm_policy_module/core/database.py | 17 +-- osm_policy_module/core/pol.yaml | 38 +++++++ .../tests/integration/test_kafka_messages.py | 7 +- .../tests/integration/test_policy_agent.py | 5 +- .../unit/common/__init__.py} | 19 ---- .../unit/common/test_message_bus_client.py | 73 ++++++++++++ .../tests/unit/core/test_policy_agent.py | 20 +++- setup.py | 4 +- 18 files changed, 362 insertions(+), 187 deletions(-) create mode 100644 osm_policy_module/common/message_bus_client.py create mode 100644 osm_policy_module/core/pol.yaml rename osm_policy_module/{core/singleton.py => tests/unit/common/__init__.py} (65%) create mode 100644 osm_policy_module/tests/unit/common/test_message_bus_client.py diff --git a/MANIFEST.in b/MANIFEST.in index 9dbb8cd..a2e97ec 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -23,5 +23,5 @@ include requirements.txt include test-requirements.txt include README.rst -recursive-include osm_policy_module *.py *.xml *.sh +recursive-include osm_policy_module *.py *.xml *.sh *.yaml recursive-include devops-stages * \ No newline at end of file diff --git a/debian/python3-osm-policy-module.postinst b/debian/python3-osm-policy-module.postinst index cd57d53..253eaaa 100644 --- a/debian/python3-osm-policy-module.postinst +++ b/debian/python3-osm-policy-module.postinst @@ -1,9 +1,32 @@ #!/bin/bash +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## + echo "Installing python dependencies via pip..." pip3 install aiokafka==0.4.* pip3 install peewee==3.1.* pip3 install jsonschema==2.6.* pip3 install six==1.11.* pip3 install pyyaml==3.* +pip3 install pymysql echo "Installation of python dependencies finished" \ No newline at end of file diff --git a/docker/Dockerfile b/docker/Dockerfile index 91bc5f6..7f57036 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -45,7 +45,6 @@ ENV OSMPOL_DATABASE_URI mongodb://mongo:27017 ENV OSMPOL_SQL_DATABASE_URI sqlite:///policy_module.db -ENV OSMPOL_LOG_LEVEL INFO -ENV OSMPOL_KAFKA_LOG_LEVEL WARN +ENV OSMPOL_GLOBAL_LOG_LEVEL INFO CMD osm-policy-agent diff --git a/osm_policy_module/cmd/policy_module_agent.py b/osm_policy_module/cmd/policy_module_agent.py index 49c7d3a..050cf32 100644 --- a/osm_policy_module/cmd/policy_module_agent.py +++ b/osm_policy_module/cmd/policy_module_agent.py @@ -21,6 +21,7 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## +import argparse import asyncio import logging import sys @@ -31,27 +32,28 @@ from osm_policy_module.core.database import DatabaseManager def main(): - cfg = Config.instance() - log_formatter_str = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' - logging.basicConfig(stream=sys.stdout, - format=log_formatter_str, - datefmt='%m/%d/%Y %I:%M:%S %p', - level=logging.getLevelName(cfg.OSMPOL_LOG_LEVEL)) - kafka_logger = logging.getLogger('kafka') - kafka_logger.setLevel(logging.getLevelName(cfg.OSMPOL_KAFKA_LOG_LEVEL)) - kafka_formatter = logging.Formatter(log_formatter_str) - kafka_handler = logging.StreamHandler(sys.stdout) - kafka_handler.setFormatter(kafka_formatter) - kafka_logger.addHandler(kafka_handler) + parser = argparse.ArgumentParser(prog='osm-policy-agent') + parser.add_argument('--config-file', nargs='?', help='POL configuration file') + args = parser.parse_args() + cfg = Config(args.config_file) + + root = logging.getLogger() + root.setLevel(logging.getLevelName(cfg.get('global', 'loglevel'))) + ch = logging.StreamHandler(sys.stdout) + ch.setLevel(logging.getLevelName(cfg.get('global', 'loglevel'))) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p') + ch.setFormatter(formatter) + root.addHandler(ch) + log = logging.getLogger(__name__) - log.info("Config: %s", vars(cfg)) - log.info("Syncing database...") + log.debug("Config: %s", cfg.conf) + log.info("Initializing database...") db_manager = DatabaseManager() - db_manager.create_tables() - log.info("Database synced correctly.") + db_manager.init_db(cfg) + log.info("Database initialized correctly.") log.info("Starting policy module agent...") loop = asyncio.get_event_loop() - agent = PolicyModuleAgent(loop) + agent = PolicyModuleAgent(cfg, loop) agent.run() diff --git a/osm_policy_module/common/common_db_client.py b/osm_policy_module/common/common_db_client.py index 2be3693..5731155 100644 --- a/osm_policy_module/common/common_db_client.py +++ b/osm_policy_module/common/common_db_client.py @@ -21,18 +21,21 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## -from osm_common import dbmongo +from osm_common import dbmongo, dbmemory from osm_policy_module.core.config import Config from osm_policy_module.core.exceptions import VdurNotFound class CommonDbClient: - def __init__(self): - cfg = Config.instance() - self.common_db = dbmongo.DbMongo() - self.common_db.db_connect({'uri': cfg.OSMPOL_DATABASE_URI, - 'name': 'osm'}) + def __init__(self, config: Config): + if config.get('database', 'driver') == "mongo": + self.common_db = dbmongo.DbMongo() + elif config.get('database', 'driver') == "memory": + self.common_db = dbmemory.DbMemory() + else: + raise Exception("Unknown database driver {}".format(config.get('section', 'driver'))) + self.common_db.db_connect(config.get("database")) def get_vnfr(self, nsr_id: str, member_index: int): vnfr = self.common_db.get_one("vnfrs", @@ -65,3 +68,6 @@ class CommonDbClient: return vdur raise VdurNotFound('vdur not found for nsr-id %s, member_index %s and vdur_name %s', nsr_id, member_index, vdur_name) + + def create_nslcmop(self, nslcmop): + self.common_db.create("nslcmops", nslcmop) diff --git a/osm_policy_module/common/lcm_client.py b/osm_policy_module/common/lcm_client.py index 05378d7..2efa241 100644 --- a/osm_policy_module/common/lcm_client.py +++ b/osm_policy_module/common/lcm_client.py @@ -28,22 +28,17 @@ import logging import time import uuid -from aiokafka import AIOKafkaProducer -from osm_common import dbmongo - +from osm_policy_module.common.common_db_client import CommonDbClient +from osm_policy_module.common.message_bus_client import MessageBusClient from osm_policy_module.core.config import Config log = logging.getLogger(__name__) class LcmClient: - def __init__(self, loop=None): - cfg = Config.instance() - self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST, - cfg.OSMPOL_MESSAGE_PORT) - self.common_db = dbmongo.DbMongo() - self.common_db.db_connect({'uri': cfg.OSMPOL_DATABASE_URI, - 'name': 'osm'}) + def __init__(self, config: Config, loop=None): + self.db_client = CommonDbClient(config) + self.msg_bus = MessageBusClient(config) if not loop: loop = asyncio.get_event_loop() self.loop = loop @@ -51,19 +46,9 @@ class LcmClient: async def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str): log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action) nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action) - self.common_db.create("nslcmops", nslcmop) + self.db_client.create_nslcmop(nslcmop) log.info("Sending scale action message: %s", json.dumps(nslcmop)) - producer = AIOKafkaProducer(loop=self.loop, - bootstrap_servers=self.kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) - await producer.start() - try: - # Produce message - await producer.send_and_wait("ns", key="scale", value=json.dumps(nslcmop)) - finally: - # Wait for all pending messages to be delivered or expire. - await producer.stop() + await self.msg_bus.aiowrite("ns", "scale", nslcmop) def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str): log.debug("_generate_nslcmop %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action) diff --git a/osm_policy_module/common/message_bus_client.py b/osm_policy_module/common/message_bus_client.py new file mode 100644 index 0000000..ea5095d --- /dev/null +++ b/osm_policy_module/common/message_bus_client.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- + +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## +import asyncio +from typing import List, Callable + +from osm_common import msgkafka, msglocal + +from osm_policy_module.core.config import Config + + +class MessageBusClient: + def __init__(self, config: Config, loop=None): + if config.get('message', 'driver') == "local": + self.msg_bus = msglocal.MsgLocal() + elif config.get('message', 'driver') == "kafka": + self.msg_bus = msgkafka.MsgKafka() + else: + raise Exception("Unknown message bug driver {}".format(config.get('section', 'driver'))) + self.msg_bus.connect(config.get('message')) + if not loop: + loop = asyncio.get_event_loop() + self.loop = loop + + async def aioread(self, topics: List[str], callback: Callable = None, **kwargs): + """ + Retrieves messages continuously from bus and executes callback for each message consumed. + :param topics: List of message bus topics to consume from. + :param callback: Async callback function to be called for each message received. + :param kwargs: Keyword arguments to be passed to callback function. + :return: None + """ + await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs) + + async def aiowrite(self, topic: str, key: str, msg: dict): + """ + Writes message to bus. + :param topic: Topic to write to. + :param key: Key to write to. + :param msg: Dictionary containing message to be written. + :return: None + """ + await self.msg_bus.aiowrite(topic, key, msg, self.loop) + + async def aioread_once(self, topic: str): + """ + Retrieves last message from bus. + :param topic: topic to retrieve message from. + :return: tuple(topic, key, message) + """ + result = await self.msg_bus.aioread(topic, self.loop) + return result diff --git a/osm_policy_module/common/mon_client.py b/osm_policy_module/common/mon_client.py index 5124ed5..76a6f52 100644 --- a/osm_policy_module/common/mon_client.py +++ b/osm_policy_module/common/mon_client.py @@ -36,10 +36,9 @@ log = logging.getLogger(__name__) class MonClient: - def __init__(self, loop=None): - cfg = Config.instance() - self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST, - cfg.OSMPOL_MESSAGE_PORT) + def __init__(self, config: Config, loop=None): + self.kafka_server = '{}:{}'.format(config.get('message', 'host'), + config.get('message', 'port')) if not loop: loop = asyncio.get_event_loop() self.loop = loop diff --git a/osm_policy_module/core/agent.py b/osm_policy_module/core/agent.py index 205b98c..482f911 100644 --- a/osm_policy_module/core/agent.py +++ b/osm_policy_module/core/agent.py @@ -25,14 +25,12 @@ import asyncio import datetime import json import logging -from json import JSONDecodeError import peewee -import yaml -from aiokafka import AIOKafkaConsumer from osm_policy_module.common.common_db_client import CommonDbClient from osm_policy_module.common.lcm_client import LcmClient +from osm_policy_module.common.message_bus_client import MessageBusClient from osm_policy_module.common.mon_client import MonClient from osm_policy_module.core import database from osm_policy_module.core.config import Config @@ -46,57 +44,45 @@ ALLOWED_KAFKA_KEYS = ['instantiated', 'scaled', 'terminated', 'notify_alarm'] class PolicyModuleAgent: - def __init__(self, loop=None): - cfg = Config.instance() + def __init__(self, config: Config, loop=None): + self.conf = config if not loop: loop = asyncio.get_event_loop() self.loop = loop - self.db_client = CommonDbClient() - self.mon_client = MonClient(loop=self.loop) - self.lcm_client = LcmClient(loop=self.loop) - self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST, - cfg.OSMPOL_MESSAGE_PORT) + self.db_client = CommonDbClient(config) + self.mon_client = MonClient(config, loop=self.loop) + self.lcm_client = LcmClient(config, loop=self.loop) self.database_manager = DatabaseManager() + self.msg_bus = MessageBusClient(config) def run(self): self.loop.run_until_complete(self.start()) async def start(self): - consumer = AIOKafkaConsumer( + topics = [ "ns", - "alarm_response", - loop=self.loop, - bootstrap_servers=self.kafka_server, - group_id="pol-consumer", - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - ) - await consumer.start() - try: - async for msg in consumer: - log.info("Message arrived: %s", msg) - await self._process_msg(msg.topic, msg.key, msg.value) - finally: - await consumer.stop() + "alarm_response" + ] + await self.msg_bus.aioread(topics, self._process_msg) log.critical("Exiting...") async def _process_msg(self, topic, key, msg): log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg) + log.info("Message arrived: %s", msg) try: if key in ALLOWED_KAFKA_KEYS: - try: - content = json.loads(msg) - except JSONDecodeError: - content = yaml.safe_load(msg) - if key == 'instantiated' or key == 'scaled': - await self._handle_instantiated_or_scaled(content) + if key == 'instantiated': + await self._handle_instantiated(msg) + + if key == 'scaled': + await self._handle_scaled(msg) if key == 'terminated': - await self._handle_terminated(content) + await self._handle_terminated(msg) if key == 'notify_alarm': - await self._handle_alarm_notification(content) + await self._handle_alarm_notification(msg) else: log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key) except peewee.PeeweeException: @@ -142,8 +128,22 @@ class PolicyModuleAgent: except ScalingAlarm.DoesNotExist: log.info("There is no action configured for alarm %s.", alarm_uuid) - async def _handle_instantiated_or_scaled(self, content): - log.debug("_handle_instantiated_or_scaled: %s", content) + async def _handle_instantiated(self, content): + log.debug("_handle_instantiated: %s", content) + nslcmop_id = content['nslcmop_id'] + nslcmop = self.db_client.get_nslcmop(nslcmop_id) + if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED': + nsr_id = nslcmop['nsInstanceId'] + log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id) + await self._configure_scaling_groups(nsr_id) + else: + log.info( + "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " + "Current state is %s. Skipping...", + nslcmop['operationState']) + + async def _handle_scaled(self, content): + log.debug("_handle_scaled: %s", content) nslcmop_id = content['nslcmop_id'] nslcmop = self.db_client.get_nslcmop(nslcmop_id) if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED': diff --git a/osm_policy_module/core/config.py b/osm_policy_module/core/config.py index 55287cc..e482ec8 100644 --- a/osm_policy_module/core/config.py +++ b/osm_policy_module/core/config.py @@ -26,64 +26,48 @@ import logging import os -from collections import namedtuple - -import six - -from osm_policy_module.core.singleton import Singleton - -log = logging.getLogger(__name__) - - -class BadConfigError(Exception): - """Configuration exception.""" - - pass - - -class CfgParam(namedtuple('CfgParam', ['key', 'default', 'data_type'])): - """Configuration parameter definition.""" - - def value(self, data): - """Convert a string to the parameter type.""" - try: - return self.data_type(data) - except (ValueError, TypeError): - raise BadConfigError( - 'Invalid value "%s" for configuration parameter "%s"' % ( - data, self.key)) - - -@Singleton -class Config(object): - """Configuration object.""" - - _configuration = [ - CfgParam('OSMPOL_MESSAGE_DRIVER', "kafka", six.text_type), - CfgParam('OSMPOL_MESSAGE_HOST', "localhost", six.text_type), - CfgParam('OSMPOL_MESSAGE_PORT', 9092, int), - CfgParam('OSMPOL_DATABASE_DRIVER', "mongo", six.text_type), - CfgParam('OSMPOL_DATABASE_URI', "mongodb://mongo:27017", six.text_type), - CfgParam('OSMPOL_SQL_DATABASE_URI', "sqlite:///policy_module.db", six.text_type), - CfgParam('OSMPOL_LOG_LEVEL', "INFO", six.text_type), - CfgParam('OSMPOL_KAFKA_LOG_LEVEL', "WARN", six.text_type), - ] - - _config_dict = {cfg.key: cfg for cfg in _configuration} - _config_keys = _config_dict.keys() - - def __init__(self): - """Set the default values.""" - for cfg in self._configuration: - setattr(self, cfg.key, cfg.default) - self.read_environ() - - def read_environ(self): - """Check the appropriate environment variables and update defaults.""" - for key in self._config_keys: - try: - val = self._config_dict[key].data_type(os.environ[key]) - setattr(self, key, val) - except KeyError as exc: - log.debug("Environment variable not present: %s", exc) - return +import pkg_resources +import yaml + +logger = logging.getLogger(__name__) + + +class Config: + def __init__(self, config_file: str = ''): + self.conf = {} + self._read_config_file(config_file) + self._read_env() + + def _read_config_file(self, config_file): + if not config_file: + path = 'pol.yaml' + config_file = pkg_resources.resource_filename(__name__, path) + with open(config_file) as f: + self.conf = yaml.load(f) + + def get(self, section, field=None): + if not field: + return self.conf[section] + return self.conf[section][field] + + def set(self, section, field, value): + if section not in self.conf: + self.conf[section] = {} + self.conf[section][field] = value + + def _read_env(self): + for env in os.environ: + if not env.startswith("OSMPOL_"): + continue + elements = env.lower().split("_") + if len(elements) < 3: + logger.warning( + "Environment variable %s=%s does not comply with required format. Section and/or field missing.", + env, os.getenv(env)) + continue + section = elements[1] + field = '_'.join(elements[2:]) + value = os.getenv(env) + if section not in self.conf: + self.conf[section] = {} + self.conf[section][field] = value diff --git a/osm_policy_module/core/database.py b/osm_policy_module/core/database.py index db8cf28..330d8c5 100644 --- a/osm_policy_module/core/database.py +++ b/osm_policy_module/core/database.py @@ -24,15 +24,14 @@ import datetime import logging -from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField, AutoField, DateTimeField +from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField, AutoField, DateTimeField, Proxy from playhouse.db_url import connect from osm_policy_module.core.config import Config log = logging.getLogger(__name__) -cfg = Config.instance() -db = connect(cfg.OSMPOL_SQL_DATABASE_URI) +db = Proxy() class BaseModel(Model): @@ -70,10 +69,14 @@ class ScalingAlarm(BaseModel): class DatabaseManager: + def init_db(self, config: Config): + db.initialize(connect(config.get('sql', 'database_uri'))) + self.create_tables() + def create_tables(self): - db.connect() - db.create_tables([ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm]) - db.close() + with db.atomic(): + db.create_tables([ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm]) def get_alarm(self, alarm_uuid: str): - return ScalingAlarm.select().where(ScalingAlarm.alarm_uuid == alarm_uuid).get() + with db.atomic(): + return ScalingAlarm.select().where(ScalingAlarm.alarm_uuid == alarm_uuid).get() diff --git a/osm_policy_module/core/pol.yaml b/osm_policy_module/core/pol.yaml new file mode 100644 index 0000000..4e07cf8 --- /dev/null +++ b/osm_policy_module/core/pol.yaml @@ -0,0 +1,38 @@ +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## + +global: + loglevel: INFO + +database: + driver: mongo + uri: mongodb://mongo:27017 + name: osm + +message: + driver: kafka + host: kafka + port: 9092 + group_id: pol-consumer + +sql: + database_uri: sqlite:///policy_module.db \ No newline at end of file diff --git a/osm_policy_module/tests/integration/test_kafka_messages.py b/osm_policy_module/tests/integration/test_kafka_messages.py index 29d88e2..28b2e0f 100644 --- a/osm_policy_module/tests/integration/test_kafka_messages.py +++ b/osm_policy_module/tests/integration/test_kafka_messages.py @@ -42,11 +42,10 @@ log.addHandler(stream_handler) class KafkaMessagesTest(unittest.TestCase): def setUp(self): super() - cfg = Config.instance() - self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST, - cfg.OSMPOL_MESSAGE_PORT) + cfg = Config() + self.kafka_server = '{}:{}'.format(cfg.get('message', 'host'), + cfg.get('message', 'port')) self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(None) def tearDown(self): super() diff --git a/osm_policy_module/tests/integration/test_policy_agent.py b/osm_policy_module/tests/integration/test_policy_agent.py index c175697..68ab1f4 100644 --- a/osm_policy_module/tests/integration/test_policy_agent.py +++ b/osm_policy_module/tests/integration/test_policy_agent.py @@ -36,6 +36,7 @@ from osm_policy_module.common.common_db_client import CommonDbClient from osm_policy_module.common.mon_client import MonClient from osm_policy_module.core import database from osm_policy_module.core.agent import PolicyModuleAgent +from osm_policy_module.core.config import Config from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria log = logging.getLogger() @@ -433,7 +434,6 @@ class PolicyModuleAgentTest(unittest.TestCase): test_db.drop_tables(MODELS) test_db.create_tables(MODELS) self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(None) def tearDown(self): super() @@ -459,7 +459,8 @@ class PolicyModuleAgentTest(unittest.TestCase): get_nsr.return_value = nsr_record_mock get_vnfd.return_value = vnfd_record_mock create_alarm.side_effect = _test_configure_scaling_groups_create_alarm - agent = PolicyModuleAgent(self.loop) + config = Config() + agent = PolicyModuleAgent(config, self.loop) self.loop.run_until_complete(agent._configure_scaling_groups("test_nsr_id")) create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util', ns_id='test_nsr_id', diff --git a/osm_policy_module/core/singleton.py b/osm_policy_module/tests/unit/common/__init__.py similarity index 65% rename from osm_policy_module/core/singleton.py rename to osm_policy_module/tests/unit/common/__init__.py index 9b8db5d..d81308a 100644 --- a/osm_policy_module/core/singleton.py +++ b/osm_policy_module/tests/unit/common/__init__.py @@ -21,22 +21,3 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## -"""Simple singleton class.""" - -from __future__ import unicode_literals - - -class Singleton(object): - """Simple singleton class.""" - - def __init__(self, decorated): - """Initialize singleton instance.""" - self._decorated = decorated - - def instance(self): - """Return singleton instance.""" - try: - return self._instance - except AttributeError: - self._instance = self._decorated() - return self._instance diff --git a/osm_policy_module/tests/unit/common/test_message_bus_client.py b/osm_policy_module/tests/unit/common/test_message_bus_client.py new file mode 100644 index 0000000..0b97de7 --- /dev/null +++ b/osm_policy_module/tests/unit/common/test_message_bus_client.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- + +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## +import asyncio +from unittest import TestCase, mock + +from osm_common.msgkafka import MsgKafka + +from osm_policy_module.common.message_bus_client import MessageBusClient +from osm_policy_module.core.config import Config + + +class TestMessageBusClient(TestCase): + + def setUp(self): + self.config = Config() + self.config.set('message', 'driver', 'kafka') + self.loop = asyncio.new_event_loop() + + @mock.patch.object(MsgKafka, 'aioread') + def test_aioread(self, aioread): + async def mock_callback(): + pass + + future = asyncio.Future(loop=self.loop) + future.set_result('mock') + aioread.return_value = future + msg_bus = MessageBusClient(self.config, loop=self.loop) + topic = 'test_topic' + self.loop.run_until_complete(msg_bus.aioread([topic], mock_callback)) + aioread.assert_called_with(['test_topic'], self.loop, aiocallback=mock_callback) + + @mock.patch.object(MsgKafka, 'aiowrite') + def test_aiowrite(self, aiowrite): + future = asyncio.Future(loop=self.loop) + future.set_result('mock') + aiowrite.return_value = future + msg_bus = MessageBusClient(self.config, loop=self.loop) + topic = 'test_topic' + key = 'test_key' + msg = {'test': 'test_msg'} + self.loop.run_until_complete(msg_bus.aiowrite(topic, key, msg)) + aiowrite.assert_called_with(topic, key, msg, self.loop) + + @mock.patch.object(MsgKafka, 'aioread') + def test_aioread_once(self, aioread): + future = asyncio.Future(loop=self.loop) + future.set_result('mock') + aioread.return_value = future + msg_bus = MessageBusClient(self.config, loop=self.loop) + topic = 'test_topic' + self.loop.run_until_complete(msg_bus.aioread_once(topic)) + aioread.assert_called_with('test_topic', self.loop) diff --git a/osm_policy_module/tests/unit/core/test_policy_agent.py b/osm_policy_module/tests/unit/core/test_policy_agent.py index 932adf4..7fc2dc9 100644 --- a/osm_policy_module/tests/unit/core/test_policy_agent.py +++ b/osm_policy_module/tests/unit/core/test_policy_agent.py @@ -28,23 +28,29 @@ from unittest import mock from unittest.mock import Mock from osm_policy_module.core.agent import PolicyModuleAgent +from osm_policy_module.core.config import Config from osm_policy_module.core.database import DatabaseManager class PolicyAgentTest(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(None) @mock.patch('osm_policy_module.core.agent.CommonDbClient') @mock.patch('osm_policy_module.core.agent.MonClient') @mock.patch('osm_policy_module.core.agent.LcmClient') @mock.patch.object(PolicyModuleAgent, '_configure_scaling_groups') - def test_handle_instantiated_or_scaled(self, configure_scaling_groups, lcm_client, mon_client, db_client): + @mock.patch.object(PolicyModuleAgent, '_delete_orphaned_alarms') + def test_handle_instantiated(self, delete_orphaned_alarms, configure_scaling_groups, lcm_client, + mon_client, db_client): async def mock_configure_scaling_groups(nsr_id): pass - agent = PolicyModuleAgent(self.loop) + async def mock_delete_orphaned_alarms(nsr_id): + pass + + config = Config() + agent = PolicyModuleAgent(config, self.loop) assert lcm_client.called assert mon_client.called assert db_client.called @@ -60,14 +66,15 @@ class PolicyAgentTest(unittest.TestCase): 'nsInstanceId': 'test_nsr_id' } configure_scaling_groups.side_effect = mock_configure_scaling_groups + delete_orphaned_alarms.side_effect = mock_delete_orphaned_alarms db_client.return_value.get_nslcmop.return_value = nslcmop_completed - self.loop.run_until_complete(agent._handle_instantiated_or_scaled(content)) + self.loop.run_until_complete(agent._handle_instantiated(content)) configure_scaling_groups.assert_called_with('test_nsr_id') configure_scaling_groups.reset_mock() db_client.return_value.get_nslcmop.return_value = nslcmop_failed - self.loop.run_until_complete(agent._handle_instantiated_or_scaled(content)) + self.loop.run_until_complete(agent._handle_instantiated(content)) configure_scaling_groups.assert_not_called() @mock.patch('osm_policy_module.core.agent.CommonDbClient') @@ -78,7 +85,8 @@ class PolicyAgentTest(unittest.TestCase): async def mock_scale(nsr_id, scaling_group_name, vnf_member_index, action): pass - agent = PolicyModuleAgent(self.loop) + config = Config() + agent = PolicyModuleAgent(config, self.loop) assert lcm_client.called assert mon_client.called assert db_client.called diff --git a/setup.py b/setup.py index 2a85f83..1d42c16 100644 --- a/setup.py +++ b/setup.py @@ -45,6 +45,7 @@ setup( url=_url, license=_license, packages=[_name], + package_dir={_name: _name}, include_package_data=True, install_requires=[ "aiokafka==0.4.*", @@ -52,7 +53,8 @@ setup( "jsonschema==2.6.*", "six==1.11.*", "pyyaml==3.*", - "osm-common" + "pymysql", + "osm-common", ], entry_points={ "console_scripts": [ -- 2.17.1