Replaces direct use of aiokafka with osm_common message bus in agent and 76/7176/4
authorBenjamin Diaz <bdiaz@whitestack.com>
Fri, 1 Feb 2019 16:31:47 +0000 (13:31 -0300)
committerBenjamin Diaz <bdiaz@whitestack.com>
Mon, 4 Feb 2019 19:45:11 +0000 (16:45 -0300)
lcmclient

Changes config handling to comply with the way it is handled in other modules,
by using a config file and overriding it with env vars.

Adds unit tests for message_bus_client.

Mon client remains using aiokafka directly, as there is no support yet for
auto_offset_reset configuration in osm_common.

Change-Id: I99615287cc934ce310105e86544a6bfe26bc0673
Signed-off-by: Benjamin Diaz <bdiaz@whitestack.com>
19 files changed:
MANIFEST.in
debian/python3-osm-policy-module.postinst
docker/Dockerfile
osm_policy_module/cmd/policy_module_agent.py
osm_policy_module/common/common_db_client.py
osm_policy_module/common/lcm_client.py
osm_policy_module/common/message_bus_client.py [new file with mode: 0644]
osm_policy_module/common/mon_client.py
osm_policy_module/core/agent.py
osm_policy_module/core/config.py
osm_policy_module/core/database.py
osm_policy_module/core/pol.yaml [new file with mode: 0644]
osm_policy_module/core/singleton.py [deleted file]
osm_policy_module/tests/integration/test_kafka_messages.py
osm_policy_module/tests/integration/test_policy_agent.py
osm_policy_module/tests/unit/common/__init__.py [new file with mode: 0644]
osm_policy_module/tests/unit/common/test_message_bus_client.py [new file with mode: 0644]
osm_policy_module/tests/unit/core/test_policy_agent.py
setup.py

index 9dbb8cd..a2e97ec 100644 (file)
@@ -23,5 +23,5 @@
 include requirements.txt
 include test-requirements.txt
 include README.rst
-recursive-include osm_policy_module *.py *.xml *.sh
+recursive-include osm_policy_module *.py *.xml *.sh *.yaml
 recursive-include devops-stages *
\ No newline at end of file
index cd57d53..253eaaa 100644 (file)
@@ -1,9 +1,32 @@
 #!/bin/bash
 
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+
 echo "Installing python dependencies via pip..."
 pip3 install aiokafka==0.4.*
 pip3 install peewee==3.1.*
 pip3 install jsonschema==2.6.*
 pip3 install six==1.11.*
 pip3 install pyyaml==3.*
+pip3 install pymysql
 echo "Installation of python dependencies finished"
\ No newline at end of file
index 91bc5f6..7f57036 100644 (file)
@@ -45,7 +45,6 @@ ENV OSMPOL_DATABASE_URI mongodb://mongo:27017
 
 ENV OSMPOL_SQL_DATABASE_URI sqlite:///policy_module.db
 
-ENV OSMPOL_LOG_LEVEL INFO
-ENV OSMPOL_KAFKA_LOG_LEVEL WARN
+ENV OSMPOL_GLOBAL_LOG_LEVEL INFO
 
 CMD osm-policy-agent
index 49c7d3a..050cf32 100644 (file)
@@ -21,6 +21,7 @@
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+import argparse
 import asyncio
 import logging
 import sys
@@ -31,27 +32,28 @@ from osm_policy_module.core.database import DatabaseManager
 
 
 def main():
-    cfg = Config.instance()
-    log_formatter_str = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
-    logging.basicConfig(stream=sys.stdout,
-                        format=log_formatter_str,
-                        datefmt='%m/%d/%Y %I:%M:%S %p',
-                        level=logging.getLevelName(cfg.OSMPOL_LOG_LEVEL))
-    kafka_logger = logging.getLogger('kafka')
-    kafka_logger.setLevel(logging.getLevelName(cfg.OSMPOL_KAFKA_LOG_LEVEL))
-    kafka_formatter = logging.Formatter(log_formatter_str)
-    kafka_handler = logging.StreamHandler(sys.stdout)
-    kafka_handler.setFormatter(kafka_formatter)
-    kafka_logger.addHandler(kafka_handler)
+    parser = argparse.ArgumentParser(prog='osm-policy-agent')
+    parser.add_argument('--config-file', nargs='?', help='POL configuration file')
+    args = parser.parse_args()
+    cfg = Config(args.config_file)
+
+    root = logging.getLogger()
+    root.setLevel(logging.getLevelName(cfg.get('global', 'loglevel')))
+    ch = logging.StreamHandler(sys.stdout)
+    ch.setLevel(logging.getLevelName(cfg.get('global', 'loglevel')))
+    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%m/%d/%Y %I:%M:%S %p')
+    ch.setFormatter(formatter)
+    root.addHandler(ch)
+
     log = logging.getLogger(__name__)
