Merge "Added LICENSE file to root folder" netslice
authorlavado <glavado@whitestack.com>
Sat, 13 Oct 2018 12:08:08 +0000 (14:08 +0200)
committerGerrit Code Review <root@osm.etsi.org>
Sat, 13 Oct 2018 12:08:08 +0000 (14:08 +0200)
18 files changed:
.gitignore
Dockerfile
MANIFEST.in
Makefile [new file with mode: 0644]
debian/python3-osm-policy-module.postinst [new file with mode: 0644]
devops-stages/stage-build.sh
docker/Dockerfile
osm_policy_module/cmd/policy_module_agent.py
osm_policy_module/common/db_client.py [new file with mode: 0644]
osm_policy_module/common/lcm_client.py
osm_policy_module/common/mon_client.py
osm_policy_module/core/agent.py
osm_policy_module/core/config.py
osm_policy_module/core/database.py
osm_policy_module/tests/integration/test_policy_agent.py
setup.py
stdeb.cfg [new file with mode: 0644]
tox.ini

index f4d6bb1..88962fe 100644 (file)
@@ -76,3 +76,7 @@ ChangeLog
 .settings/
 __pycache__/
 .idea
+
+deb_dist
+*.tar.gz
+*.db
\ No newline at end of file
index 6738633..f3af477 100644 (file)
@@ -22,7 +22,7 @@
 
 FROM ubuntu:16.04
 RUN  apt-get update && \
-  DEBIAN_FRONTEND=noninteractive apt-get --yes install git tox make python python-pip python3 python3-pip debhelper && \
-  DEBIAN_FRONTEND=noninteractive apt-get --yes install wget python-dev python-software-properties python-stdeb && \
-  DEBIAN_FRONTEND=noninteractive apt-get --yes install default-jre libmysqlclient-dev && \
-  DEBIAN_FRONTEND=noninteractive apt-get --yes install libmysqlclient-dev libxml2 python3-all
+  DEBIAN_FRONTEND=noninteractive apt-get --yes install git tox make python-all python3 python3-pip debhelper wget && \
+  DEBIAN_FRONTEND=noninteractive apt-get --yes install libmysqlclient-dev libxml2 python3-all && \
+  DEBIAN_FRONTEND=noninteractive pip3 install -U setuptools setuptools-version-command stdeb
+
index ad2c95a..9dbb8cd 100644 (file)
@@ -24,5 +24,4 @@ include requirements.txt
 include test-requirements.txt
 include README.rst
 recursive-include osm_policy_module *.py *.xml *.sh
-recursive-include devops-stages *
-recursive-include test *.py
+recursive-include devops-stages *
\ No newline at end of file
diff --git a/Makefile b/Makefile
new file mode 100644 (file)
index 0000000..586b73c
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,9 @@
+all: clean package
+
+clean:
+       rm -rf dist deb_dist osm_policy_module-*.tar.gz osm_policy_module.egg-info .eggs
+
+package:
+       python3 setup.py --command-packages=stdeb.command sdist_dsc
+       cp debian/python3-osm-policy-module.postinst deb_dist/osm-policy-module*/debian
+       cd deb_dist/osm-policy-module*/  && dpkg-buildpackage -rfakeroot -uc -us
\ No newline at end of file
diff --git a/debian/python3-osm-policy-module.postinst b/debian/python3-osm-policy-module.postinst
new file mode 100644 (file)
index 0000000..3b24842
--- /dev/null
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+echo "Installing python dependencies via pip..."
+pip3 install kafka==1.3.*
+pip3 install peewee==3.1.*
+pip3 install jsonschema==2.6.*
+pip3 install six==1.11.*
+pip3 install pyyaml==3.*
+echo "Installation of python dependencies finished"
\ No newline at end of file
index 4251b1c..8a8d332 100755 (executable)
@@ -23,7 +23,4 @@
 #__date__   = "14/Sep/2017"
 
 #!/bin/bash
