forked from ITStudio_OUC/OucPosts
711 lines
23 KiB
Python
711 lines
23 KiB
Python
from datetime import datetime, timedelta
|
|
from flask import Blueprint, request, jsonify
|
|
from flask_jwt_extended import create_access_token, jwt_required, get_jwt_identity, get_jwt
|
|
from Crypto.PublicKey import RSA
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
import uuid
|
|
from models import db, User, UserRole, PendingPassage, Passage, Paper, PaperPassagesRelation
|
|
|
|
bp_api_v1 = Blueprint('api/v1', __name__)
|
|
SESSION_EXPIRATION = timedelta(minutes=5)
|
|
sessions = {}
|
|
|
|
|
|
def cleanup_sessions():
|
|
now = datetime.now()
|
|
expired_keys = [sid for sid, info in sessions.items() if now - info['created_at'] > SESSION_EXPIRATION]
|
|
for sid in expired_keys:
|
|
sessions.pop(sid, None)
|
|
|
|
|
|
@bp_api_v1.route('/ping')
|
|
def ping():
|
|
return 'pong'
|
|
|
|
|
|
@bp_api_v1.route('/get_key')
|
|
def get_key():
|
|
cleanup_sessions()
|
|
|
|
session_id = str(uuid.uuid4())
|
|
private_key = RSA.generate(2048)
|
|
sessions[session_id] = {
|
|
'private_key': private_key,
|
|
'created_at': datetime.now()
|
|
}
|
|
|
|
public_key_pem = private_key.publickey().export_key(format='PEM').decode('utf-8')
|
|
return jsonify({'pkey': public_key_pem, 'session_id': session_id}), 200
|
|
|
|
|
|
@bp_api_v1.route('/register', methods=['POST'])
|
|
def register():
|
|
cleanup_sessions()
|
|
data = request.get_json()
|
|
|
|
session_id = data.get('session_id')
|
|
if not session_id or session_id not in sessions:
|
|
return jsonify({'error': 'Invalid session_id'}), 400
|
|
|
|
username = data.get('username')
|
|
if not username or len(username) == 0 or len(username) > 80:
|
|
return jsonify({'error': 'Username is required'}), 400
|
|
if User.query.filter_by(username=username).first():
|
|
return jsonify({'error': 'Username already exists'}), 400
|
|
|
|
nickname = data.get('nickname')
|
|
if nickname and len(nickname) > 80:
|
|
return jsonify({'error': 'Nickname is too long'}), 400
|
|
|
|
phone = data.get('phone')
|
|
email = data.get('email')
|
|
qq = data.get('qq')
|
|
if phone and (len(phone) != 11 or not phone.isdigit()):
|
|
return jsonify({'error': 'Invalid phone number'}), 400
|
|
if email and (not '@' in email or len(email) > 120):
|
|
return jsonify({'error': 'Invalid email address'}), 400
|
|
if qq and (len(qq) < 5 or not qq.isdigit() or len(qq) > 10):
|
|
return jsonify({'error': 'Invalid QQ number'}), 400
|
|
if not phone and not email and not qq:
|
|
return jsonify({'error': 'At least one contact method is required'}), 400
|
|
|
|
password_encrypted = data.get('password')
|
|
if not password_encrypted:
|
|
return jsonify({'error': 'Password is required'}), 400
|
|
|
|
private_key = sessions[session_id]['private_key']
|
|
try:
|
|
password_decrypted = private_key.decrypt(bytes.fromhex(password_encrypted)).decode('utf-8')
|
|
password_hash = generate_password_hash(password_decrypted)
|
|
except Exception:
|
|
return jsonify({'error': 'Failed to decrypt password'}), 400
|
|
|
|
sessions.pop(session_id, None)
|
|
new_user = User(username=username, nickname=nickname, password=password_hash, role=UserRole.USER, phone=phone, email=email, qq=qq)
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'User registered successfully'}), 201
|
|
|
|
|
|
@bp_api_v1.route('/login', methods=['POST'])
|
|
def login():
|
|
cleanup_sessions()
|
|
data = request.get_json()
|
|
|
|
session_id = data.get('session_id')
|
|
if not session_id or session_id not in sessions:
|
|
return jsonify({'error': 'Invalid session_id'}), 400
|
|
|
|
username = data.get('username')
|
|
password_encrypted = data.get('password')
|
|
if not username or not password_encrypted:
|
|
return jsonify({'error': 'Username and password are required'}), 400
|
|
|
|
user = User.query.filter_by(username=username).first()
|
|
if not user:
|
|
return jsonify({'error': 'User has not been registered'}), 401
|
|
|
|
private_key = sessions[session_id]['private_key']
|
|
try:
|
|
password_decrypted = private_key.decrypt(bytes.fromhex(password_encrypted)).decode('utf-8')
|
|
except Exception:
|
|
return jsonify({'error': 'Failed to decrypt password'}), 400
|
|
|
|
if not check_password_hash(user.password, password_decrypted):
|
|
return jsonify({'error': 'Incorrect username or password'}), 401
|
|
|
|
sessions.pop(session_id, None)
|
|
access_token = create_access_token(identity=user.id, expires_delta=timedelta(days=30 * 6), additional_claims={
|
|
'role': user.role.value,
|
|
'username': user.username,
|
|
'phone': user.phone,
|
|
'email': user.email,
|
|
'qq': user.qq
|
|
})
|
|
return jsonify({'access_token': access_token}), 200
|
|
|
|
|
|
@bp_api_v1.route('/profile')
|
|
@jwt_required()
|
|
def profile():
|
|
user_id = get_jwt_identity()
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return jsonify({'error': 'User not found'}), 404
|
|
|
|
return jsonify({
|
|
'username': user.username,
|
|
'role': user.role.value,
|
|
'created_at': user.created_at.isoformat(),
|
|
'phone': user.phone,
|
|
'email': user.email,
|
|
'qq': user.qq
|
|
}), 200
|
|
|
|
|
|
@bp_api_v1.route('/passages/<int:passage_id>', methods=['GET'])
|
|
@jwt_required(optional=True)
|
|
def get_passage(passage_id):
|
|
passage = Passage.query.get(passage_id)
|
|
if not passage:
|
|
return jsonify({'error': 'Passage not found'}), 404
|
|
|
|
if not passage.is_posted:
|
|
jwt_data = get_jwt()
|
|
if not jwt_data or jwt_data.get('role') != UserRole.ADMIN.value:
|
|
return jsonify({'error': 'Passage is not posted yet'}), 403
|
|
|
|
return jsonify({
|
|
'id': passage.id,
|
|
'title': passage.title,
|
|
'content': passage.content,
|
|
'image_url': passage.image_url,
|
|
'authors': passage.authors,
|
|
'created_at': passage.created_at.isoformat(),
|
|
'is_outlink': passage.is_outlink,
|
|
'sender': {
|
|
'id': passage.sender.id,
|
|
'username': passage.sender.username
|
|
}
|
|
}), 200
|
|
|
|
|
|
@bp_api_v1.route('/passages', methods=['GET'])
|
|
@jwt_required()
|
|
def list_passages():
|
|
role = get_jwt()['role']
|
|
if role != UserRole.ADMIN.value:
|
|
return jsonify({'error': 'Admin access required'}), 403
|
|
|
|
try:
|
|
page = int(request.args.get('page', 1))
|
|
per_page = int(request.args.get('per_page', 10))
|
|
except ValueError:
|
|
return jsonify({'error': 'Invalid pagination parameters'}), 400
|
|
|
|
query = Passage.query.paginate(page=page, per_page=per_page, error_out=False)
|
|
passages = query.items
|
|
return jsonify([{
|
|
'id': p.id,
|
|
'title': p.title,
|
|
'image_url': p.image_url,
|
|
'authors': p.authors,
|
|
'created_at': p.created_at.isoformat(),
|
|
'is_outlink': p.is_outlink,
|
|
'sender': {
|
|
'id': p.sender.id,
|
|
'username': p.sender.username
|
|
},
|
|
'has_next': query.has_next
|
|
} for p in passages]), 200
|
|
|
|
|
|
@bp_api_v1.route('/papers/<int:paper_id>', methods=['GET'])
|
|
def get_paper(paper_id):
|
|
paper = Paper.query.get(paper_id)
|
|
if not paper:
|
|
return jsonify({'error': 'Paper not found'}), 404
|
|
|
|
return jsonify({
|
|
'id': paper.id,
|
|
'title': paper.title,
|
|
'description': paper.description,
|
|
'image_url': paper.image_url,
|
|
'created_at': paper.created_at.isoformat(),
|
|
'passages': [p.id for p in paper.passages]
|
|
}), 200
|
|
|
|
|
|
@bp_api_v1.route('/papers', methods=['GET'])
|
|
def list_papers():
|
|
try:
|
|
page = int(request.args.get('page', 1))
|
|
per_page = int(request.args.get('per_page', 10))
|
|
except ValueError:
|
|
return jsonify({'error': 'Invalid pagination parameters'}), 400
|
|
|
|
query = Paper.query.paginate(page=page, per_page=per_page, error_out=False)
|
|
papers = query.items
|
|
return jsonify([{
|
|
'id': p.id,
|
|
'title': p.title,
|
|
'description': p.description,
|
|
'image_url': p.image_url,
|
|
'created_at': p.created_at.isoformat(),
|
|
'has_next': query.has_next
|
|
} for p in papers]), 200
|
|
|
|
|
|
@bp_api_v1.route('/create_passage', methods=['POST'])
|
|
@jwt_required()
|
|
def create_passage():
|
|
user_id = get_jwt_identity()
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return jsonify({'error': 'User not found'}), 404
|
|
|
|
data = request.get_json()
|
|
title = data.get('title')
|
|
content = data.get('content')
|
|
image_url = data.get('image_url')
|
|
authors = data.get('authors')
|
|
is_outlink = data.get('is_outlink', False)
|
|
|
|
if not title or len(title) == 0 or len(title) > 200:
|
|
return jsonify({'error': 'Invalid title'}), 400
|
|
if not content or len(content) == 0:
|
|
return jsonify({'error': 'Invalid content'}), 400
|
|
if image_url and len(image_url) > 500:
|
|
return jsonify({'error': 'Image URL is too long'}), 400
|
|
if not authors or len(authors) == 0 or len(authors) > 60:
|
|
return jsonify({'error': 'Invalid authors'}), 400
|
|
|
|
new_passage = PendingPassage(
|
|
sender_id=user.id,
|
|
title=title,
|
|
content=content,
|
|
image_url=image_url,
|
|
authors=authors,
|
|
is_outlink=is_outlink
|
|
)
|
|
db.session.add(new_passage)
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Passage submitted successfully'}), 201
|
|
|
|
|
|
@bp_api_v1.route('/pending_passages', methods=['GET'])
|
|
@jwt_required()
|
|
def list_pending_passages():
|
|
role = get_jwt()['role']
|
|
if role != UserRole.ADMIN.value:
|
|
return jsonify({'error': 'Admin access required'}), 403
|
|
|
|
try:
|
|
page = int(request.args.get('page', 1))
|
|
per_page = int(request.args.get('per_page', 10))
|
|
except ValueError:
|
|
return jsonify({'error': 'Invalid pagination parameters'}), 400
|
|
|
|
query = PendingPassage.query.paginate(page=page, per_page=per_page, error_out=False)
|
|
passages = query.items
|
|
return jsonify([{
|
|
'id': p.id,
|
|
'title': p.title,
|
|
'content': p.content,
|
|
'image_url': p.image_url,
|
|
'authors': p.authors,
|
|
'created_at': p.created_at.isoformat(),
|
|
'sender': {
|
|
'id': p.sender.id,
|
|
'username': p.sender.username
|
|
},
|
|
'has_next': query.has_next
|
|
} for p in passages]), 200
|
|
|
|
|
|
@bp_api_v1.route('/pending_passages/approve/<int:passage_id>', methods=['POST'])
|
|
@jwt_required()
|
|
def approve_passage(passage_id):
|
|
role = get_jwt()['role']
|
|
if role != UserRole.ADMIN.value:
|
|
return jsonify({'error': 'Admin access required'}), 403
|
|
|
|
passage = PendingPassage.query.get(passage_id)
|
|
if not passage:
|
|
return jsonify({'error': 'Pending passage not found'}), 404
|
|
|
|
new_passage = Passage(
|
|
sender_id=passage.sender_id,
|
|
processor_id=get_jwt_identity(),
|
|
title=passage.title,
|
|
content=passage.content,
|
|
image_url=passage.image_url,
|
|
authors=passage.authors,
|
|
is_outlink=passage.is_outlink
|
|
)
|
|
db.session.add(new_passage)
|
|
db.session.delete(passage)
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Passage approved successfully'}), 200
|
|
|
|
|
|
@bp_api_v1.route('/pending_passages/reject/<int:passage_id>', methods=['POST'])
|
|
@jwt_required()
|
|
def reject_passage(passage_id):
|
|
role = get_jwt()['role']
|
|
if role != UserRole.ADMIN.value:
|
|
return jsonify({'error': 'Admin access required'}), 403
|
|
|
|
passage = PendingPassage.query.get(passage_id)
|
|
if not passage:
|
|
return jsonify({'error': 'Pending passage not found'}), 404
|
|
|
|
db.session.delete(passage)
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Passage rejected successfully'}), 200
|
|
|
|
|
|
@bp_api_v1.route('/update_passage/<int:passage_id>', methods=['POST'])
|
|
@jwt_required()
|
|
def update_passage(passage_id):
|
|
role = get_jwt()['role']
|
|
if role != UserRole.ADMIN.value:
|
|
return jsonify({'error': 'Admin access required'}), 403
|
|
|
|
passage = Passage.query.get(passage_id)
|
|
if not passage:
|
|
return jsonify({'error': 'Passage not found'}), 404
|
|
|
|
data = request.get_json()
|
|
title = data.get('title')
|
|
content = data.get('content')
|
|
image_url = data.get('image_url')
|
|
authors = data.get('authors')
|
|
is_outlink = data.get('is_outlink')
|
|
|
|
if title and (len(title) == 0 or len(title) > 200):
|
|
return jsonify({'error': 'Invalid title'}), 400
|
|
if content and len(content) == 0:
|
|
return jsonify({'error': 'Invalid content'}), 400
|
|
if image_url and len(image_url) > 500:
|
|
return jsonify({'error': 'Image URL is too long'}), 400
|
|
if authors and (len(authors) == 0 or len(authors) > 60):
|
|
return jsonify({'error': 'Invalid authors'}), 400
|
|
|
|
if title:
|
|
passage.title = title
|
|
if content:
|
|
passage.content = content
|
|
if image_url is not None:
|
|
passage.image_url = image_url
|
|
if authors:
|
|
passage.authors = authors
|
|
if is_outlink is not None:
|
|
passage.is_outlink = is_outlink
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Passage updated successfully'}), 200
|
|
|
|
|
|
@bp_api_v1.route('/delete_passage/<int:passage_id>', methods=['POST'])
|
|
@jwt_required()
|
|
def delete_passage(passage_id):
|
|
role = get_jwt()['role']
|
|
if role != UserRole.ADMIN.value:
|
|
return jsonify({'error': 'Admin access required'}), 403
|
|
|
|
passage = Passage.query.get(passage_id)
|
|
if not passage:
|
|
return jsonify({'error': 'Passage not found'}), 404
|
|
|
|
db.session.delete(passage)
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Passage deleted successfully'}), 200
|
|
|
|
@bp_api_v1.route('/update_pending_passage/<int:passage_id>', methods=['POST'])
|
|
@jwt_required()
|
|
def update_pending_passage(passage_id):
|
|
role = get_jwt()['role']
|
|
if role != UserRole.ADMIN.value and (get_jwt_identity() != PendingPassage.query.get(passage_id).sender_id):
|
|
return jsonify({'error': 'Permission denied'}), 403
|
|
|
|
passage = PendingPassage.query.get(passage_id)
|
|
if not passage:
|
|
return jsonify({'error': 'Pending passage not found'}), 404
|
|
|
|
data = request.get_json()
|
|
title = data.get('title')
|
|
content = data.get('content')
|
|
image_url = data.get('image_url')
|
|
authors = data.get('authors')
|
|
is_outlink = data.get('is_outlink')
|
|
|
|
if title and (len(title) == 0 or len(title) > 200):
|
|
return jsonify({'error': 'Invalid title'}), 400
|
|
if content and len(content) == 0:
|
|
return jsonify({'error': 'Invalid content'}), 400
|
|
if image_url and len(image_url) > 500:
|
|
return jsonify({'error': 'Image URL is too long'}), 400
|
|
if authors and (len(authors) == 0 or len(authors) > 60):
|
|
return jsonify({'error': 'Invalid authors'}), 400
|
|
|
|
if title:
|
|
passage.title = title
|
|
if content:
|
|
passage.content = content
|
|
if image_url is not None:
|
|
passage.image_url = image_url
|
|
if authors:
|
|
passage.authors = authors
|
|
if is_outlink is not None:
|
|
passage.is_outlink = is_outlink
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Pending passage updated successfully'}), 200
|
|
|
|
|
|
@bp_api_v1.route('/delete_pending_passage/<int:passage_id>', methods=['POST'])
|
|
@jwt_required()
|
|
def delete_pending_passage(passage_id):
|
|
role = get_jwt()['role']
|
|
if role != UserRole.ADMIN.value and (get_jwt_identity() != PendingPassage.query.get(passage_id).sender_id):
|
|
return jsonify({'error': 'Permission denied'}), 403
|
|
|
|
passage = PendingPassage.query.get(passage_id)
|
|
if not passage:
|
|
return jsonify({'error': 'Pending passage not found'}), 404
|
|
|
|
db.session.delete(passage)
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Pending passage deleted successfully'}), 200
|
|
|
|
|
|
@bp_api_v1.route('/create_paper', methods=['POST'])
|
|
@jwt_required()
|
|
def create_paper():
|
|
role = get_jwt()['role']
|
|
if role != UserRole.ADMIN.value:
|
|
return jsonify({'error': 'Admin access required'}), 403
|
|
|
|
data = request.get_json()
|
|
title = data.get('title')
|
|
description = data.get('description')
|
|
image_url = data.get('image_url')
|
|
passage_ids = data.get('passage_ids', [])
|
|
|
|
if not title or len(title) == 0 or len(title) > 200:
|
|
return jsonify({'error': 'Invalid title'}), 400
|
|
if description and len(description) > 500:
|
|
return jsonify({'error': 'Description is too long'}), 400
|
|
if image_url and len(image_url) > 500:
|
|
return jsonify({'error': 'Image URL is too long'}), 400
|
|
|
|
relations = []
|
|
for pid in passage_ids:
|
|
passage = Passage.query.get(pid)
|
|
if passage:
|
|
relation = PaperPassagesRelation(paper_id=new_paper.id, passage_id=pid)
|
|
relations.append(relation)
|
|
passage.is_posted = True
|
|
else:
|
|
db.session.rollback()
|
|
return jsonify({'error': f'Passage with id {pid} not found'}), 400
|
|
|
|
|
|
new_paper = Paper(
|
|
title=title,
|
|
description=description,
|
|
image_url=image_url
|
|
)
|
|
db.session.add(new_paper)
|
|
db.session.commit()
|
|
|
|
for relation in relations:
|
|
db.session.add(relation)
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Paper created successfully'}), 201
|
|
|
|
|
|
@bp_api_v1.route('/delete_paper/<int:paper_id>', methods=['POST'])
|
|
@jwt_required()
|
|
def delete_paper(paper_id):
|
|
role = get_jwt()['role']
|
|
if role != UserRole.ADMIN.value:
|
|
return jsonify({'error': 'Admin access required'}), 403
|
|
|
|
paper = Paper.query.get(paper_id)
|
|
if not paper:
|
|
return jsonify({'error': 'Paper not found'}), 404
|
|
|
|
relations = PaperPassagesRelation.query.filter_by(paper_id=paper_id).all()
|
|
for relation in relations:
|
|
passage = Passage.query.get(relation.passage_id)
|
|
if passage:
|
|
if PaperPassagesRelation.query.filter_by(passage_id=passage.id).count() == 1:
|
|
passage.is_posted = False
|
|
db.session.delete(relation)
|
|
|
|
db.session.delete(paper)
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Paper deleted successfully'}), 200
|
|
|
|
|
|
@bp_api_v1.route('/update_paper/<int:paper_id>', methods=['POST'])
|
|
@jwt_required()
|
|
def update_paper(paper_id):
|
|
role = get_jwt()['role']
|
|
if role != UserRole.ADMIN.value:
|
|
return jsonify({'error': 'Admin access required'}), 403
|
|
|
|
paper = Paper.query.get(paper_id)
|
|
if not paper:
|
|
return jsonify({'error': 'Paper not found'}), 404
|
|
|
|
data = request.get_json()
|
|
title = data.get('title')
|
|
description = data.get('description')
|
|
image_url = data.get('image_url')
|
|
passage_ids = data.get('passage_ids')
|
|
|
|
if title and (len(title) == 0 or len(title) > 200):
|
|
return jsonify({'error': 'Invalid title'}), 400
|
|
if description and len(description) > 500:
|
|
return jsonify({'error': 'Description is too long'}), 400
|
|
if image_url and len(image_url) > 500:
|
|
return jsonify({'error': 'Image URL is too long'}), 400
|
|
|
|
add_relations = []
|
|
remove_relations = []
|
|
if passage_ids is not None:
|
|
existing_relations = PaperPassagesRelation.query.filter_by(paper_id=paper_id).all()
|
|
existing_passage_ids = {r.passage_id for r in existing_relations}
|
|
new_passage_ids = set(passage_ids)
|
|
|
|
for pid in new_passage_ids:
|
|
if pid not in existing_passage_ids:
|
|
passage = Passage.query.get(pid)
|
|
if passage:
|
|
add_relations.append(PaperPassagesRelation(paper_id=paper_id, passage_id=pid))
|
|
passage.is_posted = True
|
|
else:
|
|
db.session.rollback()
|
|
return jsonify({'error': f'Passage with id {pid} not found'}), 400
|
|
|
|
for relation in existing_relations:
|
|
if relation.passage_id not in new_passage_ids:
|
|
remove_relations.append(relation)
|
|
passage = Passage.query.get(relation.passage_id)
|
|
if passage and PaperPassagesRelation.query.filter_by(passage_id=passage.id).count() == 1:
|
|
passage.is_posted = False
|
|
|
|
if title:
|
|
paper.title = title
|
|
if description is not None:
|
|
paper.description = description
|
|
if image_url is not None:
|
|
paper.image_url = image_url
|
|
|
|
for relation in add_relations:
|
|
db.session.add(relation)
|
|
for relation in remove_relations:
|
|
db.session.delete(relation)
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Paper updated successfully'}), 200
|
|
|
|
|
|
@bp_api_v1.route('/update_profile', methods=['POST'])
|
|
@jwt_required()
|
|
def update_profile():
|
|
current_user_id = get_jwt_identity()
|
|
user = User.query.get(current_user_id)
|
|
if not user:
|
|
return jsonify({'error': 'User not found'}), 404
|
|
|
|
if user.role != UserRole.ADMIN.value and user.id != current_user_id:
|
|
return jsonify({'error': 'Permission denied'}), 403
|
|
|
|
data = request.get_json()
|
|
nickname = data.get('nickname')
|
|
phone = data.get('phone')
|
|
email = data.get('email')
|
|
qq = data.get('qq')
|
|
|
|
if nickname is not None:
|
|
if len(nickname) > 80:
|
|
return jsonify({'error': 'Nickname is too long'}), 400
|
|
user.nickname = nickname
|
|
|
|
if phone is not None:
|
|
user.phone = phone
|
|
if email is not None:
|
|
user.email = email
|
|
if qq is not None:
|
|
user.qq = qq
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Profile updated successfully'}), 200
|
|
|
|
|
|
@bp_api_v1.route('/change_password', methods=['POST'])
|
|
@jwt_required()
|
|
def change_password():
|
|
cleanup_sessions()
|
|
current_user_id = get_jwt_identity()
|
|
user = User.query.get(current_user_id)
|
|
if not user:
|
|
return jsonify({'error': 'User not found'}), 404
|
|
|
|
data = request.get_json()
|
|
|
|
session_id = data.get('session_id')
|
|
if not session_id or session_id not in sessions:
|
|
return jsonify({'error': 'Invalid session_id'}), 400
|
|
|
|
old_password_encrypted = data.get('old_password')
|
|
new_password_encrypted = data.get('new_password')
|
|
if not old_password_encrypted or not new_password_encrypted:
|
|
return jsonify({'error': 'Old and new passwords are required'}), 400
|
|
|
|
private_key = sessions[session_id]['private_key']
|
|
try:
|
|
old_password_decrypted = private_key.decrypt(bytes.fromhex(old_password_encrypted)).decode('utf-8')
|
|
new_password_decrypted = private_key.decrypt(bytes.fromhex(new_password_encrypted)).decode('utf-8')
|
|
except Exception:
|
|
return jsonify({'error': 'Failed to decrypt passwords'}), 400
|
|
|
|
if not check_password_hash(user.password, old_password_decrypted):
|
|
return jsonify({'error': 'Incorrect old password'}), 401
|
|
|
|
sessions.pop(session_id, None)
|
|
user.password = generate_password_hash(new_password_decrypted)
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Password changed successfully'}), 200
|
|
|
|
|
|
@bp_api_v1.route('/admin/reset_password/<int:user_id>', methods=['POST'])
|
|
@jwt_required()
|
|
def admin_reset_password(user_id):
|
|
cleanup_sessions()
|
|
role = get_jwt()['role']
|
|
if role != UserRole.ADMIN.value:
|
|
return jsonify({'error': 'Admin access required'}), 403
|
|
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return jsonify({'error': 'User not found'}), 404
|
|
|
|
data = request.get_json()
|
|
|
|
session_id = data.get('session_id')
|
|
if not session_id or session_id not in sessions:
|
|
return jsonify({'error': 'Invalid session_id'}), 400
|
|
|
|
new_password_encrypted = data.get('new_password')
|
|
if not new_password_encrypted:
|
|
return jsonify({'error': 'New password is required'}), 400
|
|
|
|
private_key = sessions[session_id]['private_key']
|
|
try:
|
|
new_password_decrypted = private_key.decrypt(bytes.fromhex(new_password_encrypted)).decode('utf-8')
|
|
except Exception:
|
|
return jsonify({'error': 'Failed to decrypt password'}), 400
|
|
|
|
sessions.pop(session_id, None)
|
|
user.password = generate_password_hash(new_password_decrypted)
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Password reset successfully'}), 200 |