-    log.info("Config: %s", vars(cfg))
-    log.info("Syncing database...")
+    log.debug("Config: %s", cfg.conf)
+    log.info("Initializing database...")
     db_manager = DatabaseManager()
-    db_manager.create_tables()
-    log.info("Database synced correctly.")
+    db_manager.init_db(cfg)
+    log.info("Database initialized correctly.")
     log.info("Starting policy module agent...")
     loop = asyncio.get_event_loop()
-    agent = PolicyModuleAgent(loop)
+    agent = PolicyModuleAgent(cfg, loop)
     agent.run()
 
 
index 2be3693..5731155 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
-from osm_common import dbmongo
+from osm_common import dbmongo, dbmemory
 
 from osm_policy_module.core.config import Config
 from osm_policy_module.core.exceptions import VdurNotFound
 
 
 class CommonDbClient:
-    def __init__(self):
-        cfg = Config.instance()
-        self.common_db = dbmongo.DbMongo()
-        self.common_db.db_connect({'uri': cfg.OSMPOL_DATABASE_URI,
-                                   'name': 'osm'})
+    def __init__(self, config: Config):
+        if config.get('database', 'driver') == "mongo":
+            self.common_db = dbmongo.DbMongo()
+        elif config.get('database', 'driver') == "memory":
+            self.common_db = dbmemory.DbMemory()
+        else:
+            raise Exception("Unknown database driver {}".format(config.get('section', 'driver')))
+        self.common_db.db_connect(config.get("database"))
 
     def get_vnfr(self, nsr_id: str, member_index: int):
         vnfr = self.common_db.get_one("vnfrs",
@@ -65,3 +68,6 @@ class CommonDbClient:
                 return vdur
         raise VdurNotFound('vdur not found for nsr-id %s, member_index %s and vdur_name %s', nsr_id, member_index,
                            vdur_name)
+
+    def create_nslcmop(self, nslcmop):
+        self.common_db.create("nslcmops", nslcmop)
index 05378d7..2efa241 100644 (file)
@@ -28,22 +28,17 @@ import logging
 import time
 import uuid
 
-from aiokafka import AIOKafkaProducer
-from osm_common import dbmongo
-
+from osm_policy_module.common.common_db_client import CommonDbClient
+from osm_policy_module.common.message_bus_client import MessageBusClient
 from osm_policy_module.core.config import Config
 
 log = logging.getLogger(__name__)
 
 
 class LcmClient:
-    def __init__(self, loop=None):
-        cfg = Config.instance()
-        self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
-                                           cfg.OSMPOL_MESSAGE_PORT)
-        self.common_db = dbmongo.DbMongo()
-        self.common_db.db_connect({'uri': cfg.OSMPOL_DATABASE_URI,
-                                   'name': 'osm'})
+    def __init__(self, config: Config, loop=None):
+        self.db_client = CommonDbClient(config)
+        self.msg_bus = MessageBusClient(config)
         if not loop:
             loop = asyncio.get_event_loop()
         self.loop = loop
@@ -51,19 +46,9 @@ class LcmClient:
     async def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
         log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
         nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action)
-        self.common_db.create("nslcmops", nslcmop)
+        self.db_client.create_nslcmop(nslcmop)
         log.info("Sending scale action message: %s", json.dumps(nslcmop))