-rm -rf deb_dist
-rm -rf dist
-rm -rf osm_mon.egg-info
-tox -e build
+make
index 4095fa8..ab5b411 100644 (file)
@@ -47,5 +47,6 @@ ENV OSMPOL_DATABASE_PORT 27017
 ENV OSMPOL_SQL_DATABASE_URI sqlite:///mon_sqlite.db
 
 ENV OSMPOL_LOG_LEVEL INFO
+ENV OSMPOL_KAFKA_LOG_LEVEL WARN
 
 CMD osm-policy-agent
index 24663e5..c82f006 100644 (file)
@@ -43,13 +43,13 @@ def main():
                         datefmt='%m/%d/%Y %I:%M:%S %p',
                         level=logging.getLevelName(cfg.OSMPOL_LOG_LEVEL))
     kafka_logger = logging.getLogger('kafka')
-    kafka_logger.setLevel(logging.WARN)
+    kafka_logger.setLevel(logging.getLevelName(cfg.OSMPOL_KAFKA_LOG_LEVEL))
     kafka_formatter = logging.Formatter(log_formatter_str)
     kafka_handler = logging.StreamHandler(sys.stdout)
     kafka_handler.setFormatter(kafka_formatter)
     kafka_logger.addHandler(kafka_handler)
     log = logging.getLogger(__name__)
-    log.info("Config: %s", cfg)
+    log.info("Config: %s", vars(cfg))
     log.info("Syncing database...")
     db_manager = DatabaseManager()
     db_manager.create_tables()
diff --git a/osm_policy_module/common/db_client.py b/osm_policy_module/common/db_client.py
new file mode 100644 (file)
index 0000000..1eed0f3
--- /dev/null
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+#         http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+from osm_common import dbmongo
+
+from osm_policy_module.core.config import Config
+
+
+class DbClient:
+    def __init__(self):
+        cfg = Config.instance()
+        self.common_db = dbmongo.DbMongo()
+        self.common_db.db_connect({'host': cfg.OSMPOL_DATABASE_HOST,
+                                   'port': int(cfg.OSMPOL_DATABASE_PORT),
+                                   'name': 'osm'})
+
+    def get_vnfr(self, nsr_id: str, member_index: int):
+        vnfr = self.common_db.get_one("vnfrs",
+                                      {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
+        return vnfr
+
+    def get_vnfrs(self, nsr_id: str):
+        return [self.get_vnfr(nsr_id, member['member-vnf-index']) for member in
+                self.get_nsr(nsr_id)['nsd']['constituent-vnfd']]
+
+    def get_vnfd(self, vnfd_id: str):
+        vnfr = self.common_db.get_one("vnfds",
+                                      {"_id": vnfd_id})
+        return vnfr
+
+    def get_nsr(self, nsr_id: str):
+        nsr = self.common_db.get_one("nsrs",
+                                     {"id": nsr_id})
+        return nsr
+
+    def get_nslcmop(self, nslcmop_id):
+        nslcmop = self.common_db.get_one("nslcmops",
+                                         {"_id": nslcmop_id})
+        return nslcmop
index 7857d26..34e212f 100644 (file)
@@ -49,6 +49,7 @@ class LcmClient:
                                    'name': 'osm'})
 
     def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
+        log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
         nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action)
         self.common_db.create("nslcmops", nslcmop)
         log.info("Sending scale action message: %s", json.dumps(nslcmop))
@@ -56,6 +57,7 @@ class LcmClient:
         self.producer.flush()
 
     def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
+        log.debug("_generate_nslcmop %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
         _id = str(uuid.uuid4())
         now = time.time()
         params = {
index 724fe83..5d2416e 100644 (file)
@@ -53,7 +53,8 @@ class MonClient:
         consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
                                  key_deserializer=bytes.decode,
                                  value_deserializer=bytes.decode,
-                                 consumer_timeout_ms=10000)
+                                 consumer_timeout_ms=10000,
+                                 group_id='mon-client-' + str(uuid.uuid4()))
         consumer.subscribe(['alarm_response'])
         for message in consumer:
             if message.key == 'create_alarm_response':
index 8309da1..bbefadd 100644 (file)
@@ -21,6 +21,7 @@
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+import datetime
 import json
 import logging
 import threading
