This repository was archived by the owner on Mar 12, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathmain.py
More file actions
174 lines (145 loc) · 5.24 KB
/
main.py
File metadata and controls
174 lines (145 loc) · 5.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# Name: main.py
# Author: Yiyuan Wang
# Description:
# an simple flask backend
from flask import Flask, json, jsonify, request, render_template
from flask_cors import CORS
from flask_mysqldb import MySQL
from flask_jwt import JWT, jwt_required, current_identity
from werkzeug.security import safe_str_cmp
############################ Login Classes ############################
# from dbms.json_db.model import Model
# from dbms.dict_db.model import Model
class User(object):
def __init__(self, id, username, password):
self.id = id
self.username = username
self.password = password
def __str__(self):
return "User(id='%s')" % self.id
users = [
User(1, 'test', 'testLogin'),
User(2, 'yiyuan', 'yiyuanwang'),
]
username_table = {u.username: u for u in users}
userid_table = {u.id: u for u in users}
def authenticate(username, password):
user = username_table.get(username, None)
if user and safe_str_cmp(user.password.encode('utf-8'), password.encode('utf-8')):
return user
def identity(payload):
user_id = payload['identity']
return userid_table.get(user_id, None)
############################ Initialization ############################
app = Flask(__name__)
# this essitial for Cross Origin Resource Sharing with React frontend
# https://flask-cors.readthedocs.io/en/latest/
CORS(app)
# use database
app.config['MYSQL_HOST'] = 'localhost'
app.config['MYSQL_USER'] = 'root'
app.config['MYSQL_PASSWORD'] = '123456'
app.config['MYSQL_DB'] = 'FinalProj'
mysql = MySQL(app)
app.config['SECRET_KEY'] = 'super-secret'
jwt = JWT(app, authenticate, identity)
############################ Login ############################
@app.route('/')
def index():
return render_template("App.js")
@app.route('/protected')
@jwt_required()
def protected():
return '%s' % current_identity
########################## API Implementation #########################
# https://github.com/Jiangyiqun/fullstack_tutorial/tree/master/documentation
############################## create name #############################
@app.route('/customer', methods = ["POST"])
def create_customer():
# # read Data
data_json = request.data
# print(data_json)
data_dict = json.loads(data_json)
customerID = data_dict['customerID']
cname = data_dict['name']
# # Execute SQL
cur = mysql.connection.cursor()
try:
cur.execute("INSERT INTO Customers(key_,name, lastName) VALUES (%s,%s)", (customerID,cname))
mysql.connection.commit()
# NB : you won't get an IntegrityError when reading
except Exception as e:
print(e)
return "BAD REQUEST", 400
cur.close()
# # succeed
return "SUCCESS", 200
############################## read name ###############################
@app.route('/customer/<customerID>', methods = ["GET"])
def read_customers(customerID):
cur = mysql.connection.cursor()
try:
cur.execute("SELECT * Customers WHERE key_ = %s", (customerID))
mysql.connection.commit()
# NB : you won't get an IntegrityError when reading
except Exception as e:
print(e)
return "BAD REQUEST", 400
cur.close()
return "SUCCESS", 200
############################## update name #############################
@app.route('/customer/<customerID>', methods = ["PUT"])
def update_name(customerID):
data_json = request.data
# print(data_json)
data_dict = json.loads(data_json)
customerID = data_dict['customerID']
cname = data_dict['name']
# # Execute SQL
cur = mysql.connection.cursor()
try:
## USE TRANSACTIONS
cur.execute("UPDATAE Customers(key_,name, lastName) VALUES (%s,%s)", (customerID,cname))
mysql.connection.commit()
# NB : you won't get an IntegrityError when reading
except Exception as e:
print(e)
return "BAD REQUEST", 400
cur.close()
return "SUCCESS", 200
############################## delete name #############################
@app.route('/customer/<customerID>', methods = ["DELETE"])
def delete_name(key):
# not found
data_json = request.data
# print(data_json)
data_dict = json.loads(data_json)
customerID = data_dict['customerID']
# # Execute SQL
cur = mysql.connection.cursor()
try:
cur.execute("DELETE Customers WHERE CustomerID = %s", (customerID))
mysql.connection.commit()
# NB : you won't get an IntegrityError when reading
except Exception as e:
print(e)
return "BAD REQUEST", 400
cur.close()
return "SUCCESS", 200
############################# Debug Method #############################
# print database
@app.route('/debug', methods = ["GET"])
def print_database():
# database = model.debug()
# if (database is None):
# print("\n########### Debug Method Not Implemented #############")
# return jsonify({"errorMsg": "Debug Method Not Implemented"}), 200
# else:
# print("\n######################################################")
# print(database)
# return jsonify(database), 200
return "" , 200
############################ Main Function #############################
if __name__ == "__main__":
# run backend server on http://localhost:5000/
app.run(host = 'localhost',port=5000, debug=True)