include requirements.txt
include test-requirements.txt
include README.rst
-recursive-include osm_policy_module *.py *.xml *.sh
+recursive-include osm_policy_module *.py *.xml *.sh *.yaml
recursive-include devops-stages *
\ No newline at end of file
#!/bin/bash
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+
echo "Installing python dependencies via pip..."
pip3 install aiokafka==0.4.*
pip3 install peewee==3.1.*
pip3 install jsonschema==2.6.*
pip3 install six==1.11.*
pip3 install pyyaml==3.*
+pip3 install pymysql
echo "Installation of python dependencies finished"
\ No newline at end of file
ENV OSMPOL_SQL_DATABASE_URI sqlite:///policy_module.db
-ENV OSMPOL_LOG_LEVEL INFO
-ENV OSMPOL_KAFKA_LOG_LEVEL WARN
+ENV OSMPOL_GLOBAL_LOG_LEVEL INFO
CMD osm-policy-agent
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
+import argparse
import asyncio
import logging
import sys
def main():
- cfg = Config.instance()
- log_formatter_str = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- logging.basicConfig(stream=sys.stdout,
- format=log_formatter_str,
- datefmt='%m/%d/%Y %I:%M:%S %p',
- level=logging.getLevelName(cfg.OSMPOL_LOG_LEVEL))
- kafka_logger = logging.getLogger('kafka')
- kafka_logger.setLevel(logging.getLevelName(cfg.OSMPOL_KAFKA_LOG_LEVEL))
- kafka_formatter = logging.Formatter(log_formatter_str)
- kafka_handler = logging.StreamHandler(sys.stdout)
- kafka_handler.setFormatter(kafka_formatter)
- kafka_logger.addHandler(kafka_handler)
+ parser = argparse.ArgumentParser(prog='osm-policy-agent')
+ parser.add_argument('--config-file', nargs='?', help='POL configuration file')
+ args = parser.parse_args()
+ cfg = Config(args.config_file)
+
+ root = logging.getLogger()
+ root.setLevel(logging.getLevelName(cfg.get('global', 'loglevel')))
+ ch = logging.StreamHandler(sys.stdout)
+ ch.setLevel(logging.getLevelName(cfg.get('global', 'loglevel')))
+ formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p')
+ ch.setFormatter(formatter)
+ root.addHandler(ch)
+
log = logging.getLogger(__name__)
- log.info("Config: %s", vars(cfg))
- log.info("Syncing database...")
+ log.debug("Config: %s", cfg.conf)
+ log.info("Initializing database...")
db_manager = DatabaseManager()
- db_manager.create_tables()
- log.info("Database synced correctly.")
+ db_manager.init_db(cfg)
+ log.info("Database initialized correctly.")
log.info("Starting policy module agent...")
loop = asyncio.get_event_loop()
- agent = PolicyModuleAgent(loop)
+ agent = PolicyModuleAgent(cfg, loop)
agent.run()
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
-from osm_common import dbmongo
+from osm_common import dbmongo, dbmemory
from osm_policy_module.core.config import Config
from osm_policy_module.core.exceptions import VdurNotFound
class CommonDbClient:
- def __init__(self):
- cfg = Config.instance()
- self.common_db = dbmongo.DbMongo()
- self.common_db.db_connect({'uri': cfg.OSMPOL_DATABASE_URI,
- 'name': 'osm'})
+ def __init__(self, config: Config):
+ if config.get('database', 'driver') == "mongo":
+ self.common_db = dbmongo.DbMongo()
+ elif config.get('database', 'driver') == "memory":
+ self.common_db = dbmemory.DbMemory()
+ else:
+ raise Exception("Unknown database driver {}".format(config.get('section', 'driver')))
+ self.common_db.db_connect(config.get("database"))
def get_vnfr(self, nsr_id: str, member_index: int):
vnfr = self.common_db.get_one("vnfrs",
return vdur
raise VdurNotFound('vdur not found for nsr-id %s, member_index %s and vdur_name %s', nsr_id, member_index,
vdur_name)
+
+ def create_nslcmop(self, nslcmop):
+ self.common_db.create("nslcmops", nslcmop)
import time
import uuid
-from aiokafka import AIOKafkaProducer
-from osm_common import dbmongo
-
+from osm_policy_module.common.common_db_client import CommonDbClient
+from osm_policy_module.common.message_bus_client import MessageBusClient
from osm_policy_module.core.config import Config
log = logging.getLogger(__name__)
class LcmClient:
- def __init__(self, loop=None):
- cfg = Config.instance()
- self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
- cfg.OSMPOL_MESSAGE_PORT)
- self.common_db = dbmongo.DbMongo()
- self.common_db.db_connect({'uri': cfg.OSMPOL_DATABASE_URI,
- 'name': 'osm'})
+ def __init__(self, config: Config, loop=None):
+ self.db_client = CommonDbClient(config)
+ self.msg_bus = MessageBusClient(config)
if not loop:
loop = asyncio.get_event_loop()
self.loop = loop
async def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action)
- self.common_db.create("nslcmops", nslcmop)
+ self.db_client.create_nslcmop(nslcmop)
log.info("Sending scale action message: %s", json.dumps(nslcmop))
- producer = AIOKafkaProducer(loop=self.loop,
- bootstrap_servers=self.kafka_server,
- key_serializer=str.encode,
- value_serializer=str.encode)
- await producer.start()
- try:
- # Produce message
- await producer.send_and_wait("ns", key="scale", value=json.dumps(nslcmop))
- finally:
- # Wait for all pending messages to be delivered or expire.
- await producer.stop()
+ await self.msg_bus.aiowrite("ns", "scale", nslcmop)
def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
log.debug("_generate_nslcmop %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import asyncio
+from typing import List, Callable
+
+from osm_common import msgkafka, msglocal
+
+from osm_policy_module.core.config import Config
+
+
+class MessageBusClient:
+ def __init__(self, config: Config, loop=None):
+ if config.get('message', 'driver') == "local":
+ self.msg_bus = msglocal.MsgLocal()
+ elif config.get('message', 'driver') == "kafka":
+ self.msg_bus = msgkafka.MsgKafka()
+ else:
+ raise Exception("Unknown message bug driver {}".format(config.get('section', 'driver')))
+ self.msg_bus.connect(config.get('message'))
+ if not loop:
+ loop = asyncio.get_event_loop()
+ self.loop = loop
+
+ async def aioread(self, topics: List[str], callback: Callable = None, **kwargs):
+ """
+ Retrieves messages continuously from bus and executes callback for each message consumed.
+ :param topics: List of message bus topics to consume from.
+ :param callback: Async callback function to be called for each message received.
+ :param kwargs: Keyword arguments to be passed to callback function.
+ :return: None
+ """
+ await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs)
+
+ async def aiowrite(self, topic: str, key: str, msg: dict):
+ """
+ Writes message to bus.
+ :param topic: Topic to write to.
+ :param key: Key to write to.
+ :param msg: Dictionary containing message to be written.
+ :return: None
+ """
+ await self.msg_bus.aiowrite(topic, key, msg, self.loop)
+
+ async def aioread_once(self, topic: str):
+ """
+ Retrieves last message from bus.
+ :param topic: topic to retrieve message from.
+ :return: tuple(topic, key, message)
+ """
+ result = await self.msg_bus.aioread(topic, self.loop)
+ return result
class MonClient:
- def __init__(self, loop=None):
- cfg = Config.instance()
- self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
- cfg.OSMPOL_MESSAGE_PORT)
+ def __init__(self, config: Config, loop=None):
+ self.kafka_server = '{}:{}'.format(config.get('message', 'host'),
+ config.get('message', 'port'))
if not loop:
loop = asyncio.get_event_loop()
self.loop = loop
import datetime
import json
import logging
-from json import JSONDecodeError
import peewee
-import yaml
-from aiokafka import AIOKafkaConsumer
from osm_policy_module.common.common_db_client import CommonDbClient
from osm_policy_module.common.lcm_client import LcmClient
+from osm_policy_module.common.message_bus_client import MessageBusClient
from osm_policy_module.common.mon_client import MonClient
from osm_policy_module.core import database
from osm_policy_module.core.config import Config
class PolicyModuleAgent:
- def __init__(self, loop=None):
- cfg = Config.instance()
+ def __init__(self, config: Config, loop=None):
+ self.conf = config
if not loop:
loop = asyncio.get_event_loop()
self.loop = loop
- self.db_client = CommonDbClient()
- self.mon_client = MonClient(loop=self.loop)
- self.lcm_client = LcmClient(loop=self.loop)
- self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
- cfg.OSMPOL_MESSAGE_PORT)
+ self.db_client = CommonDbClient(config)
+ self.mon_client = MonClient(config, loop=self.loop)
+ self.lcm_client = LcmClient(config, loop=self.loop)
self.database_manager = DatabaseManager()
+ self.msg_bus = MessageBusClient(config)
def run(self):
self.loop.run_until_complete(self.start())
async def start(self):
- consumer = AIOKafkaConsumer(
+ topics = [
"ns",
- "alarm_response",
- loop=self.loop,
- bootstrap_servers=self.kafka_server,
- group_id="pol-consumer",
- key_deserializer=bytes.decode,
- value_deserializer=bytes.decode,
- )
- await consumer.start()
- try:
- async for msg in consumer:
- log.info("Message arrived: %s", msg)
- await self._process_msg(msg.topic, msg.key, msg.value)
- finally:
- await consumer.stop()
+ "alarm_response"
+ ]
+ await self.msg_bus.aioread(topics, self._process_msg)
log.critical("Exiting...")
async def _process_msg(self, topic, key, msg):
log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg)
+ log.info("Message arrived: %s", msg)
try:
if key in ALLOWED_KAFKA_KEYS:
- try:
- content = json.loads(msg)
- except JSONDecodeError:
- content = yaml.safe_load(msg)
- if key == 'instantiated' or key == 'scaled':
- await self._handle_instantiated_or_scaled(content)
+ if key == 'instantiated':
+ await self._handle_instantiated(msg)
+
+ if key == 'scaled':
+ await self._handle_scaled(msg)
if key == 'terminated':
- await self._handle_terminated(content)
+ await self._handle_terminated(msg)
if key == 'notify_alarm':
- await self._handle_alarm_notification(content)
+ await self._handle_alarm_notification(msg)
else:
log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key)
except peewee.PeeweeException:
except ScalingAlarm.DoesNotExist:
log.info("There is no action configured for alarm %s.", alarm_uuid)
- async def _handle_instantiated_or_scaled(self, content):
- log.debug("_handle_instantiated_or_scaled: %s", content)
+ async def _handle_instantiated(self, content):
+ log.debug("_handle_instantiated: %s", content)
+ nslcmop_id = content['nslcmop_id']
+ nslcmop = self.db_client.get_nslcmop(nslcmop_id)
+ if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
+ nsr_id = nslcmop['nsInstanceId']
+ log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
+ await self._configure_scaling_groups(nsr_id)
+ else:
+ log.info(
+ "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
+ "Current state is %s. Skipping...",
+ nslcmop['operationState'])
+
+ async def _handle_scaled(self, content):
+ log.debug("_handle_scaled: %s", content)
nslcmop_id = content['nslcmop_id']
nslcmop = self.db_client.get_nslcmop(nslcmop_id)
if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
import logging
import os
-from collections import namedtuple
-
-import six
-
-from osm_policy_module.core.singleton import Singleton
-
-log = logging.getLogger(__name__)
-
-
-class BadConfigError(Exception):
- """Configuration exception."""
-
- pass
-
-
-class CfgParam(namedtuple('CfgParam', ['key', 'default', 'data_type'])):
- """Configuration parameter definition."""
-
- def value(self, data):
- """Convert a string to the parameter type."""
- try:
- return self.data_type(data)
- except (ValueError, TypeError):
- raise BadConfigError(
- 'Invalid value "%s" for configuration parameter "%s"' % (
- data, self.key))
-
-
-@Singleton
-class Config(object):
- """Configuration object."""
-
- _configuration = [
- CfgParam('OSMPOL_MESSAGE_DRIVER', "kafka", six.text_type),
- CfgParam('OSMPOL_MESSAGE_HOST', "localhost", six.text_type),
- CfgParam('OSMPOL_MESSAGE_PORT', 9092, int),
- CfgParam('OSMPOL_DATABASE_DRIVER', "mongo", six.text_type),
- CfgParam('OSMPOL_DATABASE_URI', "mongodb://mongo:27017", six.text_type),
- CfgParam('OSMPOL_SQL_DATABASE_URI', "sqlite:///policy_module.db", six.text_type),
- CfgParam('OSMPOL_LOG_LEVEL', "INFO", six.text_type),
- CfgParam('OSMPOL_KAFKA_LOG_LEVEL', "WARN", six.text_type),
- ]
-
- _config_dict = {cfg.key: cfg for cfg in _configuration}
- _config_keys = _config_dict.keys()
-
- def __init__(self):
- """Set the default values."""
- for cfg in self._configuration:
- setattr(self, cfg.key, cfg.default)
- self.read_environ()
-
- def read_environ(self):
- """Check the appropriate environment variables and update defaults."""
- for key in self._config_keys:
- try:
- val = self._config_dict[key].data_type(os.environ[key])
- setattr(self, key, val)
- except KeyError as exc:
- log.debug("Environment variable not present: %s", exc)
- return
+import pkg_resources
+import yaml
+
+logger = logging.getLogger(__name__)
+
+
+class Config:
+ def __init__(self, config_file: str = ''):
+ self.conf = {}
+ self._read_config_file(config_file)
+ self._read_env()
+
+ def _read_config_file(self, config_file):
+ if not config_file:
+ path = 'pol.yaml'
+ config_file = pkg_resources.resource_filename(__name__, path)
+ with open(config_file) as f:
+ self.conf = yaml.load(f)
+
+ def get(self, section, field=None):
+ if not field:
+ return self.conf[section]
+ return self.conf[section][field]
+
+ def set(self, section, field, value):
+ if section not in self.conf:
+ self.conf[section] = {}
+ self.conf[section][field] = value
+
+ def _read_env(self):
+ for env in os.environ:
+ if not env.startswith("OSMPOL_"):
+ continue
+ elements = env.lower().split("_")
+ if len(elements) < 3:
+ logger.warning(
+ "Environment variable %s=%s does not comply with required format. Section and/or field missing.",
+ env, os.getenv(env))
+ continue
+ section = elements[1]
+ field = '_'.join(elements[2:])
+ value = os.getenv(env)
+ if section not in self.conf:
+ self.conf[section] = {}
+ self.conf[section][field] = value
import datetime
import logging
-from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField, AutoField, DateTimeField
+from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField, AutoField, DateTimeField, Proxy
from playhouse.db_url import connect
from osm_policy_module.core.config import Config
log = logging.getLogger(__name__)
-cfg = Config.instance()
-db = connect(cfg.OSMPOL_SQL_DATABASE_URI)
+db = Proxy()
class BaseModel(Model):
class DatabaseManager:
+ def init_db(self, config: Config):
+ db.initialize(connect(config.get('sql', 'database_uri')))
+ self.create_tables()
+
def create_tables(self):
- db.connect()
- db.create_tables([ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm])
- db.close()
+ with db.atomic():
+ db.create_tables([ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm])
def get_alarm(self, alarm_uuid: str):
- return ScalingAlarm.select().where(ScalingAlarm.alarm_uuid == alarm_uuid).get()
+ with db.atomic():
+ return ScalingAlarm.select().where(ScalingAlarm.alarm_uuid == alarm_uuid).get()
--- /dev/null
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+
+global:
+ loglevel: INFO
+
+database:
+ driver: mongo
+ uri: mongodb://mongo:27017
+ name: osm
+
+message:
+ driver: kafka
+ host: kafka
+ port: 9092
+ group_id: pol-consumer
+
+sql:
+ database_uri: sqlite:///policy_module.db
\ No newline at end of file
+++ /dev/null
-# -*- coding: utf-8 -*-
-
-# Copyright 2018 Whitestack, LLC
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Whitestack, LLC
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-# http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: bdiaz@whitestack.com or glavado@whitestack.com
-##
-"""Simple singleton class."""
-
-from __future__ import unicode_literals
-
-
-class Singleton(object):
- """Simple singleton class."""
-
- def __init__(self, decorated):
- """Initialize singleton instance."""
- self._decorated = decorated
-
- def instance(self):
- """Return singleton instance."""
- try:
- return self._instance
- except AttributeError:
- self._instance = self._decorated()
- return self._instance
class KafkaMessagesTest(unittest.TestCase):
def setUp(self):
super()
- cfg = Config.instance()
- self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
- cfg.OSMPOL_MESSAGE_PORT)
+ cfg = Config()
+ self.kafka_server = '{}:{}'.format(cfg.get('message', 'host'),
+ cfg.get('message', 'port'))
self.loop = asyncio.new_event_loop()
- asyncio.set_event_loop(None)
def tearDown(self):
super()
from osm_policy_module.common.mon_client import MonClient
from osm_policy_module.core import database
from osm_policy_module.core.agent import PolicyModuleAgent
+from osm_policy_module.core.config import Config
from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria
log = logging.getLogger()
test_db.drop_tables(MODELS)
test_db.create_tables(MODELS)
self.loop = asyncio.new_event_loop()
- asyncio.set_event_loop(None)
def tearDown(self):
super()
get_nsr.return_value = nsr_record_mock
get_vnfd.return_value = vnfd_record_mock
create_alarm.side_effect = _test_configure_scaling_groups_create_alarm
- agent = PolicyModuleAgent(self.loop)
+ config = Config()
+ agent = PolicyModuleAgent(config, self.loop)
self.loop.run_until_complete(agent._configure_scaling_groups("test_nsr_id"))
create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util',
ns_id='test_nsr_id',
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
--- /dev/null
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import asyncio
+from unittest import TestCase, mock
+
+from osm_common.msgkafka import MsgKafka
+
+from osm_policy_module.common.message_bus_client import MessageBusClient
+from osm_policy_module.core.config import Config
+
+
+class TestMessageBusClient(TestCase):
+
+ def setUp(self):
+ self.config = Config()
+ self.config.set('message', 'driver', 'kafka')
+ self.loop = asyncio.new_event_loop()
+
+ @mock.patch.object(MsgKafka, 'aioread')
+ def test_aioread(self, aioread):
+ async def mock_callback():
+ pass
+
+ future = asyncio.Future(loop=self.loop)
+ future.set_result('mock')
+ aioread.return_value = future
+ msg_bus = MessageBusClient(self.config, loop=self.loop)
+ topic = 'test_topic'
+ self.loop.run_until_complete(msg_bus.aioread([topic], mock_callback))
+ aioread.assert_called_with(['test_topic'], self.loop, aiocallback=mock_callback)
+
+ @mock.patch.object(MsgKafka, 'aiowrite')
+ def test_aiowrite(self, aiowrite):
+ future = asyncio.Future(loop=self.loop)
+ future.set_result('mock')
+ aiowrite.return_value = future
+ msg_bus = MessageBusClient(self.config, loop=self.loop)
+ topic = 'test_topic'
+ key = 'test_key'
+ msg = {'test': 'test_msg'}
+ self.loop.run_until_complete(msg_bus.aiowrite(topic, key, msg))
+ aiowrite.assert_called_with(topic, key, msg, self.loop)
+
+ @mock.patch.object(MsgKafka, 'aioread')
+ def test_aioread_once(self, aioread):
+ future = asyncio.Future(loop=self.loop)
+ future.set_result('mock')
+ aioread.return_value = future
+ msg_bus = MessageBusClient(self.config, loop=self.loop)
+ topic = 'test_topic'
+ self.loop.run_until_complete(msg_bus.aioread_once(topic))
+ aioread.assert_called_with('test_topic', self.loop)
from unittest.mock import Mock
from osm_policy_module.core.agent import PolicyModuleAgent
+from osm_policy_module.core.config import Config
from osm_policy_module.core.database import DatabaseManager
class PolicyAgentTest(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
- asyncio.set_event_loop(None)
@mock.patch('osm_policy_module.core.agent.CommonDbClient')
@mock.patch('osm_policy_module.core.agent.MonClient')
@mock.patch('osm_policy_module.core.agent.LcmClient')
@mock.patch.object(PolicyModuleAgent, '_configure_scaling_groups')
- def test_handle_instantiated_or_scaled(self, configure_scaling_groups, lcm_client, mon_client, db_client):
+ @mock.patch.object(PolicyModuleAgent, '_delete_orphaned_alarms')
+ def test_handle_instantiated(self, delete_orphaned_alarms, configure_scaling_groups, lcm_client,
+ mon_client, db_client):
async def mock_configure_scaling_groups(nsr_id):
pass
- agent = PolicyModuleAgent(self.loop)
+ async def mock_delete_orphaned_alarms(nsr_id):
+ pass
+
+ config = Config()
+ agent = PolicyModuleAgent(config, self.loop)
assert lcm_client.called
assert mon_client.called
assert db_client.called
'nsInstanceId': 'test_nsr_id'
}
configure_scaling_groups.side_effect = mock_configure_scaling_groups
+ delete_orphaned_alarms.side_effect = mock_delete_orphaned_alarms
db_client.return_value.get_nslcmop.return_value = nslcmop_completed
- self.loop.run_until_complete(agent._handle_instantiated_or_scaled(content))
+ self.loop.run_until_complete(agent._handle_instantiated(content))
configure_scaling_groups.assert_called_with('test_nsr_id')
configure_scaling_groups.reset_mock()
db_client.return_value.get_nslcmop.return_value = nslcmop_failed
- self.loop.run_until_complete(agent._handle_instantiated_or_scaled(content))
+ self.loop.run_until_complete(agent._handle_instantiated(content))
configure_scaling_groups.assert_not_called()
@mock.patch('osm_policy_module.core.agent.CommonDbClient')
async def mock_scale(nsr_id, scaling_group_name, vnf_member_index, action):
pass
- agent = PolicyModuleAgent(self.loop)
+ config = Config()
+ agent = PolicyModuleAgent(config, self.loop)
assert lcm_client.called
assert mon_client.called
assert db_client.called
url=_url,
license=_license,
packages=[_name],
+ package_dir={_name: _name},
include_package_data=True,
install_requires=[
"aiokafka==0.4.*",
"jsonschema==2.6.*",
"six==1.11.*",
"pyyaml==3.*",
- "osm-common"
+ "pymysql",
+ "osm-common",
],
entry_points={
"console_scripts": [