@@ -28,36 +29,33 @@ from json import JSONDecodeError
 
 import yaml
 from kafka import KafkaConsumer
-from osm_common import dbmongo
 
+from osm_policy_module.common.db_client import DbClient
 from osm_policy_module.common.lcm_client import LcmClient
 from osm_policy_module.common.mon_client import MonClient
 from osm_policy_module.core import database
 from osm_policy_module.core.config import Config
-from osm_policy_module.core.database import ScalingRecord, ScalingAlarm
+from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria
 
 log = logging.getLogger(__name__)
 
+ALLOWED_KAFKA_KEYS = ['instantiated', 'scaled', 'notify_alarm']
+
 
 class PolicyModuleAgent:
     def __init__(self):
         cfg = Config.instance()
-        self.common_db = dbmongo.DbMongo()
-        self.common_db.db_connect({'host': cfg.OSMPOL_DATABASE_HOST,
-                                   'port': int(cfg.OSMPOL_DATABASE_PORT),
-                                   'name': 'osm'})
+        self.db_client = DbClient()
         self.mon_client = MonClient()
+        self.lcm_client = LcmClient()
         self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
                                            cfg.OSMPOL_MESSAGE_PORT)
 
     def run(self):
-        cfg = Config.instance()
-        cfg.read_environ()
-
         consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
                                  key_deserializer=bytes.decode,
                                  value_deserializer=bytes.decode,
-                                 consumer_timeout_ms=10000)
+                                 group_id='pol-consumer')
         consumer.subscribe(["ns", "alarm_response"])
 
         for message in consumer:
@@ -65,141 +63,204 @@ class PolicyModuleAgent:
             t.start()
 
     def _process_msg(self, topic, key, msg):
+        log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg)
         try:
-            # Check for ns instantiation
-            if key == 'instantiated':
+            if key in ALLOWED_KAFKA_KEYS:
                 try:
                     content = json.loads(msg)
                 except JSONDecodeError:
                     content = yaml.safe_load(msg)
-                log.info("Message arrived with topic: %s, key: %s, msg: %s", topic, key, content)
-                nslcmop_id = content['nslcmop_id']
-                nslcmop = self.common_db.get_one(table="nslcmops",
-                                                 filter={"_id": nslcmop_id})
-                if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
-                    nsr_id = nslcmop['nsInstanceId']
-                    log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
-                    self._configure_scaling_groups(nsr_id)
-                else:
-                    log.info(
-                        "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
-                        "Current state is %s. Skipping...",
-                        nslcmop['operationState'])
-
-            if key == 'notify_alarm':
-                try:
-                    content = json.loads(msg)
-                except JSONDecodeError:
-                    content = yaml.safe_load(msg)
-                log.info("Message arrived with topic: %s, key: %s, msg: %s", topic, key, content)
-                alarm_id = content['notify_details']['alarm_uuid']
-                metric_name = content['notify_details']['metric_name']
-                operation = content['notify_details']['operation']
-                threshold = content['notify_details']['threshold_value']
-                vdu_name = content['notify_details']['vdu_name']
-                vnf_member_index = content['notify_details']['vnf_member_index']
-                ns_id = content['notify_details']['ns_id']
-                log.info(
-                    "Received alarm notification for alarm %s, \
-                    metric %s, \
-                    operation %s, \
-                    threshold %s, \
-                    vdu_name %s, \
-                    vnf_member_index %s, \
-                    ns_id %s ",
-                    alarm_id, metric_name, operation, threshold, vdu_name, vnf_member_index, ns_id)
-                try:
-                    alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
-                    lcm_client = LcmClient()
-                    log.info("Sending scaling action message for ns: %s", alarm_id)
-                    lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.vnf_member_index,
-                                     alarm.action)
-                except ScalingAlarm.DoesNotExist:
-                    log.info("There is no action configured for alarm %s.", alarm_id)
-        except Exception:
-            log.exception("Error consuming message: ")
 
