-
Notifications
You must be signed in to change notification settings - Fork 0
/
api-bd.py
124 lines (101 loc) · 4.14 KB
/
api-bd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import json
import decimal
import os
from flask import Flask, request, render_template, jsonify, current_app
from werkzeug.security import safe_str_cmp
from jwt import encode, decode, ExpiredSignatureError, InvalidTokenError
from functools import wraps
import datetime
import database as db
APP = Flask(__name__)
APP.config['SECRET_KEY'] = os.getenv('JWT_SECRET_KEY', 'chave_super_secreta')
@APP.route('/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get("username", "NULL")
password = data.get("password", "NULL")
user = db.get_user_by_name(username) #CHANGE
if user and safe_str_cmp(user.password.encode('utf-8'), password.encode('utf-8')):
token = encode({
'sub': user.username,
'iat': datetime.datetime.utcnow(),
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=30)},
current_app.config['SECRET_KEY'])
response = jsonify({'token': token.decode('UTF-8')})
return response
response = jsonify({'message': 'Invalid credentials', 'authenticated': False})
return response
def token_required(f):
@wraps(f)
def _verify(*args, **kwargs):
print(request.headers)
auth_headers = request.headers.get('Authorization', '').split()
invalid_msg = {
'message': 'Invalid token. Registeration and / or authentication required',
'authenticated': False
}
expired_msg = {
'message': 'Expired token. Reauthentication required.',
'authenticated': False
}
if len(auth_headers) != 2:
response = jsonify(invalid_msg)
response.headers.add('Access-Control-Allow-Origin', '*')
return response, 401
try:
token = auth_headers[1]
data = decode(token, current_app.config['SECRET_KEY'])
user = db.get_user_by_name(data['sub']) #CHANGE
if not user:
raise RuntimeError('User not found')
return f(*args, **kwargs)
except ExpiredSignatureError:
response = jsonify(expired_msg)
response.headers.add('Access-Control-Allow-Origin', '*')
return response, 401 # 401 is Unauthorized HTTP status code
except (InvalidTokenError, Exception) as e:
print(e)
response = jsonify(invalid_msg)
response.headers.add('Access-Control-Allow-Origin', '*')
return response, 401
return _verify
@APP.route('/user_info', methods=['GET'])
def user_info():
user_list = [{'id': u.u_id, 'username': u.username} for u in db.get_users()] #CHANGE
print(user_list)
response = jsonify({'users': user_list})
return response, 200
@APP.route('/user_info/<int:user_id>', methods=['GET', 'POST'])
@token_required
def user(user_id):
def get_user_info(user):
response = jsonify({'user': user.username, 'movie': user.favorite_movie})
return response
def post_user_info(user):
try:
req_json = request.get_json()
new_favorite = req_json['movie']
user.favorite_movie = new_favorite
response = jsonify({'status': 'success'})
return response
except:
response = jsonify({'status': 'failed', 'fail reason': 'Wrong arguments'})
return response, 500
user = db.get_user_by_id(user_id) #CHANGE
if user is None:
response = jsonify({'status': 'failed', 'fail reason': 'User not found'})
return response, 404
if request.method == 'GET':
return get_user_info(user)
elif request.method == 'POST':
return post_user_info(user)
else:
response = jsonify({'status': 'failed', 'fail reason': 'Wrong method, only POST and GET available'})
return response, 401
if __name__ == '__main__':
users = db.get_users() #CHANGE
if not users:
db.create_user('admin_user', 'admin_pass', 'Padrinho')
db.create_user('readonly_user', 'readonly_pass', 'Jurassic World')
db.create_user('readwrite_user', 'readwrite_pass', 'Jurassic Park')
APP.run(port='5050')