-        producer = AIOKafkaProducer(loop=self.loop,
-                                    bootstrap_servers=self.kafka_server,
-                                    key_serializer=str.encode,
-                                    value_serializer=str.encode)
-        await producer.start()
-        try:
-            # Produce message
-            await producer.send_and_wait("ns", key="scale", value=json.dumps(nslcmop))
-        finally:
-            # Wait for all pending messages to be delivered or expire.
-            await producer.stop()
+        await self.msg_bus.aiowrite("ns", "scale", nslcmop)
 
     def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
         log.debug("_generate_nslcmop %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
diff --git a/osm_policy_module/common/message_bus_client.py b/osm_policy_module/common/message_bus_client.py
new file mode 100644 (file)
index 0000000..ea5095d
--- /dev/null
@@ -0,0 +1,72 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import asyncio
+from typing import List, Callable
+
+from osm_common import msgkafka, msglocal
+
+from osm_policy_module.core.config import Config
+
+
+class MessageBusClient:
+    def __init__(self, config: Config, loop=None):
+        if config.get('message', 'driver') == "local":
+            self.msg_bus = msglocal.MsgLocal()
+        elif config.get('message', 'driver') == "kafka":
+            self.msg_bus = msgkafka.MsgKafka()
+        else:
+            raise Exception("Unknown message bug driver {}".format(config.get('section', 'driver')))
+        self.msg_bus.connect(config.get('message'))
+        if not loop:
+            loop = asyncio.get_event_loop()
+        self.loop = loop
+
+    async def aioread(self, topics: List[str], callback: Callable = None, **kwargs):
+        """
+        Retrieves messages continuously from bus and executes callback for each message consumed.
+        :param topics: List of message bus topics to consume from.
+        :param callback: Async callback function to be called for each message received.
+        :param kwargs: Keyword arguments to be passed to callback function.
+        :return: None
+        """
+        await self.msg_bus.aioread(topics, self.loop, aiocallback=callback, **kwargs)
+
+    async def aiowrite(self, topic: str, key: str, msg: dict):
+        """
+        Writes message to bus.
+        :param topic: Topic to write to.
+        :param key: Key to write to.
+        :param msg: Dictionary containing message to be written.
+        :return: None
+        """
+        await self.msg_bus.aiowrite(topic, key, msg, self.loop)
+
+    async def aioread_once(self, topic: str):
+        """
+        Retrieves last message from bus.
+        :param topic: topic to retrieve message from.
+        :return: tuple(topic, key, message)
+        """
+        result = await self.msg_bus.aioread(topic, self.loop)
+        return result
index 5124ed5..76a6f52 100644 (file)
@@ -36,10 +36,9 @@ log = logging.getLogger(__name__)
 
 
 class MonClient:
-    def __init__(self, loop=None):
-        cfg = Config.instance()
-        self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
-                                           cfg.OSMPOL_MESSAGE_PORT)
+    def __init__(self, config: Config, loop=None):
+        self.kafka_server = '{}:{}'.format(config.get('message', 'host'),
+                                           config.get('message', 'port'))
         if not loop:
             loop = asyncio.get_event_loop()
         self.loop = loop
index 205b98c..482f911 100644 (file)
@@ -25,14 +25,12 @@ import asyncio
 import datetime
 import json
 import logging
-from json import JSONDecodeError
 
 import peewee
-import yaml
-from aiokafka import AIOKafkaConsumer
 
 from osm_policy_module.common.common_db_client import CommonDbClient
 from osm_policy_module.common.lcm_client import LcmClient
+from osm_policy_module.common.message_bus_client import MessageBusClient
 from osm_policy_module.common.mon_client import MonClient
 from osm_policy_module.core import database
 from osm_policy_module.core.config import Config
@@ -46,57 +44,45 @@ ALLOWED_KAFKA_KEYS = ['instantiated', 'scaled', 'terminated', 'notify_alarm']
 
 
 class PolicyModuleAgent:
-    def __init__(self, loop=None):
-        cfg = Config.instance()
+    def __init__(self, config: Config, loop=None):
+        self.conf = config
         if not loop:
             loop = asyncio.get_event_loop()
         self.loop = loop
-        self.db_client = CommonDbClient()
-        self.mon_client = MonClient(loop=self.loop)
-        self.lcm_client = LcmClient(loop=self.loop)
-        self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
-                                           cfg.OSMPOL_MESSAGE_PORT)
+        self.db_client = CommonDbClient(config)
+        self.mon_client = MonClient(config, loop=self.loop)
+        self.lcm_client = LcmClient(config, loop=self.loop)
         self.database_manager = DatabaseManager()
