-
Notifications
You must be signed in to change notification settings - Fork 0
/
api-autenticacao.py
155 lines (128 loc) · 4.82 KB
/
api-autenticacao.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import json
import decimal
import os
from flask import Flask, request, render_template, jsonify, current_app
from werkzeug.security import safe_str_cmp
from jwt import encode, decode, ExpiredSignatureError, InvalidTokenError
from functools import wraps
import datetime
class User(object):
"""
Class User
used for API authentication
"""
def __init__(self, id, username, password):
"""
Contructor for User class
:param id: id of the user
:param username: username for the user
:param password: password for the user
"""
self.id = id
self.username = username
self.password = password
self.favorite_movie = 'Padrinho'
def __str__(self):
return "User(id='%s')" % self.id
USERS = [
User(1, os.getenv('API_ADMIN_USER', 'admin_user').strip().strip('"'), os.getenv('API_ADMIN_PASS', 'admin_pass').strip().strip('"')),
User(2, "readonly_user", "readonly_pass"),
User(3, "readwrite_user", "readwrite_pass"),
]
USERNAME_TABLE = {u.username: u for u in USERS}
USERID_TABLE = {u.id: u for u in USERS}
APP = Flask(__name__)
APP.config['SECRET_KEY'] = os.getenv('JWT_SECRET_KEY', 'chave_super_secreta')
@APP.route('/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get("username", "NULL")
password = data.get("password", "NULL")
user = USERNAME_TABLE.get(username, None)
if user and safe_str_cmp(user.password.encode('utf-8'), password.encode('utf-8')):
token = encode({
'sub': user.username,
'iat': datetime.datetime.utcnow(),
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=30)},
current_app.config['SECRET_KEY'])
response = jsonify({'token': token.decode('UTF-8')})
return response
response = jsonify({'message': 'Invalid credentials', 'authenticated': False})
return response
def token_required(f):
@wraps(f)
def _verify(*args, **kwargs):
print(request.headers)
auth_headers = request.headers.get('Authorization', '').split()
invalid_msg = {
'message': 'Invalid token. Registeration and / or authentication required',
'authenticated': False
}
expired_msg = {
'message': 'Expired token. Reauthentication required.',
'authenticated': False
}
if len(auth_headers) != 2:
response = jsonify(invalid_msg)
response.headers.add('Access-Control-Allow-Origin', '*')
return response, 401
try:
token = auth_headers[1]
data = decode(token, current_app.config['SECRET_KEY'])
user = USERNAME_TABLE.get(data['sub'], None)
if not user:
raise RuntimeError('User not found')
return f(*args, **kwargs)
except ExpiredSignatureError:
response = jsonify(expired_msg)
response.headers.add('Access-Control-Allow-Origin', '*')
return response, 401 # 401 is Unauthorized HTTP status code
except (InvalidTokenError, Exception) as e:
print(e)
response = jsonify(invalid_msg)
response.headers.add('Access-Control-Allow-Origin', '*')
return response, 401
return _verify
@APP.route('/user_info', methods=['GET'])
def user_info():
user_list = [{'id': u.id, 'username': u.username} for u in USERS]
print(user_list)
response = jsonify({'users': user_list})
return response, 200
@APP.route('/user_info/<int:user_id>', methods=['GET', 'POST'])
@token_required
def user(user_id):
def get_user_info(user):
"""
Get the axis configured values
:return: json
"""
response = jsonify({'user': user.username, 'movie': user.favorite_movie})
return response
def post_user_info(user):
"""
Update the axis configured values
:return: json
"""
try:
req_json = request.get_json()
new_favorite = req_json['movie']
user.favorite_movie = new_favorite
response = jsonify({'status': 'success'})
return response
except:
response = jsonify({'status': 'failed', 'fail reason': 'Wrong arguments'})
return response, 500
user = USERID_TABLE.get(user_id, None)
if user is None:
response = jsonify({'status': 'failed', 'fail reason': 'User not found'})
return response, 404
if request.method == 'GET':
return get_user_info(user)
elif request.method == 'POST':
return post_user_info(user)
else:
response = jsonify({'status': 'failed', 'fail reason': 'Wrong method, only POST and GET available'})
return response, 401
if __name__ == '__main__':
APP.run(port='5050')