Code Coverage

Cobertura Coverage Report > osm_nbi >

auth.py

Trend

Classes0%
 
Lines0%
 
Conditionals100%
 

File Coverage summary

NameClassesLinesConditionals
auth.py
0%
0/1
0%
0/375
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
auth.py
0%
0/375
N/A

Source

osm_nbi/auth.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # Copyright 2018 Telefonica S.A.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
9 #
10 #         http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
16 # under the License.
17 #
18 # For those usages not covered by the Apache License, Version 2.0 please
19 # contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
20 ##
21
22
23 0 """
24 Authenticator is responsible for authenticating the users,
25 create the tokens unscoped and scoped, retrieve the role
26 list inside the projects that they are inserted
27 """
28
29 0 __author__ = "Eduardo Sousa <esousa@whitestack.com>; Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
30 0 __date__ = "$27-jul-2018 23:59:59$"
31
32 0 import cherrypy
33 0 import logging
34 0 import yaml
35 0 from base64 import standard_b64decode
36 0 from copy import deepcopy
37 # from functools import reduce
38 0 from http import HTTPStatus
39 0 from time import time
40 0 from os import path
41
42 0 from osm_nbi.authconn import AuthException, AuthconnException, AuthExceptionUnauthorized
43 0 from osm_nbi.authconn_keystone import AuthconnKeystone
44 0 from osm_nbi.authconn_internal import AuthconnInternal
45 0 from osm_common import dbmemory, dbmongo, msglocal, msgkafka
46 0 from osm_common.dbbase import DbException
47 0 from osm_nbi.validation import is_valid_uuid
48 0 from itertools import chain
49 0 from uuid import uuid4
50
51
52 0 class Authenticator:
53     """
54     This class should hold all the mechanisms for User Authentication and
55     Authorization. Initially it should support Openstack Keystone as a
56     backend through a plugin model where more backends can be added and a
57     RBAC model to manage permissions on operations.
58     This class must be threading safe
59     """
60
61 0     periodin_db_pruning = 60 * 30  # for the internal backend only. every 30 minutes expired tokens will be pruned
62 0     token_limit = 500   # when reached, the token cache will be cleared
63
64 0     def __init__(self, valid_methods, valid_query_string):
65         """
66         Authenticator initializer. Setup the initial state of the object,
67         while it waits for the config dictionary and database initialization.
68         """
69 0         self.backend = None
70 0         self.config = None
71 0         self.db = None
72 0         self.msg = None
73 0         self.tokens_cache = dict()
74 0         self.next_db_prune_time = 0  # time when next cleaning of expired tokens must be done
75 0         self.roles_to_operations_file = None
76         # self.roles_to_operations_table = None
77 0         self.resources_to_operations_mapping = {}
78 0         self.operation_to_allowed_roles = {}
79 0         self.logger = logging.getLogger("nbi.authenticator")
80 0         self.role_permissions = []
81 0         self.valid_methods = valid_methods
82 0         self.valid_query_string = valid_query_string
83 0         self.system_admin_role_id = None   # system_role id
84 0         self.test_project_id = None  # test_project_id
85
86 0     def start(self, config):
87         """
88         Method to configure the Authenticator object. This method should be called
89         after object creation. It is responsible by initializing the selected backend,
90         as well as the initialization of the database connection.
91
92         :param config: dictionary containing the relevant parameters for this object.
93         """
94 0         self.config = config
95
96 0         try:
97 0             if not self.db:
98 0                 if config["database"]["driver"] == "mongo":
99 0                     self.db = dbmongo.DbMongo()
100 0                     self.db.db_connect(config["database"])
101 0                 elif config["database"]["driver"] == "memory":
102 0                     self.db = dbmemory.DbMemory()
103 0                     self.db.db_connect(config["database"])
104                 else:
105 0                     raise AuthException("Invalid configuration param '{}' at '[database]':'driver'"
106                                         .format(config["database"]["driver"]))
107 0             if not self.msg:
108 0                 if config["message"]["driver"] == "local":
109 0                     self.msg = msglocal.MsgLocal()
110 0                     self.msg.connect(config["message"])
111 0                 elif config["message"]["driver"] == "kafka":
112 0                     self.msg = msgkafka.MsgKafka()
113 0                     self.msg.connect(config["message"])
114                 else:
115 0                     raise AuthException("Invalid configuration param '{}' at '[message]':'driver'"
116                                         .format(config["message"]["driver"]))
117 0             if not self.backend:
118 0                 if config["authentication"]["backend"] == "keystone":
119 0                     self.backend = AuthconnKeystone(self.config["authentication"], self.db, self.role_permissions)
120 0                 elif config["authentication"]["backend"] == "internal":
121 0                     self.backend = AuthconnInternal(self.config["authentication"], self.db, self.role_permissions)
122 0                     self._internal_tokens_prune()
123                 else:
124 0                     raise AuthException("Unknown authentication backend: {}"
125                                         .format(config["authentication"]["backend"]))
126
127 0             if not self.roles_to_operations_file:
128 0                 if "roles_to_operations" in config["rbac"]:
129 0                     self.roles_to_operations_file = config["rbac"]["roles_to_operations"]
130                 else:
131 0                     possible_paths = (
132                         __file__[:__file__.rfind("auth.py")] + "roles_to_operations.yml",
133                         "./roles_to_operations.yml"
134                     )
135 0                     for config_file in possible_paths:
136 0                         if path.isfile(config_file):
137 0                             self.roles_to_operations_file = config_file
138 0                             break
139 0                 if not self.roles_to_operations_file:
140 0                     raise AuthException("Invalid permission configuration: roles_to_operations file missing")
141
142             # load role_permissions
143 0             def load_role_permissions(method_dict):
144 0                 for k in method_dict:
145 0                     if k == "ROLE_PERMISSION":
146 0                         for method in chain(method_dict.get("METHODS", ()), method_dict.get("TODO", ())):
147 0                             permission = method_dict["ROLE_PERMISSION"] + method.lower()
148 0                             if permission not in self.role_permissions:
149 0                                 self.role_permissions.append(permission)
150 0                     elif k in ("TODO", "METHODS"):
151 0                         continue
152 0                     elif method_dict[k]:
153 0                         load_role_permissions(method_dict[k])
154
155 0             load_role_permissions(self.valid_methods)
156 0             for query_string in self.valid_query_string:
157 0                 for method in ("get", "put", "patch", "post", "delete"):
158 0                     permission = query_string.lower() + ":" + method
159 0                     if permission not in self.role_permissions:
160 0                         self.role_permissions.append(permission)
161
162             # get ids of role system_admin and test project
163 0             role_system_admin = self.db.get_one("roles", {"name": "system_admin"}, fail_on_empty=False)
164 0             if role_system_admin:
165 0                 self.system_admin_role_id = role_system_admin["_id"]
166 0             test_project_name = self.config["authentication"].get("project_not_authorized", "admin")
167 0             test_project = self.db.get_one("projects", {"name": test_project_name}, fail_on_empty=False)
168 0             if test_project:
169 0                 self.test_project_id = test_project["_id"]
170
171 0         except Exception as e:
172 0             raise AuthException(str(e))
173
174 0     def stop(self):
175 0         try:
176 0             if self.db:
177 0                 self.db.db_disconnect()
178 0         except DbException as e:
179 0             raise AuthException(str(e), http_code=e.http_code)
180
181 0     def create_admin_project(self):
182         """
183         Creates a new project 'admin' into database if it doesn't exist. Useful for initialization.
184         :return: _id identity of the 'admin' project
185         """
186
187         # projects = self.db.get_one("projects", fail_on_empty=False, fail_on_more=False)
188 0         project_desc = {"name": "admin"}
189 0         projects = self.backend.get_project_list(project_desc)
190 0         if projects:
191 0             return projects[0]["_id"]
192 0         now = time()
193 0         project_desc["_id"] = str(uuid4())
194 0         project_desc["_admin"] = {"created": now, "modified": now}
195 0         pid = self.backend.create_project(project_desc)
196 0         self.logger.info("Project '{}' created at database".format(project_desc["name"]))
197 0         return pid
198
199 0     def create_admin_user(self, project_id):
200         """
201         Creates a new user admin/admin into database if database is empty. Useful for initialization
202         :return: _id identity of the inserted data, or None
203         """
204         # users = self.db.get_one("users", fail_on_empty=False, fail_on_more=False)
205 0         users = self.backend.get_user_list()
206 0         if users:
207 0             return None
208         # user_desc = {"username": "admin", "password": "admin", "projects": [project_id]}
209 0         now = time()
210 0         user_desc = {"username": "admin", "password": "admin", "_admin": {"created": now, "modified": now}}
211 0         if project_id:
212 0             pid = project_id
213         else:
214             # proj = self.db.get_one("projects", {"name": "admin"}, fail_on_empty=False, fail_on_more=False)
215 0             proj = self.backend.get_project_list({"name": "admin"})
216 0             pid = proj[0]["_id"] if proj else None
217         # role = self.db.get_one("roles", {"name": "system_admin"}, fail_on_empty=False, fail_on_more=False)
218 0         roles = self.backend.get_role_list({"name": "system_admin"})
219 0         if pid and roles:
220 0             user_desc["project_role_mappings"] = [{"project": pid, "role": roles[0]["_id"]}]
221 0         uid = self.backend.create_user(user_desc)
222 0         self.logger.info("User '{}' created at database".format(user_desc["username"]))
223 0         return uid
224
225 0     def init_db(self, target_version='1.0'):
226         """
227         Check if the database has been initialized, with at least one user. If not, create the required tables
228         and insert the predefined mappings between roles and permissions.
229
230         :param target_version: schema version that should be present in the database.
231         :return: None if OK, exception if error or version is different.
232         """
233
234 0         records = self.backend.get_role_list()
235
236         # Loading permissions to AUTH. At lease system_admin must be present.
237 0         if not records or not next((r for r in records if r["name"] == "system_admin"), None):
238 0             with open(self.roles_to_operations_file, "r") as stream:
239 0                 roles_to_operations_yaml = yaml.load(stream, Loader=yaml.Loader)
240
241 0             role_names = []
242 0             for role_with_operations in roles_to_operations_yaml["roles"]:
243                 # Verifying if role already exists. If it does, raise exception
244 0                 if role_with_operations["name"] not in role_names:
245 0                     role_names.append(role_with_operations["name"])
246                 else:
247 0                     raise AuthException("Duplicated role name '{}' at file '{}''"
248                                         .format(role_with_operations["name"], self.roles_to_operations_file))
249
250 0                 if not role_with_operations["permissions"]:
251 0                     continue
252
253 0                 for permission, is_allowed in role_with_operations["permissions"].items():
254 0                     if not isinstance(is_allowed, bool):
255 0                         raise AuthException("Invalid value for permission '{}' at role '{}'; at file '{}'"
256                                             .format(permission, role_with_operations["name"],
257                                                     self.roles_to_operations_file))
258
259                     # TODO check permission is ok
260 0                     if permission[-1] == ":":
261 0                         raise AuthException("Invalid permission '{}' terminated in ':' for role '{}'; at file {}"
262                                             .format(permission, role_with_operations["name"],
263                                                     self.roles_to_operations_file))
264
265 0                 if "default" not in role_with_operations["permissions"]:
266 0                     role_with_operations["permissions"]["default"] = False
267 0                 if "admin" not in role_with_operations["permissions"]:
268 0                     role_with_operations["permissions"]["admin"] = False
269
270 0                 now = time()
271 0                 role_with_operations["_admin"] = {
272                     "created": now,
273                     "modified": now,
274                 }
275
276                 # self.db.create(self.roles_to_operations_table, role_with_operations)
277 0                 try:
278 0                     self.backend.create_role(role_with_operations)
279 0                     self.logger.info("Role '{}' created".format(role_with_operations["name"]))
280 0                 except (AuthException, AuthconnException) as e:
281 0                     if role_with_operations["name"] == "system_admin":
282 0                         raise
283 0                     self.logger.error("Role '{}' cannot be created: {}".format(role_with_operations["name"], e))
284
285         # Create admin project&user if required
286 0         pid = self.create_admin_project()
287 0         user_id = self.create_admin_user(pid)
288
289         # try to assign system_admin role to user admin if not any user has this role
290 0         if not user_id:
291 0             try:
292 0                 users = self.backend.get_user_list()
293 0                 roles = self.backend.get_role_list({"name": "system_admin"})
294 0                 role_id = roles[0]["_id"]
295 0                 user_with_system_admin = False
296 0                 user_admin_id = None
297 0                 for user in users:
298 0                     if not user_admin_id:
299 0                         user_admin_id = user["_id"]
300 0                     if user["username"] == "admin":
301 0                         user_admin_id = user["_id"]
302 0                     for prm in user.get("project_role_mappings", ()):
303 0                         if prm["role"] == role_id:
304 0                             user_with_system_admin = True
305 0                             break
306 0                     if user_with_system_admin:
307 0                         break
308 0                 if not user_with_system_admin:
309 0                     self.backend.update_user({"_id": user_admin_id,
310                                               "add_project_role_mappings": [{"project": pid, "role": role_id}]})
311 0                     self.logger.info("Added role system admin to user='{}' project=admin".format(user_admin_id))
312 0             except Exception as e:
313 0                 self.logger.error("Error in Authorization DataBase initialization: {}: {}".format(type(e).__name__, e))
314
315 0         self.load_operation_to_allowed_roles()
316
317 0     def load_operation_to_allowed_roles(self):
318         """
319         Fills the internal self.operation_to_allowed_roles based on database role content and self.role_permissions
320         It works in a shadow copy and replace at the end to allow other threads working with the old copy
321         :return: None
322         """
323
324 0         permissions = {oper: [] for oper in self.role_permissions}
325         # records = self.db.get_list(self.roles_to_operations_table)
326 0         records = self.backend.get_role_list()
327
328 0         ignore_fields = ["_id", "_admin", "name", "default"]
329 0         for record in records:
330 0             if not record.get("permissions"):
331 0                 continue
332 0             record_permissions = {oper: record["permissions"].get("default", False) for oper in self.role_permissions}
333 0             operations_joined = [(oper, value) for oper, value in record["permissions"].items()
334                                  if oper not in ignore_fields]
335 0             operations_joined.sort(key=lambda x: x[0].count(":"))
336
337 0             for oper in operations_joined:
338 0                 match = list(filter(lambda x: x.find(oper[0]) == 0, record_permissions.keys()))
339
340 0                 for m in match:
341 0                     record_permissions[m] = oper[1]
342
343 0             allowed_operations = [k for k, v in record_permissions.items() if v is True]
344
345 0             for allowed_op in allowed_operations:
346 0                 permissions[allowed_op].append(record["name"])
347
348 0         self.operation_to_allowed_roles = permissions
349
350 0     def authorize(self, role_permission=None, query_string_operations=None, item_id=None):
351 0         token = None
352 0         user_passwd64 = None
353 0         try:
354             # 1. Get token Authorization bearer
355 0             auth = cherrypy.request.headers.get("Authorization")
356 0             if auth:
357 0                 auth_list = auth.split(" ")
358 0                 if auth_list[0].lower() == "bearer":
359 0                     token = auth_list[-1]
360 0                 elif auth_list[0].lower() == "basic":
361 0                     user_passwd64 = auth_list[-1]
362 0             if not token:
363 0                 if cherrypy.session.get("Authorization"):
364                     # 2. Try using session before request a new token. If not, basic authentication will generate
365 0                     token = cherrypy.session.get("Authorization")
366 0                     if token == "logout":
367 0                         token = None  # force Unauthorized response to insert user password again
368 0                 elif user_passwd64 and cherrypy.request.config.get("auth.allow_basic_authentication"):
369                     # 3. Get new token from user password
370 0                     user = None
371 0                     passwd = None
372 0                     try:
373 0                         user_passwd = standard_b64decode(user_passwd64).decode()
374 0                         user, _, passwd = user_passwd.partition(":")
375 0                     except Exception:
376 0                         pass
377 0                     outdata = self.new_token(None, {"username": user, "password": passwd})
378 0                     token = outdata["_id"]
379 0                     cherrypy.session['Authorization'] = token
380
381 0             if not token:
382 0                 raise AuthException("Needed a token or Authorization http header",
383                                     http_code=HTTPStatus.UNAUTHORIZED)
384
385             # try to get from cache first
386 0             now = time()
387 0             token_info = self.tokens_cache.get(token)
388 0             if token_info and token_info["expires"] < now:
389                 # delete token. MUST be done with care, as another thread maybe already delete it. Do not use del
390 0                 self.tokens_cache.pop(token, None)
391 0                 token_info = None
392
393             # get from database if not in cache
394 0             if not token_info:
395 0                 token_info = self.backend.validate_token(token)
396                 # Clear cache if token limit reached
397 0                 if len(self.tokens_cache) > self.token_limit:
398 0                     self.tokens_cache.clear()
399 0                 self.tokens_cache[token] = token_info
400             # TODO add to token info remote host, port
401
402 0             if role_permission:
403 0                 RBAC_auth = self.check_permissions(token_info, cherrypy.request.method, role_permission,
404                                                    query_string_operations, item_id)
405 0                 token_info["allow_show_user_project_role"] = RBAC_auth
406
407 0             return token_info
408 0         except AuthException as e:
409 0             if not isinstance(e, AuthExceptionUnauthorized):
410 0                 if cherrypy.session.get('Authorization'):
411 0                     del cherrypy.session['Authorization']
412 0                 cherrypy.response.headers["WWW-Authenticate"] = 'Bearer realm="{}"'.format(e)
413 0             if self.config["authentication"].get("user_not_authorized"):
414 0                 return {"id": "testing-token", "_id": "testing-token",
415                         "project_id": self.test_project_id,
416                         "username": self.config["authentication"]["user_not_authorized"],
417                         "roles": [self.system_admin_role_id],
418                         "admin": True, "allow_show_user_project_role": True}
419 0             raise
420
421 0     def new_token(self, token_info, indata, remote):
422 0         new_token_info = self.backend.authenticate(
423             credentials=indata,
424             token_info=token_info,
425         )
426
427 0         new_token_info["remote_port"] = remote.port
428 0         if not new_token_info.get("expires"):
429 0             new_token_info["expires"] = time() + 3600
430 0         if not new_token_info.get("admin"):
431 0             new_token_info["admin"] = True if new_token_info.get("project_name") == "admin" else False
432             # TODO put admin in RBAC
433
434 0         if remote.name:
435 0             new_token_info["remote_host"] = remote.name
436 0         elif remote.ip:
437 0             new_token_info["remote_host"] = remote.ip
438
439         # TODO call self._internal_tokens_prune(now) ?
440 0         return deepcopy(new_token_info)
441
442 0     def get_token_list(self, token_info):
443 0         if self.config["authentication"]["backend"] == "internal":
444 0             return self._internal_get_token_list(token_info)
445         else:
446             # TODO: check if this can be avoided. Backend may provide enough information
447 0             return [deepcopy(token) for token in self.tokens_cache.values()
448                     if token["username"] == token_info["username"]]
449
450 0     def get_token(self, token_info, token):
451 0         if self.config["authentication"]["backend"] == "internal":
452 0             return self._internal_get_token(token_info, token)
453         else:
454             # TODO: check if this can be avoided. Backend may provide enough information
455 0             token_value = self.tokens_cache.get(token)
456 0             if not token_value:
457 0                 raise AuthException("token not found", http_code=HTTPStatus.NOT_FOUND)
458 0             if token_value["username"] != token_info["username"] and not token_info["admin"]:
459 0                 raise AuthException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
460 0             return token_value
461
462 0     def del_token(self, token):
463 0         try:
464 0             self.backend.revoke_token(token)
465             # self.tokens_cache.pop(token, None)
466 0             self.remove_token_from_cache(token)
467 0             return "token '{}' deleted".format(token)
468 0         except KeyError:
469 0             raise AuthException("Token '{}' not found".format(token), http_code=HTTPStatus.NOT_FOUND)
470
471 0     def check_permissions(self, token_info, method, role_permission=None, query_string_operations=None, item_id=None):
472         """
473         Checks that operation has permissions to be done, base on the assigned roles to this user project
474         :param token_info: Dictionary that contains "roles" with a list of assigned roles.
475             This method fills the token_info["admin"] with True or False based on assigned tokens, if any allows admin
476             This will be used among others to hide or not the _admin content of topics
477         :param method: GET,PUT, POST, ...
478         :param role_permission: role permission name of the operation required
479         :param query_string_operations: list of possible admin query strings provided by user. It is checked that the
480             assigned role allows this query string for this method
481         :param item_id: item identifier if included in the URL, None otherwise
482         :return: True if access granted by permission rules, False if access granted by default rules (Bug 853)
483         :raises: AuthExceptionUnauthorized if access denied
484         """
485
486 0         roles_required = self.operation_to_allowed_roles[role_permission]
487 0         roles_allowed = [role["name"] for role in token_info["roles"]]
488
489         # fills token_info["admin"] if some roles allows it
490 0         token_info["admin"] = False
491 0         for role in roles_allowed:
492 0             if role in self.operation_to_allowed_roles["admin:" + method.lower()]:
493 0                 token_info["admin"] = True
494 0                 break
495
496 0         if "anonymous" in roles_required:
497 0             return True
498 0         operation_allowed = False
499 0         for role in roles_allowed:
500 0             if role in roles_required:
501 0                 operation_allowed = True
502                 # if query_string operations, check if this role allows it
503 0                 if not query_string_operations:
504 0                     return True
505 0                 for query_string_operation in query_string_operations:
506 0                     if role not in self.operation_to_allowed_roles[query_string_operation]:
507 0                         break
508                 else:
509 0                     return True
510
511         # Bug 853 - Final Solution
512         # User/Project/Role whole listings are filtered elsewhere
513         # uid, pid, rid = ("user_id", "project_id", "id") if is_valid_uuid(id) else ("username", "project_name", "name")
514 0         uid = "user_id" if is_valid_uuid(item_id) else "username"
515 0         if (role_permission in ["projects:get", "projects:id:get", "roles:get", "roles:id:get", "users:get"]) \
516                 or (role_permission == "users:id:get" and item_id == token_info[uid]):
517             # or (role_permission == "projects:id:get" and item_id == token_info[pid]) \
518             # or (role_permission == "roles:id:get" and item_id in [role[rid] for role in token_info["roles"]]):
519 0             return False
520
521 0         if not operation_allowed:
522 0             raise AuthExceptionUnauthorized("Access denied: lack of permissions.")
523         else:
524 0             raise AuthExceptionUnauthorized("Access denied: You have not permissions to use these admin query string")
525
526 0     def get_user_list(self):
527 0         return self.backend.get_user_list()
528
529 0     def _normalize_url(self, url, method):
530         # DEPRECATED !!!
531         # Removing query strings
532 0         normalized_url = url if '?' not in url else url[:url.find("?")]
533 0         normalized_url_splitted = normalized_url.split("/")
534 0         parameters = {}
535
536 0         filtered_keys = [key for key in self.resources_to_operations_mapping.keys()
537                          if method in key.split()[0]]
538
539 0         for idx, path_part in enumerate(normalized_url_splitted):
540 0             tmp_keys = []
541 0             for tmp_key in filtered_keys:
542 0                 splitted = tmp_key.split()[1].split("/")
543 0                 if idx >= len(splitted):
544 0                     continue
545 0                 elif "<" in splitted[idx] and ">" in splitted[idx]:
546 0                     if splitted[idx] == "<artifactPath>":
547 0                         tmp_keys.append(tmp_key)
548 0                         continue
549 0                     elif idx == len(normalized_url_splitted) - 1 and \
550                             len(normalized_url_splitted) != len(splitted):
551 0                         continue
552                     else:
553 0                         tmp_keys.append(tmp_key)
554 0                 elif splitted[idx] == path_part:
555 0                     if idx == len(normalized_url_splitted) - 1 and \
556                             len(normalized_url_splitted) != len(splitted):
557 0                         continue
558                     else:
559 0                         tmp_keys.append(tmp_key)
560 0             filtered_keys = tmp_keys
561 0             if len(filtered_keys) == 1 and \
562                     filtered_keys[0].split("/")[-1] == "<artifactPath>":
563 0                 break
564
565 0         if len(filtered_keys) == 0:
566 0             raise AuthException("Cannot make an authorization decision. URL not found. URL: {0}".format(url))
567 0         elif len(filtered_keys) > 1:
568 0             raise AuthException("Cannot make an authorization decision. Multiple URLs found. URL: {0}".format(url))
569
570 0         filtered_key = filtered_keys[0]
571
572 0         for idx, path_part in enumerate(filtered_key.split()[1].split("/")):
573 0             if "<" in path_part and ">" in path_part:
574 0                 if path_part == "<artifactPath>":
575 0                     parameters[path_part[1:-1]] = "/".join(normalized_url_splitted[idx:])
576                 else:
577 0                     parameters[path_part[1:-1]] = normalized_url_splitted[idx]
578
579 0         return filtered_key, parameters
580
581 0     def _internal_get_token_list(self, token_info):
582 0         now = time()
583 0         token_list = self.db.get_list("tokens", {"username": token_info["username"], "expires.gt": now})
584 0         return token_list
585
586 0     def _internal_get_token(self, token_info, token_id):
587 0         token_value = self.db.get_one("tokens", {"_id": token_id}, fail_on_empty=False)
588 0         if not token_value:
589 0             raise AuthException("token not found", http_code=HTTPStatus.NOT_FOUND)
590 0         if token_value["username"] != token_info["username"] and not token_info["admin"]:
591 0             raise AuthException("needed admin privileges", http_code=HTTPStatus.UNAUTHORIZED)
592 0         return token_value
593
594 0     def _internal_tokens_prune(self, now=None):
595 0         now = now or time()
596 0         if not self.next_db_prune_time or self.next_db_prune_time >= now:
597 0             self.db.del_list("tokens", {"expires.lt": now})
598 0             self.next_db_prune_time = self.periodin_db_pruning + now
599             # self.tokens_cache.clear()  # not required any more
600
601 0     def remove_token_from_cache(self, token=None):
602 0         if token:
603 0             self.tokens_cache.pop(token, None)
604         else:
605 0             self.tokens_cache.clear()
606 0         self.msg.write("admin", "revoke_token", {"_id": token} if token else None)