-    def _get_vnfr(self, nsr_id: str, member_index: int):
-        vnfr = self.common_db.get_one(table="vnfrs",
-                                      filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
-        return vnfr
+                if key == 'instantiated' or key == 'scaled':
+                    self._handle_instantiated_or_scaled(content)
 
-    def _get_vnfrs(self, nsr_id: str):
-        return [self._get_vnfr(nsr_id, member['member-vnf-index']) for member in
-                self._get_nsr(nsr_id)['nsd']['constituent-vnfd']]
+                if key == 'notify_alarm':
+                    self._handle_alarm_notification(content)
+            else:
+                log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key)
+        except Exception:
+            log.exception("Error consuming message: ")
 
-    def _get_vnfd(self, vnfd_id: str):
-        vnfr = self.common_db.get_one(table="vnfds",
-                                      filter={"_id": vnfd_id})
-        return vnfr
+    def _handle_alarm_notification(self, content):
+        log.debug("_handle_alarm_notification: %s", content)
+        alarm_id = content['notify_details']['alarm_uuid']
+        metric_name = content['notify_details']['metric_name']
+        operation = content['notify_details']['operation']
+        threshold = content['notify_details']['threshold_value']
+        vdu_name = content['notify_details']['vdu_name']
+        vnf_member_index = content['notify_details']['vnf_member_index']
+        ns_id = content['notify_details']['ns_id']
+        log.info(
+            "Received alarm notification for alarm %s, \
+            metric %s, \
+            operation %s, \
+            threshold %s, \
+            vdu_name %s, \
+            vnf_member_index %s, \
+            ns_id %s ",
+            alarm_id, metric_name, operation, threshold, vdu_name, vnf_member_index, ns_id)
+        try:
+            alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
+            delta = datetime.datetime.now() - alarm.scaling_criteria.scaling_policy.last_scale
+            log.debug("last_scale: %s", alarm.scaling_criteria.scaling_policy.last_scale)
+            log.debug("now: %s", datetime.datetime.now())
+            log.debug("delta: %s", delta)
+            if delta.total_seconds() < alarm.scaling_criteria.scaling_policy.cooldown_time:
+                log.info("Time between last scale and now is less than cooldown time. Skipping.")
+                return
+            log.info("Sending scaling action message for ns: %s", alarm_id)
+            self.lcm_client.scale(alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
+                                  alarm.scaling_criteria.scaling_policy.scaling_group.name,
+                                  alarm.vnf_member_index,
+                                  alarm.action)
+            alarm.scaling_criteria.scaling_policy.last_scale = datetime.datetime.now()
+            alarm.scaling_criteria.scaling_policy.save()
+        except ScalingAlarm.DoesNotExist:
+            log.info("There is no action configured for alarm %s.", alarm_id)
 
-    def _get_nsr(self, nsr_id: str):
-        nsr = self.common_db.get_one(table="nsrs",
-                                     filter={"id": nsr_id})
-        return nsr
+    def _handle_instantiated_or_scaled(self, content):
+        log.debug("_handle_instantiated_or_scaled: %s", content)
+        nslcmop_id = content['nslcmop_id']
+        nslcmop = self.db_client.get_nslcmop(nslcmop_id)
+        if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
+            nsr_id = nslcmop['nsInstanceId']
+            log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
+            self._configure_scaling_groups(nsr_id)
+        else:
+            log.info(
+                "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
+                "Current state is %s. Skipping...",
+                nslcmop['operationState'])
 
     def _configure_scaling_groups(self, nsr_id: str):
+        log.debug("_configure_scaling_groups: %s", nsr_id)
         # TODO(diazb): Check for alarm creation on exception and clean resources if needed.
+        # TODO: Add support for non-nfvi metrics
         with database.db.atomic():
-            vnfrs = self._get_vnfrs(nsr_id)
-            log.info("Checking %s vnfrs...", len(vnfrs))
+            vnfrs = self.db_client.get_vnfrs(nsr_id)
+            log.info("Found %s vnfrs", len(vnfrs))
             for vnfr in vnfrs:
-                vnfd = self._get_vnfd(vnfr['vnfd-id'])
+                vnfd = self.db_client.get_vnfd(vnfr['vnfd-id'])
                 log.info("Looking for vnfd %s", vnfr['vnfd-id'])
                 scaling_groups = vnfd['scaling-group-descriptor']
                 vnf_monitoring_params = vnfd['monitoring-param']
                 for scaling_group in scaling_groups:
-                    log.info("Creating scaling record in DB...")
-                    scaling_record = ScalingRecord.create(
-                        nsr_id=nsr_id,
-                        name=scaling_group['name'],
-                        content=json.dumps(scaling_group)
-                    )
-                    log.info("Created scaling record in DB : nsr_id=%s, name=%s, content=%s",
-                             scaling_record.nsr_id,
-                             scaling_record.name,
-                             scaling_record.content)
+                    try:
+                        scaling_group_record = ScalingGroup.select().where(
+                            ScalingGroup.nsr_id == nsr_id,
+                            ScalingGroup.vnf_member_index == vnfr['member-vnf-index-ref'],
+                            ScalingGroup.name == scaling_group['name']
+                        ).get()
+                        log.info("Found existing scaling group record in DB...")
+                    except ScalingGroup.DoesNotExist:
+                        log.info("Creating scaling group record in DB...")
+                        scaling_group_record = ScalingGroup.create(
+                            nsr_id=nsr_id,
+                            vnf_member_index=vnfr['member-vnf-index-ref'],
+                            name=scaling_group['name'],
+                            content=json.dumps(scaling_group)
+                        )
+                        log.info(
+                            "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%d, name=%s, content=%s",
+                            scaling_group_record.nsr_id,
+                            scaling_group_record.vnf_member_index,
+                            scaling_group_record.name,
+                            scaling_group_record.content)
                     for scaling_policy in scaling_group['scaling-policy']:
-                        for vdur in vnfd['vdu']:
-                            vdu_monitoring_params = vdur['monitoring-param']
-                            for scaling_criteria in scaling_policy['scaling-criteria']:
+                        if scaling_policy['scaling-type'] != 'automatic':
+                            continue
+                        try:
+                            scaling_policy_record = ScalingPolicy.select().join(ScalingGroup).where(
+                                ScalingPolicy.name == scaling_policy['name'],
+                                ScalingGroup.id == scaling_group_record.id
+                            ).get()
+                            log.info("Found existing scaling policy record in DB...")
+                        except ScalingPolicy.DoesNotExist:
+                            log.info("Creating scaling policy record in DB...")
+                            scaling_policy_record = ScalingPolicy.create(
+                                nsr_id=nsr_id,
+                                name=scaling_policy['name'],
+                                cooldown_time=scaling_policy['cooldown-time'],
+                                scaling_group=scaling_group_record
+                            )
+                            log.info("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
+                                     scaling_policy_record.name,
+                                     scaling_policy_record.scaling_group.name)
+
+                        for scaling_criteria in scaling_policy['scaling-criteria']:
+                            try:
+                                scaling_criteria_record = ScalingCriteria.select().join(ScalingPolicy).where(
+                                    ScalingPolicy.id == scaling_policy_record.id,
+                                    ScalingCriteria.name == scaling_criteria['name']
+                                ).get()
+                                log.info("Found existing scaling criteria record in DB...")
+                            except ScalingCriteria.DoesNotExist:
+                                log.info("Creating scaling criteria record in DB...")
+                                scaling_criteria_record = ScalingCriteria.create(
+                                    nsr_id=nsr_id,
+                                    name=scaling_criteria['name'],
+                                    scaling_policy=scaling_policy_record
+                                )
+                                log.info(
+                                    "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
+                                    scaling_criteria_record.name,
+                                    scaling_criteria_record.scaling_policy.name)
+
+                            for vdu_ref in scaling_group['vdu']:
                                 vnf_monitoring_param = next(
                                     filter(lambda param: param['id'] == scaling_criteria['vnf-monitoring-param-ref'],
                                            vnf_monitoring_params))