+        self.msg_bus = MessageBusClient(config)
 
     def run(self):
         self.loop.run_until_complete(self.start())
 
     async def start(self):
-        consumer = AIOKafkaConsumer(
+        topics = [
             "ns",
-            "alarm_response",
-            loop=self.loop,
-            bootstrap_servers=self.kafka_server,
-            group_id="pol-consumer",
-            key_deserializer=bytes.decode,
-            value_deserializer=bytes.decode,
-        )
-        await consumer.start()
-        try:
-            async for msg in consumer:
-                log.info("Message arrived: %s", msg)
-                await self._process_msg(msg.topic, msg.key, msg.value)
-        finally:
-            await consumer.stop()
+            "alarm_response"
+        ]
+        await self.msg_bus.aioread(topics, self._process_msg)
         log.critical("Exiting...")
 
     async def _process_msg(self, topic, key, msg):
         log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg)
+        log.info("Message arrived: %s", msg)
         try:
             if key in ALLOWED_KAFKA_KEYS:
-                try:
-                    content = json.loads(msg)
-                except JSONDecodeError:
-                    content = yaml.safe_load(msg)
 
-                if key == 'instantiated' or key == 'scaled':
-                    await self._handle_instantiated_or_scaled(content)
+                if key == 'instantiated':
+                    await self._handle_instantiated(msg)
+
+                if key == 'scaled':
+                    await self._handle_scaled(msg)
 
                 if key == 'terminated':
-                    await self._handle_terminated(content)
+                    await self._handle_terminated(msg)
 
                 if key == 'notify_alarm':
-                    await self._handle_alarm_notification(content)
+                    await self._handle_alarm_notification(msg)
             else:
                 log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key)
         except peewee.PeeweeException:
@@ -142,8 +128,22 @@ class PolicyModuleAgent:
         except ScalingAlarm.DoesNotExist:
             log.info("There is no action configured for alarm %s.", alarm_uuid)
 
-    async def _handle_instantiated_or_scaled(self, content):
-        log.debug("_handle_instantiated_or_scaled: %s", content)
+    async def _handle_instantiated(self, content):
+        log.debug("_handle_instantiated: %s", content)
+        nslcmop_id = content['nslcmop_id']
+        nslcmop = self.db_client.get_nslcmop(nslcmop_id)
+        if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
+            nsr_id = nslcmop['nsInstanceId']
+            log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
+            await self._configure_scaling_groups(nsr_id)
+        else:
+            log.info(
+                "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
+                "Current state is %s. Skipping...",
+                nslcmop['operationState'])
+
+    async def _handle_scaled(self, content):
+        log.debug("_handle_scaled: %s", content)
         nslcmop_id = content['nslcmop_id']
         nslcmop = self.db_client.get_nslcmop(nslcmop_id)
         if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
index 55287cc..e482ec8 100644 (file)
 import logging
 import os
 
