Files
2026-05-26 20:05:13 +08:00

711 lines
23 KiB
Python

from datetime import datetime, timedelta
from flask import Blueprint, request, jsonify
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity, get_jwt
from Crypto.PublicKey import RSA
from werkzeug.security import generate_password_hash, check_password_hash
import uuid
from models import db, User, UserRole, PendingPassage, Passage, Paper, PaperPassagesRelation
bp_api_v1 = Blueprint('api/v1', __name__)
SESSION_EXPIRATION = timedelta(minutes=5)
sessions = {}
def cleanup_sessions():
now = datetime.now()
expired_keys = [sid for sid, info in sessions.items() if now - info['created_at'] > SESSION_EXPIRATION]
for sid in expired_keys:
sessions.pop(sid, None)
@bp_api_v1.route('/ping')
def ping():
return 'pong'
@bp_api_v1.route('/get_key')
def get_key():
cleanup_sessions()
session_id = str(uuid.uuid4())
private_key = RSA.generate(2048)
sessions[session_id] = {
'private_key': private_key,
'created_at': datetime.now()
}
public_key_pem = private_key.publickey().export_key(format='PEM').decode('utf-8')
return jsonify({'pkey': public_key_pem, 'session_id': session_id}), 200
@bp_api_v1.route('/register', methods=['POST'])
def register():
cleanup_sessions()
data = request.get_json()
session_id = data.get('session_id')
if not session_id or session_id not in sessions:
return jsonify({'error': 'Invalid session_id'}), 400
username = data.get('username')
if not username or len(username) == 0 or len(username) > 80:
return jsonify({'error': 'Username is required'}), 400
if User.query.filter_by(username=username).first():
return jsonify({'error': 'Username already exists'}), 400
nickname = data.get('nickname')
if nickname and len(nickname) > 80:
return jsonify({'error': 'Nickname is too long'}), 400
phone = data.get('phone')
email = data.get('email')
qq = data.get('qq')
if phone and (len(phone) != 11 or not phone.isdigit()):
return jsonify({'error': 'Invalid phone number'}), 400
if email and (not '@' in email or len(email) > 120):
return jsonify({'error': 'Invalid email address'}), 400
if qq and (len(qq) < 5 or not qq.isdigit() or len(qq) > 10):
return jsonify({'error': 'Invalid QQ number'}), 400
if not phone and not email and not qq:
return jsonify({'error': 'At least one contact method is required'}), 400
password_encrypted = data.get('password')
if not password_encrypted:
return jsonify({'error': 'Password is required'}), 400
private_key = sessions[session_id]['private_key']
try:
password_decrypted = private_key.decrypt(bytes.fromhex(password_encrypted)).decode('utf-8')
password_hash = generate_password_hash(password_decrypted)
except Exception:
return jsonify({'error': 'Failed to decrypt password'}), 400
sessions.pop(session_id, None)
new_user = User(username=username, nickname=nickname, password=password_hash, role=UserRole.USER, phone=phone, email=email, qq=qq)
db.session.add(new_user)
db.session.commit()
return jsonify({'message': 'User registered successfully'}), 201
@bp_api_v1.route('/login', methods=['POST'])
def login():
cleanup_sessions()
data = request.get_json()
session_id = data.get('session_id')
if not session_id or session_id not in sessions:
return jsonify({'error': 'Invalid session_id'}), 400
username = data.get('username')
password_encrypted = data.get('password')
if not username or not password_encrypted:
return jsonify({'error': 'Username and password are required'}), 400
user = User.query.filter_by(username=username).first()
if not user:
return jsonify({'error': 'User has not been registered'}), 401
private_key = sessions[session_id]['private_key']
try:
password_decrypted = private_key.decrypt(bytes.fromhex(password_encrypted)).decode('utf-8')
except Exception:
return jsonify({'error': 'Failed to decrypt password'}), 400
if not check_password_hash(user.password, password_decrypted):
return jsonify({'error': 'Incorrect username or password'}), 401
sessions.pop(session_id, None)
access_token = create_access_token(identity=user.id, expires_delta=timedelta(days=30 * 6), additional_claims={
'role': user.role.value,
'username': user.username,
'phone': user.phone,
'email': user.email,
'qq': user.qq
})
return jsonify({'access_token': access_token}), 200
@bp_api_v1.route('/profile')
@jwt_required()
def profile():
user_id = get_jwt_identity()
user = User.query.get(user_id)
if not user:
return jsonify({'error': 'User not found'}), 404
return jsonify({
'username': user.username,
'role': user.role.value,
'created_at': user.created_at.isoformat(),
'phone': user.phone,
'email': user.email,
'qq': user.qq
}), 200
@bp_api_v1.route('/passages/<int:passage_id>', methods=['GET'])
@jwt_required(optional=True)
def get_passage(passage_id):
passage = Passage.query.get(passage_id)
if not passage:
return jsonify({'error': 'Passage not found'}), 404
if not passage.is_posted:
jwt_data = get_jwt()
if not jwt_data or jwt_data.get('role') != UserRole.ADMIN.value:
return jsonify({'error': 'Passage is not posted yet'}), 403
return jsonify({
'id': passage.id,
'title': passage.title,
'content': passage.content,
'image_url': passage.image_url,
'authors': passage.authors,
'created_at': passage.created_at.isoformat(),
'is_outlink': passage.is_outlink,
'sender': {
'id': passage.sender.id,
'username': passage.sender.username
}
}), 200
@bp_api_v1.route('/passages', methods=['GET'])
@jwt_required()
def list_passages():
role = get_jwt()['role']
if role != UserRole.ADMIN.value:
return jsonify({'error': 'Admin access required'}), 403
try:
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 10))
except ValueError:
return jsonify({'error': 'Invalid pagination parameters'}), 400
query = Passage.query.paginate(page=page, per_page=per_page, error_out=False)
passages = query.items
return jsonify([{
'id': p.id,
'title': p.title,
'image_url': p.image_url,
'authors': p.authors,
'created_at': p.created_at.isoformat(),
'is_outlink': p.is_outlink,
'sender': {
'id': p.sender.id,
'username': p.sender.username
},
'has_next': query.has_next
} for p in passages]), 200
@bp_api_v1.route('/papers/<int:paper_id>', methods=['GET'])
def get_paper(paper_id):
paper = Paper.query.get(paper_id)
if not paper:
return jsonify({'error': 'Paper not found'}), 404
return jsonify({
'id': paper.id,
'title': paper.title,
'description': paper.description,
'image_url': paper.image_url,
'created_at': paper.created_at.isoformat(),
'passages': [p.id for p in paper.passages]
}), 200
@bp_api_v1.route('/papers', methods=['GET'])
def list_papers():
try:
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 10))
except ValueError:
return jsonify({'error': 'Invalid pagination parameters'}), 400
query = Paper.query.paginate(page=page, per_page=per_page, error_out=False)
papers = query.items
return jsonify([{
'id': p.id,
'title': p.title,
'description': p.description,
'image_url': p.image_url,
'created_at': p.created_at.isoformat(),
'has_next': query.has_next
} for p in papers]), 200
@bp_api_v1.route('/create_passage', methods=['POST'])
@jwt_required()
def create_passage():
user_id = get_jwt_identity()
user = User.query.get(user_id)
if not user:
return jsonify({'error': 'User not found'}), 404
data = request.get_json()
title = data.get('title')
content = data.get('content')
image_url = data.get('image_url')
authors = data.get('authors')
is_outlink = data.get('is_outlink', False)
if not title or len(title) == 0 or len(title) > 200:
return jsonify({'error': 'Invalid title'}), 400
if not content or len(content) == 0:
return jsonify({'error': 'Invalid content'}), 400
if image_url and len(image_url) > 500:
return jsonify({'error': 'Image URL is too long'}), 400
if not authors or len(authors) == 0 or len(authors) > 60:
return jsonify({'error': 'Invalid authors'}), 400
new_passage = PendingPassage(
sender_id=user.id,
title=title,
content=content,
image_url=image_url,
authors=authors,
is_outlink=is_outlink
)
db.session.add(new_passage)
db.session.commit()
return jsonify({'message': 'Passage submitted successfully'}), 201
@bp_api_v1.route('/pending_passages', methods=['GET'])
@jwt_required()
def list_pending_passages():
role = get_jwt()['role']
if role != UserRole.ADMIN.value:
return jsonify({'error': 'Admin access required'}), 403
try:
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 10))
except ValueError:
return jsonify({'error': 'Invalid pagination parameters'}), 400
query = PendingPassage.query.paginate(page=page, per_page=per_page, error_out=False)
passages = query.items
return jsonify([{
'id': p.id,
'title': p.title,
'content': p.content,
'image_url': p.image_url,
'authors': p.authors,
'created_at': p.created_at.isoformat(),
'sender': {
'id': p.sender.id,
'username': p.sender.username
},
'has_next': query.has_next
} for p in passages]), 200
@bp_api_v1.route('/pending_passages/approve/<int:passage_id>', methods=['POST'])
@jwt_required()
def approve_passage(passage_id):
role = get_jwt()['role']
if role != UserRole.ADMIN.value:
return jsonify({'error': 'Admin access required'}), 403
passage = PendingPassage.query.get(passage_id)
if not passage:
return jsonify({'error': 'Pending passage not found'}), 404
new_passage = Passage(
sender_id=passage.sender_id,
processor_id=get_jwt_identity(),
title=passage.title,
content=passage.content,
image_url=passage.image_url,
authors=passage.authors,
is_outlink=passage.is_outlink
)
db.session.add(new_passage)
db.session.delete(passage)
db.session.commit()
return jsonify({'message': 'Passage approved successfully'}), 200
@bp_api_v1.route('/pending_passages/reject/<int:passage_id>', methods=['POST'])
@jwt_required()
def reject_passage(passage_id):
role = get_jwt()['role']
if role != UserRole.ADMIN.value:
return jsonify({'error': 'Admin access required'}), 403
passage = PendingPassage.query.get(passage_id)
if not passage:
return jsonify({'error': 'Pending passage not found'}), 404
db.session.delete(passage)
db.session.commit()
return jsonify({'message': 'Passage rejected successfully'}), 200
@bp_api_v1.route('/update_passage/<int:passage_id>', methods=['POST'])
@jwt_required()
def update_passage(passage_id):
role = get_jwt()['role']
if role != UserRole.ADMIN.value:
return jsonify({'error': 'Admin access required'}), 403
passage = Passage.query.get(passage_id)
if not passage:
return jsonify({'error': 'Passage not found'}), 404
data = request.get_json()
title = data.get('title')
content = data.get('content')
image_url = data.get('image_url')
authors = data.get('authors')
is_outlink = data.get('is_outlink')
if title and (len(title) == 0 or len(title) > 200):
return jsonify({'error': 'Invalid title'}), 400
if content and len(content) == 0:
return jsonify({'error': 'Invalid content'}), 400
if image_url and len(image_url) > 500:
return jsonify({'error': 'Image URL is too long'}), 400
if authors and (len(authors) == 0 or len(authors) > 60):
return jsonify({'error': 'Invalid authors'}), 400
if title:
passage.title = title
if content:
passage.content = content
if image_url is not None:
passage.image_url = image_url
if authors:
passage.authors = authors
if is_outlink is not None:
passage.is_outlink = is_outlink
db.session.commit()
return jsonify({'message': 'Passage updated successfully'}), 200
@bp_api_v1.route('/delete_passage/<int:passage_id>', methods=['POST'])
@jwt_required()
def delete_passage(passage_id):
role = get_jwt()['role']
if role != UserRole.ADMIN.value:
return jsonify({'error': 'Admin access required'}), 403
passage = Passage.query.get(passage_id)
if not passage:
return jsonify({'error': 'Passage not found'}), 404
db.session.delete(passage)
db.session.commit()
return jsonify({'message': 'Passage deleted successfully'}), 200
@bp_api_v1.route('/update_pending_passage/<int:passage_id>', methods=['POST'])
@jwt_required()
def update_pending_passage(passage_id):
role = get_jwt()['role']
if role != UserRole.ADMIN.value and (get_jwt_identity() != PendingPassage.query.get(passage_id).sender_id):
return jsonify({'error': 'Permission denied'}), 403
passage = PendingPassage.query.get(passage_id)
if not passage:
return jsonify({'error': 'Pending passage not found'}), 404
data = request.get_json()
title = data.get('title')
content = data.get('content')
image_url = data.get('image_url')
authors = data.get('authors')
is_outlink = data.get('is_outlink')
if title and (len(title) == 0 or len(title) > 200):
return jsonify({'error': 'Invalid title'}), 400
if content and len(content) == 0:
return jsonify({'error': 'Invalid content'}), 400
if image_url and len(image_url) > 500:
return jsonify({'error': 'Image URL is too long'}), 400
if authors and (len(authors) == 0 or len(authors) > 60):
return jsonify({'error': 'Invalid authors'}), 400
if title:
passage.title = title
if content:
passage.content = content
if image_url is not None:
passage.image_url = image_url
if authors:
passage.authors = authors
if is_outlink is not None:
passage.is_outlink = is_outlink
db.session.commit()
return jsonify({'message': 'Pending passage updated successfully'}), 200
@bp_api_v1.route('/delete_pending_passage/<int:passage_id>', methods=['POST'])
@jwt_required()
def delete_pending_passage(passage_id):
role = get_jwt()['role']
if role != UserRole.ADMIN.value and (get_jwt_identity() != PendingPassage.query.get(passage_id).sender_id):
return jsonify({'error': 'Permission denied'}), 403
passage = PendingPassage.query.get(passage_id)
if not passage:
return jsonify({'error': 'Pending passage not found'}), 404
db.session.delete(passage)
db.session.commit()
return jsonify({'message': 'Pending passage deleted successfully'}), 200
@bp_api_v1.route('/create_paper', methods=['POST'])
@jwt_required()
def create_paper():
role = get_jwt()['role']
if role != UserRole.ADMIN.value:
return jsonify({'error': 'Admin access required'}), 403
data = request.get_json()
title = data.get('title')
description = data.get('description')
image_url = data.get('image_url')
passage_ids = data.get('passage_ids', [])
if not title or len(title) == 0 or len(title) > 200:
return jsonify({'error': 'Invalid title'}), 400
if description and len(description) > 500:
return jsonify({'error': 'Description is too long'}), 400
if image_url and len(image_url) > 500:
return jsonify({'error': 'Image URL is too long'}), 400
relations = []
for pid in passage_ids:
passage = Passage.query.get(pid)
if passage:
relation = PaperPassagesRelation(paper_id=new_paper.id, passage_id=pid)
relations.append(relation)
passage.is_posted = True
else:
db.session.rollback()
return jsonify({'error': f'Passage with id {pid} not found'}), 400
new_paper = Paper(
title=title,
description=description,
image_url=image_url
)
db.session.add(new_paper)
db.session.commit()
for relation in relations:
db.session.add(relation)
db.session.commit()
return jsonify({'message': 'Paper created successfully'}), 201
@bp_api_v1.route('/delete_paper/<int:paper_id>', methods=['POST'])
@jwt_required()
def delete_paper(paper_id):
role = get_jwt()['role']
if role != UserRole.ADMIN.value:
return jsonify({'error': 'Admin access required'}), 403
paper = Paper.query.get(paper_id)
if not paper:
return jsonify({'error': 'Paper not found'}), 404
relations = PaperPassagesRelation.query.filter_by(paper_id=paper_id).all()
for relation in relations:
passage = Passage.query.get(relation.passage_id)
if passage:
if PaperPassagesRelation.query.filter_by(passage_id=passage.id).count() == 1:
passage.is_posted = False
db.session.delete(relation)
db.session.delete(paper)
db.session.commit()
return jsonify({'message': 'Paper deleted successfully'}), 200
@bp_api_v1.route('/update_paper/<int:paper_id>', methods=['POST'])
@jwt_required()
def update_paper(paper_id):
role = get_jwt()['role']
if role != UserRole.ADMIN.value:
return jsonify({'error': 'Admin access required'}), 403
paper = Paper.query.get(paper_id)
if not paper:
return jsonify({'error': 'Paper not found'}), 404
data = request.get_json()
title = data.get('title')
description = data.get('description')
image_url = data.get('image_url')
passage_ids = data.get('passage_ids')
if title and (len(title) == 0 or len(title) > 200):
return jsonify({'error': 'Invalid title'}), 400
if description and len(description) > 500:
return jsonify({'error': 'Description is too long'}), 400
if image_url and len(image_url) > 500:
return jsonify({'error': 'Image URL is too long'}), 400
add_relations = []
remove_relations = []
if passage_ids is not None:
existing_relations = PaperPassagesRelation.query.filter_by(paper_id=paper_id).all()
existing_passage_ids = {r.passage_id for r in existing_relations}
new_passage_ids = set(passage_ids)
for pid in new_passage_ids:
if pid not in existing_passage_ids:
passage = Passage.query.get(pid)
if passage:
add_relations.append(PaperPassagesRelation(paper_id=paper_id, passage_id=pid))
passage.is_posted = True
else:
db.session.rollback()
return jsonify({'error': f'Passage with id {pid} not found'}), 400
for relation in existing_relations:
if relation.passage_id not in new_passage_ids:
remove_relations.append(relation)
passage = Passage.query.get(relation.passage_id)
if passage and PaperPassagesRelation.query.filter_by(passage_id=passage.id).count() == 1:
passage.is_posted = False
if title:
paper.title = title
if description is not None:
paper.description = description
if image_url is not None:
paper.image_url = image_url
for relation in add_relations:
db.session.add(relation)
for relation in remove_relations:
db.session.delete(relation)
db.session.commit()
return jsonify({'message': 'Paper updated successfully'}), 200
@bp_api_v1.route('/update_profile', methods=['POST'])
@jwt_required()
def update_profile():
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({'error': 'User not found'}), 404
if user.role != UserRole.ADMIN.value and user.id != current_user_id:
return jsonify({'error': 'Permission denied'}), 403
data = request.get_json()
nickname = data.get('nickname')
phone = data.get('phone')
email = data.get('email')
qq = data.get('qq')
if nickname is not None:
if len(nickname) > 80:
return jsonify({'error': 'Nickname is too long'}), 400
user.nickname = nickname
if phone is not None:
user.phone = phone
if email is not None:
user.email = email
if qq is not None:
user.qq = qq
db.session.commit()
return jsonify({'message': 'Profile updated successfully'}), 200
@bp_api_v1.route('/change_password', methods=['POST'])
@jwt_required()
def change_password():
cleanup_sessions()
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({'error': 'User not found'}), 404
data = request.get_json()
session_id = data.get('session_id')
if not session_id or session_id not in sessions:
return jsonify({'error': 'Invalid session_id'}), 400
old_password_encrypted = data.get('old_password')
new_password_encrypted = data.get('new_password')
if not old_password_encrypted or not new_password_encrypted:
return jsonify({'error': 'Old and new passwords are required'}), 400
private_key = sessions[session_id]['private_key']
try:
old_password_decrypted = private_key.decrypt(bytes.fromhex(old_password_encrypted)).decode('utf-8')
new_password_decrypted = private_key.decrypt(bytes.fromhex(new_password_encrypted)).decode('utf-8')
except Exception:
return jsonify({'error': 'Failed to decrypt passwords'}), 400
if not check_password_hash(user.password, old_password_decrypted):
return jsonify({'error': 'Incorrect old password'}), 401
sessions.pop(session_id, None)
user.password = generate_password_hash(new_password_decrypted)
db.session.commit()
return jsonify({'message': 'Password changed successfully'}), 200
@bp_api_v1.route('/admin/reset_password/<int:user_id>', methods=['POST'])
@jwt_required()
def admin_reset_password(user_id):
cleanup_sessions()
role = get_jwt()['role']
if role != UserRole.ADMIN.value:
return jsonify({'error': 'Admin access required'}), 403
user = User.query.get(user_id)
if not user:
return jsonify({'error': 'User not found'}), 404
data = request.get_json()
session_id = data.get('session_id')
if not session_id or session_id not in sessions:
return jsonify({'error': 'Invalid session_id'}), 400
new_password_encrypted = data.get('new_password')
if not new_password_encrypted:
return jsonify({'error': 'New password is required'}), 400
private_key = sessions[session_id]['private_key']
try:
new_password_decrypted = private_key.decrypt(bytes.fromhex(new_password_encrypted)).decode('utf-8')
except Exception:
return jsonify({'error': 'Failed to decrypt password'}), 400
sessions.pop(session_id, None)
user.password = generate_password_hash(new_password_decrypted)
db.session.commit()
return jsonify({'message': 'Password reset successfully'}), 200