-                                # TODO: Add support for non-nfvi metrics
+                                if not vdu_ref['vdu-id-ref'] == vnf_monitoring_param['vdu-ref']:
+                                    continue
+                                vdu = next(
+                                    filter(lambda vdu: vdu['id'] == vdu_ref['vdu-id-ref'], vnfd['vdu'])
+                                )
+                                vdu_monitoring_params = vdu['monitoring-param']
                                 vdu_monitoring_param = next(
                                     filter(
                                         lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param-ref'],
                                         vdu_monitoring_params))
-                                alarm_uuid = self.mon_client.create_alarm(
-                                    metric_name=vdu_monitoring_param['nfvi-metric'],
-                                    ns_id=nsr_id,
-                                    vdu_name=vdur['name'],
-                                    vnf_member_index=vnfr['member-vnf-index-ref'],
-                                    threshold=scaling_criteria['scale-in-threshold'],
-                                    operation=scaling_criteria['scale-in-relational-operation'],
-                                    statistic=vnf_monitoring_param['aggregation-type']
-                                )
-                                ScalingAlarm.create(
-                                    alarm_id=alarm_uuid,
-                                    action='scale_in',
-                                    vnf_member_index=int(vnfr['member-vnf-index-ref']),
-                                    vdu_name=vdur['name'],
-                                    scaling_record=scaling_record
-                                )
-                                alarm_uuid = self.mon_client.create_alarm(
-                                    metric_name=vdu_monitoring_param['nfvi-metric'],
-                                    ns_id=nsr_id,
-                                    vdu_name=vdur['name'],
-                                    vnf_member_index=vnfr['member-vnf-index-ref'],
-                                    threshold=scaling_criteria['scale-out-threshold'],
-                                    operation=scaling_criteria['scale-out-relational-operation'],
-                                    statistic=vnf_monitoring_param['aggregation-type']
-                                )
-                                ScalingAlarm.create(
-                                    alarm_id=alarm_uuid,
-                                    action='scale_out',
-                                    vnf_member_index=int(vnfr['member-vnf-index-ref']),
-                                    vdu_name=vdur['name'],
-                                    scaling_record=scaling_record
-                                )
+                                vdurs = list(filter(lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param['vdu-ref'],
+                                                    vnfr['vdur']))
+                                for vdur in vdurs:
+                                    try:
+                                        ScalingAlarm.select().join(ScalingCriteria).where(
+                                            ScalingAlarm.vdu_name == vdur['name'],
+                                            ScalingCriteria.name == scaling_criteria['name']
+                                        ).get()
+                                        log.debug("vdu %s already has an alarm configured", vdur['name'])
+                                        continue
+                                    except ScalingAlarm.DoesNotExist:
+                                        pass
+                                    alarm_uuid = self.mon_client.create_alarm(
+                                        metric_name=vdu_monitoring_param['nfvi-metric'],
+                                        ns_id=nsr_id,
+                                        vdu_name=vdur['name'],
+                                        vnf_member_index=vnfr['member-vnf-index-ref'],
+                                        threshold=scaling_criteria['scale-in-threshold'],
+                                        operation=scaling_criteria['scale-in-relational-operation'],
+                                        statistic=vnf_monitoring_param['aggregation-type']
+                                    )
+                                    ScalingAlarm.create(
+                                        alarm_id=alarm_uuid,
+                                        action='scale_in',
+                                        vnf_member_index=int(vnfr['member-vnf-index-ref']),
+                                        vdu_name=vdur['name'],
+                                        scaling_criteria=scaling_criteria_record
+                                    )
+                                    alarm_uuid = self.mon_client.create_alarm(
+                                        metric_name=vdu_monitoring_param['nfvi-metric'],
+                                        ns_id=nsr_id,
+                                        vdu_name=vdur['name'],
+                                        vnf_member_index=vnfr['member-vnf-index-ref'],
+                                        threshold=scaling_criteria['scale-out-threshold'],
+                                        operation=scaling_criteria['scale-out-relational-operation'],
+                                        statistic=vnf_monitoring_param['aggregation-type']
+                                    )
+                                    ScalingAlarm.create(
+                                        alarm_id=alarm_uuid,
+                                        action='scale_out',
+                                        vnf_member_index=int(vnfr['member-vnf-index-ref']),
+                                        vdu_name=vdur['name'],
+                                        scaling_criteria=scaling_criteria_record
+                                    )
index dab409b..84a1f57 100644 (file)
@@ -67,6 +67,7 @@ class Config(object):
         CfgParam('OSMPOL_DATABASE_PORT', 27017, int),
         CfgParam('OSMPOL_SQL_DATABASE_URI', "sqlite:///mon_sqlite.db", six.text_type),
         CfgParam('OSMPOL_LOG_LEVEL', "INFO", six.text_type),