-from collections import namedtuple
-
-import six
-
-from osm_policy_module.core.singleton import Singleton
-
-log = logging.getLogger(__name__)
-
-
-class BadConfigError(Exception):
-    """Configuration exception."""
-
-    pass
-
-
-class CfgParam(namedtuple('CfgParam', ['key', 'default', 'data_type'])):
-    """Configuration parameter definition."""
-
-    def value(self, data):
-        """Convert a string to the parameter type."""
-        try:
-            return self.data_type(data)
-        except (ValueError, TypeError):
-            raise BadConfigError(
-                'Invalid value "%s" for configuration parameter "%s"' % (
-                    data, self.key))
-
-
-@Singleton
-class Config(object):
-    """Configuration object."""
-
-    _configuration = [
-        CfgParam('OSMPOL_MESSAGE_DRIVER', "kafka", six.text_type),
-        CfgParam('OSMPOL_MESSAGE_HOST', "localhost", six.text_type),
-        CfgParam('OSMPOL_MESSAGE_PORT', 9092, int),
-        CfgParam('OSMPOL_DATABASE_DRIVER', "mongo", six.text_type),
-        CfgParam('OSMPOL_DATABASE_URI', "mongodb://mongo:27017", six.text_type),
-        CfgParam('OSMPOL_SQL_DATABASE_URI', "sqlite:///policy_module.db", six.text_type),
-        CfgParam('OSMPOL_LOG_LEVEL', "INFO", six.text_type),
-        CfgParam('OSMPOL_KAFKA_LOG_LEVEL', "WARN", six.text_type),
-    ]
-
-    _config_dict = {cfg.key: cfg for cfg in _configuration}
-    _config_keys = _config_dict.keys()
-
-    def __init__(self):
-        """Set the default values."""
-        for cfg in self._configuration:
-            setattr(self, cfg.key, cfg.default)
-        self.read_environ()
-
-    def read_environ(self):
-        """Check the appropriate environment variables and update defaults."""
-        for key in self._config_keys:
-            try:
-                val = self._config_dict[key].data_type(os.environ[key])
-                setattr(self, key, val)
-            except KeyError as exc:
-                log.debug("Environment variable not present: %s", exc)
-        return
+import pkg_resources
+import yaml
+
+logger = logging.getLogger(__name__)
+
+
+class Config:
+    def __init__(self, config_file: str = ''):
+        self.conf = {}
+        self._read_config_file(config_file)
+        self._read_env()
+
+    def _read_config_file(self, config_file):
+        if not config_file:
+            path = 'pol.yaml'
+            config_file = pkg_resources.resource_filename(__name__, path)
+        with open(config_file) as f:
+            self.conf = yaml.load(f)
+
+    def get(self, section, field=None):
+        if not field:
+            return self.conf[section]
+        return self.conf[section][field]
+
+    def set(self, section, field, value):
+        if section not in self.conf:
+            self.conf[section] = {}
+        self.conf[section][field] = value
+
+    def _read_env(self):
+        for env in os.environ:
+            if not env.startswith("OSMPOL_"):
+                continue
+            elements = env.lower().split("_")
+            if len(elements) < 3:
+                logger.warning(
+                    "Environment variable %s=%s does not comply with required format. Section and/or field missing.",
+                    env, os.getenv(env))
+                continue
+            section = elements[1]
+            field = '_'.join(elements[2:])
+            value = os.getenv(env)
+            if section not in self.conf:
+                self.conf[section] = {}
+            self.conf[section][field] = value
index db8cf28..330d8c5 100644 (file)
 import datetime
 import logging
 
-from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField, AutoField, DateTimeField
+from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField, AutoField, DateTimeField, Proxy
 from playhouse.db_url import connect
 
 from osm_policy_module.core.config import Config
 
 log = logging.getLogger(__name__)
-cfg = Config.instance()
 
-db = connect(cfg.OSMPOL_SQL_DATABASE_URI)
+db = Proxy()
 
 
 class BaseModel(Model):
@@ -70,10 +69,14 @@ class ScalingAlarm(BaseModel):
 
 
 class DatabaseManager:
+    def init_db(self, config: Config):
+        db.initialize(connect(config.get('sql', 'database_uri')))
+        self.create_tables()
+
     def create_tables(self):
-        db.connect()
-        db.create_tables([ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm])
-        db.close()
+        with db.atomic():
+            db.create_tables([ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm])
 
     def get_alarm(self, alarm_uuid: str):
-        return ScalingAlarm.select().where(ScalingAlarm.alarm_uuid == alarm_uuid).get()
+        with db.atomic():
+            return ScalingAlarm.select().where(ScalingAlarm.alarm_uuid == alarm_uuid).get()
diff --git a/osm_policy_module/core/pol.yaml b/osm_policy_module/core/pol.yaml
new file mode 100644 (file)
index 0000000..4e07cf8
--- /dev/null
@@ -0,0 +1,38 @@
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+
+global:
+  loglevel: INFO
+
+database:
+  driver: mongo
+  uri: mongodb://mongo:27017
+  name: osm
+
+message:
+  driver: kafka
+  host: kafka
+  port: 9092
+  group_id: pol-consumer
+
+sql:
+  database_uri: sqlite:///policy_module.db
\ No newline at end of file
diff --git a/osm_policy_module/core/singleton.py b/osm_policy_module/core/singleton.py
deleted file mode 100644 (file)
index 9b8db5d..0000000
+++ /dev/null
@@ -1,42 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright 2018 Whitestack, LLC
-# *************************************************************
-
-# This file is part of OSM Monitoring module
-# All Rights Reserved to Whitestack, LLC
-
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-
-#         http://www.apache.org/licenses/LICENSE-2.0
-
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# For those usages not covered by the Apache License, Version 2.0 please
-# contact: bdiaz@whitestack.com or glavado@whitestack.com
-##
-"""Simple singleton class."""
-
-from __future__ import unicode_literals
-
-
-class Singleton(object):
-    """Simple singleton class."""
-
-    def __init__(self, decorated):
-        """Initialize singleton instance."""
-        self._decorated = decorated
-
-    def instance(self):
-        """Return singleton instance."""
-        try:
-            return self._instance
-        except AttributeError:
-            self._instance = self._decorated()
-            return self._instance
index 29d88e2..28b2e0f 100644 (file)
@@ -42,11 +42,10 @@ log.addHandler(stream_handler)
 class KafkaMessagesTest(unittest.TestCase):
     def setUp(self):
         super()
