Merge "Added LICENSE file to root folder"
diff --git a/.gitignore b/.gitignore
index f4d6bb1..88962fe 100644
--- a/.gitignore
+++ b/.gitignore
@@ -76,3 +76,7 @@
.settings/
__pycache__/
.idea
+
+deb_dist
+*.tar.gz
+*.db
\ No newline at end of file
diff --git a/Dockerfile b/Dockerfile
index 6738633..f3af477 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -22,7 +22,7 @@
FROM ubuntu:16.04
RUN apt-get update && \
- DEBIAN_FRONTEND=noninteractive apt-get --yes install git tox make python python-pip python3 python3-pip debhelper && \
- DEBIAN_FRONTEND=noninteractive apt-get --yes install wget python-dev python-software-properties python-stdeb && \
- DEBIAN_FRONTEND=noninteractive apt-get --yes install default-jre libmysqlclient-dev && \
- DEBIAN_FRONTEND=noninteractive apt-get --yes install libmysqlclient-dev libxml2 python3-all
+ DEBIAN_FRONTEND=noninteractive apt-get --yes install git tox make python-all python3 python3-pip debhelper wget && \
+ DEBIAN_FRONTEND=noninteractive apt-get --yes install libmysqlclient-dev libxml2 python3-all && \
+ DEBIAN_FRONTEND=noninteractive pip3 install -U setuptools setuptools-version-command stdeb
+
diff --git a/MANIFEST.in b/MANIFEST.in
index ad2c95a..9dbb8cd 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -24,5 +24,4 @@
include test-requirements.txt
include README.rst
recursive-include osm_policy_module *.py *.xml *.sh
-recursive-include devops-stages *
-recursive-include test *.py
+recursive-include devops-stages *
\ No newline at end of file
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..586b73c
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,9 @@
+all: clean package
+
+clean:
+ rm -rf dist deb_dist osm_policy_module-*.tar.gz osm_policy_module.egg-info .eggs
+
+package:
+ python3 setup.py --command-packages=stdeb.command sdist_dsc
+ cp debian/python3-osm-policy-module.postinst deb_dist/osm-policy-module*/debian
+ cd deb_dist/osm-policy-module*/ && dpkg-buildpackage -rfakeroot -uc -us
\ No newline at end of file
diff --git a/debian/python3-osm-policy-module.postinst b/debian/python3-osm-policy-module.postinst
new file mode 100644
index 0000000..3b24842
--- /dev/null
+++ b/debian/python3-osm-policy-module.postinst
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+echo "Installing python dependencies via pip..."
+pip3 install kafka==1.3.*
+pip3 install peewee==3.1.*
+pip3 install jsonschema==2.6.*
+pip3 install six==1.11.*
+pip3 install pyyaml==3.*
+echo "Installation of python dependencies finished"
\ No newline at end of file
diff --git a/devops-stages/stage-build.sh b/devops-stages/stage-build.sh
index 4251b1c..8a8d332 100755
--- a/devops-stages/stage-build.sh
+++ b/devops-stages/stage-build.sh
@@ -23,7 +23,4 @@
#__date__ = "14/Sep/2017"
#!/bin/bash
-rm -rf deb_dist
-rm -rf dist
-rm -rf osm_mon.egg-info
-tox -e build
+make
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 4095fa8..ab5b411 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -47,5 +47,6 @@
ENV OSMPOL_SQL_DATABASE_URI sqlite:///mon_sqlite.db
ENV OSMPOL_LOG_LEVEL INFO
+ENV OSMPOL_KAFKA_LOG_LEVEL WARN
CMD osm-policy-agent
diff --git a/osm_policy_module/cmd/policy_module_agent.py b/osm_policy_module/cmd/policy_module_agent.py
index 24663e5..c82f006 100644
--- a/osm_policy_module/cmd/policy_module_agent.py
+++ b/osm_policy_module/cmd/policy_module_agent.py
@@ -43,13 +43,13 @@
datefmt='%m/%d/%Y %I:%M:%S %p',
level=logging.getLevelName(cfg.OSMPOL_LOG_LEVEL))
kafka_logger = logging.getLogger('kafka')
- kafka_logger.setLevel(logging.WARN)
+ kafka_logger.setLevel(logging.getLevelName(cfg.OSMPOL_KAFKA_LOG_LEVEL))
kafka_formatter = logging.Formatter(log_formatter_str)
kafka_handler = logging.StreamHandler(sys.stdout)
kafka_handler.setFormatter(kafka_formatter)
kafka_logger.addHandler(kafka_handler)
log = logging.getLogger(__name__)
- log.info("Config: %s", cfg)
+ log.info("Config: %s", vars(cfg))
log.info("Syncing database...")
db_manager = DatabaseManager()
db_manager.create_tables()
diff --git a/osm_policy_module/common/db_client.py b/osm_policy_module/common/db_client.py
new file mode 100644
index 0000000..1eed0f3
--- /dev/null
+++ b/osm_policy_module/common/db_client.py
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+
+# Copyright 2018 Whitestack, LLC
+# *************************************************************
+
+# This file is part of OSM Monitoring module
+# All Rights Reserved to Whitestack, LLC
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: bdiaz@whitestack.com or glavado@whitestack.com
+##
+from osm_common import dbmongo
+
+from osm_policy_module.core.config import Config
+
+
+class DbClient:
+ def __init__(self):
+ cfg = Config.instance()
+ self.common_db = dbmongo.DbMongo()
+ self.common_db.db_connect({'host': cfg.OSMPOL_DATABASE_HOST,
+ 'port': int(cfg.OSMPOL_DATABASE_PORT),
+ 'name': 'osm'})
+
+ def get_vnfr(self, nsr_id: str, member_index: int):
+ vnfr = self.common_db.get_one("vnfrs",
+ {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
+ return vnfr
+
+ def get_vnfrs(self, nsr_id: str):
+ return [self.get_vnfr(nsr_id, member['member-vnf-index']) for member in
+ self.get_nsr(nsr_id)['nsd']['constituent-vnfd']]
+
+ def get_vnfd(self, vnfd_id: str):
+ vnfr = self.common_db.get_one("vnfds",
+ {"_id": vnfd_id})
+ return vnfr
+
+ def get_nsr(self, nsr_id: str):
+ nsr = self.common_db.get_one("nsrs",
+ {"id": nsr_id})
+ return nsr
+
+ def get_nslcmop(self, nslcmop_id):
+ nslcmop = self.common_db.get_one("nslcmops",
+ {"_id": nslcmop_id})
+ return nslcmop
diff --git a/osm_policy_module/common/lcm_client.py b/osm_policy_module/common/lcm_client.py
index 7857d26..34e212f 100644
--- a/osm_policy_module/common/lcm_client.py
+++ b/osm_policy_module/common/lcm_client.py
@@ -49,6 +49,7 @@
'name': 'osm'})
def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
+ log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action)
self.common_db.create("nslcmops", nslcmop)
log.info("Sending scale action message: %s", json.dumps(nslcmop))
@@ -56,6 +57,7 @@
self.producer.flush()
def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str):
+ log.debug("_generate_nslcmop %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action)
_id = str(uuid.uuid4())
now = time.time()
params = {
diff --git a/osm_policy_module/common/mon_client.py b/osm_policy_module/common/mon_client.py
index 724fe83..5d2416e 100644
--- a/osm_policy_module/common/mon_client.py
+++ b/osm_policy_module/common/mon_client.py
@@ -53,7 +53,8 @@
consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
key_deserializer=bytes.decode,
value_deserializer=bytes.decode,
- consumer_timeout_ms=10000)
+ consumer_timeout_ms=10000,
+ group_id='mon-client-' + str(uuid.uuid4()))
consumer.subscribe(['alarm_response'])
for message in consumer:
if message.key == 'create_alarm_response':
diff --git a/osm_policy_module/core/agent.py b/osm_policy_module/core/agent.py
index 8309da1..bbefadd 100644
--- a/osm_policy_module/core/agent.py
+++ b/osm_policy_module/core/agent.py
@@ -21,6 +21,7 @@
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
+import datetime
import json
import logging
import threading
@@ -28,36 +29,33 @@
import yaml
from kafka import KafkaConsumer
-from osm_common import dbmongo
+from osm_policy_module.common.db_client import DbClient
from osm_policy_module.common.lcm_client import LcmClient
from osm_policy_module.common.mon_client import MonClient
from osm_policy_module.core import database
from osm_policy_module.core.config import Config
-from osm_policy_module.core.database import ScalingRecord, ScalingAlarm
+from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria
log = logging.getLogger(__name__)
+ALLOWED_KAFKA_KEYS = ['instantiated', 'scaled', 'notify_alarm']
+
class PolicyModuleAgent:
def __init__(self):
cfg = Config.instance()
- self.common_db = dbmongo.DbMongo()
- self.common_db.db_connect({'host': cfg.OSMPOL_DATABASE_HOST,
- 'port': int(cfg.OSMPOL_DATABASE_PORT),
- 'name': 'osm'})
+ self.db_client = DbClient()
self.mon_client = MonClient()
+ self.lcm_client = LcmClient()
self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST,
cfg.OSMPOL_MESSAGE_PORT)
def run(self):
- cfg = Config.instance()
- cfg.read_environ()
-
consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
key_deserializer=bytes.decode,
value_deserializer=bytes.decode,
- consumer_timeout_ms=10000)
+ group_id='pol-consumer')
consumer.subscribe(["ns", "alarm_response"])
for message in consumer:
@@ -65,141 +63,204 @@
t.start()
def _process_msg(self, topic, key, msg):
+ log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg)
try:
- # Check for ns instantiation
- if key == 'instantiated':
+ if key in ALLOWED_KAFKA_KEYS:
try:
content = json.loads(msg)
except JSONDecodeError:
content = yaml.safe_load(msg)
- log.info("Message arrived with topic: %s, key: %s, msg: %s", topic, key, content)
- nslcmop_id = content['nslcmop_id']
- nslcmop = self.common_db.get_one(table="nslcmops",
- filter={"_id": nslcmop_id})
- if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
- nsr_id = nslcmop['nsInstanceId']
- log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
- self._configure_scaling_groups(nsr_id)
- else:
- log.info(
- "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
- "Current state is %s. Skipping...",
- nslcmop['operationState'])
- if key == 'notify_alarm':
- try:
- content = json.loads(msg)
- except JSONDecodeError:
- content = yaml.safe_load(msg)
- log.info("Message arrived with topic: %s, key: %s, msg: %s", topic, key, content)
- alarm_id = content['notify_details']['alarm_uuid']
- metric_name = content['notify_details']['metric_name']
- operation = content['notify_details']['operation']
- threshold = content['notify_details']['threshold_value']
- vdu_name = content['notify_details']['vdu_name']
- vnf_member_index = content['notify_details']['vnf_member_index']
- ns_id = content['notify_details']['ns_id']
- log.info(
- "Received alarm notification for alarm %s, \
- metric %s, \
- operation %s, \
- threshold %s, \
- vdu_name %s, \
- vnf_member_index %s, \
- ns_id %s ",
- alarm_id, metric_name, operation, threshold, vdu_name, vnf_member_index, ns_id)
- try:
- alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
- lcm_client = LcmClient()
- log.info("Sending scaling action message for ns: %s", alarm_id)
- lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.vnf_member_index,
- alarm.action)
- except ScalingAlarm.DoesNotExist:
- log.info("There is no action configured for alarm %s.", alarm_id)
+ if key == 'instantiated' or key == 'scaled':
+ self._handle_instantiated_or_scaled(content)
+
+ if key == 'notify_alarm':
+ self._handle_alarm_notification(content)
+ else:
+ log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key)
except Exception:
log.exception("Error consuming message: ")
- def _get_vnfr(self, nsr_id: str, member_index: int):
- vnfr = self.common_db.get_one(table="vnfrs",
- filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)})
- return vnfr
+ def _handle_alarm_notification(self, content):
+ log.debug("_handle_alarm_notification: %s", content)
+ alarm_id = content['notify_details']['alarm_uuid']
+ metric_name = content['notify_details']['metric_name']
+ operation = content['notify_details']['operation']
+ threshold = content['notify_details']['threshold_value']
+ vdu_name = content['notify_details']['vdu_name']
+ vnf_member_index = content['notify_details']['vnf_member_index']
+ ns_id = content['notify_details']['ns_id']
+ log.info(
+ "Received alarm notification for alarm %s, \
+ metric %s, \
+ operation %s, \
+ threshold %s, \
+ vdu_name %s, \
+ vnf_member_index %s, \
+ ns_id %s ",
+ alarm_id, metric_name, operation, threshold, vdu_name, vnf_member_index, ns_id)
+ try:
+ alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get()
+ delta = datetime.datetime.now() - alarm.scaling_criteria.scaling_policy.last_scale
+ log.debug("last_scale: %s", alarm.scaling_criteria.scaling_policy.last_scale)
+ log.debug("now: %s", datetime.datetime.now())
+ log.debug("delta: %s", delta)
+ if delta.total_seconds() < alarm.scaling_criteria.scaling_policy.cooldown_time:
+ log.info("Time between last scale and now is less than cooldown time. Skipping.")
+ return
+ log.info("Sending scaling action message for ns: %s", alarm_id)
+ self.lcm_client.scale(alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id,
+ alarm.scaling_criteria.scaling_policy.scaling_group.name,
+ alarm.vnf_member_index,
+ alarm.action)
+ alarm.scaling_criteria.scaling_policy.last_scale = datetime.datetime.now()
+ alarm.scaling_criteria.scaling_policy.save()
+ except ScalingAlarm.DoesNotExist:
+ log.info("There is no action configured for alarm %s.", alarm_id)
- def _get_vnfrs(self, nsr_id: str):
- return [self._get_vnfr(nsr_id, member['member-vnf-index']) for member in
- self._get_nsr(nsr_id)['nsd']['constituent-vnfd']]
-
- def _get_vnfd(self, vnfd_id: str):
- vnfr = self.common_db.get_one(table="vnfds",
- filter={"_id": vnfd_id})
- return vnfr
-
- def _get_nsr(self, nsr_id: str):
- nsr = self.common_db.get_one(table="nsrs",
- filter={"id": nsr_id})
- return nsr
+ def _handle_instantiated_or_scaled(self, content):
+ log.debug("_handle_instantiated_or_scaled: %s", content)
+ nslcmop_id = content['nslcmop_id']
+ nslcmop = self.db_client.get_nslcmop(nslcmop_id)
+ if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED':
+ nsr_id = nslcmop['nsInstanceId']
+ log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id)
+ self._configure_scaling_groups(nsr_id)
+ else:
+ log.info(
+ "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. "
+ "Current state is %s. Skipping...",
+ nslcmop['operationState'])
def _configure_scaling_groups(self, nsr_id: str):
+ log.debug("_configure_scaling_groups: %s", nsr_id)
# TODO(diazb): Check for alarm creation on exception and clean resources if needed.
+ # TODO: Add support for non-nfvi metrics
with database.db.atomic():
- vnfrs = self._get_vnfrs(nsr_id)
- log.info("Checking %s vnfrs...", len(vnfrs))
+ vnfrs = self.db_client.get_vnfrs(nsr_id)
+ log.info("Found %s vnfrs", len(vnfrs))
for vnfr in vnfrs:
- vnfd = self._get_vnfd(vnfr['vnfd-id'])
+ vnfd = self.db_client.get_vnfd(vnfr['vnfd-id'])
log.info("Looking for vnfd %s", vnfr['vnfd-id'])
scaling_groups = vnfd['scaling-group-descriptor']
vnf_monitoring_params = vnfd['monitoring-param']
for scaling_group in scaling_groups:
- log.info("Creating scaling record in DB...")
- scaling_record = ScalingRecord.create(
- nsr_id=nsr_id,
- name=scaling_group['name'],
- content=json.dumps(scaling_group)
- )
- log.info("Created scaling record in DB : nsr_id=%s, name=%s, content=%s",
- scaling_record.nsr_id,
- scaling_record.name,
- scaling_record.content)
+ try:
+ scaling_group_record = ScalingGroup.select().where(
+ ScalingGroup.nsr_id == nsr_id,
+ ScalingGroup.vnf_member_index == vnfr['member-vnf-index-ref'],
+ ScalingGroup.name == scaling_group['name']
+ ).get()
+ log.info("Found existing scaling group record in DB...")
+ except ScalingGroup.DoesNotExist:
+ log.info("Creating scaling group record in DB...")
+ scaling_group_record = ScalingGroup.create(
+ nsr_id=nsr_id,
+ vnf_member_index=vnfr['member-vnf-index-ref'],
+ name=scaling_group['name'],
+ content=json.dumps(scaling_group)
+ )
+ log.info(
+ "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%d, name=%s, content=%s",
+ scaling_group_record.nsr_id,
+ scaling_group_record.vnf_member_index,
+ scaling_group_record.name,
+ scaling_group_record.content)
for scaling_policy in scaling_group['scaling-policy']:
- for vdur in vnfd['vdu']:
- vdu_monitoring_params = vdur['monitoring-param']
- for scaling_criteria in scaling_policy['scaling-criteria']:
+ if scaling_policy['scaling-type'] != 'automatic':
+ continue
+ try:
+ scaling_policy_record = ScalingPolicy.select().join(ScalingGroup).where(
+ ScalingPolicy.name == scaling_policy['name'],
+ ScalingGroup.id == scaling_group_record.id
+ ).get()
+ log.info("Found existing scaling policy record in DB...")
+ except ScalingPolicy.DoesNotExist:
+ log.info("Creating scaling policy record in DB...")
+ scaling_policy_record = ScalingPolicy.create(
+ nsr_id=nsr_id,
+ name=scaling_policy['name'],
+ cooldown_time=scaling_policy['cooldown-time'],
+ scaling_group=scaling_group_record
+ )
+ log.info("Created scaling policy record in DB : name=%s, scaling_group.name=%s",
+ scaling_policy_record.name,
+ scaling_policy_record.scaling_group.name)
+
+ for scaling_criteria in scaling_policy['scaling-criteria']:
+ try:
+ scaling_criteria_record = ScalingCriteria.select().join(ScalingPolicy).where(
+ ScalingPolicy.id == scaling_policy_record.id,
+ ScalingCriteria.name == scaling_criteria['name']
+ ).get()
+ log.info("Found existing scaling criteria record in DB...")
+ except ScalingCriteria.DoesNotExist:
+ log.info("Creating scaling criteria record in DB...")
+ scaling_criteria_record = ScalingCriteria.create(
+ nsr_id=nsr_id,
+ name=scaling_criteria['name'],
+ scaling_policy=scaling_policy_record
+ )
+ log.info(
+ "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s",
+ scaling_criteria_record.name,
+ scaling_criteria_record.scaling_policy.name)
+
+ for vdu_ref in scaling_group['vdu']:
vnf_monitoring_param = next(
filter(lambda param: param['id'] == scaling_criteria['vnf-monitoring-param-ref'],
vnf_monitoring_params))
- # TODO: Add support for non-nfvi metrics
+ if not vdu_ref['vdu-id-ref'] == vnf_monitoring_param['vdu-ref']:
+ continue
+ vdu = next(
+ filter(lambda vdu: vdu['id'] == vdu_ref['vdu-id-ref'], vnfd['vdu'])
+ )
+ vdu_monitoring_params = vdu['monitoring-param']
vdu_monitoring_param = next(
filter(
lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param-ref'],
vdu_monitoring_params))
- alarm_uuid = self.mon_client.create_alarm(
- metric_name=vdu_monitoring_param['nfvi-metric'],
- ns_id=nsr_id,
- vdu_name=vdur['name'],
- vnf_member_index=vnfr['member-vnf-index-ref'],
- threshold=scaling_criteria['scale-in-threshold'],
- operation=scaling_criteria['scale-in-relational-operation'],
- statistic=vnf_monitoring_param['aggregation-type']
- )
- ScalingAlarm.create(
- alarm_id=alarm_uuid,
- action='scale_in',
- vnf_member_index=int(vnfr['member-vnf-index-ref']),
- vdu_name=vdur['name'],
- scaling_record=scaling_record
- )
- alarm_uuid = self.mon_client.create_alarm(
- metric_name=vdu_monitoring_param['nfvi-metric'],
- ns_id=nsr_id,
- vdu_name=vdur['name'],
- vnf_member_index=vnfr['member-vnf-index-ref'],
- threshold=scaling_criteria['scale-out-threshold'],
- operation=scaling_criteria['scale-out-relational-operation'],
- statistic=vnf_monitoring_param['aggregation-type']
- )
- ScalingAlarm.create(
- alarm_id=alarm_uuid,
- action='scale_out',
- vnf_member_index=int(vnfr['member-vnf-index-ref']),
- vdu_name=vdur['name'],
- scaling_record=scaling_record
- )
+ vdurs = list(filter(lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param['vdu-ref'],
+ vnfr['vdur']))
+ for vdur in vdurs:
+ try:
+ ScalingAlarm.select().join(ScalingCriteria).where(
+ ScalingAlarm.vdu_name == vdur['name'],
+ ScalingCriteria.name == scaling_criteria['name']
+ ).get()
+ log.debug("vdu %s already has an alarm configured", vdur['name'])
+ continue
+ except ScalingAlarm.DoesNotExist:
+ pass
+ alarm_uuid = self.mon_client.create_alarm(
+ metric_name=vdu_monitoring_param['nfvi-metric'],
+ ns_id=nsr_id,
+ vdu_name=vdur['name'],
+ vnf_member_index=vnfr['member-vnf-index-ref'],
+ threshold=scaling_criteria['scale-in-threshold'],
+ operation=scaling_criteria['scale-in-relational-operation'],
+ statistic=vnf_monitoring_param['aggregation-type']
+ )
+ ScalingAlarm.create(
+ alarm_id=alarm_uuid,
+ action='scale_in',
+ vnf_member_index=int(vnfr['member-vnf-index-ref']),
+ vdu_name=vdur['name'],
+ scaling_criteria=scaling_criteria_record
+ )
+ alarm_uuid = self.mon_client.create_alarm(
+ metric_name=vdu_monitoring_param['nfvi-metric'],
+ ns_id=nsr_id,
+ vdu_name=vdur['name'],
+ vnf_member_index=vnfr['member-vnf-index-ref'],
+ threshold=scaling_criteria['scale-out-threshold'],
+ operation=scaling_criteria['scale-out-relational-operation'],
+ statistic=vnf_monitoring_param['aggregation-type']
+ )
+ ScalingAlarm.create(
+ alarm_id=alarm_uuid,
+ action='scale_out',
+ vnf_member_index=int(vnfr['member-vnf-index-ref']),
+ vdu_name=vdur['name'],
+ scaling_criteria=scaling_criteria_record
+ )
diff --git a/osm_policy_module/core/config.py b/osm_policy_module/core/config.py
index dab409b..84a1f57 100644
--- a/osm_policy_module/core/config.py
+++ b/osm_policy_module/core/config.py
@@ -67,6 +67,7 @@
CfgParam('OSMPOL_DATABASE_PORT', 27017, int),
CfgParam('OSMPOL_SQL_DATABASE_URI', "sqlite:///mon_sqlite.db", six.text_type),
CfgParam('OSMPOL_LOG_LEVEL', "INFO", six.text_type),
+ CfgParam('OSMPOL_KAFKA_LOG_LEVEL', "WARN", six.text_type),
]
_config_dict = {cfg.key: cfg for cfg in _configuration}
diff --git a/osm_policy_module/core/database.py b/osm_policy_module/core/database.py
index f9a37c2..8ad19f2 100644
--- a/osm_policy_module/core/database.py
+++ b/osm_policy_module/core/database.py
@@ -21,9 +21,10 @@
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
+import datetime
import logging
-from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField
+from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField, AutoField, DateTimeField
from playhouse.sqlite_ext import SqliteExtDatabase
from osm_policy_module.core.config import Config
@@ -35,29 +36,44 @@
class BaseModel(Model):
+ id = AutoField(primary_key=True)
+
class Meta:
database = db
-class ScalingRecord(BaseModel):
+class ScalingGroup(BaseModel):
nsr_id = CharField()
+ vnf_member_index = IntegerField()
name = CharField()
content = TextField()
+class ScalingPolicy(BaseModel):
+ name = CharField()
+ cooldown_time = IntegerField()
+ last_scale = DateTimeField(default=datetime.datetime.now)
+ scaling_group = ForeignKeyField(ScalingGroup, related_name='scaling_policies')
+
+
+class ScalingCriteria(BaseModel):
+ name = CharField()
+ scaling_policy = ForeignKeyField(ScalingPolicy, related_name='scaling_criterias')
+
+
class ScalingAlarm(BaseModel):
alarm_id = CharField()
action = CharField()
vnf_member_index = IntegerField()
vdu_name = CharField()
- scaling_record = ForeignKeyField(ScalingRecord, related_name='scaling_alarms')
+ scaling_criteria = ForeignKeyField(ScalingCriteria, related_name='scaling_alarms')
class DatabaseManager:
def create_tables(self):
try:
db.connect()
- db.create_tables([ScalingRecord, ScalingAlarm])
+ db.create_tables([ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm])
db.close()
except Exception as e:
log.exception("Error creating tables: ")
diff --git a/osm_policy_module/tests/integration/test_policy_agent.py b/osm_policy_module/tests/integration/test_policy_agent.py
index 3a82ea7..bed6eb5 100644
--- a/osm_policy_module/tests/integration/test_policy_agent.py
+++ b/osm_policy_module/tests/integration/test_policy_agent.py
@@ -31,10 +31,11 @@
from osm_common.dbmongo import DbMongo
from peewee import SqliteDatabase
+from osm_policy_module.common.db_client import DbClient
from osm_policy_module.common.mon_client import MonClient
from osm_policy_module.core import database
from osm_policy_module.core.agent import PolicyModuleAgent
-from osm_policy_module.core.database import ScalingRecord, ScalingAlarm
+from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria
log = logging.getLogger()
log.level = logging.INFO
@@ -401,7 +402,7 @@
test_db = SqliteDatabase(':memory:')
-MODELS = [ScalingRecord, ScalingAlarm]
+MODELS = [ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm]
class PolicyModuleAgentTest(unittest.TestCase):
@@ -419,9 +420,9 @@
@patch.object(DbMongo, 'db_connect', Mock())
@patch.object(KafkaProducer, '__init__')
@patch.object(MonClient, 'create_alarm')
- @patch.object(PolicyModuleAgent, '_get_vnfd')
- @patch.object(PolicyModuleAgent, '_get_nsr')
- @patch.object(PolicyModuleAgent, '_get_vnfr')
+ @patch.object(DbClient, 'get_vnfd')
+ @patch.object(DbClient, 'get_nsr')
+ @patch.object(DbClient, 'get_vnfr')
def test_configure_scaling_groups(self, get_vnfr, get_nsr, get_vnfd, create_alarm, kafka_producer_init):
def _test_configure_scaling_groups_get_vnfr(*args, **kwargs):
if '1' in args[1]:
@@ -444,12 +445,32 @@
operation='GT',
statistic='AVERAGE',
threshold=80,
- vdu_name='cirros_vnfd-VM',
+ vdu_name='cirros_ns-1-cirros_vnfd-VM-1',
vnf_member_index='1')
- scaling_record = ScalingRecord.get()
+ create_alarm.assert_any_call(metric_name='average_memory_utilization',
+ ns_id='test_nsr_id',
+ operation='LT',
+ statistic='AVERAGE',
+ threshold=20,
+ vdu_name='cirros_ns-1-cirros_vnfd-VM-1',
+ vnf_member_index='1')
+ create_alarm.assert_any_call(metric_name='average_memory_utilization',
+ ns_id='test_nsr_id',
+ operation='GT',
+ statistic='AVERAGE',
+ threshold=80,
+ vdu_name='cirros_ns-2-cirros_vnfd-VM-1',
+ vnf_member_index='2')
+ create_alarm.assert_any_call(metric_name='average_memory_utilization',
+ ns_id='test_nsr_id',
+ operation='LT',
+ statistic='AVERAGE',
+ threshold=20,
+ vdu_name='cirros_ns-2-cirros_vnfd-VM-1',
+ vnf_member_index='2')
+ scaling_record = ScalingGroup.get()
self.assertEqual(scaling_record.name, 'scale_cirros_vnfd-VM')
self.assertEqual(scaling_record.nsr_id, 'test_nsr_id')
- self.assertIsNotNone(scaling_record)
if __name__ == '__main__':
diff --git a/setup.py b/setup.py
index c3c735d..86dacc4 100644
--- a/setup.py
+++ b/setup.py
@@ -24,7 +24,7 @@
from setuptools import setup
_name = 'osm_policy_module'
-_version_command = ('git describe --match v* --tags --long --dirty', 'pep440-git')
+_version_command = ('git describe --match v* --tags --long --dirty', 'pep440-git-full')
_author = "Benjamín Díaz"
_author_email = 'bdiaz@whitestack.com'
_description = 'OSM Policy Module'
diff --git a/stdeb.cfg b/stdeb.cfg
new file mode 100644
index 0000000..021202c
--- /dev/null
+++ b/stdeb.cfg
@@ -0,0 +1,3 @@
+[DEFAULT]
+X-Python3-Version : >= 3.4
+Depends3 : libmysqlclient-dev, python3-pip, python3-osm-common
\ No newline at end of file
diff --git a/tox.ini b/tox.ini
index 4ef6a64..584efa7 100644
--- a/tox.ini
+++ b/tox.ini
@@ -25,7 +25,7 @@
# test suite on all supported python versions. To use it, "pip install tox"
# and then run "tox" from this directory.
[tox]
-envlist = py3
+envlist = py3, flake8
toxworkdir={homedir}/.tox
[testenv]
@@ -40,12 +40,6 @@
commands =
flake8 osm_policy_module
-[testenv:build]
-basepython = python3
-deps = stdeb
- setuptools-version-command
-commands = python3 setup.py --command-packages=stdeb.command bdist_deb
-
[flake8]
# E123, E125 skipped as they are invalid PEP-8.
max-line-length = 120
@@ -54,4 +48,8 @@
builtins = _
exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build,devops_stages/*,.rst
-
+[testenv:build]
+basepython = python3
+deps = stdeb
+ setuptools-version-command
+commands = python3 setup.py --command-packages=stdeb.command bdist_deb