+        CfgParam('OSMPOL_KAFKA_LOG_LEVEL', "WARN", six.text_type),
     ]
 
     _config_dict = {cfg.key: cfg for cfg in _configuration}
index f9a37c2..8ad19f2 100644 (file)
 # For those usages not covered by the Apache License, Version 2.0 please
 # contact: bdiaz@whitestack.com or glavado@whitestack.com
 ##
+import datetime
 import logging
 
-from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField
+from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField, AutoField, DateTimeField
 from playhouse.sqlite_ext import SqliteExtDatabase
 
 from osm_policy_module.core.config import Config
@@ -35,29 +36,44 @@ db = SqliteExtDatabase('policy_module.db')
 
 
 class BaseModel(Model):
+    id = AutoField(primary_key=True)
+
     class Meta:
         database = db
 
 
-class ScalingRecord(BaseModel):
+class ScalingGroup(BaseModel):
     nsr_id = CharField()
+    vnf_member_index = IntegerField()
     name = CharField()
     content = TextField()
 
 
+class ScalingPolicy(BaseModel):
+    name = CharField()
+    cooldown_time = IntegerField()
+    last_scale = DateTimeField(default=datetime.datetime.now)
+    scaling_group = ForeignKeyField(ScalingGroup, related_name='scaling_policies')
+
+
+class ScalingCriteria(BaseModel):
+    name = CharField()
+    scaling_policy = ForeignKeyField(ScalingPolicy, related_name='scaling_criterias')
+
+
 class ScalingAlarm(BaseModel):
     alarm_id = CharField()
     action = CharField()
     vnf_member_index = IntegerField()
     vdu_name = CharField()
-    scaling_record = ForeignKeyField(ScalingRecord, related_name='scaling_alarms')
+    scaling_criteria = ForeignKeyField(ScalingCriteria, related_name='scaling_alarms')
 
 
 class DatabaseManager:
     def create_tables(self):
         try:
             db.connect()
-            db.create_tables([ScalingRecord, ScalingAlarm])
+            db.create_tables([ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm])
             db.close()
         except Exception as e:
             log.exception("Error creating tables: ")
index 3a82ea7..bed6eb5 100644 (file)
@@ -31,10 +31,11 @@ from kafka import KafkaProducer
 from osm_common.dbmongo import DbMongo
 from peewee import SqliteDatabase
 
+from osm_policy_module.common.db_client import DbClient
 from osm_policy_module.common.mon_client import MonClient
 from osm_policy_module.core import database
 from osm_policy_module.core.agent import PolicyModuleAgent
-from osm_policy_module.core.database import ScalingRecord, ScalingAlarm
+from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria
 
 log = logging.getLogger()
 log.level = logging.INFO
@@ -401,7 +402,7 @@ vnfd_record_mock = {
 
 test_db = SqliteDatabase(':memory:')
 
-MODELS = [ScalingRecord, ScalingAlarm]
+MODELS = [ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm]
 
 
 class PolicyModuleAgentTest(unittest.TestCase):
@@ -419,9 +420,9 @@ class PolicyModuleAgentTest(unittest.TestCase):
     @patch.object(DbMongo, 'db_connect', Mock())
     @patch.object(KafkaProducer, '__init__')
     @patch.object(MonClient, 'create_alarm')
-    @patch.object(PolicyModuleAgent, '_get_vnfd')
-    @patch.object(PolicyModuleAgent, '_get_nsr')
-    @patch.object(PolicyModuleAgent, '_get_vnfr')
+    @patch.object(DbClient, 'get_vnfd')
+    @patch.object(DbClient, 'get_nsr')
+    @patch.object(DbClient, 'get_vnfr')
     def test_configure_scaling_groups(self, get_vnfr, get_nsr, get_vnfd, create_alarm, kafka_producer_init):
         def _test_configure_scaling_groups_get_vnfr(*args, **kwargs):
             if '1' in args[1]:
@@ -444,12 +445,32 @@ class PolicyModuleAgentTest(unittest.TestCase):
                                      operation='GT',
                                      statistic='AVERAGE',
                                      threshold=80,
-                                     vdu_name='cirros_vnfd-VM',
+                                     vdu_name='cirros_ns-1-cirros_vnfd-VM-1',
                                      vnf_member_index='1')