-        cfg = Config.instance()
-        self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
-                                           cfg.OSMPOL_MESSAGE_PORT)
+        cfg = Config()
+        self.kafka_server = '{}:{}'.format(cfg.get('message', 'host'),
+                                           cfg.get('message', 'port'))
         self.loop = asyncio.new_event_loop()
-        asyncio.set_event_loop(None)
 
     def tearDown(self):
         super()
index c175697..68ab1f4 100644 (file)
@@ -36,6 +36,7 @@ from osm_policy_module.common.common_db_client import CommonDbClient
 from osm_policy_module.common.mon_client import MonClient
 from osm_policy_module.core import database
 from osm_policy_module.core.agent import PolicyModuleAgent
+from osm_policy_module.core.config import Config
 from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria
 
 log = logging.getLogger()
@@ -433,7 +434,6 @@ class PolicyModuleAgentTest(unittest.TestCase):
         test_db.drop_tables(MODELS)
         test_db.create_tables(MODELS)
         self.loop = asyncio.new_event_loop()
-        asyncio.set_event_loop(None)
 
     def tearDown(self):
         super()
@@ -459,7 +459,8 @@ class PolicyModuleAgentTest(unittest.TestCase):
         get_nsr.return_value = nsr_record_mock
         get_vnfd.return_value = vnfd_record_mock
         create_alarm.side_effect = _test_configure_scaling_groups_create_alarm
