OSM Internal Authentication Backend and leverages the RBAC model
"""
-__author__ = "Pedro de la Cruz Ramos <pdelacruzramos@altran.com>"
+__author__ = "Pedro de la Cruz Ramos <pdelacruzramos@altran.com>, " \
+ "Alfonso Tierno <alfonso.tiernosepulveda@telefoncia.com"
__date__ = "$06-jun-2019 11:16:08$"
-from authconn import Authconn, AuthException # , AuthconnOperationException
+from osm_nbi.authconn import Authconn, AuthException # , AuthconnOperationException
from osm_common.dbbase import DbException
-from base_topic import BaseTopic
+from osm_nbi.base_topic import BaseTopic
import logging
import re
self.logger = logging.getLogger("nbi.authenticator.internal")
- # Get Configuration
- # self.xxx = config.get("xxx", "default")
-
self.db = db
self.token_cache = token_cache
else:
raise
except AuthException:
- if self.config["global"].get("test.user_not_authorized"):
- return {"id": "fake-token-id-for-test",
- "project_id": self.config["global"].get("test.project_not_authorized", "admin"),
- "username": self.config["global"]["test.user_not_authorized"], "admin": True}
- else:
- raise
+ raise
except Exception:
self.logger.exception("Error during token validation using internal backend")
raise AuthException("Error during token validation using internal backend",
filt["username"] = filt["name"]
del filt["name"]
users = self.db.get_list("users", filt)
+ project_id_name = {}
+ role_id_name = {}
for user in users:
- projects = []
- projs_with_roles = []
- prms = user.get("project_role_mappings", [])
- for prm in prms:
- if prm["project"] not in projects:
- projects.append(prm["project"])
- for project in projects:
- roles = []
- roles_for_proj = []
+ prms = user.get("project_role_mappings")
+ projects = user.get("projects")
+ if prms:
+ projects = []
+ # add project_name and role_name. Generate projects for backward compatibility
for prm in prms:
- if prm["project"] == project and prm["role"] not in roles:
- role = prm["role"]
- roles.append(role)
- rl = self.db.get_one("roles", {BaseTopic.id_field("roles", role): role})
- roles_for_proj.append({"name": rl["name"], "_id": rl["_id"], "id": rl["_id"]})
- try:
- pr = self.db.get_one("projects", {BaseTopic.id_field("projects", project): project})
- projs_with_roles.append({"name": pr["name"], "_id": pr["_id"], "id": pr["_id"],
- "roles": roles_for_proj})
- except Exception as e:
- self.logger.exception("Error during user listing using internal backend: {}".format(e))
- user["projects"] = projs_with_roles
- if "project_role_mappings" in user:
- del user["project_role_mappings"]
+ project_id = prm["project"]
+ if project_id not in project_id_name:
+ pr = self.db.get_one("projects", {BaseTopic.id_field("projects", project_id): project_id},
+ fail_on_empty=False)
+ project_id_name[project_id] = pr["name"] if pr else None
+ prm["project_name"] = project_id_name[project_id]
+ if prm["project_name"] not in projects:
+ projects.append(prm["project_name"])
+
+ role_id = prm["role"]
+ if role_id not in role_id_name:
+ role = self.db.get_one("roles", {BaseTopic.id_field("roles", role_id): role_id},
+ fail_on_empty=False)
+ role_id_name[role_id] = role["name"] if role else None
+ prm["role_name"] = role_id_name[role_id]
+ user["projects"] = projects # for backward compatibility
+ elif projects:
+ # user created with an old version. Create a project_role mapping with role project_admin
+ user["project_role_mappings"] = []
+ role = self.db.get_one("roles", {BaseTopic.id_field("roles", "project_admin"): "project_admin"})
+ for p_id_name in projects:
+ pr = self.db.get_one("projects", {BaseTopic.id_field("projects", p_id_name): p_id_name})
+ prm = {"project": pr["_id"],
+ "project_name": pr["name"],
+ "role_name": "project_admin",
+ "role": role["_id"]
+ }
+ user["project_role_mappings"].append(prm)
+ else:
+ user["projects"] = []
+ user["project_role_mappings"] = []
+
return users
def get_project_list(self, filter_q={}):