Fix pylint issues appeared with version 3.2.0 of pylint
[osm/POL.git] / osm_policy_module / core / database.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20
21 # For those usages not covered by the Apache License, Version 2.0 please
22 # contact: bdiaz@whitestack.com or glavado@whitestack.com
23 ##
24 import datetime
25 import logging
26 import os
27 from typing import Iterable, List
28
29 from peewee import (
30 CharField,
31 IntegerField,
32 ForeignKeyField,
33 Model,
34 TextField,
35 AutoField,
36 DateTimeField,
37 Proxy,
38 BooleanField,
39 )
40 from peewee_migrate import Router
41 from playhouse.db_url import connect
42
43 from osm_policy_module import migrations
44 from osm_policy_module.core.config import Config
45
46 log = logging.getLogger(__name__)
47
48 db = Proxy()
49
50
51 class BaseModel(Model):
52 id = AutoField(primary_key=True)
53
54 class Meta:
55 database = db
56
57
58 class ScalingGroup(BaseModel):
59 nsr_id = CharField()
60 vnf_member_index = CharField()
61 name = CharField()
62 content = TextField()
63
64
65 class ScalingPolicy(BaseModel):
66 name = CharField()
67 cooldown_time = IntegerField()
68 scale_in_operation = CharField(default="AND")
69 scale_out_operation = CharField(default="OR")
70 enabled = BooleanField(default=True)
71 last_scale = DateTimeField(default=datetime.datetime.now)
72 scaling_group = ForeignKeyField(
73 ScalingGroup, related_name="scaling_policies", on_delete="CASCADE"
74 )
75
76
77 class ScalingCriteria(BaseModel):
78 name = CharField()
79 scaling_policy = ForeignKeyField(
80 ScalingPolicy, related_name="scaling_criterias", on_delete="CASCADE"
81 )
82
83
84 class ScalingAlarm(BaseModel):
85 alarm_uuid = CharField(unique=True)
86 action = CharField()
87 vnf_member_index = CharField()
88 vdu_name = CharField()
89 scaling_criteria = ForeignKeyField(
90 ScalingCriteria, related_name="scaling_alarms", on_delete="CASCADE"
91 )
92 last_status = CharField(default="insufficient-data")
93
94
95 class VnfAlarm(BaseModel):
96 alarm_id = CharField()
97 alarm_uuid = CharField(unique=True)
98 nsr_id = CharField()
99 vnf_member_index = CharField()
100 vdu_name = CharField()
101 last_action = CharField(default="insufficient-data")
102 id_suffix = IntegerField()
103 ok_ack = BooleanField(default=False)
104 alarm_ack = BooleanField(default=False)
105
106
107 class AlarmAction(BaseModel):
108 type = CharField()
109 url = TextField()
110 alarm = ForeignKeyField(VnfAlarm, related_name="actions", on_delete="CASCADE")
111
112
113 class HealingAction(BaseModel):
114 alarm_id = CharField()
115 recovery_action = CharField()
116 alarm_uuid = CharField(unique=True)
117 nsr_id = CharField()
118 vnfinstance_id = CharField()
119 vnf_member_index = CharField()
120 vdur_name = CharField()
121 vdu_id = CharField()
122 cooldown_time = IntegerField()
123 count_index = IntegerField()
124 last_heal = DateTimeField(default=datetime.datetime.now)
125 last_status = CharField(default="insufficient-data")
126 day1 = BooleanField(default=False)
127
128
129 class DatabaseManager:
130 def __init__(self, config: Config):
131 db.initialize(connect(config.get("sql", "database_uri")))
132
133 def create_tables(self) -> None:
134 db.connect()
135 with db.atomic():
136 router = Router(db, os.path.dirname(migrations.__file__))
137 router.run()
138 db.close()
139
140
141 class ScalingAlarmRepository:
142 @staticmethod
143 def list(*expressions) -> Iterable[ScalingAlarm]:
144 return ScalingAlarm.select().where(*expressions).__iter__()
145
146 @staticmethod
147 def get(*expressions, join_classes: List = None) -> ScalingAlarm:
148 query = ScalingAlarm.select()
149 if join_classes:
150 for join_class in join_classes:
151 query = query.join(join_class)
152 return query.where(*expressions).get()
153
154 @staticmethod
155 def create(**query) -> ScalingAlarm:
156 return ScalingAlarm.create(**query)
157
158
159 class ScalingGroupRepository:
160 @staticmethod
161 def list(*expressions) -> Iterable[ScalingGroup]:
162 return ScalingGroup.select().where(*expressions).__iter__()
163
164 @staticmethod
165 def get(*expressions) -> ScalingGroup:
166 return ScalingGroup.select().where(*expressions).get()
167
168 @staticmethod
169 def create(**query) -> ScalingGroup:
170 return ScalingGroup.create(**query)
171
172
173 class ScalingPolicyRepository:
174 @staticmethod
175 def list(*expressions, join_classes: List = None) -> Iterable[ScalingPolicy]:
176 query = ScalingPolicy.select()
177 if join_classes:
178 for join_class in join_classes:
179 query = query.join(join_class)
180 return query.where(*expressions).__iter__()
181
182 @staticmethod
183 def get(*expressions, join_classes: List = None) -> ScalingPolicy:
184 query = ScalingPolicy.select()
185 if join_classes:
186 for join_class in join_classes:
187 query = query.join(join_class)
188 return query.where(*expressions).get()
189
190 @staticmethod
191 def create(**query) -> ScalingPolicy:
192 return ScalingPolicy.create(**query)
193
194
195 class ScalingCriteriaRepository:
196 @staticmethod
197 def list(*expressions, join_classes: List = None) -> Iterable[ScalingCriteria]:
198 query = ScalingCriteria.select()
199 if join_classes:
200 for join_class in join_classes:
201 query = query.join(join_class)
202 return query.where(*expressions).__iter__()
203
204 @staticmethod
205 def get(*expressions, join_classes: List = None) -> ScalingCriteria:
206 query = ScalingCriteria.select()
207 if join_classes:
208 for join_class in join_classes:
209 query = query.join(join_class)
210 return query.where(*expressions).get()
211
212 @staticmethod
213 def create(**query) -> ScalingCriteria:
214 return ScalingCriteria.create(**query)
215
216
217 class VnfAlarmRepository:
218 @staticmethod
219 def list(*expressions) -> Iterable[VnfAlarm]:
220 return VnfAlarm.select().where(*expressions).__iter__()
221
222 @staticmethod
223 def get(*expressions) -> VnfAlarm:
224 return VnfAlarm.select().where(*expressions).get()
225
226 @staticmethod
227 def create(**query) -> VnfAlarm:
228 return VnfAlarm.create(**query)
229
230
231 class AlarmActionRepository:
232 @staticmethod
233 def list(*expressions) -> Iterable[AlarmAction]:
234 return AlarmAction.select().where(*expressions).__iter__()
235
236 @staticmethod
237 def get(*expressions) -> AlarmAction:
238 return AlarmAction.select().where(*expressions).get()
239
240 @staticmethod
241 def create(**query) -> AlarmAction:
242 return AlarmAction.create(**query)
243
244
245 class HealingActionRepository:
246 @staticmethod
247 def list(*expressions) -> Iterable[HealingAction]:
248 log.info(
249 "### Printing healing action db alarm {}".format(
250 HealingAction.select().where(*expressions)
251 )
252 )
253 return HealingAction.select().where(*expressions).__iter__()
254
255 @staticmethod
256 def get(*expressions) -> HealingAction:
257 return HealingAction.select().where(*expressions).get()
258
259 @staticmethod
260 def create(**query) -> HealingAction:
261 return HealingAction.create(**query)