Update db_upgrade.py to append osm to metric_name to be BWC
[osm/devops.git] / installers / charm / osm-update-db-operator / src / db_upgrade.py
1 # Copyright 2022 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License"); you may
4 # not use this file except in compliance with the License. You may obtain
5 # a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 # License for the specific language governing permissions and limitations
13 # under the License.
14
15 """Upgrade DB charm module."""
16
17 import json
18 import logging
19
20 from pymongo import MongoClient
21 from uuid import uuid4
22
23 logger = logging.getLogger(__name__)
24
25
26 class MongoUpgrade1214:
27 """Upgrade MongoDB Database from OSM v12 to v14."""
28
29 @staticmethod
30 def gather_vnfr_healing_alerts(vnfr, vnfd):
31 alerts = []
32 nsr_id = vnfr["nsr-id-ref"]
33 df = vnfd.get("df", [{}])[0]
34 # Checking for auto-healing configuration
35 if "healing-aspect" in df:
36 healing_aspects = df["healing-aspect"]
37 for healing in healing_aspects:
38 for healing_policy in healing.get("healing-policy", ()):
39 vdu_id = healing_policy["vdu-id"]
40 vdur = next(
41 (
42 vdur
43 for vdur in vnfr["vdur"]
44 if vdu_id == vdur["vdu-id-ref"]
45 ),
46 {},
47 )
48 if not vdur:
49 continue
50 metric_name = "vm_status"
51 vdu_name = vdur.get("name")
52 vnf_member_index = vnfr["member-vnf-index-ref"]
53 uuid = str(uuid4())
54 name = f"healing_{uuid}"
55 action = healing_policy
56 # action_on_recovery = healing.get("action-on-recovery")
57 # cooldown_time = healing.get("cooldown-time")
58 # day1 = healing.get("day1")
59 alert = {
60 "uuid": uuid,
61 "name": name,
62 "metric": metric_name,
63 "tags": {
64 "ns_id": nsr_id,
65 "vnf_member_index": vnf_member_index,
66 "vdu_name": vdu_name,
67 },
68 "alarm_status": "ok",
69 "action_type": "healing",
70 "action": action,
71 }
72 alerts.append(alert)
73 return alerts
74
75 @staticmethod
76 def gather_vnfr_scaling_alerts(vnfr, vnfd):
77 alerts = []
78 nsr_id = vnfr["nsr-id-ref"]
79 df = vnfd.get("df", [{}])[0]
80 # Checking for auto-scaling configuration
81 if "scaling-aspect" in df:
82 rel_operation_types = {
83 "GE": ">=",
84 "LE": "<=",
85 "GT": ">",
86 "LT": "<",
87 "EQ": "==",
88 "NE": "!=",
89 }
90 scaling_aspects = df["scaling-aspect"]
91 all_vnfd_monitoring_params = {}
92 for ivld in vnfd.get("int-virtual-link-desc", ()):
93 for mp in ivld.get("monitoring-parameters", ()):
94 all_vnfd_monitoring_params[mp.get("id")] = mp
95 for vdu in vnfd.get("vdu", ()):
96 for mp in vdu.get("monitoring-parameter", ()):
97 all_vnfd_monitoring_params[mp.get("id")] = mp
98 for df in vnfd.get("df", ()):
99 for mp in df.get("monitoring-parameter", ()):
100 all_vnfd_monitoring_params[mp.get("id")] = mp
101 for scaling_aspect in scaling_aspects:
102 scaling_group_name = scaling_aspect.get("name", "")
103 # Get monitored VDUs
104 all_monitored_vdus = set()
105 for delta in scaling_aspect.get(
106 "aspect-delta-details", {}
107 ).get("deltas", ()):
108 for vdu_delta in delta.get("vdu-delta", ()):
109 all_monitored_vdus.add(vdu_delta.get("id"))
110 monitored_vdurs = list(
111 filter(
112 lambda vdur: vdur["vdu-id-ref"]
113 in all_monitored_vdus,
114 vnfr["vdur"],
115 )
116 )
117 if not monitored_vdurs:
118 logger.error("Scaling criteria is referring to a vnf-monitoring-param that does not contain a reference to a vdu or vnf metric")
119 continue
120 for scaling_policy in scaling_aspect.get(
121 "scaling-policy", ()
122 ):
123 if scaling_policy["scaling-type"] != "automatic":
124 continue
125 threshold_time = scaling_policy.get(
126 "threshold-time", "1"
127 )
128 cooldown_time = scaling_policy.get("cooldown-time", "0")
129 for scaling_criteria in scaling_policy["scaling-criteria"]:
130 monitoring_param_ref = scaling_criteria.get(
131 "vnf-monitoring-param-ref"
132 )
133 vnf_monitoring_param = all_vnfd_monitoring_params[
134 monitoring_param_ref
135 ]
136 for vdur in monitored_vdurs:
137 vdu_id = vdur["vdu-id-ref"]
138 metric_name = vnf_monitoring_param.get("performance-metric")
139 metric_name = f"osm_{metric_name}"
140 vdu_name = vdur["name"]
141 vnf_member_index = vnfr["member-vnf-index-ref"]
142 scalein_threshold = scaling_criteria.get("scale-in-threshold")
143 # Looking for min/max-number-of-instances
144 instances_min_number = 1
145 instances_max_number = 1
146 vdu_profile = df["vdu-profile"]
147 if vdu_profile:
148 profile = next(
149 item
150 for item in vdu_profile
151 if item["id"] == vdu_id
152 )
153 instances_min_number = profile.get("min-number-of-instances", 1)
154 instances_max_number = profile.get("max-number-of-instances", 1)
155
156 if scalein_threshold:
157 uuid = str(uuid4())
158 name = f"scalein_{uuid}"
159 operation = scaling_criteria["scale-in-relational-operation"]
160 rel_operator = rel_operation_types.get(operation, "<=")
161 metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
162 expression = f"(count ({metric_selector}) > {instances_min_number}) and (avg({metric_selector}) {rel_operator} {scalein_threshold})"
163 labels = {
164 "ns_id": nsr_id,
165 "vnf_member_index": vnf_member_index,
166 "vdu_id": vdu_id,
167 }
168 prom_cfg = {
169 "alert": name,
170 "expr": expression,
171 "for": str(threshold_time) + "m",
172 "labels": labels,
173 }
174 action = scaling_policy
175 action = {
176 "scaling-group": scaling_group_name,
177 "cooldown-time": cooldown_time,
178 }
179 alert = {
180 "uuid": uuid,
181 "name": name,
182 "metric": metric_name,
183 "tags": {
184 "ns_id": nsr_id,
185 "vnf_member_index": vnf_member_index,
186 "vdu_id": vdu_id,
187 },
188 "alarm_status": "ok",
189 "action_type": "scale_in",
190 "action": action,
191 "prometheus_config": prom_cfg,
192 }
193 alerts.append(alert)
194
195 scaleout_threshold = scaling_criteria.get("scale-out-threshold")
196 if scaleout_threshold:
197 uuid = str(uuid4())
198 name = f"scaleout_{uuid}"
199 operation = scaling_criteria["scale-out-relational-operation"]
200 rel_operator = rel_operation_types.get(operation, "<=")
201 metric_selector = f'{metric_name}{{ns_id="{nsr_id}", vnf_member_index="{vnf_member_index}", vdu_id="{vdu_id}"}}'
202 expression = f"(count ({metric_selector}) < {instances_max_number}) and (avg({metric_selector}) {rel_operator} {scaleout_threshold})"
203 labels = {
204 "ns_id": nsr_id,
205 "vnf_member_index": vnf_member_index,
206 "vdu_id": vdu_id,
207 }
208 prom_cfg = {
209 "alert": name,
210 "expr": expression,
211 "for": str(threshold_time) + "m",
212 "labels": labels,
213 }
214 action = scaling_policy
215 action = {
216 "scaling-group": scaling_group_name,
217 "cooldown-time": cooldown_time,
218 }
219 alert = {
220 "uuid": uuid,
221 "name": name,
222 "metric": metric_name,
223 "tags": {
224 "ns_id": nsr_id,
225 "vnf_member_index": vnf_member_index,
226 "vdu_id": vdu_id,
227 },
228 "alarm_status": "ok",
229 "action_type": "scale_out",
230 "action": action,
231 "prometheus_config": prom_cfg,
232 }
233 alerts.append(alert)
234 return alerts
235
236 @staticmethod
237 def _migrate_alerts(osm_db):
238 """Create new alerts collection.
239 """
240 if "alerts" in osm_db.list_collection_names():
241 return
242 logger.info("Entering in MongoUpgrade1214._migrate_alerts function")
243
244 # Get vnfds from MongoDB
245 logger.info("Reading VNF descriptors:")
246 vnfds = osm_db["vnfds"]
247 db_vnfds = []
248 for vnfd in vnfds.find():
249 logger.info(f' {vnfd["_id"]}: {vnfd["description"]}')
250 db_vnfds.append(vnfd)
251
252 # Get vnfrs from MongoDB
253 logger.info("Reading VNFRs")
254 vnfrs = osm_db["vnfrs"]
255
256 # Gather healing and scaling alerts for each vnfr
257 healing_alerts = []
258 scaling_alerts = []
259 for vnfr in vnfrs.find():
260 logger.info(f' vnfr {vnfr["_id"]}')
261 vnfd = next((sub for sub in db_vnfds if sub["_id"] == vnfr["vnfd-id"]), None)
262 healing_alerts.extend(MongoUpgrade1214.gather_vnfr_healing_alerts(vnfr, vnfd))
263 scaling_alerts.extend(MongoUpgrade1214.gather_vnfr_scaling_alerts(vnfr, vnfd))
264
265 # Add new alerts in MongoDB
266 alerts = osm_db["alerts"]
267 for alert in healing_alerts:
268 logger.info(f"Storing healing alert in MongoDB: {alert}")
269 alerts.insert_one(alert)
270 for alert in scaling_alerts:
271 logger.info(f"Storing scaling alert in MongoDB: {alert}")
272 alerts.insert_one(alert)
273
274 # Delete old alarms collections
275 logger.info("Deleting alarms and alarms_action collections")
276 alarms = osm_db["alarms"]
277 alarms.drop()
278 alarms_action = osm_db["alarms_action"]
279 alarms_action.drop()
280
281
282 @staticmethod
283 def upgrade(mongo_uri):
284 """Upgrade alerts in MongoDB."""
285 logger.info("Entering in MongoUpgrade1214.upgrade function")
286 myclient = MongoClient(mongo_uri)
287 osm_db = myclient["osm"]
288 MongoUpgrade1214._migrate_alerts(osm_db)
289
290
291 class MongoUpgrade1012:
292 """Upgrade MongoDB Database from OSM v10 to v12."""
293
294 @staticmethod
295 def _remove_namespace_from_k8s(nsrs, nsr):
296 namespace = "kube-system:"
297 if nsr["_admin"].get("deployed"):
298 k8s_list = []
299 for k8s in nsr["_admin"]["deployed"].get("K8s"):
300 if k8s.get("k8scluster-uuid"):
301 k8s["k8scluster-uuid"] = k8s["k8scluster-uuid"].replace(namespace, "", 1)
302 k8s_list.append(k8s)
303 myquery = {"_id": nsr["_id"]}
304 nsrs.update_one(myquery, {"$set": {"_admin.deployed.K8s": k8s_list}})
305
306 @staticmethod
307 def _update_nsr(osm_db):
308 """Update nsr.
309
310 Add vim_message = None if it does not exist.
311 Remove "namespace:" from k8scluster-uuid.
312 """
313 if "nsrs" not in osm_db.list_collection_names():
314 return
315 logger.info("Entering in MongoUpgrade1012._update_nsr function")
316
317 nsrs = osm_db["nsrs"]
318 for nsr in nsrs.find():
319 logger.debug(f"Updating {nsr['_id']} nsr")
320 for key, values in nsr.items():
321 if isinstance(values, list):
322 item_list = []
323 for value in values:
324 if isinstance(value, dict) and value.get("vim_info"):
325 index = list(value["vim_info"].keys())[0]
326 if not value["vim_info"][index].get("vim_message"):
327 value["vim_info"][index]["vim_message"] = None
328 item_list.append(value)
329 myquery = {"_id": nsr["_id"]}
330 nsrs.update_one(myquery, {"$set": {key: item_list}})
331 MongoUpgrade1012._remove_namespace_from_k8s(nsrs, nsr)
332
333 @staticmethod
334 def _update_vnfr(osm_db):
335 """Update vnfr.
336
337 Add vim_message to vdur if it does not exist.
338 Copy content of interfaces into interfaces_backup.
339 """
340 if "vnfrs" not in osm_db.list_collection_names():
341 return
342 logger.info("Entering in MongoUpgrade1012._update_vnfr function")
343 mycol = osm_db["vnfrs"]
344 for vnfr in mycol.find():
345 logger.debug(f"Updating {vnfr['_id']} vnfr")
346 vdur_list = []
347 for vdur in vnfr["vdur"]:
348 if vdur.get("vim_info"):
349 index = list(vdur["vim_info"].keys())[0]
350 if not vdur["vim_info"][index].get("vim_message"):
351 vdur["vim_info"][index]["vim_message"] = None
352 if vdur["vim_info"][index].get(
353 "interfaces", "Not found"
354 ) != "Not found" and not vdur["vim_info"][index].get("interfaces_backup"):
355 vdur["vim_info"][index]["interfaces_backup"] = vdur["vim_info"][index][
356 "interfaces"
357 ]
358 vdur_list.append(vdur)
359 myquery = {"_id": vnfr["_id"]}
360 mycol.update_one(myquery, {"$set": {"vdur": vdur_list}})
361
362 @staticmethod
363 def _update_k8scluster(osm_db):
364 """Remove namespace from helm-chart and helm-chart-v3 id."""
365 if "k8sclusters" not in osm_db.list_collection_names():
366 return
367 logger.info("Entering in MongoUpgrade1012._update_k8scluster function")
368 namespace = "kube-system:"
369 k8sclusters = osm_db["k8sclusters"]
370 for k8scluster in k8sclusters.find():
371 if k8scluster["_admin"].get("helm-chart") and k8scluster["_admin"]["helm-chart"].get(
372 "id"
373 ):
374 if k8scluster["_admin"]["helm-chart"]["id"].startswith(namespace):
375 k8scluster["_admin"]["helm-chart"]["id"] = k8scluster["_admin"]["helm-chart"][
376 "id"
377 ].replace(namespace, "", 1)
378 if k8scluster["_admin"].get("helm-chart-v3") and k8scluster["_admin"][
379 "helm-chart-v3"
380 ].get("id"):
381 if k8scluster["_admin"]["helm-chart-v3"]["id"].startswith(namespace):
382 k8scluster["_admin"]["helm-chart-v3"]["id"] = k8scluster["_admin"][
383 "helm-chart-v3"
384 ]["id"].replace(namespace, "", 1)
385 myquery = {"_id": k8scluster["_id"]}
386 k8sclusters.update_one(myquery, {"$set": k8scluster})
387
388 @staticmethod
389 def upgrade(mongo_uri):
390 """Upgrade nsr, vnfr and k8scluster in DB."""
391 logger.info("Entering in MongoUpgrade1012.upgrade function")
392 myclient = MongoClient(mongo_uri)
393 osm_db = myclient["osm"]
394 MongoUpgrade1012._update_nsr(osm_db)
395 MongoUpgrade1012._update_vnfr(osm_db)
396 MongoUpgrade1012._update_k8scluster(osm_db)
397
398
399 class MongoUpgrade910:
400 """Upgrade MongoDB Database from OSM v9 to v10."""
401
402 @staticmethod
403 def upgrade(mongo_uri):
404 """Add parameter alarm status = OK if not found in alarms collection."""
405 myclient = MongoClient(mongo_uri)
406 osm_db = myclient["osm"]
407 collist = osm_db.list_collection_names()
408
409 if "alarms" in collist:
410 mycol = osm_db["alarms"]
411 for x in mycol.find():
412 if not x.get("alarm_status"):
413 myquery = {"_id": x["_id"]}
414 mycol.update_one(myquery, {"$set": {"alarm_status": "ok"}})
415
416
417 class MongoPatch1837:
418 """Patch Bug 1837 on MongoDB."""
419
420 @staticmethod
421 def _update_nslcmops_params(osm_db):
422 """Updates the nslcmops collection to change the additional params to a string."""
423 logger.info("Entering in MongoPatch1837._update_nslcmops_params function")
424 if "nslcmops" in osm_db.list_collection_names():
425 nslcmops = osm_db["nslcmops"]
426 for nslcmop in nslcmops.find():
427 if nslcmop.get("operationParams"):
428 if nslcmop["operationParams"].get("additionalParamsForVnf") and isinstance(
429 nslcmop["operationParams"].get("additionalParamsForVnf"), list
430 ):
431 string_param = json.dumps(
432 nslcmop["operationParams"]["additionalParamsForVnf"]
433 )
434 myquery = {"_id": nslcmop["_id"]}
435 nslcmops.update_one(
436 myquery,
437 {
438 "$set": {
439 "operationParams": {"additionalParamsForVnf": string_param}
440 }
441 },
442 )
443 elif nslcmop["operationParams"].get("primitive_params") and isinstance(
444 nslcmop["operationParams"].get("primitive_params"), dict
445 ):
446 string_param = json.dumps(nslcmop["operationParams"]["primitive_params"])
447 myquery = {"_id": nslcmop["_id"]}
448 nslcmops.update_one(
449 myquery,
450 {"$set": {"operationParams": {"primitive_params": string_param}}},
451 )
452
453 @staticmethod
454 def _update_vnfrs_params(osm_db):
455 """Updates the vnfrs collection to change the additional params to a string."""
456 logger.info("Entering in MongoPatch1837._update_vnfrs_params function")
457 if "vnfrs" in osm_db.list_collection_names():
458 mycol = osm_db["vnfrs"]
459 for vnfr in mycol.find():
460 if vnfr.get("kdur"):
461 kdur_list = []
462 for kdur in vnfr["kdur"]:
463 if kdur.get("additionalParams") and not isinstance(
464 kdur["additionalParams"], str
465 ):
466 kdur["additionalParams"] = json.dumps(kdur["additionalParams"])
467 kdur_list.append(kdur)
468 myquery = {"_id": vnfr["_id"]}
469 mycol.update_one(
470 myquery,
471 {"$set": {"kdur": kdur_list}},
472 )
473 vnfr["kdur"] = kdur_list
474
475 @staticmethod
476 def patch(mongo_uri):
477 """Updates the database to change the additional params from dict to a string."""
478 logger.info("Entering in MongoPatch1837.patch function")
479 myclient = MongoClient(mongo_uri)
480 osm_db = myclient["osm"]
481 MongoPatch1837._update_nslcmops_params(osm_db)
482 MongoPatch1837._update_vnfrs_params(osm_db)
483
484
485 MONGODB_UPGRADE_FUNCTIONS = {
486 "9": {"10": [MongoUpgrade910.upgrade]},
487 "10": {"12": [MongoUpgrade1012.upgrade]},
488 "12": {"14": [MongoUpgrade1214.upgrade]},
489 }
490 MYSQL_UPGRADE_FUNCTIONS = {}
491 BUG_FIXES = {
492 1837: MongoPatch1837.patch,
493 }
494
495
496 class MongoUpgrade:
497 """Upgrade MongoDB Database."""
498
499 def __init__(self, mongo_uri):
500 self.mongo_uri = mongo_uri
501
502 def upgrade(self, current, target):
503 """Validates the upgrading path and upgrades the DB."""
504 self._validate_upgrade(current, target)
505 for function in MONGODB_UPGRADE_FUNCTIONS.get(current)[target]:
506 function(self.mongo_uri)
507
508 def _validate_upgrade(self, current, target):
509 """Check if the upgrade path chosen is possible."""
510 logger.info("Validating the upgrade path")
511 if current not in MONGODB_UPGRADE_FUNCTIONS:
512 raise Exception(f"cannot upgrade from {current} version.")
513 if target not in MONGODB_UPGRADE_FUNCTIONS[current]:
514 raise Exception(f"cannot upgrade from version {current} to {target}.")
515
516 def apply_patch(self, bug_number: int) -> None:
517 """Checks the bug-number and applies the fix in the database."""
518 if bug_number not in BUG_FIXES:
519 raise Exception(f"There is no patch for bug {bug_number}")
520 patch_function = BUG_FIXES[bug_number]
521 patch_function(self.mongo_uri)
522
523
524 class MysqlUpgrade:
525 """Upgrade Mysql Database."""
526
527 def __init__(self, mysql_uri):
528 self.mysql_uri = mysql_uri
529
530 def upgrade(self, current, target):
531 """Validates the upgrading path and upgrades the DB."""
532 self._validate_upgrade(current, target)
533 for function in MYSQL_UPGRADE_FUNCTIONS[current][target]:
534 function(self.mysql_uri)
535
536 def _validate_upgrade(self, current, target):
537 """Check if the upgrade path chosen is possible."""
538 logger.info("Validating the upgrade path")
539 if current not in MYSQL_UPGRADE_FUNCTIONS:
540 raise Exception(f"cannot upgrade from {current} version.")
541 if target not in MYSQL_UPGRADE_FUNCTIONS[current]:
542 raise Exception(f"cannot upgrade from version {current} to {target}.")