-        scaling_record = ScalingRecord.get()
+        create_alarm.assert_any_call(metric_name='average_memory_utilization',
+                                     ns_id='test_nsr_id',
+                                     operation='LT',
+                                     statistic='AVERAGE',
+                                     threshold=20,
+                                     vdu_name='cirros_ns-1-cirros_vnfd-VM-1',
+                                     vnf_member_index='1')
+        create_alarm.assert_any_call(metric_name='average_memory_utilization',
+                                     ns_id='test_nsr_id',
+                                     operation='GT',
+                                     statistic='AVERAGE',
+                                     threshold=80,
+                                     vdu_name='cirros_ns-2-cirros_vnfd-VM-1',
+                                     vnf_member_index='2')
+        create_alarm.assert_any_call(metric_name='average_memory_utilization',
+                                     ns_id='test_nsr_id',
+                                     operation='LT',
+                                     statistic='AVERAGE',
+                                     threshold=20,
+                                     vdu_name='cirros_ns-2-cirros_vnfd-VM-1',
+                                     vnf_member_index='2')
+        scaling_record = ScalingGroup.get()
         self.assertEqual(scaling_record.name, 'scale_cirros_vnfd-VM')
         self.assertEqual(scaling_record.nsr_id, 'test_nsr_id')
-        self.assertIsNotNone(scaling_record)
 
 
 if __name__ == '__main__':
index c3c735d..86dacc4 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -24,7 +24,7 @@
 from setuptools import setup
 
 _name = 'osm_policy_module'
-_version_command = ('git describe --match v* --tags --long --dirty', 'pep440-git')
+_version_command = ('git describe --match v* --tags --long --dirty', 'pep440-git-full')
 _author = "Benjamín Díaz"
 _author_email = 'bdiaz@whitestack.com'
 _description = 'OSM Policy Module'
diff --git a/stdeb.cfg b/stdeb.cfg
new file mode 100644 (file)
index 0000000..021202c
--- /dev/null
+++ b/stdeb.cfg
@@ -0,0 +1,3 @@
+[DEFAULT]
+X-Python3-Version : >= 3.4
+Depends3 : libmysqlclient-dev, python3-pip, python3-osm-common
\ No newline at end of file
diff --git a/tox.ini b/tox.ini
index 4ef6a64..584efa7 100644 (file)
--- a/tox.ini
+++ b/tox.ini
@@ -25,7 +25,7 @@
 # test suite on all supported python versions. To use it, "pip install tox"
 # and then run "tox" from this directory.
 [tox]
-envlist = py3
+envlist = py3, flake8
 toxworkdir={homedir}/.tox
 
 [testenv]
@@ -40,12 +40,6 @@ deps = flake8
 commands =
     flake8 osm_policy_module
 
-[testenv:build]
-basepython = python3
-deps = stdeb
-       setuptools-version-command
-commands = python3 setup.py --command-packages=stdeb.command bdist_deb
-
 [flake8]
 # E123, E125 skipped as they are invalid PEP-8.
 max-line-length = 120
@@ -54,4 +48,8 @@ ignore = E123,E125,E241
 builtins = _
 exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build,devops_stages/*,.rst
 
-
+[testenv:build]
+basepython = python3
+deps = stdeb
+       setuptools-version-command
+commands = python3 setup.py --command-packages=stdeb.command bdist_deb