-        agent = PolicyModuleAgent(self.loop)
+        config = Config()
+        agent = PolicyModuleAgent(config, self.loop)
         self.loop.run_until_complete(agent._configure_scaling_groups("test_nsr_id"))
         create_alarm.assert_any_call(metric_name='cirros_vnf_memory_util',
                                      ns_id='test_nsr_id',
diff --git a/osm_policy_module/tests/unit/common/__init__.py b/osm_policy_module/tests/unit/common/__init__.py
new file mode 100644 (file)
index 0000000..d81308a
--- /dev/null
@@ -0,0 +1,23 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
diff --git a/osm_policy_module/tests/unit/common/test_message_bus_client.py b/osm_policy_module/tests/unit/common/test_message_bus_client.py
new file mode 100644 (file)
index 0000000..0b97de7
--- /dev/null
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+import asyncio
+from unittest import TestCase, mock
+
+from osm_common.msgkafka import MsgKafka
+
+from osm_policy_module.common.message_bus_client import MessageBusClient
+from osm_policy_module.core.config import Config
+
+
+class TestMessageBusClient(TestCase):
+
+    def setUp(self):
+        self.config = Config()
+        self.config.set('message', 'driver', 'kafka')
+        self.loop = asyncio.new_event_loop()
+
+    @mock.patch.object(MsgKafka, 'aioread')
+    def test_aioread(self, aioread):
+        async def mock_callback():
+            pass
+
+        future = asyncio.Future(loop=self.loop)
+        future.set_result('mock')
+        aioread.return_value = future
+        msg_bus = MessageBusClient(self.config, loop=self.loop)
+        topic = 'test_topic'
+        self.loop.run_until_complete(msg_bus.aioread([topic], mock_callback))
+        aioread.assert_called_with(['test_topic'], self.loop, aiocallback=mock_callback)
+
+    @mock.patch.object(MsgKafka, 'aiowrite')
+    def test_aiowrite(self, aiowrite):
+        future = asyncio.Future(loop=self.loop)
+        future.set_result('mock')
+        aiowrite.return_value = future
+        msg_bus = MessageBusClient(self.config, loop=self.loop)
+        topic = 'test_topic'
+        key = 'test_key'
+        msg = {'test': 'test_msg'}
+        self.loop.run_until_complete(msg_bus.aiowrite(topic, key, msg))
+        aiowrite.assert_called_with(topic, key, msg, self.loop)
+
+    @mock.patch.object(MsgKafka, 'aioread')
+    def test_aioread_once(self, aioread):
+        future = asyncio.Future(loop=self.loop)
+        future.set_result('mock')
+        aioread.return_value = future
+        msg_bus = MessageBusClient(self.config, loop=self.loop)
+        topic = 'test_topic'
+        self.loop.run_until_complete(msg_bus.aioread_once(topic))
+        aioread.assert_called_with('test_topic', self.loop)
index 932adf4..7fc2dc9 100644 (file)
@@ -28,23 +28,29 @@ from unittest import mock
 from unittest.mock import Mock
 
 from osm_policy_module.core.agent import PolicyModuleAgent
+from osm_policy_module.core.config import Config
 from osm_policy_module.core.database import DatabaseManager
 
 
 class PolicyAgentTest(unittest.TestCase):
     def setUp(self):
         self.loop = asyncio.new_event_loop()
-        asyncio.set_event_loop(None)
 
     @mock.patch('osm_policy_module.core.agent.CommonDbClient')
     @mock.patch('osm_policy_module.core.agent.MonClient')
     @mock.patch('osm_policy_module.core.agent.LcmClient')
     @mock.patch.object(PolicyModuleAgent, '_configure_scaling_groups')
-    def test_handle_instantiated_or_scaled(self, configure_scaling_groups, lcm_client, mon_client, db_client):
+    @mock.patch.object(PolicyModuleAgent, '_delete_orphaned_alarms')
+    def test_handle_instantiated(self, delete_orphaned_alarms, configure_scaling_groups, lcm_client,
+                                 mon_client, db_client):
         async def mock_configure_scaling_groups(nsr_id):
             pass
 
-        agent = PolicyModuleAgent(self.loop)
+        async def mock_delete_orphaned_alarms(nsr_id):
+            pass
+
+        config = Config()
+        agent = PolicyModuleAgent(config, self.loop)
         assert lcm_client.called
         assert mon_client.called
         assert db_client.called
@@ -60,14 +66,15 @@ class PolicyAgentTest(unittest.TestCase):
             'nsInstanceId': 'test_nsr_id'
         }
         configure_scaling_groups.side_effect = mock_configure_scaling_groups
+        delete_orphaned_alarms.side_effect = mock_delete_orphaned_alarms
 
         db_client.return_value.get_nslcmop.return_value = nslcmop_completed
-        self.loop.run_until_complete(agent._handle_instantiated_or_scaled(content))
+        self.loop.run_until_complete(agent._handle_instantiated(content))
         configure_scaling_groups.assert_called_with('test_nsr_id')
         configure_scaling_groups.reset_mock()
 
         db_client.return_value.get_nslcmop.return_value = nslcmop_failed
-        self.loop.run_until_complete(agent._handle_instantiated_or_scaled(content))
+        self.loop.run_until_complete(agent._handle_instantiated(content))
         configure_scaling_groups.assert_not_called()
 
     @mock.patch('osm_policy_module.core.agent.CommonDbClient')
@@ -78,7 +85,8 @@ class PolicyAgentTest(unittest.TestCase):
         async def mock_scale(nsr_id, scaling_group_name, vnf_member_index, action):
             pass
 
-        agent = PolicyModuleAgent(self.loop)
+        config = Config()
+        agent = PolicyModuleAgent(config, self.loop)
         assert lcm_client.called
         assert mon_client.called
         assert db_client.called
index 2a85f83..1d42c16 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -45,6 +45,7 @@ setup(
     url=_url,
     license=_license,
     packages=[_name],
+    package_dir={_name: _name},
     include_package_data=True,
     install_requires=[
         "aiokafka==0.4.*",
@@ -52,7 +53,8 @@ setup(
         "jsonschema==2.6.*",
         "six==1.11.*",
         "pyyaml==3.*",
-        "osm-common"
+        "pymysql",
+        "osm-common",
     ],
     entry_points={
         "console_scripts": [