From: lavado Date: Sat, 13 Oct 2018 12:08:08 +0000 (+0200) Subject: Merge "Added LICENSE file to root folder" X-Git-Tag: v5.0.0~11 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=d8e91c539f8a1657a93c6ed59dd5f9ad489684ef;hp=7d277a2b457923ae72e579701d9d79dab3f4eb57;p=osm%2FPOL.git Merge "Added LICENSE file to root folder" --- diff --git a/.gitignore b/.gitignore index f4d6bb1..88962fe 100644 --- a/.gitignore +++ b/.gitignore @@ -76,3 +76,7 @@ ChangeLog .settings/ __pycache__/ .idea + +deb_dist +*.tar.gz +*.db \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 6738633..f3af477 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,7 +22,7 @@ FROM ubuntu:16.04 RUN apt-get update && \ - DEBIAN_FRONTEND=noninteractive apt-get --yes install git tox make python python-pip python3 python3-pip debhelper && \ - DEBIAN_FRONTEND=noninteractive apt-get --yes install wget python-dev python-software-properties python-stdeb && \ - DEBIAN_FRONTEND=noninteractive apt-get --yes install default-jre libmysqlclient-dev && \ - DEBIAN_FRONTEND=noninteractive apt-get --yes install libmysqlclient-dev libxml2 python3-all + DEBIAN_FRONTEND=noninteractive apt-get --yes install git tox make python-all python3 python3-pip debhelper wget && \ + DEBIAN_FRONTEND=noninteractive apt-get --yes install libmysqlclient-dev libxml2 python3-all && \ + DEBIAN_FRONTEND=noninteractive pip3 install -U setuptools setuptools-version-command stdeb + diff --git a/MANIFEST.in b/MANIFEST.in index ad2c95a..9dbb8cd 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -24,5 +24,4 @@ include requirements.txt include test-requirements.txt include README.rst recursive-include osm_policy_module *.py *.xml *.sh -recursive-include devops-stages * -recursive-include test *.py +recursive-include devops-stages * \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..586b73c --- /dev/null +++ b/Makefile @@ -0,0 +1,9 @@ +all: clean package + +clean: + rm -rf dist deb_dist osm_policy_module-*.tar.gz osm_policy_module.egg-info .eggs + +package: + python3 setup.py --command-packages=stdeb.command sdist_dsc + cp debian/python3-osm-policy-module.postinst deb_dist/osm-policy-module*/debian + cd deb_dist/osm-policy-module*/ && dpkg-buildpackage -rfakeroot -uc -us \ No newline at end of file diff --git a/debian/python3-osm-policy-module.postinst b/debian/python3-osm-policy-module.postinst new file mode 100644 index 0000000..3b24842 --- /dev/null +++ b/debian/python3-osm-policy-module.postinst @@ -0,0 +1,9 @@ +#!/bin/bash + +echo "Installing python dependencies via pip..." +pip3 install kafka==1.3.* +pip3 install peewee==3.1.* +pip3 install jsonschema==2.6.* +pip3 install six==1.11.* +pip3 install pyyaml==3.* +echo "Installation of python dependencies finished" \ No newline at end of file diff --git a/devops-stages/stage-build.sh b/devops-stages/stage-build.sh index 4251b1c..8a8d332 100755 --- a/devops-stages/stage-build.sh +++ b/devops-stages/stage-build.sh @@ -23,7 +23,4 @@ #__date__ = "14/Sep/2017" #!/bin/bash -rm -rf deb_dist -rm -rf dist -rm -rf osm_mon.egg-info -tox -e build +make diff --git a/docker/Dockerfile b/docker/Dockerfile index 4095fa8..ab5b411 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -47,5 +47,6 @@ ENV OSMPOL_DATABASE_PORT 27017 ENV OSMPOL_SQL_DATABASE_URI sqlite:///mon_sqlite.db ENV OSMPOL_LOG_LEVEL INFO +ENV OSMPOL_KAFKA_LOG_LEVEL WARN CMD osm-policy-agent diff --git a/osm_policy_module/cmd/policy_module_agent.py b/osm_policy_module/cmd/policy_module_agent.py index 24663e5..c82f006 100644 --- a/osm_policy_module/cmd/policy_module_agent.py +++ b/osm_policy_module/cmd/policy_module_agent.py @@ -43,13 +43,13 @@ def main(): datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.getLevelName(cfg.OSMPOL_LOG_LEVEL)) kafka_logger = logging.getLogger('kafka') - kafka_logger.setLevel(logging.WARN) + kafka_logger.setLevel(logging.getLevelName(cfg.OSMPOL_KAFKA_LOG_LEVEL)) kafka_formatter = logging.Formatter(log_formatter_str) kafka_handler = logging.StreamHandler(sys.stdout) kafka_handler.setFormatter(kafka_formatter) kafka_logger.addHandler(kafka_handler) log = logging.getLogger(__name__) - log.info("Config: %s", cfg) + log.info("Config: %s", vars(cfg)) log.info("Syncing database...") db_manager = DatabaseManager() db_manager.create_tables() diff --git a/osm_policy_module/common/db_client.py b/osm_policy_module/common/db_client.py new file mode 100644 index 0000000..1eed0f3 --- /dev/null +++ b/osm_policy_module/common/db_client.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 -*- + +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## +from osm_common import dbmongo + +from osm_policy_module.core.config import Config + + +class DbClient: + def __init__(self): + cfg = Config.instance() + self.common_db = dbmongo.DbMongo() + self.common_db.db_connect({'host': cfg.OSMPOL_DATABASE_HOST, + 'port': int(cfg.OSMPOL_DATABASE_PORT), + 'name': 'osm'}) + + def get_vnfr(self, nsr_id: str, member_index: int): + vnfr = self.common_db.get_one("vnfrs", + {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)}) + return vnfr + + def get_vnfrs(self, nsr_id: str): + return [self.get_vnfr(nsr_id, member['member-vnf-index']) for member in + self.get_nsr(nsr_id)['nsd']['constituent-vnfd']] + + def get_vnfd(self, vnfd_id: str): + vnfr = self.common_db.get_one("vnfds", + {"_id": vnfd_id}) + return vnfr + + def get_nsr(self, nsr_id: str): + nsr = self.common_db.get_one("nsrs", + {"id": nsr_id}) + return nsr + + def get_nslcmop(self, nslcmop_id): + nslcmop = self.common_db.get_one("nslcmops", + {"_id": nslcmop_id}) + return nslcmop diff --git a/osm_policy_module/common/lcm_client.py b/osm_policy_module/common/lcm_client.py index 7857d26..34e212f 100644 --- a/osm_policy_module/common/lcm_client.py +++ b/osm_policy_module/common/lcm_client.py @@ -49,6 +49,7 @@ class LcmClient: 'name': 'osm'}) def scale(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str): + log.debug("scale %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action) nslcmop = self._generate_nslcmop(nsr_id, scaling_group_name, vnf_member_index, action) self.common_db.create("nslcmops", nslcmop) log.info("Sending scale action message: %s", json.dumps(nslcmop)) @@ -56,6 +57,7 @@ class LcmClient: self.producer.flush() def _generate_nslcmop(self, nsr_id: str, scaling_group_name: str, vnf_member_index: int, action: str): + log.debug("_generate_nslcmop %s %s %s %s", nsr_id, scaling_group_name, vnf_member_index, action) _id = str(uuid.uuid4()) now = time.time() params = { diff --git a/osm_policy_module/common/mon_client.py b/osm_policy_module/common/mon_client.py index 724fe83..5d2416e 100644 --- a/osm_policy_module/common/mon_client.py +++ b/osm_policy_module/common/mon_client.py @@ -53,7 +53,8 @@ class MonClient: consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, key_deserializer=bytes.decode, value_deserializer=bytes.decode, - consumer_timeout_ms=10000) + consumer_timeout_ms=10000, + group_id='mon-client-' + str(uuid.uuid4())) consumer.subscribe(['alarm_response']) for message in consumer: if message.key == 'create_alarm_response': diff --git a/osm_policy_module/core/agent.py b/osm_policy_module/core/agent.py index 8309da1..bbefadd 100644 --- a/osm_policy_module/core/agent.py +++ b/osm_policy_module/core/agent.py @@ -21,6 +21,7 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## +import datetime import json import logging import threading @@ -28,36 +29,33 @@ from json import JSONDecodeError import yaml from kafka import KafkaConsumer -from osm_common import dbmongo +from osm_policy_module.common.db_client import DbClient from osm_policy_module.common.lcm_client import LcmClient from osm_policy_module.common.mon_client import MonClient from osm_policy_module.core import database from osm_policy_module.core.config import Config -from osm_policy_module.core.database import ScalingRecord, ScalingAlarm +from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria log = logging.getLogger(__name__) +ALLOWED_KAFKA_KEYS = ['instantiated', 'scaled', 'notify_alarm'] + class PolicyModuleAgent: def __init__(self): cfg = Config.instance() - self.common_db = dbmongo.DbMongo() - self.common_db.db_connect({'host': cfg.OSMPOL_DATABASE_HOST, - 'port': int(cfg.OSMPOL_DATABASE_PORT), - 'name': 'osm'}) + self.db_client = DbClient() self.mon_client = MonClient() + self.lcm_client = LcmClient() self.kafka_server = '{}:{}'.format(cfg.OSMPOL_MESSAGE_HOST, cfg.OSMPOL_MESSAGE_PORT) def run(self): - cfg = Config.instance() - cfg.read_environ() - consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, key_deserializer=bytes.decode, value_deserializer=bytes.decode, - consumer_timeout_ms=10000) + group_id='pol-consumer') consumer.subscribe(["ns", "alarm_response"]) for message in consumer: @@ -65,141 +63,204 @@ class PolicyModuleAgent: t.start() def _process_msg(self, topic, key, msg): + log.debug("_process_msg topic=%s key=%s msg=%s", topic, key, msg) try: - # Check for ns instantiation - if key == 'instantiated': + if key in ALLOWED_KAFKA_KEYS: try: content = json.loads(msg) except JSONDecodeError: content = yaml.safe_load(msg) - log.info("Message arrived with topic: %s, key: %s, msg: %s", topic, key, content) - nslcmop_id = content['nslcmop_id'] - nslcmop = self.common_db.get_one(table="nslcmops", - filter={"_id": nslcmop_id}) - if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED': - nsr_id = nslcmop['nsInstanceId'] - log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id) - self._configure_scaling_groups(nsr_id) - else: - log.info( - "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " - "Current state is %s. Skipping...", - nslcmop['operationState']) - - if key == 'notify_alarm': - try: - content = json.loads(msg) - except JSONDecodeError: - content = yaml.safe_load(msg) - log.info("Message arrived with topic: %s, key: %s, msg: %s", topic, key, content) - alarm_id = content['notify_details']['alarm_uuid'] - metric_name = content['notify_details']['metric_name'] - operation = content['notify_details']['operation'] - threshold = content['notify_details']['threshold_value'] - vdu_name = content['notify_details']['vdu_name'] - vnf_member_index = content['notify_details']['vnf_member_index'] - ns_id = content['notify_details']['ns_id'] - log.info( - "Received alarm notification for alarm %s, \ - metric %s, \ - operation %s, \ - threshold %s, \ - vdu_name %s, \ - vnf_member_index %s, \ - ns_id %s ", - alarm_id, metric_name, operation, threshold, vdu_name, vnf_member_index, ns_id) - try: - alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get() - lcm_client = LcmClient() - log.info("Sending scaling action message for ns: %s", alarm_id) - lcm_client.scale(alarm.scaling_record.nsr_id, alarm.scaling_record.name, alarm.vnf_member_index, - alarm.action) - except ScalingAlarm.DoesNotExist: - log.info("There is no action configured for alarm %s.", alarm_id) - except Exception: - log.exception("Error consuming message: ") - def _get_vnfr(self, nsr_id: str, member_index: int): - vnfr = self.common_db.get_one(table="vnfrs", - filter={"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)}) - return vnfr + if key == 'instantiated' or key == 'scaled': + self._handle_instantiated_or_scaled(content) - def _get_vnfrs(self, nsr_id: str): - return [self._get_vnfr(nsr_id, member['member-vnf-index']) for member in - self._get_nsr(nsr_id)['nsd']['constituent-vnfd']] + if key == 'notify_alarm': + self._handle_alarm_notification(content) + else: + log.debug("Key %s is not in ALLOWED_KAFKA_KEYS", key) + except Exception: + log.exception("Error consuming message: ") - def _get_vnfd(self, vnfd_id: str): - vnfr = self.common_db.get_one(table="vnfds", - filter={"_id": vnfd_id}) - return vnfr + def _handle_alarm_notification(self, content): + log.debug("_handle_alarm_notification: %s", content) + alarm_id = content['notify_details']['alarm_uuid'] + metric_name = content['notify_details']['metric_name'] + operation = content['notify_details']['operation'] + threshold = content['notify_details']['threshold_value'] + vdu_name = content['notify_details']['vdu_name'] + vnf_member_index = content['notify_details']['vnf_member_index'] + ns_id = content['notify_details']['ns_id'] + log.info( + "Received alarm notification for alarm %s, \ + metric %s, \ + operation %s, \ + threshold %s, \ + vdu_name %s, \ + vnf_member_index %s, \ + ns_id %s ", + alarm_id, metric_name, operation, threshold, vdu_name, vnf_member_index, ns_id) + try: + alarm = ScalingAlarm.select().where(ScalingAlarm.alarm_id == alarm_id).get() + delta = datetime.datetime.now() - alarm.scaling_criteria.scaling_policy.last_scale + log.debug("last_scale: %s", alarm.scaling_criteria.scaling_policy.last_scale) + log.debug("now: %s", datetime.datetime.now()) + log.debug("delta: %s", delta) + if delta.total_seconds() < alarm.scaling_criteria.scaling_policy.cooldown_time: + log.info("Time between last scale and now is less than cooldown time. Skipping.") + return + log.info("Sending scaling action message for ns: %s", alarm_id) + self.lcm_client.scale(alarm.scaling_criteria.scaling_policy.scaling_group.nsr_id, + alarm.scaling_criteria.scaling_policy.scaling_group.name, + alarm.vnf_member_index, + alarm.action) + alarm.scaling_criteria.scaling_policy.last_scale = datetime.datetime.now() + alarm.scaling_criteria.scaling_policy.save() + except ScalingAlarm.DoesNotExist: + log.info("There is no action configured for alarm %s.", alarm_id) - def _get_nsr(self, nsr_id: str): - nsr = self.common_db.get_one(table="nsrs", - filter={"id": nsr_id}) - return nsr + def _handle_instantiated_or_scaled(self, content): + log.debug("_handle_instantiated_or_scaled: %s", content) + nslcmop_id = content['nslcmop_id'] + nslcmop = self.db_client.get_nslcmop(nslcmop_id) + if nslcmop['operationState'] == 'COMPLETED' or nslcmop['operationState'] == 'PARTIALLY_COMPLETED': + nsr_id = nslcmop['nsInstanceId'] + log.info("Configuring scaling groups for network service with nsr_id: %s", nsr_id) + self._configure_scaling_groups(nsr_id) + else: + log.info( + "Network service is not in COMPLETED or PARTIALLY_COMPLETED state. " + "Current state is %s. Skipping...", + nslcmop['operationState']) def _configure_scaling_groups(self, nsr_id: str): + log.debug("_configure_scaling_groups: %s", nsr_id) # TODO(diazb): Check for alarm creation on exception and clean resources if needed. + # TODO: Add support for non-nfvi metrics with database.db.atomic(): - vnfrs = self._get_vnfrs(nsr_id) - log.info("Checking %s vnfrs...", len(vnfrs)) + vnfrs = self.db_client.get_vnfrs(nsr_id) + log.info("Found %s vnfrs", len(vnfrs)) for vnfr in vnfrs: - vnfd = self._get_vnfd(vnfr['vnfd-id']) + vnfd = self.db_client.get_vnfd(vnfr['vnfd-id']) log.info("Looking for vnfd %s", vnfr['vnfd-id']) scaling_groups = vnfd['scaling-group-descriptor'] vnf_monitoring_params = vnfd['monitoring-param'] for scaling_group in scaling_groups: - log.info("Creating scaling record in DB...") - scaling_record = ScalingRecord.create( - nsr_id=nsr_id, - name=scaling_group['name'], - content=json.dumps(scaling_group) - ) - log.info("Created scaling record in DB : nsr_id=%s, name=%s, content=%s", - scaling_record.nsr_id, - scaling_record.name, - scaling_record.content) + try: + scaling_group_record = ScalingGroup.select().where( + ScalingGroup.nsr_id == nsr_id, + ScalingGroup.vnf_member_index == vnfr['member-vnf-index-ref'], + ScalingGroup.name == scaling_group['name'] + ).get() + log.info("Found existing scaling group record in DB...") + except ScalingGroup.DoesNotExist: + log.info("Creating scaling group record in DB...") + scaling_group_record = ScalingGroup.create( + nsr_id=nsr_id, + vnf_member_index=vnfr['member-vnf-index-ref'], + name=scaling_group['name'], + content=json.dumps(scaling_group) + ) + log.info( + "Created scaling group record in DB : nsr_id=%s, vnf_member_index=%d, name=%s, content=%s", + scaling_group_record.nsr_id, + scaling_group_record.vnf_member_index, + scaling_group_record.name, + scaling_group_record.content) for scaling_policy in scaling_group['scaling-policy']: - for vdur in vnfd['vdu']: - vdu_monitoring_params = vdur['monitoring-param'] - for scaling_criteria in scaling_policy['scaling-criteria']: + if scaling_policy['scaling-type'] != 'automatic': + continue + try: + scaling_policy_record = ScalingPolicy.select().join(ScalingGroup).where( + ScalingPolicy.name == scaling_policy['name'], + ScalingGroup.id == scaling_group_record.id + ).get() + log.info("Found existing scaling policy record in DB...") + except ScalingPolicy.DoesNotExist: + log.info("Creating scaling policy record in DB...") + scaling_policy_record = ScalingPolicy.create( + nsr_id=nsr_id, + name=scaling_policy['name'], + cooldown_time=scaling_policy['cooldown-time'], + scaling_group=scaling_group_record + ) + log.info("Created scaling policy record in DB : name=%s, scaling_group.name=%s", + scaling_policy_record.name, + scaling_policy_record.scaling_group.name) + + for scaling_criteria in scaling_policy['scaling-criteria']: + try: + scaling_criteria_record = ScalingCriteria.select().join(ScalingPolicy).where( + ScalingPolicy.id == scaling_policy_record.id, + ScalingCriteria.name == scaling_criteria['name'] + ).get() + log.info("Found existing scaling criteria record in DB...") + except ScalingCriteria.DoesNotExist: + log.info("Creating scaling criteria record in DB...") + scaling_criteria_record = ScalingCriteria.create( + nsr_id=nsr_id, + name=scaling_criteria['name'], + scaling_policy=scaling_policy_record + ) + log.info( + "Created scaling criteria record in DB : name=%s, scaling_policy.name=%s", + scaling_criteria_record.name, + scaling_criteria_record.scaling_policy.name) + + for vdu_ref in scaling_group['vdu']: vnf_monitoring_param = next( filter(lambda param: param['id'] == scaling_criteria['vnf-monitoring-param-ref'], vnf_monitoring_params)) - # TODO: Add support for non-nfvi metrics + if not vdu_ref['vdu-id-ref'] == vnf_monitoring_param['vdu-ref']: + continue + vdu = next( + filter(lambda vdu: vdu['id'] == vdu_ref['vdu-id-ref'], vnfd['vdu']) + ) + vdu_monitoring_params = vdu['monitoring-param'] vdu_monitoring_param = next( filter( lambda param: param['id'] == vnf_monitoring_param['vdu-monitoring-param-ref'], vdu_monitoring_params)) - alarm_uuid = self.mon_client.create_alarm( - metric_name=vdu_monitoring_param['nfvi-metric'], - ns_id=nsr_id, - vdu_name=vdur['name'], - vnf_member_index=vnfr['member-vnf-index-ref'], - threshold=scaling_criteria['scale-in-threshold'], - operation=scaling_criteria['scale-in-relational-operation'], - statistic=vnf_monitoring_param['aggregation-type'] - ) - ScalingAlarm.create( - alarm_id=alarm_uuid, - action='scale_in', - vnf_member_index=int(vnfr['member-vnf-index-ref']), - vdu_name=vdur['name'], - scaling_record=scaling_record - ) - alarm_uuid = self.mon_client.create_alarm( - metric_name=vdu_monitoring_param['nfvi-metric'], - ns_id=nsr_id, - vdu_name=vdur['name'], - vnf_member_index=vnfr['member-vnf-index-ref'], - threshold=scaling_criteria['scale-out-threshold'], - operation=scaling_criteria['scale-out-relational-operation'], - statistic=vnf_monitoring_param['aggregation-type'] - ) - ScalingAlarm.create( - alarm_id=alarm_uuid, - action='scale_out', - vnf_member_index=int(vnfr['member-vnf-index-ref']), - vdu_name=vdur['name'], - scaling_record=scaling_record - ) + vdurs = list(filter(lambda vdur: vdur['vdu-id-ref'] == vnf_monitoring_param['vdu-ref'], + vnfr['vdur'])) + for vdur in vdurs: + try: + ScalingAlarm.select().join(ScalingCriteria).where( + ScalingAlarm.vdu_name == vdur['name'], + ScalingCriteria.name == scaling_criteria['name'] + ).get() + log.debug("vdu %s already has an alarm configured", vdur['name']) + continue + except ScalingAlarm.DoesNotExist: + pass + alarm_uuid = self.mon_client.create_alarm( + metric_name=vdu_monitoring_param['nfvi-metric'], + ns_id=nsr_id, + vdu_name=vdur['name'], + vnf_member_index=vnfr['member-vnf-index-ref'], + threshold=scaling_criteria['scale-in-threshold'], + operation=scaling_criteria['scale-in-relational-operation'], + statistic=vnf_monitoring_param['aggregation-type'] + ) + ScalingAlarm.create( + alarm_id=alarm_uuid, + action='scale_in', + vnf_member_index=int(vnfr['member-vnf-index-ref']), + vdu_name=vdur['name'], + scaling_criteria=scaling_criteria_record + ) + alarm_uuid = self.mon_client.create_alarm( + metric_name=vdu_monitoring_param['nfvi-metric'], + ns_id=nsr_id, + vdu_name=vdur['name'], + vnf_member_index=vnfr['member-vnf-index-ref'], + threshold=scaling_criteria['scale-out-threshold'], + operation=scaling_criteria['scale-out-relational-operation'], + statistic=vnf_monitoring_param['aggregation-type'] + ) + ScalingAlarm.create( + alarm_id=alarm_uuid, + action='scale_out', + vnf_member_index=int(vnfr['member-vnf-index-ref']), + vdu_name=vdur['name'], + scaling_criteria=scaling_criteria_record + ) diff --git a/osm_policy_module/core/config.py b/osm_policy_module/core/config.py index dab409b..84a1f57 100644 --- a/osm_policy_module/core/config.py +++ b/osm_policy_module/core/config.py @@ -67,6 +67,7 @@ class Config(object): CfgParam('OSMPOL_DATABASE_PORT', 27017, int), CfgParam('OSMPOL_SQL_DATABASE_URI', "sqlite:///mon_sqlite.db", six.text_type), CfgParam('OSMPOL_LOG_LEVEL', "INFO", six.text_type), + CfgParam('OSMPOL_KAFKA_LOG_LEVEL', "WARN", six.text_type), ] _config_dict = {cfg.key: cfg for cfg in _configuration} diff --git a/osm_policy_module/core/database.py b/osm_policy_module/core/database.py index f9a37c2..8ad19f2 100644 --- a/osm_policy_module/core/database.py +++ b/osm_policy_module/core/database.py @@ -21,9 +21,10 @@ # For those usages not covered by the Apache License, Version 2.0 please # contact: bdiaz@whitestack.com or glavado@whitestack.com ## +import datetime import logging -from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField +from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField, AutoField, DateTimeField from playhouse.sqlite_ext import SqliteExtDatabase from osm_policy_module.core.config import Config @@ -35,29 +36,44 @@ db = SqliteExtDatabase('policy_module.db') class BaseModel(Model): + id = AutoField(primary_key=True) + class Meta: database = db -class ScalingRecord(BaseModel): +class ScalingGroup(BaseModel): nsr_id = CharField() + vnf_member_index = IntegerField() name = CharField() content = TextField() +class ScalingPolicy(BaseModel): + name = CharField() + cooldown_time = IntegerField() + last_scale = DateTimeField(default=datetime.datetime.now) + scaling_group = ForeignKeyField(ScalingGroup, related_name='scaling_policies') + + +class ScalingCriteria(BaseModel): + name = CharField() + scaling_policy = ForeignKeyField(ScalingPolicy, related_name='scaling_criterias') + + class ScalingAlarm(BaseModel): alarm_id = CharField() action = CharField() vnf_member_index = IntegerField() vdu_name = CharField() - scaling_record = ForeignKeyField(ScalingRecord, related_name='scaling_alarms') + scaling_criteria = ForeignKeyField(ScalingCriteria, related_name='scaling_alarms') class DatabaseManager: def create_tables(self): try: db.connect() - db.create_tables([ScalingRecord, ScalingAlarm]) + db.create_tables([ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm]) db.close() except Exception as e: log.exception("Error creating tables: ") diff --git a/osm_policy_module/tests/integration/test_policy_agent.py b/osm_policy_module/tests/integration/test_policy_agent.py index 3a82ea7..bed6eb5 100644 --- a/osm_policy_module/tests/integration/test_policy_agent.py +++ b/osm_policy_module/tests/integration/test_policy_agent.py @@ -31,10 +31,11 @@ from kafka import KafkaProducer from osm_common.dbmongo import DbMongo from peewee import SqliteDatabase +from osm_policy_module.common.db_client import DbClient from osm_policy_module.common.mon_client import MonClient from osm_policy_module.core import database from osm_policy_module.core.agent import PolicyModuleAgent -from osm_policy_module.core.database import ScalingRecord, ScalingAlarm +from osm_policy_module.core.database import ScalingGroup, ScalingAlarm, ScalingPolicy, ScalingCriteria log = logging.getLogger() log.level = logging.INFO @@ -401,7 +402,7 @@ vnfd_record_mock = { test_db = SqliteDatabase(':memory:') -MODELS = [ScalingRecord, ScalingAlarm] +MODELS = [ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm] class PolicyModuleAgentTest(unittest.TestCase): @@ -419,9 +420,9 @@ class PolicyModuleAgentTest(unittest.TestCase): @patch.object(DbMongo, 'db_connect', Mock()) @patch.object(KafkaProducer, '__init__') @patch.object(MonClient, 'create_alarm') - @patch.object(PolicyModuleAgent, '_get_vnfd') - @patch.object(PolicyModuleAgent, '_get_nsr') - @patch.object(PolicyModuleAgent, '_get_vnfr') + @patch.object(DbClient, 'get_vnfd') + @patch.object(DbClient, 'get_nsr') + @patch.object(DbClient, 'get_vnfr') def test_configure_scaling_groups(self, get_vnfr, get_nsr, get_vnfd, create_alarm, kafka_producer_init): def _test_configure_scaling_groups_get_vnfr(*args, **kwargs): if '1' in args[1]: @@ -444,12 +445,32 @@ class PolicyModuleAgentTest(unittest.TestCase): operation='GT', statistic='AVERAGE', threshold=80, - vdu_name='cirros_vnfd-VM', + vdu_name='cirros_ns-1-cirros_vnfd-VM-1', vnf_member_index='1') - scaling_record = ScalingRecord.get() + create_alarm.assert_any_call(metric_name='average_memory_utilization', + ns_id='test_nsr_id', + operation='LT', + statistic='AVERAGE', + threshold=20, + vdu_name='cirros_ns-1-cirros_vnfd-VM-1', + vnf_member_index='1') + create_alarm.assert_any_call(metric_name='average_memory_utilization', + ns_id='test_nsr_id', + operation='GT', + statistic='AVERAGE', + threshold=80, + vdu_name='cirros_ns-2-cirros_vnfd-VM-1', + vnf_member_index='2') + create_alarm.assert_any_call(metric_name='average_memory_utilization', + ns_id='test_nsr_id', + operation='LT', + statistic='AVERAGE', + threshold=20, + vdu_name='cirros_ns-2-cirros_vnfd-VM-1', + vnf_member_index='2') + scaling_record = ScalingGroup.get() self.assertEqual(scaling_record.name, 'scale_cirros_vnfd-VM') self.assertEqual(scaling_record.nsr_id, 'test_nsr_id') - self.assertIsNotNone(scaling_record) if __name__ == '__main__': diff --git a/setup.py b/setup.py index c3c735d..86dacc4 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ from setuptools import setup _name = 'osm_policy_module' -_version_command = ('git describe --match v* --tags --long --dirty', 'pep440-git') +_version_command = ('git describe --match v* --tags --long --dirty', 'pep440-git-full') _author = "Benjamín Díaz" _author_email = 'bdiaz@whitestack.com' _description = 'OSM Policy Module' diff --git a/stdeb.cfg b/stdeb.cfg new file mode 100644 index 0000000..021202c --- /dev/null +++ b/stdeb.cfg @@ -0,0 +1,3 @@ +[DEFAULT] +X-Python3-Version : >= 3.4 +Depends3 : libmysqlclient-dev, python3-pip, python3-osm-common \ No newline at end of file diff --git a/tox.ini b/tox.ini index 4ef6a64..584efa7 100644 --- a/tox.ini +++ b/tox.ini @@ -25,7 +25,7 @@ # test suite on all supported python versions. To use it, "pip install tox" # and then run "tox" from this directory. [tox] -envlist = py3 +envlist = py3, flake8 toxworkdir={homedir}/.tox [testenv] @@ -40,12 +40,6 @@ deps = flake8 commands = flake8 osm_policy_module -[testenv:build] -basepython = python3 -deps = stdeb - setuptools-version-command -commands = python3 setup.py --command-packages=stdeb.command bdist_deb - [flake8] # E123, E125 skipped as they are invalid PEP-8. max-line-length = 120 @@ -54,4 +48,8 @@ ignore = E123,E125,E241 builtins = _ exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build,devops_stages/*,.rst - +[testenv:build] +basepython = python3 +deps = stdeb + setuptools-version-command +commands = python3 setup.py --command-packages=stdeb.command bdist_deb