##
import datetime
import logging
+import os
+from typing import Iterable, List
-from peewee import CharField, IntegerField, ForeignKeyField, Model, TextField, AutoField, DateTimeField
+from peewee import (
+ CharField,
+ IntegerField,
+ ForeignKeyField,
+ Model,
+ TextField,
+ AutoField,
+ DateTimeField,
+ Proxy,
+ BooleanField,
+)
+from peewee_migrate import Router
from playhouse.db_url import connect
+from osm_policy_module import migrations
from osm_policy_module.core.config import Config
log = logging.getLogger(__name__)
-cfg = Config.instance()
-db = connect(cfg.OSMPOL_SQL_DATABASE_URI)
+db = Proxy()
class BaseModel(Model):
class ScalingGroup(BaseModel):
nsr_id = CharField()
- vnf_member_index = IntegerField()
+ vnf_member_index = CharField()
name = CharField()
content = TextField()
class ScalingPolicy(BaseModel):
name = CharField()
cooldown_time = IntegerField()
+ scale_in_operation = CharField(default="AND")
+ scale_out_operation = CharField(default="OR")
+ enabled = BooleanField(default=True)
last_scale = DateTimeField(default=datetime.datetime.now)
- scaling_group = ForeignKeyField(ScalingGroup, related_name='scaling_policies')
+ scaling_group = ForeignKeyField(
+ ScalingGroup, related_name="scaling_policies", on_delete="CASCADE"
+ )
class ScalingCriteria(BaseModel):
name = CharField()
- scaling_policy = ForeignKeyField(ScalingPolicy, related_name='scaling_criterias')
+ scaling_policy = ForeignKeyField(
+ ScalingPolicy, related_name="scaling_criterias", on_delete="CASCADE"
+ )
class ScalingAlarm(BaseModel):
- alarm_id = CharField()
+ alarm_uuid = CharField(unique=True)
action = CharField()
- vnf_member_index = IntegerField()
+ vnf_member_index = CharField()
+ vdu_name = CharField()
+ scaling_criteria = ForeignKeyField(
+ ScalingCriteria, related_name="scaling_alarms", on_delete="CASCADE"
+ )
+ last_status = CharField(default="insufficient-data")
+
+
+class VnfAlarm(BaseModel):
+ alarm_id = CharField()
+ alarm_uuid = CharField(unique=True)
+ nsr_id = CharField()
+ vnf_member_index = CharField()
vdu_name = CharField()
- scaling_criteria = ForeignKeyField(ScalingCriteria, related_name='scaling_alarms')
+ last_action = CharField(default="insufficient-data")
+ id_suffix = IntegerField()
+ ok_ack = BooleanField(default=False)
+ alarm_ack = BooleanField(default=False)
+
+
+class AlarmAction(BaseModel):
+ type = CharField()
+ url = TextField()
+ alarm = ForeignKeyField(VnfAlarm, related_name="actions", on_delete="CASCADE")
+
+
+class HealingAction(BaseModel):
+ alarm_id = CharField()
+ recovery_action = CharField()
+ alarm_uuid = CharField(unique=True)
+ nsr_id = CharField()
+ vnfinstance_id = CharField()
+ vnf_member_index = CharField()
+ vdur_name = CharField()
+ vdu_id = CharField()
+ cooldown_time = IntegerField()
+ count_index = IntegerField()
+ last_heal = DateTimeField(default=datetime.datetime.now)
+ last_status = CharField(default="insufficient-data")
+ day1 = BooleanField(default=False)
class DatabaseManager:
- def create_tables(self):
- try:
- db.connect()
- db.create_tables([ScalingGroup, ScalingPolicy, ScalingCriteria, ScalingAlarm])
- db.close()
- except Exception as e:
- log.exception("Error creating tables: ")
+ def __init__(self, config: Config):
+ db.initialize(connect(config.get("sql", "database_uri")))
+
+ def create_tables(self) -> None:
+ db.connect()
+ with db.atomic():
+ router = Router(db, os.path.dirname(migrations.__file__))
+ router.run()
+ db.close()
+
+
+class ScalingAlarmRepository:
+ @staticmethod
+ def list(*expressions) -> Iterable[ScalingAlarm]:
+ return ScalingAlarm.select().where(*expressions).__iter__()
+
+ @staticmethod
+ def get(*expressions, join_classes: List = None) -> ScalingAlarm:
+ query = ScalingAlarm.select()
+ if join_classes:
+ for join_class in join_classes:
+ query = query.join(join_class)
+ return query.where(*expressions).get()
+
+ @staticmethod
+ def create(**query) -> ScalingAlarm:
+ return ScalingAlarm.create(**query)
+
+
+class ScalingGroupRepository:
+ @staticmethod
+ def list(*expressions) -> Iterable[ScalingGroup]:
+ return ScalingGroup.select().where(*expressions).__iter__()
+
+ @staticmethod
+ def get(*expressions) -> ScalingGroup:
+ return ScalingGroup.select().where(*expressions).get()
+
+ @staticmethod
+ def create(**query) -> ScalingGroup:
+ return ScalingGroup.create(**query)
+
+
+class ScalingPolicyRepository:
+ @staticmethod
+ def list(*expressions, join_classes: List = None) -> Iterable[ScalingPolicy]:
+ query = ScalingPolicy.select()
+ if join_classes:
+ for join_class in join_classes:
+ query = query.join(join_class)
+ return query.where(*expressions).__iter__()
+
+ @staticmethod
+ def get(*expressions, join_classes: List = None) -> ScalingPolicy:
+ query = ScalingPolicy.select()
+ if join_classes:
+ for join_class in join_classes:
+ query = query.join(join_class)
+ return query.where(*expressions).get()
+
+ @staticmethod
+ def create(**query) -> ScalingPolicy:
+ return ScalingPolicy.create(**query)
+
+
+class ScalingCriteriaRepository:
+ @staticmethod
+ def list(*expressions, join_classes: List = None) -> Iterable[ScalingCriteria]:
+ query = ScalingCriteria.select()
+ if join_classes:
+ for join_class in join_classes:
+ query = query.join(join_class)
+ return query.where(*expressions).__iter__()
+
+ @staticmethod
+ def get(*expressions, join_classes: List = None) -> ScalingCriteria:
+ query = ScalingCriteria.select()
+ if join_classes:
+ for join_class in join_classes:
+ query = query.join(join_class)
+ return query.where(*expressions).get()
+
+ @staticmethod
+ def create(**query) -> ScalingCriteria:
+ return ScalingCriteria.create(**query)
+
+
+class VnfAlarmRepository:
+ @staticmethod
+ def list(*expressions) -> Iterable[VnfAlarm]:
+ return VnfAlarm.select().where(*expressions).__iter__()
+
+ @staticmethod
+ def get(*expressions) -> VnfAlarm:
+ return VnfAlarm.select().where(*expressions).get()
+
+ @staticmethod
+ def create(**query) -> VnfAlarm:
+ return VnfAlarm.create(**query)
+
+
+class AlarmActionRepository:
+ @staticmethod
+ def list(*expressions) -> Iterable[AlarmAction]:
+ return AlarmAction.select().where(*expressions).__iter__()
+
+ @staticmethod
+ def get(*expressions) -> AlarmAction:
+ return AlarmAction.select().where(*expressions).get()
+
+ @staticmethod
+ def create(**query) -> AlarmAction:
+ return AlarmAction.create(**query)
+
+
+class HealingActionRepository:
+ @staticmethod
+ def list(*expressions) -> Iterable[HealingAction]:
+ log.info(
+ "### Printing healing action db alarm {}".format(
+ HealingAction.select().where(*expressions)
+ )
+ )
+ return HealingAction.select().where(*expressions).__iter__()
+
+ @staticmethod
+ def get(*expressions) -> HealingAction:
+ return HealingAction.select().where(*expressions).get()
+
+ @staticmethod
+ def create(**query) -> HealingAction:
+ return HealingAction.create(**query)