Code Coverage

Cobertura Coverage Report > osm_nbi >

auth.py

Trend

Classes0%
 
Lines0%
 
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
auth.py
0%
0/1
0%
0/398
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
auth.py
0%
0/398
N/A

Source

osm_nbi/auth.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 #         http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
20 ##
21
22
23 0 """
24 Authenticator is responsible for authenticating the users,
25 create the tokens unscoped and scoped, retrieve the role
26 list inside the projects that they are inserted
27 """
28
29 0 __author__ = "Eduardo Sousa <esousa@whitestack.com>; Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
30 0 __date__ = "$27-jul-2018 23:59:59$"
31
32 0 import cherrypy
33 0 import logging
34 0 import yaml
35 0 from base64 import standard_b64decode
36 0 from copy import deepcopy
37
38 # from functools import reduce
39 0 from http import HTTPStatus
40 0 from time import time
41 0 from os import path
42
43 0 from osm_nbi.authconn import AuthException, AuthconnException, AuthExceptionUnauthorized
44 0 from osm_nbi.authconn_keystone import AuthconnKeystone
45 0 from osm_nbi.authconn_internal import AuthconnInternal
46 0 from osm_nbi.authconn_tacacs import AuthconnTacacs
47 0 from osm_common import dbmemory, dbmongo, msglocal, msgkafka
48 0 from osm_common.dbbase import DbException
49 0 from osm_nbi.validation import is_valid_uuid
50 0 from itertools import chain
51 0 from uuid import uuid4
52
53
54 0 class Authenticator:
55     """
56     This class should hold all the mechanisms for User Authentication and
57     Authorization. Initially it should support Openstack Keystone as a
58     backend through a plugin model where more backends can be added and a
59     RBAC model to manage permissions on operations.
60     This class must be threading safe
61     """
62
63 0     periodin_db_pruning = (
64         60 * 30
65     )  # for the internal backend only. every 30 minutes expired tokens will be pruned
66 0     token_limit = 500  # when reached, the token cache will be cleared
67
68 0     def __init__(self, valid_methods, valid_query_string):
69         """
70         Authenticator initializer. Setup the initial state of the object,
71         while it waits for the config dictionary and database initialization.
72         """
73 0         self.backend = None
74 0         self.config = None
75 0         self.db = None
76 0         self.msg = None
77 0         self.tokens_cache = dict()
78 0         self.next_db_prune_time = (
79             0  # time when next cleaning of expired tokens must be done
80         )
81 0         self.roles_to_operations_file = None
82         # self.roles_to_operations_table = None
83 0         self.resources_to_operations_mapping = {}
84 0         self.operation_to_allowed_roles = {}
85 0         self.logger = logging.getLogger("nbi.authenticator")
86 0         self.role_permissions = []
87 0         self.valid_methods = valid_methods
88 0         self.valid_query_string = valid_query_string
89 0         self.system_admin_role_id = None  # system_role id
90 0         self.test_project_id = None  # test_project_id
91
92 0     def start(self, config):
93         """
94         Method to configure the Authenticator object. This method should be called
95         after object creation. It is responsible by initializing the selected backend,
96         as well as the initialization of the database connection.
97
98         :param config: dictionary containing the relevant parameters for this object.
99         """
100 0         self.config = config
101
102 0         try:
103 0             if not self.db:
104 0                 if config["database"]["driver"] == "mongo":
105 0                     self.db = dbmongo.DbMongo()
106 0                     self.db.db_connect(config["database"])
107 0                 elif config["database"]["driver"] == "memory":
108 0                     self.db = dbmemory.DbMemory()
109 0                     self.db.db_connect(config["database"])
110                 else:
111 0                     raise AuthException(
112                         "Invalid configuration param '{}' at '[database]':'driver'".format(
113                             config["database"]["driver"]
114                         )
115                     )
116 0             if not self.msg:
117 0                 if config["message"]["driver"] == "local":
118 0                     self.msg = msglocal.MsgLocal()
119 0                     self.msg.connect(config["message"])
120 0                 elif config["message"]["driver"] == "kafka":
121 0                     self.msg = msgkafka.MsgKafka()
122 0                     self.msg.connect(config["message"])
123                 else:
124 0                     raise AuthException(
125                         "Invalid configuration param '{}' at '[message]':'driver'".format(
126                             config["message"]["driver"]
127                         )
128                     )
129 0             if not self.backend:
130 0                 if config["authentication"]["backend"] == "keystone":
131 0                     self.backend = AuthconnKeystone(
132                         self.config["authentication"], self.db, self.role_permissions
133                     )
134 0                 elif config["authentication"]["backend"] == "internal":
135 0                     self.backend = AuthconnInternal(
136                         self.config["authentication"], self.db, self.role_permissions
137                     )
138 0                     self._internal_tokens_prune("tokens")
139 0                 elif config["authentication"]["backend"] == "tacacs":
140 0                     self.backend = AuthconnTacacs(
141                         self.config["authentication"], self.db, self.role_permissions
142                     )
143 0                     self._internal_tokens_prune("tokens_tacacs")
144                 else:
145 0                     raise AuthException(
146                         "Unknown authentication backend: {}".format(
147                             config["authentication"]["backend"]
148                         )
149                     )
150
151 0             if not self.roles_to_operations_file:
152 0                 if "roles_to_operations" in config["rbac"]:
153 0                     self.roles_to_operations_file = config["rbac"][
154                         "roles_to_operations"
155                     ]
156                 else:
157 0                     possible_paths = (
158                         __file__[: __file__.rfind("auth.py")]
159                         + "roles_to_operations.yml",
160                         "./roles_to_operations.yml",
161                     )
162 0                     for config_file in possible_paths:
163 0                         if path.isfile(config_file):
164 0                             self.roles_to_operations_file = config_file
165 0                             break
166 0                 if not self.roles_to_operations_file:
167 0                     raise AuthException(
168                         "Invalid permission configuration: roles_to_operations file missing"
169                     )
170
171             # load role_permissions
172 0             def load_role_permissions(method_dict):
173 0                 for k in method_dict:
174 0                     if k == "ROLE_PERMISSION":
175 0                         for method in chain(
176                             method_dict.get("METHODS", ()), method_dict.get("TODO", ())
177                         ):
178 0                             permission = method_dict["ROLE_PERMISSION"] + method.lower()
179 0                             if permission not in self.role_permissions:
180 0                                 self.role_permissions.append(permission)
181 0                     elif k in ("TODO", "METHODS"):
182 0                         continue
183 0                     elif method_dict[k]:
184 0                         load_role_permissions(method_dict[k])
185
186 0             load_role_permissions(self.valid_methods)
187 0             for query_string in self.valid_query_string:
188 0                 for method in ("get", "put", "patch", "post", "delete"):
189 0                     permission = query_string.lower() + ":" + method
190 0                     if permission not in self.role_permissions:
191 0                         self.role_permissions.append(permission)
192
193             # get ids of role system_admin and test project
194 0             role_system_admin = self.db.get_one(
195                 "roles", {"name": "system_admin"}, fail_on_empty=False
196             )
197 0             if role_system_admin:
198 0                 self.system_admin_role_id = role_system_admin["_id"]
199 0             test_project_name = self.config["authentication"].get(
200                 "project_not_authorized", "admin"
201             )
202 0             test_project = self.db.get_one(
203                 "projects", {"name": test_project_name}, fail_on_empty=False
204             )
205 0             if test_project:
206 0                 self.test_project_id = test_project["_id"]
207
208 0         except Exception as e:
209 0             raise AuthException(str(e))
210
211 0     def stop(self):
212 0         try:
213 0             if self.db:
214 0                 self.db.db_disconnect()
215 0         except DbException as e:
216 0             raise AuthException(str(e), http_code=e.http_code)
217
218 0     def create_admin_project(self):
219         """
220         Creates a new project 'admin' into database if it doesn't exist. Useful for initialization.
221         :return: _id identity of the 'admin' project
222         """
223
224         # projects = self.db.get_one("projects", fail_on_empty=False, fail_on_more=False)
225 0         project_desc = {"name": "admin"}
226 0         projects = self.backend.get_project_list(project_desc)
227 0         if projects:
228 0             return projects[0]["_id"]
229 0         now = time()
230 0         project_desc["_id"] = str(uuid4())
231 0         project_desc["_admin"] = {"created": now, "modified": now}
232 0         pid = self.backend.create_project(project_desc)
233 0         self.logger.info(
234             "Project '{}' created at database".format(project_desc["name"])
235         )
236 0         return pid
237
238 0     def create_admin_user(self, project_id):
239         """
240         Creates a new user admin/admin into database if database is empty. Useful for initialization
241         :return: _id identity of the inserted data, or None
242         """
243         # users = self.db.get_one("users", fail_on_empty=False, fail_on_more=False)
244 0         users = self.backend.get_user_list()
245 0         if users:
246 0             return None
247         # user_desc = {"username": "admin", "password": "admin", "projects": [project_id]}
248 0         now = time()
249 0         user_desc = {
250             "username": "admin",
251             "password": "admin",
252             "_admin": {"created": now, "modified": now},
253         }
254 0         if project_id:
255 0             pid = project_id
256         else:
257             # proj = self.db.get_one("projects", {"name": "admin"}, fail_on_empty=False, fail_on_more=False)
258 0             proj = self.backend.get_project_list({"name": "admin"})
259 0             pid = proj[0]["_id"] if proj else None
260         # role = self.db.get_one("roles", {"name": "system_admin"}, fail_on_empty=False, fail_on_more=False)
261 0         roles = self.backend.get_role_list({"name": "system_admin"})
262 0         if pid and roles:
263 0             user_desc["project_role_mappings"] = [
264                 {"project": pid, "role": roles[0]["_id"]}
265             ]
266 0         uid = self.backend.create_user(user_desc)
267 0         self.logger.info("User '{}' created at database".format(user_desc["username"]))
268 0         return uid
269
270 0     def init_db(self, target_version="1.0"):
271         """
272         Check if the database has been initialized, with at least one user. If not, create the required tables
273         and insert the predefined mappings between roles and permissions.
274
275         :param target_version: schema version that should be present in the database.
276         :return: None if OK, exception if error or version is different.
277         """
278
279 0         records = self.backend.get_role_list()
280
281         # Loading permissions to AUTH. At lease system_admin must be present.
282 0         if not records or not next(
283             (r for r in records if r["name"] == "system_admin"), None
284         ):
285 0             with open(self.roles_to_operations_file, "r") as stream:
286 0                 roles_to_operations_yaml = yaml.load(stream, Loader=yaml.Loader)
287
288 0             role_names = []
289 0             for role_with_operations in roles_to_operations_yaml["roles"]:
290                 # Verifying if role already exists. If it does, raise exception
291 0                 if role_with_operations["name"] not in role_names:
292 0                     role_names.append(role_with_operations["name"])
293                 else:
294 0                     raise AuthException(
295                         "Duplicated role name '{}' at file '{}''".format(
296                             role_with_operations["name"], self.roles_to_operations_file
297                         )
298                     )
299
300 0                 if not role_with_operations["permissions"]:
301 0                     continue
302
303 0                 for permission, is_allowed in role_with_operations[
304                     "permissions"
305                 ].items():
306 0                     if not isinstance(is_allowed, bool):
307 0                         raise AuthException(
308                             "Invalid value for permission '{}' at role '{}'; at file '{}'".format(
309                                 permission,
310                                 role_with_operations["name"],
311                                 self.roles_to_operations_file,
312                             )
313                         )
314
315                     # TODO check permission is ok
316 0                     if permission[-1] == ":":
317 0                         raise AuthException(
318                             "Invalid permission '{}' terminated in ':' for role '{}'; at file {}".format(
319                                 permission,
320                                 role_with_operations["name"],
321                                 self.roles_to_operations_file,
322                             )
323                         )
324
325 0                 if "default" not in role_with_operations["permissions"]:
326 0                     role_with_operations["permissions"]["default"] = False
327 0                 if "admin" not in role_with_operations["permissions"]:
328 0                     role_with_operations["permissions"]["admin"] = False
329
330 0                 now = time()
331 0                 role_with_operations["_admin"] = {
332                     "created": now,
333                     "modified": now,
334                 }
335
336                 # self.db.create(self.roles_to_operations_table, role_with_operations)
337 0                 try:
338 0                     self.backend.create_role(role_with_operations)
339 0                     self.logger.info(
340                         "Role '{}' created".format(role_with_operations["name"])
341                     )
342 0                 except (AuthException, AuthconnException) as e:
343 0                     if role_with_operations["name"] == "system_admin":
344 0                         raise
345 0                     self.logger.error(
346                         "Role '{}' cannot be created: {}".format(
347                             role_with_operations["name"], e
348                         )
349                     )
350
351         # Create admin project&user if required
352 0         pid = self.create_admin_project()
353 0         user_id = self.create_admin_user(pid)
354
355         # try to assign system_admin role to user admin if not any user has this role
356 0         if not user_id:
357 0             try:
358 0                 users = self.backend.get_user_list()
359 0                 roles = self.backend.get_role_list({"name": "system_admin"})
360 0                 role_id = roles[0]["_id"]
361 0                 user_with_system_admin = False
362 0                 user_admin_id = None
363 0                 for user in users:
364 0                     if not user_admin_id:
365 0                         user_admin_id = user["_id"]
366 0                     if user["username"] == "admin":
367 0                         user_admin_id = user["_id"]
368 0                     for prm in user.get("project_role_mappings", ()):
369 0                         if prm["role"] == role_id:
370 0                             user_with_system_admin = True
371 0                             break
372 0                     if user_with_system_admin:
373 0                         break
374 0                 if not user_with_system_admin:
375 0                     self.backend.update_user(
376                         {
377                             "_id": user_admin_id,
378                             "add_project_role_mappings": [
379                                 {"project": pid, "role": role_id}
380                             ],
381                         }
382                     )
383 0                     self.logger.info(
384                         "Added role system admin to user='{}' project=admin".format(
385                             user_admin_id
386                         )
387                     )
388 0             except Exception as e:
389 0                 self.logger.error(
390                     "Error in Authorization DataBase initialization: {}: {}".format(
391                         type(e).__name__, e
392                     )
393                 )
394
395 0         self.load_operation_to_allowed_roles()
396
397 0     def load_operation_to_allowed_roles(self):
398         """
399         Fills the internal self.operation_to_allowed_roles based on database role content and self.role_permissions
400         It works in a shadow copy and replace at the end to allow other threads working with the old copy
401         :return: None
402         """
403 0         permissions = {oper: [] for oper in self.role_permissions}
404         # records = self.db.get_list(self.roles_to_operations_table)
405 0         records = self.backend.get_role_list()
406
407 0         ignore_fields = ["_id", "_admin", "name", "default"]
408 0         for record in records:
409 0             if not record.get("permissions"):
410 0                 continue
411 0             record_permissions = {
412                 oper: record["permissions"].get("default", False)
413                 for oper in self.role_permissions
414             }
415 0             operations_joined = [
416                 (oper, value)
417                 for oper, value in record["permissions"].items()
418                 if oper not in ignore_fields
419             ]
420 0             operations_joined.sort(key=lambda x: x[0].count(":"))
421
422 0             for oper in operations_joined:
423 0                 match = list(
424                     filter(lambda x: x.find(oper[0]) == 0, record_permissions.keys())
425                 )
426
427 0                 for m in match:
428 0                     record_permissions[m] = oper[1]
429
430 0             allowed_operations = [k for k, v in record_permissions.items() if v is True]
431
432 0             for allowed_op in allowed_operations:
433 0                 permissions[allowed_op].append(record["name"])
434
435 0         self.operation_to_allowed_roles = permissions
436
437 0     def authorize(
438         self, role_permission=None, query_string_operations=None, item_id=None
439     ):
440 0         token = None
441 0         user_passwd64 = None
442 0         try:
443             # 1. Get token Authorization bearer
444 0             auth = cherrypy.request.headers.get("Authorization")
445 0             if auth:
446 0                 auth_list = auth.split(" ")
447 0                 if auth_list[0].lower() == "bearer":
448 0                     token = auth_list[-1]
449 0                 elif auth_list[0].lower() == "basic":
450 0                     user_passwd64 = auth_list[-1]
451 0             if not token:
452 0                 if cherrypy.session.get("Authorization"):
453                     # 2. Try using session before request a new token. If not, basic authentication will generate
454 0                     token = cherrypy.session.get("Authorization")
455 0                     if token == "logout":
456 0                         token = None  # force Unauthorized response to insert user password again
457 0                 elif user_passwd64 and cherrypy.request.config.get(
458                     "auth.allow_basic_authentication"
459                 ):
460                     # 3. Get new token from user password
461 0                     user = None
462 0                     passwd = None
463 0                     try:
464 0                         user_passwd = standard_b64decode(user_passwd64).decode()
465 0                         user, _, passwd = user_passwd.partition(":")
466 0                     except Exception:
467 0                         pass
468 0                     outdata = self.new_token(
469                         None, {"username": user, "password": passwd}
470                     )
471 0                     token = outdata["_id"]
472 0                     cherrypy.session["Authorization"] = token
473
474 0             if not token:
475 0                 raise AuthException(
476                     "Needed a token or Authorization http header",
477                     http_code=HTTPStatus.UNAUTHORIZED,
478                 )
479
480             # try to get from cache first
481 0             now = time()
482 0             token_info = self.tokens_cache.get(token)
483 0             if token_info and token_info["expires"] < now:
484                 # delete token. MUST be done with care, as another thread maybe already delete it. Do not use del
485 0                 self.tokens_cache.pop(token, None)
486 0                 token_info = None
487
488             # get from database if not in cache
489 0             if not token_info:
490 0                 token_info = self.backend.validate_token(token)
491                 # Clear cache if token limit reached
492 0                 if len(self.tokens_cache) > self.token_limit:
493 0                     self.tokens_cache.clear()
494 0                 self.tokens_cache[token] = token_info
495             # TODO add to token info remote host, port
496
497 0             if role_permission:
498 0                 RBAC_auth = self.check_permissions(
499                     token_info,
500                     cherrypy.request.method,
501                     role_permission,
502                     query_string_operations,
503                     item_id,
504                 )
505 0                 self.logger.info("RBAC_auth: {}".format(RBAC_auth))
506 0                 token_info["allow_show_user_project_role"] = RBAC_auth
507
508 0             return token_info
509 0         except AuthException as e:
510 0             if not isinstance(e, AuthExceptionUnauthorized):
511 0                 if cherrypy.session.get("Authorization"):
512 0                     del cherrypy.session["Authorization"]
513 0                 cherrypy.response.headers[
514                     "WWW-Authenticate"
515                 ] = 'Bearer realm="{}"'.format(e)
516 0             if self.config["authentication"].get("user_not_authorized"):
517 0                 return {
518                     "id": "testing-token",
519                     "_id": "testing-token",
520                     "project_id": self.test_project_id,
521                     "username": self.config["authentication"]["user_not_authorized"],
522                     "roles": [self.system_admin_role_id],
523                     "admin": True,
524                     "allow_show_user_project_role": True,
525                 }
526 0             raise
527
528 0     def new_token(self, token_info, indata, remote):
529 0         new_token_info = self.backend.authenticate(
530             credentials=indata,
531             token_info=token_info,
532         )
533
534 0         new_token_info["remote_port"] = remote.port
535 0         if not new_token_info.get("expires"):
536 0             new_token_info["expires"] = time() + 3600
537 0         if not new_token_info.get("admin"):
538 0             new_token_info["admin"] = (
539                 True if new_token_info.get("project_name") == "admin" else False
540             )
541             # TODO put admin in RBAC
542
543 0         if remote.name:
544 0             new_token_info["remote_host"] = remote.name
545 0         elif remote.ip:
546 0             new_token_info["remote_host"] = remote.ip
547
548         # TODO call self._internal_tokens_prune(now) ?
549 0         return deepcopy(new_token_info)
550
551 0     def get_token_list(self, token_info):
552 0         if self.config["authentication"]["backend"] == "internal":
553 0             return self._internal_get_token_list(token_info)
554         else:
555             # TODO: check if this can be avoided. Backend may provide enough information
556 0             return [
557                 deepcopy(token)
558                 for token in self.tokens_cache.values()
559                 if token["username"] == token_info["username"]
560             ]
561
562 0     def get_token(self, token_info, token):
563 0         if self.config["authentication"]["backend"] == "internal":
564 0             return self._internal_get_token(token_info, token)
565         else:
566             # TODO: check if this can be avoided. Backend may provide enough information
567 0             token_value = self.tokens_cache.get(token)
568 0             if not token_value:
569 0                 raise AuthException("token not found", http_code=HTTPStatus.NOT_FOUND)
570 0             if (
571                 token_value["username"] != token_info["username"]
572                 and not token_info["admin"]
573             ):
574 0                 raise AuthException(
575                     "needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED
576                 )
577 0             return token_value
578
579 0     def del_token(self, token):
580 0         try:
581 0             self.backend.revoke_token(token)
582             # self.tokens_cache.pop(token, None)
583 0             self.remove_token_from_cache(token)
584 0             return "token '{}' deleted".format(token)
585 0         except KeyError:
586 0             raise AuthException(
587                 "Token '{}' not found".format(token), http_code=HTTPStatus.NOT_FOUND
588             )
589
590 0     def check_permissions(
591         self,
592         token_info,
593         method,
594         role_permission=None,
595         query_string_operations=None,
596         item_id=None,
597     ):
598         """
599         Checks that operation has permissions to be done, base on the assigned roles to this user project
600         :param token_info: Dictionary that contains "roles" with a list of assigned roles.
601             This method fills the token_info["admin"] with True or False based on assigned tokens, if any allows admin
602             This will be used among others to hide or not the _admin content of topics
603         :param method: GET,PUT, POST, ...
604         :param role_permission: role permission name of the operation required
605         :param query_string_operations: list of possible admin query strings provided by user. It is checked that the
606             assigned role allows this query string for this method
607         :param item_id: item identifier if included in the URL, None otherwise
608         :return: True if access granted by permission rules, False if access granted by default rules (Bug 853)
609         :raises: AuthExceptionUnauthorized if access denied
610         """
611 0         self.load_operation_to_allowed_roles()
612
613 0         roles_required = self.operation_to_allowed_roles[role_permission]
614 0         roles_allowed = [role["name"] for role in token_info["roles"]]
615
616         # fills token_info["admin"] if some roles allows it
617 0         token_info["admin"] = False
618 0         for role in roles_allowed:
619 0             if role in self.operation_to_allowed_roles["admin:" + method.lower()]:
620 0                 token_info["admin"] = True
621 0                 break
622
623 0         if "anonymous" in roles_required:
624 0             return True
625 0         operation_allowed = False
626 0         for role in roles_allowed:
627 0             if role in roles_required:
628 0                 operation_allowed = True
629                 # if query_string operations, check if this role allows it
630 0                 if not query_string_operations:
631 0                     return True
632 0                 for query_string_operation in query_string_operations:
633 0                     if (
634                         role
635                         not in self.operation_to_allowed_roles[query_string_operation]
636                     ):
637 0                         break
638                 else:
639 0                     return True
640
641         # Bug 853 - Final Solution
642         # User/Project/Role whole listings are filtered elsewhere
643         # uid, pid, rid = ("user_id", "project_id", "id") if is_valid_uuid(id) else ("username", "project_name", "name")
644 0         uid = "user_id" if is_valid_uuid(item_id) else "username"
645 0         if (
646             role_permission
647             in [
648                 "projects:get",
649                 "projects:id:get",
650                 "roles:get",
651                 "roles:id:get",
652                 "users:get",
653             ]
654         ) or (role_permission == "users:id:get" and item_id == token_info[uid]):
655             # or (role_permission == "projects:id:get" and item_id == token_info[pid]) \
656             # or (role_permission == "roles:id:get" and item_id in [role[rid] for role in token_info["roles"]]):
657 0             return False
658
659 0         if not operation_allowed:
660 0             raise AuthExceptionUnauthorized("Access denied: lack of permissions.")
661         else:
662 0             raise AuthExceptionUnauthorized(
663                 "Access denied: You have not permissions to use these admin query string"
664             )
665
666 0     def get_user_list(self):
667 0         return self.backend.get_user_list()
668
669 0     def _normalize_url(self, url, method):
670         # DEPRECATED !!!
671         # Removing query strings
672 0         normalized_url = url if "?" not in url else url[: url.find("?")]
673 0         normalized_url_splitted = normalized_url.split("/")
674 0         parameters = {}
675
676 0         filtered_keys = [
677             key
678             for key in self.resources_to_operations_mapping.keys()
679             if method in key.split()[0]
680         ]
681
682 0         for idx, path_part in enumerate(normalized_url_splitted):
683 0             tmp_keys = []
684 0             for tmp_key in filtered_keys:
685 0                 splitted = tmp_key.split()[1].split("/")
686 0                 if idx >= len(splitted):
687 0                     continue
688 0                 elif "<" in splitted[idx] and ">" in splitted[idx]:
689 0                     if splitted[idx] == "<artifactPath>":
690 0                         tmp_keys.append(tmp_key)
691 0                         continue
692 0                     elif idx == len(normalized_url_splitted) - 1 and len(
693                         normalized_url_splitted
694                     ) != len(splitted):
695 0                         continue
696                     else:
697 0                         tmp_keys.append(tmp_key)
698 0                 elif splitted[idx] == path_part:
699 0                     if idx == len(normalized_url_splitted) - 1 and len(
700                         normalized_url_splitted
701                     ) != len(splitted):
702 0                         continue
703                     else:
704 0                         tmp_keys.append(tmp_key)
705 0             filtered_keys = tmp_keys
706 0             if (
707                 len(filtered_keys) == 1
708                 and filtered_keys[0].split("/")[-1] == "<artifactPath>"
709             ):
710 0                 break
711
712 0         if len(filtered_keys) == 0:
713 0             raise AuthException(
714                 "Cannot make an authorization decision. URL not found. URL: {0}".format(
715                     url
716                 )
717             )
718 0         elif len(filtered_keys) > 1:
719 0             raise AuthException(
720                 "Cannot make an authorization decision. Multiple URLs found. URL: {0}".format(
721                     url
722                 )
723             )
724
725 0         filtered_key = filtered_keys[0]
726
727 0         for idx, path_part in enumerate(filtered_key.split()[1].split("/")):
728 0             if "<" in path_part and ">" in path_part:
729 0                 if path_part == "<artifactPath>":
730 0                     parameters[path_part[1:-1]] = "/".join(
731                         normalized_url_splitted[idx:]
732                     )
733                 else:
734 0                     parameters[path_part[1:-1]] = normalized_url_splitted[idx]
735
736 0         return filtered_key, parameters
737
738 0     def _internal_get_token_list(self, token_info):
739 0         now = time()
740 0         token_list = self.db.get_list(
741             "tokens", {"username": token_info["username"], "expires.gt": now}
742         )
743 0         return token_list
744
745 0     def _internal_get_token(self, token_info, token_id):
746 0         token_value = self.db.get_one("tokens", {"_id": token_id}, fail_on_empty=False)
747 0         if not token_value:
748 0             raise AuthException("token not found", http_code=HTTPStatus.NOT_FOUND)
749 0         if (
750             token_value["username"] != token_info["username"]
751             and not token_info["admin"]
752         ):
753 0             raise AuthException(
754                 "needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED
755             )
756 0         return token_value
757
758 0     def _internal_tokens_prune(self, token_collection, now=None):
759 0         now = now or time()
760 0         if not self.next_db_prune_time or self.next_db_prune_time >= now:
761 0             self.db.del_list(token_collection, {"expires.lt": now})
762 0             self.next_db_prune_time = self.periodin_db_pruning + now
763             # self.tokens_cache.clear()  # not required any more
764
765 0     def remove_token_from_cache(self, token=None):
766 0         if token:
767 0             self.tokens_cache.pop(token, None)
768         else:
769 0             self.tokens_cache.clear()
770 0         self.msg.write("admin", "revoke_token", {"_id": token} if token else None)
771
772 0     def check_password_expiry(self, outdata):
773         """
774         This method will check for password expiry of the user
775         :param outdata: user token information
776         """
777 0         user_content = None
778 0         detail = {}
779 0         present_time = time()
780 0         user = outdata["username"]
781 0         if self.config["authentication"].get("pwd_expiry_check"):
782 0             user_content = self.db.get_list("users", {"username": user})[0]
783 0             if not user_content.get("username") == "admin":
784 0                 user_content["_admin"]["modified_time"] = present_time
785 0                 if user_content.get("_admin").get("expire_time"):
786 0                     expire_time = user_content["_admin"]["expire_time"]
787                 else:
788 0                     expire_time = present_time
789 0                 uid = user_content["_id"]
790 0                 self.db.set_one("users", {"_id": uid}, user_content)
791 0                 if not present_time < expire_time:
792 0                     return True
793         else:
794 0             pass