Extracting auth methods into a new class
[osm/NBI.git] / osm_nbi / auth.py
1 import cherrypy
2 from base64 import standard_b64decode
3 from http import HTTPStatus
4
5
6 from engine import EngineException
7
8 __author__ = "Eduardo Sousa <eduardosousa@av.it.pt>"
9
10
11 class AuthenticatorException(Exception):
12 def __init__(self, message, http_code=HTTPStatus.UNAUTHORIZED):
13 self.http_code = http_code
14 Exception.__init__(self, message)
15
16
17 class Authenticator(object):
18 def __init__(self, engine):
19 super().__init__()
20
21 self.engine = engine
22
23 def authorize(self):
24 token = None
25 user_passwd64 = None
26 try:
27 # 1. Get token Authorization bearer
28 auth = cherrypy.request.headers.get("Authorization")
29 if auth:
30 auth_list = auth.split(" ")
31 if auth_list[0].lower() == "bearer":
32 token = auth_list[-1]
33 elif auth_list[0].lower() == "basic":
34 user_passwd64 = auth_list[-1]
35 if not token:
36 if cherrypy.session.get("Authorization"):
37 # 2. Try using session before request a new token. If not, basic authentication will generate
38 token = cherrypy.session.get("Authorization")
39 if token == "logout":
40 token = None # force Unauthorized response to insert user pasword again
41 elif user_passwd64 and cherrypy.request.config.get("auth.allow_basic_authentication"):
42 # 3. Get new token from user password
43 user = None
44 passwd = None
45 try:
46 user_passwd = standard_b64decode(user_passwd64).decode()
47 user, _, passwd = user_passwd.partition(":")
48 except Exception:
49 pass
50 outdata = self.engine.new_token(None, {"username": user, "password": passwd})
51 token = outdata["id"]
52 cherrypy.session['Authorization'] = token
53 # 4. Get token from cookie
54 # if not token:
55 # auth_cookie = cherrypy.request.cookie.get("Authorization")
56 # if auth_cookie:
57 # token = auth_cookie.value
58 return self.engine.authorize(token)
59 except EngineException as e:
60 if cherrypy.session.get('Authorization'):
61 del cherrypy.session['Authorization']
62 cherrypy.response.headers["WWW-Authenticate"] = 'Bearer realm="{}"'.format(e)
63 raise AuthenticatorException(str(e))
64
65 def new_token(self, session, indata, remote):
66 return self.engine.new_token(session, indata, remote)
67
68 def get_token_list(self, session):
69 return self.engine.get_token_list(session)
70
71 def get_token(self, session, token_id):
72 return self.engine.get_token(session, token_id)
73
74 def del_token(self, token_id):
75 return self.engine.del_token(token_id)