X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcore%2Fcommon_db.py;h=df8db6079d7626985021a1d2d23770be463f5e02;hb=refs%2Fchanges%2F03%2F10803%2F1;hp=aa6538831d251fa806f69123911e4296147f9a7b;hpb=a2eeb474200b8f9ebcaee6fa68fe52b6e1a5e337;p=osm%2FMON.git diff --git a/osm_mon/core/common_db.py b/osm_mon/core/common_db.py index aa65388..df8db60 100644 --- a/osm_mon/core/common_db.py +++ b/osm_mon/core/common_db.py @@ -31,35 +31,38 @@ from osm_mon.core.models import Alarm class CommonDbClient: def __init__(self, config: Config): - if config.get('database', 'driver') == "mongo": + if config.get("database", "driver") == "mongo": self.common_db = dbmongo.DbMongo() - elif config.get('database', 'driver') == "memory": + elif config.get("database", "driver") == "memory": self.common_db = dbmemory.DbMemory() else: - raise Exception("Unknown database driver {}".format(config.get('section', 'driver'))) + raise Exception( + "Unknown database driver {}".format(config.get("section", "driver")) + ) self.common_db.db_connect(config.get("database")) def get_vnfr(self, nsr_id: str, member_index: int): - vnfr = self.common_db.get_one("vnfrs", - {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)}) + vnfr = self.common_db.get_one( + "vnfrs", {"nsr-id-ref": nsr_id, "member-vnf-index-ref": str(member_index)} + ) return vnfr def get_vnfrs(self, nsr_id: str = None, vim_account_id: str = None): if nsr_id and vim_account_id: raise NotImplementedError("Only one filter is currently supported") if nsr_id: - vnfrs = [self.get_vnfr(nsr_id, member['member-vnf-index']) for member in - self.get_nsr(nsr_id)['nsd']['constituent-vnfd']] + vnfrs = [ + self.get_vnfr(nsr_id, member["member-vnf-index"]) + for member in self.get_nsr(nsr_id)["nsd"]["constituent-vnfd"] + ] elif vim_account_id: - vnfrs = self.common_db.get_list("vnfrs", - {"vim-account-id": vim_account_id}) + vnfrs = self.common_db.get_list("vnfrs", {"vim-account-id": vim_account_id}) else: - vnfrs = self.common_db.get_list('vnfrs') + vnfrs = self.common_db.get_list("vnfrs") return vnfrs def get_vnfd(self, vnfd_id: str): - vnfd = self.common_db.get_one("vnfds", - {"_id": vnfd_id}) + vnfd = self.common_db.get_one("vnfds", {"_id": vnfd_id}) return vnfd def get_vnfd_by_id(self, vnfd_id: str, filter: dict = {}): @@ -76,92 +79,104 @@ class CommonDbClient: return None def get_nsrs(self): - return self.common_db.get_list('nsrs') + return self.common_db.get_list("nsrs") def get_nsr(self, nsr_id: str): - nsr = self.common_db.get_one("nsrs", - {"id": nsr_id}) + nsr = self.common_db.get_one("nsrs", {"id": nsr_id}) return nsr def get_nslcmop(self, nslcmop_id): - nslcmop = self.common_db.get_one("nslcmops", - {"_id": nslcmop_id}) + nslcmop = self.common_db.get_one("nslcmops", {"_id": nslcmop_id}) return nslcmop def get_vdur(self, nsr_id, member_index, vdur_name): vnfr = self.get_vnfr(nsr_id, member_index) - for vdur in vnfr['vdur']: - if vdur['name'] == vdur_name: + for vdur in vnfr["vdur"]: + if vdur["name"] == vdur_name: return vdur - raise ValueError('vdur not found for nsr-id {}, member_index {} and vdur_name {}'.format(nsr_id, member_index, - vdur_name)) + raise ValueError( + "vdur not found for nsr-id {}, member_index {} and vdur_name {}".format( + nsr_id, member_index, vdur_name + ) + ) def decrypt_vim_password(self, vim_password: str, schema_version: str, vim_id: str): return self.common_db.decrypt(vim_password, schema_version, vim_id) - def decrypt_sdnc_password(self, sdnc_password: str, schema_version: str, sdnc_id: str): + def decrypt_sdnc_password( + self, sdnc_password: str, schema_version: str, sdnc_id: str + ): return self.common_db.decrypt(sdnc_password, schema_version, sdnc_id) def get_vim_account_id(self, nsr_id: str, vnf_member_index: int) -> str: vnfr = self.get_vnfr(nsr_id, vnf_member_index) - return vnfr['vim-account-id'] + return vnfr["vim-account-id"] def get_vim_accounts(self): - return self.common_db.get_list('vim_accounts') + return self.common_db.get_list("vim_accounts") def get_vim_account(self, vim_account_id: str) -> dict: - vim_account = self.common_db.get_one('vim_accounts', {"_id": vim_account_id}) - vim_account['vim_password'] = self.decrypt_vim_password(vim_account['vim_password'], - vim_account['schema_version'], - vim_account_id) + vim_account = self.common_db.get_one("vim_accounts", {"_id": vim_account_id}) + vim_account["vim_password"] = self.decrypt_vim_password( + vim_account["vim_password"], vim_account["schema_version"], vim_account_id + ) vim_config_encrypted_dict = { "1.1": ("admin_password", "nsx_password", "vcenter_password"), - "default": ("admin_password", "nsx_password", "vcenter_password", "vrops_password") + "default": ( + "admin_password", + "nsx_password", + "vcenter_password", + "vrops_password", + ), } - vim_config_encrypted = vim_config_encrypted_dict['default'] - if vim_account['schema_version'] in vim_config_encrypted_dict.keys(): - vim_config_encrypted = vim_config_encrypted_dict[vim_account['schema_version']] - if 'config' in vim_account: - for key in vim_account['config']: + vim_config_encrypted = vim_config_encrypted_dict["default"] + if vim_account["schema_version"] in vim_config_encrypted_dict.keys(): + vim_config_encrypted = vim_config_encrypted_dict[ + vim_account["schema_version"] + ] + if "config" in vim_account: + for key in vim_account["config"]: if key in vim_config_encrypted: - vim_account['config'][key] = self.decrypt_vim_password(vim_account['config'][key], - vim_account['schema_version'], - vim_account_id) + vim_account["config"][key] = self.decrypt_vim_password( + vim_account["config"][key], + vim_account["schema_version"], + vim_account_id, + ) return vim_account def get_sdncs(self): - return self.common_db.get_list('sdns') + return self.common_db.get_list("sdns") def get_sdnc(self, sdnc_id: str): - return self.common_db.get_one('sdns', {'_id': sdnc_id}) + return self.common_db.get_one("sdns", {"_id": sdnc_id}) def get_projects(self): - return self.common_db.get_list('projects') + return self.common_db.get_list("projects") def get_project(self, project_id: str): - return self.common_db.get_one('projects', {'_id': project_id}) + return self.common_db.get_one("projects", {"_id": project_id}) def create_alarm(self, alarm: Alarm): - return self.common_db.create('alarms', alarm.to_dict()) + return self.common_db.create("alarms", alarm.to_dict()) def delete_alarm(self, alarm_uuid: str): - return self.common_db.del_one('alarms', {'uuid': alarm_uuid}) + return self.common_db.del_one("alarms", {"uuid": alarm_uuid}) def get_alarms(self) -> List[Alarm]: alarms = [] - alarm_dicts = self.common_db.get_list('alarms') + alarm_dicts = self.common_db.get_list("alarms") for alarm_dict in alarm_dicts: alarms.append(Alarm.from_dict(alarm_dict)) return alarms def get_user(self, username: str): - return self.common_db.get_one('users', {'username': username}) + return self.common_db.get_one("users", {"username": username}) def get_user_by_id(self, userid: str): - return self.common_db.get_one('users', {'_id': userid}) + return self.common_db.get_one("users", {"_id": userid}) def get_role_by_name(self, name: str): - return self.common_db.get_one('roles', {'name': name}) + return self.common_db.get_one("roles", {"name": name}) def get_role_by_id(self, role_id: str): - return self.common_db.get_one('roles', {'_id': role_id}) + return self.common_db.get_one("roles", {"_id": role_id})