from osm_nbi.authconn_keystone import AuthconnKeystone
from osm_nbi.authconn_internal import AuthconnInternal
from osm_nbi.authconn_tacacs import AuthconnTacacs
+from osm_nbi.utils import cef_event, cef_event_builder
from osm_common import dbmemory, dbmongo, msglocal, msgkafka
from osm_common.dbbase import DbException
from osm_nbi.validation import is_valid_uuid
self.valid_query_string = valid_query_string
self.system_admin_role_id = None # system_role id
self.test_project_id = None # test_project_id
+ self.cef_logger = None
def start(self, config):
"""
:param config: dictionary containing the relevant parameters for this object.
"""
self.config = config
+ self.cef_logger = cef_event_builder(config["authentication"])
try:
if not self.db:
(r for r in records if r["name"] == "system_admin"), None
):
with open(self.roles_to_operations_file, "r") as stream:
- roles_to_operations_yaml = yaml.load(stream, Loader=yaml.Loader)
+ roles_to_operations_yaml = yaml.safe_load(stream)
role_names = []
for role_with_operations in roles_to_operations_yaml["roles"]:
item_id,
)
self.logger.info("RBAC_auth: {}".format(RBAC_auth))
+ if RBAC_auth:
+ cef_event(
+ self.cef_logger,
+ {
+ "name": "System Access",
+ "sourceUserName": token_info.get("username"),
+ "message": "Accessing account with system privileges, Project={}".format(
+ token_info.get("project_name")
+ ),
+ },
+ )
+ self.logger.info("{}".format(self.cef_logger))
token_info["allow_show_user_project_role"] = RBAC_auth
return token_info