Feature 10981: update alerts collections in Mongo from OSM v12 to v14
[osm/devops.git] / installers / charm / osm-update-db-operator / src / db_upgrade.py
1 # Copyright 2022 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14
15 """Upgrade DB charm module."""
16
17 import json
18 import logging
19
20 from pymongo import MongoClient
21 from uuid import uuid4
22
23 logger = logging.getLogger(__name__)
24
25
26 class MongoUpgrade1214:
27 """Upgrade MongoDB Database from OSM v12 to v14."""
28
29 @staticmethod
30 def _migrate_alerts(osm_db):
31 """Create new alerts collection.
32 """
33 if "alerts" in osm_db.list_collection_names():
34 return
35 logger.info("Entering in MongoUpgrade1214._migrate_alerts function")
36 logger.info("Reading VNF descriptors:")
37 alerts = osm_db["alerts"]
38 vnfds = osm_db["vnfds"]
39 db_vnfds = []
40 for vnfd in vnfds.find():
41 logger.info(f' {vnfd["_id"]}: {vnfd["description"]}')
42 db_vnfds.append(vnfd)
43 logger.info("Reading VNFRs")
44 vnfrs = osm_db["vnfrs"]
45 for vnfr in vnfrs.find():
46 logger.info(f' vnfr {vnfr["_id"]}')
47 vnfd = next((sub for sub in db_vnfds if sub["_id"] == vnfr["vnfd-id"]), None)
48 nsr_id = vnfr["nsr-id-ref"]
49 # Checking for auto-healing configuration
50 df = vnfd.get("df", [{}])[0]
51 if "healing-aspect" in df:
52 healing_aspects = df["healing-aspect"]
53 for healing in healing_aspects:
54 for healing_policy in healing.get("healing-policy", ()):
55 vdu_id = healing_policy["vdu-id"]
56 vdur = next(
57 (
58 vdur
59 for vdur in vnfr["vdur"]
60 if vdu_id == vdur["vdu-id-ref"]
61 ),
62 {},
63 )
64 if not vdur:
65 continue
66 metric_name = "vm_status"
67 vdu_name = vdur.get("name")
68 vnf_member_index = vnfr["member-vnf-index-ref"]
69 uuid = str(uuid4())
70 name = f"healing_{uuid}"
71 action = healing_policy
72 # action_on_recovery = healing.get("action-on-recovery")
73 # cooldown_time = healing.get("cooldown-time")
74 # day1 = healing.get("day1")
75 alert = {
76 "uuid": uuid,
77 "name": name,
78 "metric": metric_name,
79 "tags": {
80 "ns_id": nsr_id,
81 "vnf_member_index": vnf_member_index,
82 "vdu_name": vdu_name,
83 },
84 "alarm_status": "ok",
85 "action_type": "healing",
86 "action": action,
87 }
88 logger.info(f"Storing in MongoDB alert {alert}")
89 alerts.insert_one(alert)
90
91 # Checking for auto-scaling configuration
92 if "scaling-aspect" in df:
93 rel_operation_types = {
94 "GE": ">=",
95 "LE": "<=",
96 "GT": ">",
97 "LT": "<",
98 "EQ": "==",
99 "NE": "!=",
100 }
101 scaling_aspects = df["scaling-aspect"]
102 all_vnfd_monitoring_params = {}
103 for ivld in vnfd.get("int-virtual-link-desc", ()):
104 for mp in ivld.get("monitoring-parameters", ()):
105 all_vnfd_monitoring_params[mp.get("id")] = mp
106 for vdu in vnfd.get("vdu", ()):
107 for mp in vdu.get("monitoring-parameter", ()):
108 all_vnfd_monitoring_params[mp.get("id")] = mp
109 for df in vnfd.get("df", ()):
110 for mp in df.get("monitoring-parameter", ()):
111 all_vnfd_monitoring_params[mp.get("id")] = mp
112 for scaling_aspect in scaling_aspects:
113 scaling_group_name = scaling_aspect.get("name", "")
114 # Get monitored VDUs
115 all_monitored_vdus = set()
116 for delta in scaling_aspect.get(
117 "aspect-delta-details", {}
118 ).get("deltas", ()):
119 for vdu_delta in delta.get("vdu-delta", ()):
120 all_monitored_vdus.add(vdu_delta.get("id"))
121 monitored_vdurs = list(
122 filter(
123 lambda vdur: vdur["vdu-id-ref"]
124 in all_monitored_vdus,
125 vnfr["vdur"],
126 )
127 )
128 if not monitored_vdurs:
129 logger.error("Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric")
130 continue
131 for scaling_policy in scaling_aspect.get(
132 "scaling-policy", ()
133 ):
134 if scaling_policy["scaling-type"] != "automatic":
135 continue
136 threshold_time = scaling_policy.get(
137 "threshold-time", "1"
138 )
139 cooldown_time = scaling_policy.get("cooldown-time", "0")
140 for scaling_criteria in scaling_policy["scaling-criteria"]:
141 monitoring_param_ref = scaling_criteria.get(
142 "vnf-monitoring-param-ref"
143 )
144 vnf_monitoring_param = all_vnfd_monitoring_params[
145 monitoring_param_ref
146 ]
147 for vdur in monitored_vdurs:
148 vdu_id = vdur["vdu-id-ref"]
149 metric_name = vnf_monitoring_param.get("performance-metric")
150 vdu_name = vdur["name"]
151 vnf_member_index = vnfr["member-vnf-index-ref"]
152 scalein_threshold = scaling_criteria["scale-in-threshold"]
153 # Looking for min/max-number-of-instances
154 instances_min_number = 1
155 instances_max_number = 1
156 vdu_profile = df["vdu-profile"]
157 if vdu_profile:
158 profile = next(
159 item
160 for item in vdu_profile
161 if item["id"] == vdu_id
162 )
163 instances_min_number = profile.get("min-number-of-instances", 1)
164 instances_max_number = profile.get("max-number-of-instances", 1)
165
166 if scalein_threshold:
167 uuid = str(uuid4())
168 name = f"scalein_{uuid}"
169 operation = scaling_criteria["scale-in-relational-operation"]
170 rel_operator = rel_operation_types.get(operation, "<=")
171 metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
172 expression = f"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})"
173 labels = {
174 "ns_id": nsr_id,
175 "vnf_member_index": vnf_member_index,
176 "vdu_id": vdu_id,
177 }
178 prom_cfg = {
179 "alert": name,
180 "expr": expression,
181 "for": str(threshold_time) + "m",
182 "labels": labels,
183 }
184 action = scaling_policy
185 action = {
186 "scaling-group": scaling_group_name,
187 "cooldown-time": cooldown_time,
188 }
189 alert = {
190 "uuid": uuid,
191 "name": name,
192 "metric": metric_name,
193 "tags": {
194 "ns_id": nsr_id,
195 "vnf_member_index": vnf_member_index,
196 "vdu_id": vdu_id,
197 },
198 "alarm_status": "ok",
199 "action_type": "scale_in",
200 "action": action,
201 "prometheus_config": prom_cfg,
202 }
203 logger.info(f"Storing in MongoDB alert {alert}")
204 alerts.insert_one(alert)
205
206 scaleout_threshold = scaling_criteria["scale-out-threshold"]
207 if scaleout_threshold:
208 uuid = str(uuid4())
209 name = f"scaleout_{uuid}"
210 operation = scaling_criteria["scale-out-relational-operation"]
211 rel_operator = rel_operation_types.get(operation, "<=")
212 metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
213 expression = f"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})"
214 labels = {
215 "ns_id": nsr_id,
216 "vnf_member_index": vnf_member_index,
217 "vdu_id": vdu_id,
218 }
219 prom_cfg = {
220 "alert": name,
221 "expr": expression,
222 "for": str(threshold_time) + "m",
223 "labels": labels,
224 }
225 action = scaling_policy
226 action = {
227 "scaling-group": scaling_group_name,
228 "cooldown-time": cooldown_time,
229 }
230 alert = {
231 "uuid": uuid,
232 "name": name,
233 "metric": metric_name,
234 "tags": {
235 "ns_id": nsr_id,
236 "vnf_member_index": vnf_member_index,
237 "vdu_id": vdu_id,
238 },
239 "alarm_status": "ok",
240 "action_type": "scale_out",
241 "action": action,
242 "prometheus_config": prom_cfg,
243 }
244 logger.info(f"Storing in MongoDB alert {alert}")
245 alerts.insert_one(alert)
246 # Delete old alarms collections
247 logger.info("Deleting alarms and alarms_action collections")
248 alarms = osm_db["alarms"]
249 alarms.drop()
250 alarms_action = osm_db["alarms_action"]
251 alarms_action.drop()
252
253
254 @staticmethod
255 def upgrade(mongo_uri):
256 """Upgrade alerts in MongoDB."""
257 logger.info("Entering in MongoUpgrade1214.upgrade function")
258 myclient = MongoClient(mongo_uri)
259 osm_db = myclient["osm"]
260 MongoUpgrade1214._migrate_alerts(osm_db)
261
262
263 class MongoUpgrade1012:
264 """Upgrade MongoDB Database from OSM v10 to v12."""
265
266 @staticmethod
267 def _remove_namespace_from_k8s(nsrs, nsr):
268 namespace = "kube-system:"
269 if nsr["_admin"].get("deployed"):
270 k8s_list = []
271 for k8s in nsr["_admin"]["deployed"].get("K8s"):
272 if k8s.get("k8scluster-uuid"):
273 k8s["k8scluster-uuid"] = k8s["k8scluster-uuid"].replace(namespace, "", 1)
274 k8s_list.append(k8s)
275 myquery = {"_id": nsr["_id"]}
276 nsrs.update_one(myquery, {"$set": {"_admin.deployed.K8s": k8s_list}})
277
278 @staticmethod
279 def _update_nsr(osm_db):
280 """Update nsr.
281
282 Add vim_message = None if it does not exist.
283 Remove "namespace:" from k8scluster-uuid.
284 """
285 if "nsrs" not in osm_db.list_collection_names():
286 return
287 logger.info("Entering in MongoUpgrade1012._update_nsr function")
288
289 nsrs = osm_db["nsrs"]
290 for nsr in nsrs.find():
291 logger.debug(f"Updating {nsr['_id']} nsr")
292 for key, values in nsr.items():
293 if isinstance(values, list):
294 item_list = []
295 for value in values:
296 if isinstance(value, dict) and value.get("vim_info"):
297 index = list(value["vim_info"].keys())[0]
298 if not value["vim_info"][index].get("vim_message"):
299 value["vim_info"][index]["vim_message"] = None
300 item_list.append(value)
301 myquery = {"_id": nsr["_id"]}
302 nsrs.update_one(myquery, {"$set": {key: item_list}})
303 MongoUpgrade1012._remove_namespace_from_k8s(nsrs, nsr)
304
305 @staticmethod
306 def _update_vnfr(osm_db):
307 """Update vnfr.
308
309 Add vim_message to vdur if it does not exist.
310 Copy content of interfaces into interfaces_backup.
311 """
312 if "vnfrs" not in osm_db.list_collection_names():
313 return
314 logger.info("Entering in MongoUpgrade1012._update_vnfr function")
315 mycol = osm_db["vnfrs"]
316 for vnfr in mycol.find():
317 logger.debug(f"Updating {vnfr['_id']} vnfr")
318 vdur_list = []
319 for vdur in vnfr["vdur"]:
320 if vdur.get("vim_info"):
321 index = list(vdur["vim_info"].keys())[0]
322 if not vdur["vim_info"][index].get("vim_message"):
323 vdur["vim_info"][index]["vim_message"] = None
324 if vdur["vim_info"][index].get(
325 "interfaces", "Not found"
326 ) != "Not found" and not vdur["vim_info"][index].get("interfaces_backup"):
327 vdur["vim_info"][index]["interfaces_backup"] = vdur["vim_info"][index][
328 "interfaces"
329 ]
330 vdur_list.append(vdur)
331 myquery = {"_id": vnfr["_id"]}
332 mycol.update_one(myquery, {"$set": {"vdur": vdur_list}})
333
334 @staticmethod
335 def _update_k8scluster(osm_db):
336 """Remove namespace from helm-chart and helm-chart-v3 id."""
337 if "k8sclusters" not in osm_db.list_collection_names():
338 return
339 logger.info("Entering in MongoUpgrade1012._update_k8scluster function")
340 namespace = "kube-system:"
341 k8sclusters = osm_db["k8sclusters"]
342 for k8scluster in k8sclusters.find():
343 if k8scluster["_admin"].get("helm-chart") and k8scluster["_admin"]["helm-chart"].get(
344 "id"
345 ):
346 if k8scluster["_admin"]["helm-chart"]["id"].startswith(namespace):
347 k8scluster["_admin"]["helm-chart"]["id"] = k8scluster["_admin"]["helm-chart"][
348 "id"
349 ].replace(namespace, "", 1)
350 if k8scluster["_admin"].get("helm-chart-v3") and k8scluster["_admin"][
351 "helm-chart-v3"
352 ].get("id"):
353 if k8scluster["_admin"]["helm-chart-v3"]["id"].startswith(namespace):
354 k8scluster["_admin"]["helm-chart-v3"]["id"] = k8scluster["_admin"][
355 "helm-chart-v3"
356 ]["id"].replace(namespace, "", 1)
357 myquery = {"_id": k8scluster["_id"]}
358 k8sclusters.update_one(myquery, {"$set": k8scluster})
359
360 @staticmethod
361 def upgrade(mongo_uri):
362 """Upgrade nsr, vnfr and k8scluster in DB."""
363 logger.info("Entering in MongoUpgrade1012.upgrade function")
364 myclient = MongoClient(mongo_uri)
365 osm_db = myclient["osm"]
366 MongoUpgrade1012._update_nsr(osm_db)
367 MongoUpgrade1012._update_vnfr(osm_db)
368 MongoUpgrade1012._update_k8scluster(osm_db)
369
370
371 class MongoUpgrade910:
372 """Upgrade MongoDB Database from OSM v9 to v10."""
373
374 @staticmethod
375 def upgrade(mongo_uri):
376 """Add parameter alarm status = OK if not found in alarms collection."""
377 myclient = MongoClient(mongo_uri)
378 osm_db = myclient["osm"]
379 collist = osm_db.list_collection_names()
380
381 if "alarms" in collist:
382 mycol = osm_db["alarms"]
383 for x in mycol.find():
384 if not x.get("alarm_status"):
385 myquery = {"_id": x["_id"]}
386 mycol.update_one(myquery, {"$set": {"alarm_status": "ok"}})
387
388
389 class MongoPatch1837:
390 """Patch Bug 1837 on MongoDB."""
391
392 @staticmethod
393 def _update_nslcmops_params(osm_db):
394 """Updates the nslcmops collection to change the additional params to a string."""
395 logger.info("Entering in MongoPatch1837._update_nslcmops_params function")
396 if "nslcmops" in osm_db.list_collection_names():
397 nslcmops = osm_db["nslcmops"]
398 for nslcmop in nslcmops.find():
399 if nslcmop.get("operationParams"):
400 if nslcmop["operationParams"].get("additionalParamsForVnf") and isinstance(
401 nslcmop["operationParams"].get("additionalParamsForVnf"), list
402 ):
403 string_param = json.dumps(
404 nslcmop["operationParams"]["additionalParamsForVnf"]
405 )
406 myquery = {"_id": nslcmop["_id"]}
407 nslcmops.update_one(
408 myquery,
409 {
410 "$set": {
411 "operationParams": {"additionalParamsForVnf": string_param}
412 }
413 },
414 )
415 elif nslcmop["operationParams"].get("primitive_params") and isinstance(
416 nslcmop["operationParams"].get("primitive_params"), dict
417 ):
418 string_param = json.dumps(nslcmop["operationParams"]["primitive_params"])
419 myquery = {"_id": nslcmop["_id"]}
420 nslcmops.update_one(
421 myquery,
422 {"$set": {"operationParams": {"primitive_params": string_param}}},
423 )
424
425 @staticmethod
426 def _update_vnfrs_params(osm_db):
427 """Updates the vnfrs collection to change the additional params to a string."""
428 logger.info("Entering in MongoPatch1837._update_vnfrs_params function")
429 if "vnfrs" in osm_db.list_collection_names():
430 mycol = osm_db["vnfrs"]
431 for vnfr in mycol.find():
432 if vnfr.get("kdur"):
433 kdur_list = []
434 for kdur in vnfr["kdur"]:
435 if kdur.get("additionalParams") and not isinstance(
436 kdur["additionalParams"], str
437 ):
438 kdur["additionalParams"] = json.dumps(kdur["additionalParams"])
439 kdur_list.append(kdur)
440 myquery = {"_id": vnfr["_id"]}
441 mycol.update_one(
442 myquery,
443 {"$set": {"kdur": kdur_list}},
444 )
445 vnfr["kdur"] = kdur_list
446
447 @staticmethod
448 def patch(mongo_uri):
449 """Updates the database to change the additional params from dict to a string."""
450 logger.info("Entering in MongoPatch1837.patch function")
451 myclient = MongoClient(mongo_uri)
452 osm_db = myclient["osm"]
453 MongoPatch1837._update_nslcmops_params(osm_db)
454 MongoPatch1837._update_vnfrs_params(osm_db)
455
456
457 MONGODB_UPGRADE_FUNCTIONS = {
458 "9": {"10": [MongoUpgrade910.upgrade]},
459 "10": {"12": [MongoUpgrade1012.upgrade]},
460 "12": {"14": [MongoUpgrade1214.upgrade]},
461 }
462 MYSQL_UPGRADE_FUNCTIONS = {}
463 BUG_FIXES = {
464 1837: MongoPatch1837.patch,
465 }
466
467
468 class MongoUpgrade:
469 """Upgrade MongoDB Database."""
470
471 def __init__(self, mongo_uri):
472 self.mongo_uri = mongo_uri
473
474 def upgrade(self, current, target):
475 """Validates the upgrading path and upgrades the DB."""
476 self._validate_upgrade(current, target)
477 for function in MONGODB_UPGRADE_FUNCTIONS.get(current)[target]:
478 function(self.mongo_uri)
479
480 def _validate_upgrade(self, current, target):
481 """Check if the upgrade path chosen is possible."""
482 logger.info("Validating the upgrade path")
483 if current not in MONGODB_UPGRADE_FUNCTIONS:
484 raise Exception(f"cannot upgrade from {current} version.")
485 if target not in MONGODB_UPGRADE_FUNCTIONS[current]:
486 raise Exception(f"cannot upgrade from version {current} to {target}.")
487
488 def apply_patch(self, bug_number: int) -> None:
489 """Checks the bug-number and applies the fix in the database."""
490 if bug_number not in BUG_FIXES:
491 raise Exception(f"There is no patch for bug {bug_number}")
492 patch_function = BUG_FIXES[bug_number]
493 patch_function(self.mongo_uri)
494
495
496 class MysqlUpgrade:
497 """Upgrade Mysql Database."""
498
499 def __init__(self, mysql_uri):
500 self.mysql_uri = mysql_uri
501
502 def upgrade(self, current, target):
503 """Validates the upgrading path and upgrades the DB."""
504 self._validate_upgrade(current, target)
505 for function in MYSQL_UPGRADE_FUNCTIONS[current][target]:
506 function(self.mysql_uri)
507
508 def _validate_upgrade(self, current, target):
509 """Check if the upgrade path chosen is possible."""
510 logger.info("Validating the upgrade path")
511 if current not in MYSQL_UPGRADE_FUNCTIONS:
512 raise Exception(f"cannot upgrade from {current} version.")
513 if target not in MYSQL_UPGRADE_FUNCTIONS[current]:
514 raise Exception(f"cannot upgrade from version {current} to {target}.")