nsr = self.common_db.get_one("nsrs", {"id": nsr_id})
return nsr
+ def get_vnfds(self):
+ return self.common_db.get_list("vnfds")
+
+ def get_monitoring_vnfds(self):
+ return self.common_db.get_list(
+ "vnfds", {"vdu.monitoring-parameter": {"$exists": "true"}}
+ )
+
def decrypt_vim_password(self, vim_password: str, schema_version: str, vim_id: str):
return self.common_db.decrypt(vim_password, schema_version, vim_id)
+ def decrypt_sdnc_password(
+ self, sdnc_password: str, schema_version: str, sdnc_id: str
+ ):
+ return self.common_db.decrypt(sdnc_password, schema_version, sdnc_id)
+
def get_vim_accounts(self):
return self.common_db.get_list("vim_accounts")
)
return vim_account
- def get_alert(self, nsr_id: str, vnf_member_index: str, vdu_name: str):
+ def get_sdnc_accounts(self):
+ return self.common_db.get_list("sdns")
+
+ def get_sdnc_account(self, sdnc_account_id: str) -> dict:
+ sdnc_account = self.common_db.get_one("sdns", {"_id": sdnc_account_id})
+ sdnc_account["password"] = self.decrypt_vim_password(
+ sdnc_account["password"], sdnc_account["schema_version"], sdnc_account_id
+ )
+ return sdnc_account
+
+ def get_alert(
+ self,
+ nsr_id: str,
+ vnf_member_index: str,
+ vdu_id: str,
+ vdu_name: str,
+ action_type: str,
+ ):
+ q_filter = {"action_type": action_type}
+ if nsr_id:
+ q_filter["tags.ns_id"] = nsr_id
+ if vnf_member_index:
+ q_filter["tags.vnf_member_index"] = vnf_member_index
+ if vdu_id:
+ q_filter["tags.vdu_id"] = vdu_id
+ if vdu_name:
+ q_filter["tags.vdu_name"] = vdu_name
alert = self.common_db.get_one(
- "alerts",
- {
- "tags.ns_id": nsr_id,
- "tags.vnf_member_index": vnf_member_index,
- "tags.vdu_name": vdu_name,
- },
+ table="alerts", q_filter=q_filter, fail_on_empty=False
)
return alert
def create_nslcmop(self, nslcmop: dict):
self.common_db.create("nslcmops", nslcmop)
+
+ def get_nslcmop(self, nsr_id: str, operation_type: str, since: str):
+ q_filter = {}
+ if nsr_id:
+ q_filter["nsInstanceId"] = nsr_id
+ if operation_type:
+ q_filter["lcmOperationType"] = operation_type
+ if since:
+ q_filter["startTime"] = {"$gt": since}
+ ops = self.common_db.get_list(table="nslcmops", q_filter=q_filter)
+ return ops