from osm_nbi.authconn_keystone import AuthconnKeystone
from osm_nbi.authconn_internal import AuthconnInternal
from osm_nbi.authconn_tacacs import AuthconnTacacs
+from osm_nbi.utils import cef_event, cef_event_builder
from osm_common import dbmemory, dbmongo, msglocal, msgkafka
from osm_common.dbbase import DbException
from osm_nbi.validation import is_valid_uuid
self.valid_query_string = valid_query_string
self.system_admin_role_id = None # system_role id
self.test_project_id = None # test_project_id
+ self.cef_logger = None
def start(self, config):
"""
:param config: dictionary containing the relevant parameters for this object.
"""
self.config = config
+ self.cef_logger = cef_event_builder(config["authentication"])
try:
if not self.db:
user_desc = {
"username": "admin",
"password": "admin",
- "_admin": {"created": now, "modified": now},
+ "_admin": {"created": now, "modified": now, "user_status": "always-active"},
}
if project_id:
pid = project_id
(r for r in records if r["name"] == "system_admin"), None
):
with open(self.roles_to_operations_file, "r") as stream:
- roles_to_operations_yaml = yaml.load(stream, Loader=yaml.Loader)
+ roles_to_operations_yaml = yaml.safe_load(stream)
role_names = []
for role_with_operations in roles_to_operations_yaml["roles"]:
elif auth_list[0].lower() == "basic":
user_passwd64 = auth_list[-1]
if not token:
- if cherrypy.session.get("Authorization"):
+ if cherrypy.session.get("Authorization"): # pylint: disable=E1101
# 2. Try using session before request a new token. If not, basic authentication will generate
- token = cherrypy.session.get("Authorization")
+ token = cherrypy.session.get( # pylint: disable=E1101
+ "Authorization"
+ )
if token == "logout":
token = None # force Unauthorized response to insert user password again
elif user_passwd64 and cherrypy.request.config.get(
except Exception:
pass
outdata = self.new_token(
- None, {"username": user, "password": passwd}
+ None, {"username": user, "password": passwd}, None
)
token = outdata["_id"]
- cherrypy.session["Authorization"] = token
+ cherrypy.session["Authorization"] = token # pylint: disable=E1101
if not token:
raise AuthException(
item_id,
)
self.logger.info("RBAC_auth: {}".format(RBAC_auth))
+ if RBAC_auth:
+ cef_event(
+ self.cef_logger,
+ {
+ "name": "System Access",
+ "sourceUserName": token_info.get("username"),
+ "message": "Accessing account with system privileges, Project={}".format(
+ token_info.get("project_name")
+ ),
+ },
+ )
+ self.logger.info("{}".format(self.cef_logger))
token_info["allow_show_user_project_role"] = RBAC_auth
return token_info
except AuthException as e:
if not isinstance(e, AuthExceptionUnauthorized):
- if cherrypy.session.get("Authorization"):
- del cherrypy.session["Authorization"]
+ if cherrypy.session.get("Authorization"): # pylint: disable=E1101
+ del cherrypy.session["Authorization"] # pylint: disable=E1101
cherrypy.response.headers[
"WWW-Authenticate"
] = 'Bearer realm="{}"'.format(e)
This method will check for password expiry of the user
:param outdata: user token information
"""
- user_content = None
- detail = {}
+ user_list = None
present_time = time()
user = outdata["username"]
- if self.config["authentication"].get("pwd_expiry_check"):
- user_content = self.db.get_list("users", {"username": user})[0]
- if not user_content.get("username") == "admin":
- user_content["_admin"]["modified_time"] = present_time
- if user_content.get("_admin").get("expire_time"):
- expire_time = user_content["_admin"]["expire_time"]
- else:
- expire_time = present_time
- uid = user_content["_id"]
- self.db.set_one("users", {"_id": uid}, user_content)
- if not present_time < expire_time:
- return True
+ if self.config["authentication"].get("user_management"):
+ user_list = self.db.get_list("users", {"username": user})
+ if user_list:
+ user_content = user_list[0]
+ if not user_content.get("username") == "admin":
+ user_content["_admin"]["modified"] = present_time
+ if user_content.get("_admin").get("password_expire_time"):
+ password_expire_time = user_content["_admin"][
+ "password_expire_time"
+ ]
+ else:
+ password_expire_time = present_time
+ uid = user_content["_id"]
+ self.db.set_one("users", {"_id": uid}, user_content)
+ if not present_time < password_expire_time:
+ return True
else:
pass