Files
zlqy/dontshushme/app(4).py
2026-02-27 10:37:11 +08:00

2067 lines
76 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# app.py
import os
import time
import json
import random
import string
import smtplib
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from functools import wraps
from flask import (
Flask, render_template, request, jsonify, session,
redirect, url_for, flash, abort
)
from dotenv import load_dotenv
from captcha.image import ImageCaptcha
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
# 引入所有数据库模型
from models import db, User, Exam, Submission, Draft, Contest, ContestMembership, Post, Reply, Poll, Report, Bookmark, Reaction, Notification, EditHistory, ContestRegistration, TeacherApplication, Friend, ExamBookmark
load_dotenv()
app = Flask(__name__)
app.secret_key = os.urandom(24)
# 内存存储(用于临时验证码)
captcha_store = {}
email_codes = {}
# 数据库配置
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
# 阿里云号码认证服务客户端
acs_client = AcsClient(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID', ''),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET', ''),
'cn-hangzhou'
)
# ========== 辅助函数 ==========
def generate_captcha_id():
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=16))
def generate_code(length=6):
return ''.join(random.choices(string.digits, k=length))
def login_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if 'user' not in session:
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated
def teacher_required(f):
@wraps(f)
def decorated(*args, **kwargs):
user = session.get('user')
if not user:
return redirect(url_for('login'))
if user.get('role') != 'teacher':
return redirect(url_for('exam_list'))
return f(*args, **kwargs)
return decorated
def send_email(to_email, subject, html_content):
smtp_host = os.getenv('SMTP_HOST', 'smtp.163.com')
smtp_port = int(os.getenv('SMTP_PORT', 465))
smtp_user = os.getenv('SMTP_USER', '')
smtp_pass = os.getenv('SMTP_PASS', '')
msg = MIMEMultipart('alternative')
msg['From'] = f'"验证码" <{smtp_user}>'
msg['To'] = to_email
msg['Subject'] = subject
msg.attach(MIMEText(html_content, 'html', 'utf-8'))
with smtplib.SMTP_SSL(smtp_host, smtp_port) as server:
server.login(smtp_user, smtp_pass)
server.sendmail(smtp_user, to_email, msg.as_string())
def get_current_user():
return session.get('user')
def require_login():
user = get_current_user()
if not user:
abort(401)
return user
def require_admin_or_teacher():
user = require_login()
if user['role'] not in ['admin', 'teacher']:
abort(403)
return user
# 积分等级相关
LEVEL_TITLES = {1:'新手上路',2:'初出茅庐',3:'小有名气',4:'渐入佳境',5:'驾轻就熟',6:'炉火纯青',7:'学富五车',8:'出类拔萃',9:'登峰造极',10:'一代宗师'}
def calc_level(points):
if points >= 5000: return 10
if points >= 3000: return 9
if points >= 2000: return 8
if points >= 1200: return 7
if points >= 800: return 6
if points >= 500: return 5
if points >= 300: return 4
if points >= 150: return 3
if points >= 50: return 2
return 1
def add_notification(user_id, ntype, content, from_user='', post_id=0):
notif = Notification(
user_id=user_id,
type=ntype,
content=content,
from_user=from_user,
post_id=post_id
)
db.session.add(notif)
db.session.commit()
# ========== 杯赛讨论区发帖权限检查 ==========
def can_post_in_contest(user, contest):
"""检查用户是否可以在指定杯赛的讨论区发帖"""
if user['role'] in ['admin', 'teacher']:
return True
# 杯赛负责人或老师
membership = ContestMembership.query.filter_by(user_id=user['id'], contest_id=contest.id).first()
if membership and membership.role in ['owner', 'teacher']:
return True
# 报名且至少参与一次考试
registration = ContestRegistration.query.filter_by(user_id=user['id'], contest_id=contest.id).first()
if not registration:
return False
# 获取该杯赛下的所有考试 ID
exam_ids = [exam.id for exam in contest.exams]
if not exam_ids:
return False
# 查询用户在这些考试中的提交记录
submission_count = Submission.query.filter(
Submission.user_id == user['id'],
Submission.exam_id.in_(exam_ids)
).count()
return submission_count >= 1
# ========== 上下文处理器 ==========
@app.context_processor
def inject_user():
return {'user': get_current_user()}
# ========== 页面路由 ==========
@app.route('/')
def home():
return render_template('home.html')
@app.route('/login', methods=['GET'])
def login():
return render_template('login.html')
@app.route('/register', methods=['GET'])
def register():
return render_template('register.html')
@app.route('/logout')
def logout():
session.pop('user', None)
return redirect(url_for('login'))
@app.route('/contests')
def contest_list():
return render_template('contest_list.html')
@app.route('/contests/<int:contest_id>')
def contest_detail(contest_id):
contest = Contest.query.get(contest_id)
if not contest:
return redirect(url_for('contest_list'))
user = get_current_user()
registered = False
can_post = False
if user:
# 检查是否报名
registered = ContestRegistration.query.filter_by(user_id=user['id'], contest_id=contest_id).first() is not None
# 检查是否有发帖权限
can_post = can_post_in_contest(user, contest)
return render_template('contest_detail.html', contest=contest, registered=registered, can_post=can_post)
@app.route('/exams')
@login_required
def exam_list():
user = session.get('user')
search_query = request.args.get('q', '').strip()
subject_filter = request.args.get('subject', '').strip()
query = Exam.query
if subject_filter:
query = query.filter_by(subject=subject_filter)
if search_query:
query = query.filter(
(Exam.title.contains(search_query)) | (Exam.subject.contains(search_query))
)
all_exams = query.all()
all_subjects = db.session.query(Exam.subject).distinct().all()
all_subjects = [s[0] for s in all_subjects if s[0]]
user_submissions = {}
for sub in Submission.query.filter_by(user_id=user.get('id')).all():
user_submissions[sub.exam_id] = {
'id': sub.id,
'graded': sub.graded,
'score': sub.score
}
return render_template(
'exam_list.html',
exams=all_exams,
user_submissions=user_submissions,
search_query=search_query,
subject_filter=subject_filter,
all_subjects=all_subjects
)
@app.route('/exams/create')
@teacher_required
def exam_create():
return render_template('exam_create.html')
@app.route('/exams/<int:exam_id>')
@login_required
def exam_detail(exam_id):
exam = Exam.query.get(exam_id)
if not exam:
return redirect(url_for('exam_list'))
user = session.get('user')
existing = Submission.query.filter_by(exam_id=exam_id, user_id=user.get('id')).first()
draft = Draft.query.filter_by(exam_id=exam_id, user_id=user.get('id')).first()
return render_template('exam_detail.html', exam=exam, existing_submission=existing, draft=draft)
@app.route('/exams/<int:exam_id>/result')
@login_required
def exam_result(exam_id):
exam = Exam.query.get(exam_id)
if not exam:
return redirect(url_for('exam_list'))
user = session.get('user')
submission = Submission.query.filter_by(exam_id=exam_id, user_id=user.get('id')).first()
if not submission:
return redirect(url_for('exam_detail', exam_id=exam_id))
return render_template('exam_result.html', exam=exam, submission=submission)
@app.route('/exams/<int:exam_id>/submissions')
@teacher_required
def exam_submissions(exam_id):
exam = Exam.query.get(exam_id)
if not exam:
return redirect(url_for('exam_list'))
subs = Submission.query.filter_by(exam_id=exam_id).all()
stats = {}
if subs:
graded_subs = [s for s in subs if s.graded]
scores = [s.score for s in graded_subs] if graded_subs else []
stats = {
'total': len(subs),
'graded': len(graded_subs),
'ungraded': len(subs) - len(graded_subs),
'avg_score': round(sum(scores) / len(scores), 1) if scores else 0,
'max_score': max(scores) if scores else 0,
'min_score': min(scores) if scores else 0,
}
next_ungraded = None
for s in subs:
if not s.graded:
next_ungraded = s.id
break
return render_template('exam_submissions.html', exam=exam, submissions=subs, stats=stats, next_ungraded=next_ungraded)
@app.route('/exams/<int:exam_id>/grade/<int:sub_id>')
@teacher_required
def exam_grade(exam_id, sub_id):
exam = Exam.query.get(exam_id)
sub = Submission.query.get(sub_id)
if not exam or not sub:
return redirect(url_for('exam_list'))
next_ungraded = None
next_sub = Submission.query.filter_by(exam_id=exam_id, graded=False).filter(Submission.id != sub_id).first()
if next_sub:
next_ungraded = next_sub.id
return render_template('exam_grade.html', exam=exam, submission=sub, next_ungraded=next_ungraded)
@app.route('/exams/<int:exam_id>/print')
@login_required
def exam_print(exam_id):
exam = Exam.query.get(exam_id)
if not exam:
return redirect(url_for('exam_list'))
return render_template('exam_print.html', exam=exam)
@app.route('/forum')
def forum():
return render_template('forum.html')
# ========== 个人中心页面 ==========
@app.route('/profile')
@login_required
def profile():
return render_template('profile.html')
# ========== 个人中心 API ==========
@app.route('/api/user/friends')
@login_required
def api_user_friends():
user_id = session['user']['id']
friendships = Friend.query.filter(
((Friend.user_id == user_id) | (Friend.friend_id == user_id)) &
(Friend.status == 'accepted')
).all()
friends = []
for f in friendships:
if f.user_id == user_id:
friend_user = User.query.get(f.friend_id)
else:
friend_user = User.query.get(f.user_id)
if friend_user:
friends.append({
'id': friend_user.id,
'name': friend_user.name,
'avatar': friend_user.avatar if hasattr(friend_user, 'avatar') else '',
'created_at': f.created_at.strftime('%Y-%m-%d')
})
return jsonify({'success': True, 'friends': friends})
@app.route('/api/user/posts')
@login_required
def api_user_posts():
user_id = session['user']['id']
posts = Post.query.filter_by(author_id=user_id).order_by(Post.created_at.desc()).all()
data = [{
'id': p.id,
'title': p.title,
'content': p.content[:100] + '...' if len(p.content) > 100 else p.content,
'created_at': p.created_at.strftime('%Y-%m-%d %H:%M'),
'replies': p.replies_count,
'likes': p.likes
} for p in posts]
return jsonify({'success': True, 'posts': data})
@app.route('/api/user/exam-bookmarks')
@login_required
def api_user_exam_bookmarks():
user_id = session['user']['id']
bookmarks = ExamBookmark.query.filter_by(user_id=user_id).order_by(ExamBookmark.created_at.desc()).all()
data = []
for bm in bookmarks:
exam = bm.exam
if exam:
data.append({
'id': exam.id,
'title': exam.title,
'subject': exam.subject,
'total_score': exam.total_score,
'duration': exam.duration,
'status': exam.status,
'bookmarked_at': bm.created_at.strftime('%Y-%m-%d %H:%M')
})
return jsonify({'success': True, 'bookmarks': data})
@app.route('/api/exams/<int:exam_id>/bookmark', methods=['POST', 'DELETE'])
@login_required
def api_toggle_exam_bookmark(exam_id):
user_id = session['user']['id']
if request.method == 'POST':
existing = ExamBookmark.query.filter_by(user_id=user_id, exam_id=exam_id).first()
if existing:
return jsonify({'success': False, 'message': '已经收藏过了'}), 400
bookmark = ExamBookmark(user_id=user_id, exam_id=exam_id)
db.session.add(bookmark)
db.session.commit()
return jsonify({'success': True, 'message': '收藏成功', 'bookmarked': True})
elif request.method == 'DELETE':
bookmark = ExamBookmark.query.filter_by(user_id=user_id, exam_id=exam_id).first()
if not bookmark:
return jsonify({'success': False, 'message': '未收藏'}), 400
db.session.delete(bookmark)
db.session.commit()
return jsonify({'success': True, 'message': '已取消收藏', 'bookmarked': False})
@app.route('/api/friend/add', methods=['POST'])
@login_required
def api_add_friend():
user_id = session['user']['id']
data = request.get_json()
friend_id = data.get('friend_id')
if not friend_id or friend_id == user_id:
return jsonify({'success': False, 'message': '无效的好友ID'}), 400
existing = Friend.query.filter(
((Friend.user_id == user_id) & (Friend.friend_id == friend_id)) |
((Friend.user_id == friend_id) & (Friend.friend_id == user_id))
).first()
if existing:
return jsonify({'success': False, 'message': '已经是好友或已发送请求'}), 400
friend_req = Friend(user_id=user_id, friend_id=friend_id, status='pending')
db.session.add(friend_req)
db.session.commit()
return jsonify({'success': True, 'message': '好友请求已发送'})
@app.route('/api/friend/accept/<int:request_id>', methods=['POST'])
@login_required
def api_accept_friend(request_id):
user_id = session['user']['id']
req = Friend.query.get(request_id)
if not req or req.friend_id != user_id or req.status != 'pending':
return jsonify({'success': False, 'message': '请求不存在或无权操作'}), 404
req.status = 'accepted'
db.session.commit()
return jsonify({'success': True, 'message': '好友已添加'})
# ========== 教师申请路由 ==========
@app.route('/apply-teacher', methods=['GET', 'POST'])
@login_required
def apply_teacher():
if request.method == 'POST':
name = request.form.get('name', '').strip()
email = request.form.get('email', '').strip()
reason = request.form.get('reason', '').strip()
if not name or not email or not reason:
flash('请填写完整信息', 'error')
return redirect(url_for('apply_teacher'))
user = session['user']
# 检查是否已经申请过且为pending
existing = TeacherApplication.query.filter_by(user_id=user['id'], status='pending').first()
if existing:
flash('您已提交过申请,请耐心等待审核', 'error')
return redirect(url_for('apply_teacher'))
# 创建申请记录
appli = TeacherApplication(
user_id=user['id'],
name=name,
email=email,
reason=reason
)
db.session.add(appli)
db.session.commit()
flash('申请已提交,管理员会尽快审核', 'success')
return redirect(url_for('home'))
return render_template('apply_teacher.html')
# ========== 杯赛申请路由 ==========
@app.route('/apply-contest', methods=['GET', 'POST'])
@login_required
def apply_contest():
if request.method == 'POST':
# 这里可以扩展为保存申请到数据库(需创建 ContestApplication 模型)
flash('申请已提交,管理员会尽快审核', 'success')
return redirect(url_for('contest_list'))
return render_template('apply_contest.html')
# ========== API 路由 ==========
@app.route('/api/captcha')
def api_captcha():
global captcha_store
captcha_text = ''.join(random.choices(string.digits, k=4))
captcha_id = generate_captcha_id()
image = ImageCaptcha(width=150, height=50, font_sizes=[40])
data = image.generate(captcha_text)
import base64
img_base64 = base64.b64encode(data.read()).decode('utf-8')
captcha_store[captcha_id] = {
'text': captcha_text.lower(),
'expires': time.time() + 300
}
expired = [k for k, v in captcha_store.items() if time.time() > v['expires']]
for k in expired:
del captcha_store[k]
return jsonify({'captchaId': captcha_id, 'img': img_base64})
@app.route('/api/send-email-code', methods=['POST'])
def api_send_email_code():
global captcha_store, email_codes
data = request.get_json()
email = data.get('email', '')
captcha_id = data.get('captchaId', '')
captcha_text = data.get('captchaText', '')
if not email:
return jsonify({'success': False, 'message': '邮箱不能为空'}), 400
if not captcha_id or not captcha_text:
return jsonify({'success': False, 'message': '请输入图形验证码'}), 400
record = captcha_store.get(captcha_id)
if not record or time.time() > record['expires']:
captcha_store.pop(captcha_id, None)
return jsonify({'success': False, 'message': '图形验证码已过期', 'refreshCaptcha': True}), 400
if record['text'] != captcha_text.lower():
captcha_store.pop(captcha_id, None)
return jsonify({'success': False, 'message': '图形验证码错误', 'refreshCaptcha': True}), 400
captcha_store.pop(captcha_id, None)
code = generate_code(6)
email_codes[email] = {'code': code, 'expires': time.time() + 300}
try:
send_email(email, '您的注册验证码',
f'<p>您的验证码是:<b>{code}</b>5分钟内有效。</p>')
print(f'邮箱验证码已发送到 {email}: {code}')
return jsonify({'success': True, 'message': '验证码已发送到邮箱'})
except Exception as e:
print(f'发送邮件失败: {e}')
email_codes.pop(email, None)
return jsonify({'success': False, 'message': f'发送邮件失败: {str(e)}'}), 500
# ========== 注册相关 API ==========
@app.route('/api/register', methods=['POST'])
def api_register():
"""邮箱注册(需绑定手机号)"""
global email_codes
data = request.get_json()
name = data.get('name', '')
email = data.get('email', '')
phone = data.get('phone', '')
password = data.get('password', '')
invite_code = data.get('inviteCode', '')
email_code = data.get('emailCode', '')
if not name or not email or not password or not phone:
return jsonify({'success': False, 'message': '请填写完整信息'}), 400
if not email_code:
return jsonify({'success': False, 'message': '请输入邮箱验证码'}), 400
if not phone.isdigit() or len(phone) != 11:
return jsonify({'success': False, 'message': '手机号格式不正确'}), 400
record = email_codes.get(email)
if not record or time.time() > record['expires']:
email_codes.pop(email, None)
return jsonify({'success': False, 'message': '邮箱验证码已过期', 'refreshCaptcha': True}), 400
if record['code'] != email_code:
email_codes.pop(email, None)
return jsonify({'success': False, 'message': '邮箱验证码错误', 'refreshCaptcha': True}), 400
email_codes.pop(email, None)
if User.query.filter_by(email=email).first():
return jsonify({'success': False, 'message': '该邮箱已注册'}), 400
if User.query.filter_by(phone=phone).first():
return jsonify({'success': False, 'message': '该手机号已绑定其他账户'}), 400
role = 'teacher' if invite_code else 'student'
user = User(name=name, email=email, phone=phone, password=password, role=role)
db.session.add(user)
db.session.commit()
user_data = {'name': user.name, 'email': user.email, 'phone': user.phone, 'role': user.role, 'id': user.id}
session['user'] = user_data
return jsonify({'success': True, 'message': '注册成功', 'user': user_data})
@app.route('/api/register-mobile', methods=['POST'])
def api_register_mobile():
"""手机号注册"""
data = request.get_json()
name = data.get('name', '')
phone = data.get('phone', '')
password = data.get('password', '')
invite_code = data.get('inviteCode', '')
sms_code = data.get('smsCode', '')
if not name or not phone or not password or not sms_code:
return jsonify({'success': False, 'message': '请填写完整信息'}), 400
if not phone.isdigit() or len(phone) != 11:
return jsonify({'success': False, 'message': '手机号格式不正确'}), 400
try:
req = CommonRequest()
req.set_accept_format('json')
req.set_domain('dypnsapi.aliyuncs.com')
req.set_method('POST')
req.set_protocol_type('https')
req.set_version('2017-05-25')
req.set_action_name('CheckSmsVerifyCode')
req.add_query_param('PhoneNumber', phone)
req.add_query_param('VerifyCode', sms_code)
response = acs_client.do_action_with_exception(req)
result = json.loads(response)
print(f'验证结果: {json.dumps(result, ensure_ascii=False, indent=2)}')
model = result.get('Model', {})
if not (result.get('Code') == 'OK' and model.get('VerifyResult') == 'PASS'):
return jsonify({'success': False, 'message': '短信验证码错误或已过期'}), 400
except Exception as e:
print(f'验证短信验证码出错: {e}')
return jsonify({'success': False, 'message': '短信验证码验证失败'}), 500
if User.query.filter_by(phone=phone).first():
return jsonify({'success': False, 'message': '该手机号已注册'}), 400
role = 'teacher' if invite_code else 'student'
user = User(name=name, phone=phone, password=password, role=role)
db.session.add(user)
db.session.commit()
user_data = {'name': user.name, 'phone': user.phone, 'role': user.role, 'id': user.id}
session['user'] = user_data
return jsonify({'success': True, 'message': '注册成功', 'user': user_data})
# ========== 登录相关 API ==========
@app.route('/api/login', methods=['POST'])
def api_login():
data = request.get_json()
email = data.get('email', '')
password = data.get('password', '')
if not email or not password:
return jsonify({'success': False, 'message': '请输入邮箱和密码'}), 400
user = User.query.filter_by(email=email).first()
if not user:
return jsonify({'success': False, 'message': '账号不存在,请先注册'}), 400
if user.password != password:
return jsonify({'success': False, 'message': '密码错误'}), 400
user_data = {'name': user.name, 'email': user.email, 'role': user.role, 'id': user.id}
session['user'] = user_data
return jsonify({'success': True, 'message': '登录成功', 'user': user_data})
@app.route('/api/send-sms', methods=['POST'])
def api_send_sms():
global captcha_store
data = request.get_json()
phone = data.get('phone', '')
captcha_id = data.get('captchaId', '')
captcha_text = data.get('captchaText', '')
if not phone:
return jsonify({'success': False, 'message': '手机号不能为空'}), 400
if not captcha_id or not captcha_text:
return jsonify({'success': False, 'message': '请输入图形验证码'}), 400
record = captcha_store.get(captcha_id)
if not record or time.time() > record['expires']:
captcha_store.pop(captcha_id, None)
return jsonify({'success': False, 'message': '图形验证码已过期,请刷新', 'refreshCaptcha': True}), 400
if record['text'] != captcha_text.lower():
captcha_store.pop(captcha_id, None)
return jsonify({'success': False, 'message': '图形验证码错误', 'refreshCaptcha': True}), 400
captcha_store.pop(captcha_id, None)
try:
req = CommonRequest()
req.set_accept_format('json')
req.set_domain('dypnsapi.aliyuncs.com')
req.set_method('POST')
req.set_protocol_type('https')
req.set_version('2017-05-25')
req.set_action_name('SendSmsVerifyCode')
req.add_query_param('PhoneNumber', phone)
req.add_query_param('SignName', os.getenv('SIGN_NAME', ''))
req.add_query_param('TemplateCode', os.getenv('TEMPLATE_CODE', ''))
req.add_query_param('TemplateParam', json.dumps({'code': '##code##', 'min': '5'}))
req.add_query_param('CountryCode', '86')
req.add_query_param('ValidTime', 300)
req.add_query_param('Interval', 60)
req.add_query_param('ReturnVerifyCode', True)
req.add_query_param('CodeType', 1)
req.add_query_param('CodeLength', 4)
response = acs_client.do_action_with_exception(req)
result = json.loads(response)
print(f'号码认证服务返回: {json.dumps(result, ensure_ascii=False, indent=2)}')
if result.get('Code') == 'OK':
resp_data = {'success': True, 'message': '验证码发送成功'}
model = result.get('Model', {})
if model.get('VerifyCode'):
resp_data['mockCode'] = model['VerifyCode']
print(f'验证码: {model["VerifyCode"]}')
return jsonify(resp_data)
else:
print(f'发送失败: {result.get("Message")}')
return jsonify({'success': False, 'message': result.get('Message', '发送失败')}), 500
except Exception as e:
print(f'发送短信出错: {e}')
return jsonify({'success': False, 'message': f'发送短信出错: {str(e)}'}), 500
@app.route('/api/verify-code', methods=['POST'])
def api_verify_code():
data = request.get_json()
phone = data.get('phone', '')
code = data.get('code', '')
if not phone or not code:
return jsonify({'success': False, 'message': '手机号和验证码不能为空'}), 400
try:
req = CommonRequest()
req.set_accept_format('json')
req.set_domain('dypnsapi.aliyuncs.com')
req.set_method('POST')
req.set_protocol_type('https')
req.set_version('2017-05-25')
req.set_action_name('CheckSmsVerifyCode')
req.add_query_param('PhoneNumber', phone)
req.add_query_param('VerifyCode', code)
response = acs_client.do_action_with_exception(req)
result = json.loads(response)
print(f'验证结果: {json.dumps(result, ensure_ascii=False, indent=2)}')
model = result.get('Model', {})
if result.get('Code') == 'OK' and model.get('VerifyResult') == 'PASS':
user = User.query.filter_by(phone=phone).first()
if not user:
user = User(
name=f'用户{phone[-4:]}',
phone=phone,
password=generate_code(8),
role='student'
)
db.session.add(user)
db.session.commit()
user_data = {'name': user.name, 'phone': user.phone, 'role': user.role, 'id': user.id}
session['user'] = user_data
return jsonify({'success': True, 'message': '验证成功', 'user': user_data})
else:
msg = '验证码已过期' if model.get('VerifyResult') == 'UNKNOWN' else '验证码错误'
return jsonify({'success': False, 'message': msg}), 400
except Exception as e:
print(f'验证出错: {e}')
return jsonify({'success': False, 'message': '验证码错误或已过期'}), 400
# ========== 考试系统 API ==========
@app.route('/api/exams', methods=['POST'])
def api_create_exam():
user = session.get('user')
if not user or user.get('role') != 'teacher':
return jsonify({'success': False, 'message': '无权限'}), 403
data = request.get_json()
title = data.get('title', '')
subject = data.get('subject', '')
duration = data.get('duration', 120)
questions = data.get('questions', [])
if not title or not questions:
return jsonify({'success': False, 'message': '请填写试卷标题和题目'}), 400
total_score = sum(q.get('score', 0) for q in questions)
exam = Exam(
title=title,
subject=subject,
duration=duration,
total_score=total_score,
creator_id=user.get('id')
)
exam.set_questions(questions)
db.session.add(exam)
db.session.commit()
return jsonify({'success': True, 'message': '试卷创建成功', 'exam_id': exam.id})
@app.route('/api/exams/<int:exam_id>/save-draft', methods=['POST'])
def api_save_draft(exam_id):
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
exam = Exam.query.get(exam_id)
if not exam:
return jsonify({'success': False, 'message': '试卷不存在'}), 404
data = request.get_json()
answers = data.get('answers', {})
draft = Draft.query.filter_by(exam_id=exam_id, user_id=user.get('id')).first()
if not draft:
draft = Draft(exam_id=exam_id, user_id=user.get('id'))
draft.set_answers(answers)
db.session.add(draft)
db.session.commit()
return jsonify({'success': True, 'message': '草稿已保存'})
@app.route('/api/exams/<int:exam_id>/submit', methods=['POST'])
def api_submit_exam(exam_id):
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
exam = Exam.query.get(exam_id)
if not exam:
return jsonify({'success': False, 'message': '试卷不存在'}), 404
if exam.status == 'closed':
return jsonify({'success': False, 'message': '该考试已关闭'}), 400
if Submission.query.filter_by(exam_id=exam_id, user_id=user.get('id')).first():
return jsonify({'success': False, 'message': '您已提交过该试卷'}), 400
data = request.get_json()
answers = data.get('answers', {})
score = 0
auto_graded = True
question_scores = {}
questions = exam.get_questions()
for q in questions:
qid = str(q['id'])
if q['type'] == 'choice':
if answers.get(qid) == q.get('answer'):
score += q.get('score', 0)
question_scores[qid] = q.get('score', 0)
else:
question_scores[qid] = 0
elif q['type'] == 'fill':
student_answer = answers.get(qid, '').strip()
correct_answers = [a.strip() for a in q.get('answer', '').split('|')]
if student_answer in correct_answers:
score += q.get('score', 0)
question_scores[qid] = q.get('score', 0)
else:
question_scores[qid] = 0
else:
auto_graded = False
question_scores[qid] = 0
sub = Submission(
exam_id=exam_id,
user_id=user.get('id'),
score=score,
graded=auto_graded,
graded_by='系统自动' if auto_graded else ''
)
sub.set_answers(answers)
sub.set_question_scores(question_scores)
db.session.add(sub)
Draft.query.filter_by(exam_id=exam_id, user_id=user.get('id')).delete()
db.session.commit()
return jsonify({'success': True, 'message': '提交成功', 'submission_id': sub.id})
@app.route('/api/exams/<int:exam_id>/grade/<int:sub_id>', methods=['POST'])
def api_grade_submission(exam_id, sub_id):
user = session.get('user')
if not user or user.get('role') != 'teacher':
return jsonify({'success': False, 'message': '无权限'}), 403
sub = Submission.query.get(sub_id)
if not sub or sub.exam_id != exam_id:
return jsonify({'success': False, 'message': '提交记录不存在'}), 404
data = request.get_json()
scores = data.get('scores', {})
total = sum(scores.values())
sub.score = total
sub.set_question_scores(scores)
sub.graded = True
sub.graded_by = user.get('name', '')
db.session.commit()
return jsonify({'success': True, 'message': '批改完成', 'total_score': total})
@app.route('/api/exams/<int:exam_id>/status', methods=['POST'])
def api_update_exam_status(exam_id):
user = session.get('user')
if not user or user.get('role') != 'teacher':
return jsonify({'success': False, 'message': '无权限'}), 403
exam = Exam.query.get(exam_id)
if not exam:
return jsonify({'success': False, 'message': '试卷不存在'}), 404
if exam.creator_id != user.get('id'):
return jsonify({'success': False, 'message': '只能修改自己创建的试卷'}), 403
data = request.get_json()
new_status = data.get('status', '')
if new_status not in ('available', 'closed'):
return jsonify({'success': False, 'message': '无效状态'}), 400
exam.status = new_status
db.session.commit()
return jsonify({'success': True, 'message': f'试卷已{"发布" if new_status == "available" else "关闭"}'})
@app.route('/api/exams/<int:exam_id>', methods=['DELETE'])
def api_delete_exam(exam_id):
user = session.get('user')
if not user or user.get('role') != 'teacher':
return jsonify({'success': False, 'message': '无权限'}), 403
exam = Exam.query.get(exam_id)
if not exam:
return jsonify({'success': False, 'message': '试卷不存在'}), 404
if exam.creator_id != user.get('id'):
return jsonify({'success': False, 'message': '只能删除自己创建的试卷'}), 403
Submission.query.filter_by(exam_id=exam_id).delete()
Draft.query.filter_by(exam_id=exam_id).delete()
db.session.delete(exam)
db.session.commit()
return jsonify({'success': True, 'message': '试卷已删除'})
# ========== 杯赛相关 API ==========
@app.route('/api/contests/search')
def api_search_contests():
keyword = request.args.get('q', '').strip().lower()
query = Contest.query
if keyword:
query = query.filter(
(Contest.name.contains(keyword)) |
(Contest.organizer.contains(keyword)) |
(Contest.description.contains(keyword))
)
contests = query.all()
data = [{
'id': c.id,
'name': c.name,
'organizer': c.organizer,
'description': c.description,
'start_date': c.start_date,
'end_date': c.end_date,
'status': c.status,
'participants': c.participants,
'created_by': c.created_by
} for c in contests]
return jsonify({'success': True, 'data': data})
@app.route('/api/contests', methods=['POST'])
def api_create_contest():
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
data = request.get_json()
contest = Contest(
name=data.get('name'),
organizer=data.get('organizer'),
description=data.get('description'),
start_date=data.get('start_date'),
end_date=data.get('end_date'),
status=data.get('status', 'upcoming'),
participants=data.get('participants', 0),
created_by=user.get('id')
)
db.session.add(contest)
db.session.commit()
return jsonify({'success': True, 'message': '杯赛创建成功', 'contest_id': contest.id})
# ========== 杯赛报名 API ==========
@app.route('/api/contests/<int:contest_id>/register', methods=['POST'])
@login_required
def api_register_contest(contest_id):
"""用户报名杯赛"""
user = session['user']
contest = Contest.query.get(contest_id)
if not contest:
return jsonify({'success': False, 'message': '杯赛不存在'}), 404
existing = ContestRegistration.query.filter_by(user_id=user['id'], contest_id=contest_id).first()
if existing:
return jsonify({'success': False, 'message': '您已报名该杯赛'}), 400
registration = ContestRegistration(user_id=user['id'], contest_id=contest_id)
db.session.add(registration)
contest.participants += 1
db.session.commit()
return jsonify({'success': True, 'message': '报名成功', 'participants': contest.participants})
@app.route('/api/contests/<int:contest_id>/unregister', methods=['POST'])
@login_required
def api_unregister_contest(contest_id):
"""用户取消报名"""
user = session['user']
contest = Contest.query.get(contest_id)
if not contest:
return jsonify({'success': False, 'message': '杯赛不存在'}), 404
registration = ContestRegistration.query.filter_by(user_id=user['id'], contest_id=contest_id).first()
if not registration:
return jsonify({'success': False, 'message': '您尚未报名该杯赛'}), 400
db.session.delete(registration)
if contest.participants > 0:
contest.participants -= 1
db.session.commit()
return jsonify({'success': True, 'message': '已取消报名', 'participants': contest.participants})
# ========== 杯赛讨论区 API ==========
@app.route('/api/contests/<int:contest_id>/posts', methods=['GET'])
def api_contest_posts(contest_id):
"""获取杯赛讨论区帖子列表"""
contest = Contest.query.get_or_404(contest_id)
posts = Post.query.filter_by(contest_id=contest_id).order_by(Post.created_at.desc()).all()
user = get_current_user()
uid = user['id'] if user else None
data = []
for p in posts:
p_dict = {
'id': p.id,
'title': p.title,
'content': p.content[:200] + ('...' if len(p.content) > 200 else ''),
'author': p.author.name,
'author_id': p.author_id,
'tag': p.tag,
'is_official': p.is_official,
'pinned': p.pinned,
'likes': p.likes,
'replies': p.replies_count,
'views': p.views,
'created_at': p.created_at.strftime('%Y-%m-%d %H:%M'),
}
if uid:
p_dict['liked'] = Reaction.query.filter_by(user_id=uid, post_id=p.id).first() is not None
data.append(p_dict)
return jsonify({'success': True, 'data': data})
@app.route('/api/contests/<int:contest_id>/posts', methods=['POST'])
@login_required
def api_create_contest_post(contest_id):
"""在杯赛讨论区发布帖子"""
user = session['user']
contest = Contest.query.get_or_404(contest_id)
# 权限检查
if not can_post_in_contest(user, contest):
return jsonify({'success': False, 'message': '您没有权限在此杯赛讨论区发帖'}), 403
data = request.get_json()
title = data.get('title', '').strip()
content = data.get('content', '').strip()
if not title or not content:
return jsonify({'success': False, 'message': '标题和内容不能为空'}), 400
if len(title) > 100:
return jsonify({'success': False, 'message': '标题不能超过100字'}), 400
post = Post(
title=title,
content=content,
author_id=user['id'],
tag='杯赛讨论', # 可自定义
contest_id=contest_id,
is_official=(user['role'] in ['admin', 'teacher'] and data.get('is_official', False))
)
db.session.add(post)
db.session.commit()
return jsonify({'success': True, 'message': '帖子发布成功', 'post_id': post.id})
# ========== 论坛相关 API ==========
@app.route('/api/posts/search')
def api_search_posts():
keyword = request.args.get('q', '').strip().lower()
tag = request.args.get('tag', '').strip()
sort = request.args.get('sort', 'newest')
# 只获取非杯赛的帖子contest_id 为 NULL
query = Post.query.filter_by(contest_id=None)
if keyword:
query = query.filter(
(Post.title.contains(keyword)) | (Post.content.contains(keyword))
)
if tag:
query = query.filter_by(tag=tag)
if sort == 'hottest':
query = query.order_by(Post.likes.desc())
elif sort == 'most_replies':
query = query.order_by(Post.replies_count.desc())
else:
query = query.order_by(Post.created_at.desc())
posts = query.all()
user = session.get('user')
uid = user.get('id') if user else None
data = []
for p in posts:
p_dict = {
'id': p.id,
'title': p.title,
'content': p.content[:200] + ('...' if len(p.content) > 200 else ''),
'author': p.author.name,
'author_id': p.author_id,
'tag': p.tag,
'is_official': p.is_official,
'pinned': p.pinned,
'likes': p.likes,
'replies': p.replies_count,
'views': p.views,
'has_poll': p.has_poll,
'created_at': p.created_at.strftime('%Y-%m-%d %H:%M'),
}
if uid:
p_dict['liked'] = Reaction.query.filter_by(user_id=uid, post_id=p.id).first() is not None
p_dict['bookmarked'] = Bookmark.query.filter_by(user_id=uid, post_id=p.id).first() is not None
data.append(p_dict)
return jsonify({'success': True, 'data': data})
@app.route('/api/posts', methods=['POST'])
def api_create_post():
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
data = request.get_json()
title = data.get('title', '').strip()
content = data.get('content', '').strip()
tag = data.get('tag', '全部')
if not title or not content:
return jsonify({'success': False, 'message': '标题和内容不能为空'}), 400
if len(title) > 100:
return jsonify({'success': False, 'message': '标题不能超过100字'}), 400
post = Post(
title=title,
content=content,
author_id=user.get('id'),
tag=tag,
is_official=(user.get('role') == 'teacher' and data.get('is_official', False))
)
db.session.add(post)
db.session.commit()
return jsonify({'success': True, 'message': '帖子发布成功', 'post_id': post.id})
@app.route('/api/posts/<int:post_id>')
def api_get_post(post_id):
post = Post.query.get(post_id)
if not post:
return jsonify({'success': False, 'message': '帖子不存在'}), 404
post.views += 1
db.session.commit()
replies = Reply.query.filter_by(post_id=post_id).order_by(Reply.created_at).all()
user = session.get('user')
uid = user.get('id') if user else None
post_dict = {
'id': post.id,
'title': post.title,
'content': post.content,
'author': post.author.name,
'author_id': post.author_id,
'tag': post.tag,
'is_official': post.is_official,
'pinned': post.pinned,
'likes': post.likes,
'replies': post.replies_count,
'views': post.views,
'has_poll': post.has_poll,
'created_at': post.created_at.strftime('%Y-%m-%d %H:%M'),
'edited': (post.updated_at > post.created_at),
}
if uid:
post_dict['liked'] = Reaction.query.filter_by(user_id=uid, post_id=post.id).first() is not None
post_dict['bookmarked'] = Bookmark.query.filter_by(user_id=uid, post_id=post.id).first() is not None
replies_data = []
for r in replies:
r_dict = {
'id': r.id,
'content': r.content,
'author': r.author.name,
'author_id': r.author_id,
'likes': r.likes,
'reply_to': r.reply_to,
'created_at': r.created_at.strftime('%Y-%m-%d %H:%M'),
'edited': (r.updated_at > r.created_at),
}
if uid:
r_dict['liked'] = Reaction.query.filter_by(user_id=uid, reply_id=r.id).first() is not None
replies_data.append(r_dict)
return jsonify({'success': True, 'data': post_dict, 'replies': replies_data})
@app.route('/api/posts/<int:post_id>', methods=['DELETE'])
def api_delete_post(post_id):
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
post = Post.query.get(post_id)
if not post:
return jsonify({'success': False, 'message': '帖子不存在'}), 404
if post.author_id != user.get('id') and user.get('role') != 'teacher':
return jsonify({'success': False, 'message': '无权删除'}), 403
db.session.delete(post)
db.session.commit()
return jsonify({'success': True, 'message': '帖子已删除'})
@app.route('/api/posts/<int:post_id>/like', methods=['POST'])
def api_like_post(post_id):
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
post = Post.query.get(post_id)
if not post:
return jsonify({'success': False, 'message': '帖子不存在'}), 404
uid = user.get('id')
reaction = Reaction.query.filter_by(user_id=uid, post_id=post_id).first()
if reaction:
db.session.delete(reaction)
post.likes -= 1
db.session.commit()
return jsonify({'success': True, 'liked': False, 'likes': post.likes})
else:
reaction = Reaction(user_id=uid, post_id=post_id, reaction='like')
db.session.add(reaction)
post.likes += 1
db.session.commit()
return jsonify({'success': True, 'liked': True, 'likes': post.likes})
@app.route('/api/posts/<int:post_id>/bookmark', methods=['POST'])
def api_bookmark_post(post_id):
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
post = Post.query.get(post_id)
if not post:
return jsonify({'success': False, 'message': '帖子不存在'}), 404
uid = user.get('id')
bookmark = Bookmark.query.filter_by(user_id=uid, post_id=post_id).first()
if bookmark:
db.session.delete(bookmark)
db.session.commit()
return jsonify({'success': True, 'bookmarked': False})
else:
bookmark = Bookmark(user_id=uid, post_id=post_id)
db.session.add(bookmark)
db.session.commit()
return jsonify({'success': True, 'bookmarked': True})
@app.route('/api/posts/<int:post_id>/pin', methods=['POST'])
def api_pin_post(post_id):
user = session.get('user')
if not user or user.get('role') != 'teacher':
return jsonify({'success': False, 'message': '无权限'}), 403
post = Post.query.get(post_id)
if not post:
return jsonify({'success': False, 'message': '帖子不存在'}), 404
post.pinned = not post.pinned
db.session.commit()
return jsonify({'success': True, 'pinned': post.pinned})
@app.route('/api/posts/<int:post_id>/replies', methods=['POST'])
def api_create_reply(post_id):
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
post = Post.query.get(post_id)
if not post:
return jsonify({'success': False, 'message': '帖子不存在'}), 404
data = request.get_json()
content = data.get('content', '').strip()
if not content:
return jsonify({'success': False, 'message': '回复内容不能为空'}), 400
reply = Reply(
post_id=post_id,
author_id=user.get('id'),
content=content,
reply_to=data.get('reply_to', '')
)
db.session.add(reply)
post.replies_count += 1
db.session.commit()
return jsonify({'success': True, 'message': '回复成功', 'reply': {
'id': reply.id,
'content': reply.content,
'author': user.get('name'),
'author_id': user.get('id'),
'likes': 0,
'reply_to': reply.reply_to,
'created_at': reply.created_at.strftime('%Y-%m-%d %H:%M')
}})
@app.route('/api/replies/<int:reply_id>/like', methods=['POST'])
def api_like_reply(reply_id):
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
reply = Reply.query.get(reply_id)
if not reply:
return jsonify({'success': False, 'message': '回复不存在'}), 404
uid = user.get('id')
reaction = Reaction.query.filter_by(user_id=uid, reply_id=reply_id).first()
if reaction:
db.session.delete(reaction)
reply.likes -= 1
db.session.commit()
return jsonify({'success': True, 'liked': False, 'likes': reply.likes})
else:
reaction = Reaction(user_id=uid, reply_id=reply_id, reaction='like')
db.session.add(reaction)
reply.likes += 1
db.session.commit()
return jsonify({'success': True, 'liked': True, 'likes': reply.likes})
@app.route('/api/replies/<int:reply_id>', methods=['DELETE'])
def api_delete_reply(reply_id):
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
reply = Reply.query.get(reply_id)
if not reply:
return jsonify({'success': False, 'message': '回复不存在'}), 404
if reply.author_id != user.get('id') and user.get('role') != 'teacher':
return jsonify({'success': False, 'message': '无权删除'}), 403
post = Post.query.get(reply.post_id)
if post:
post.replies_count -= 1
db.session.delete(reply)
db.session.commit()
return jsonify({'success': True, 'message': '回复已删除'})
@app.route('/api/user/bookmarks')
def api_user_bookmarks():
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
uid = user.get('id')
bookmarks = Bookmark.query.filter_by(user_id=uid).all()
post_ids = [b.post_id for b in bookmarks]
posts = Post.query.filter(Post.id.in_(post_ids)).all()
data = [{
'id': p.id,
'title': p.title,
'author': p.author.name,
'created_at': p.created_at.strftime('%Y-%m-%d %H:%M')
} for p in posts]
return jsonify({'success': True, 'data': data})
# ========== 论坛增强 API ==========
@app.route('/api/posts/<int:post_id>/edit', methods=['POST'])
def api_edit_post(post_id):
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
post = Post.query.get(post_id)
if not post:
return jsonify({'success': False, 'message': '帖子不存在'}), 404
if post.author_id != user.get('id') and user.get('role') != 'teacher':
return jsonify({'success': False, 'message': '无权编辑'}), 403
data = request.get_json()
new_title = data.get('title', '').strip()
new_content = data.get('content', '').strip()
new_tag = data.get('tag', post.tag)
if not new_title or not new_content:
return jsonify({'success': False, 'message': '标题和内容不能为空'}), 400
post.title = new_title
post.content = new_content
post.tag = new_tag
db.session.commit()
return jsonify({'success': True, 'message': '编辑成功'})
@app.route('/api/replies/<int:reply_id>/edit', methods=['POST'])
def api_edit_reply(reply_id):
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
reply = Reply.query.get(reply_id)
if not reply:
return jsonify({'success': False, 'message': '回复不存在'}), 404
if reply.author_id != user.get('id'):
return jsonify({'success': False, 'message': '无权编辑'}), 403
data = request.get_json()
new_content = data.get('content', '').strip()
if not new_content:
return jsonify({'success': False, 'message': '内容不能为空'}), 400
reply.content = new_content
db.session.commit()
return jsonify({'success': True, 'message': '编辑成功'})
@app.route('/api/posts/<int:post_id>/poll', methods=['POST'])
def api_create_poll(post_id):
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
post = Post.query.get(post_id)
if not post:
return jsonify({'success': False, 'message': '帖子不存在'}), 404
if post.author_id != user.get('id'):
return jsonify({'success': False, 'message': '只有作者可以创建投票'}), 403
if post.poll:
return jsonify({'success': False, 'message': '该帖子已有投票'}), 400
data = request.get_json()
question = data.get('question', '').strip()
options = data.get('options', [])
multi = data.get('multi', False)
if not question or len(options) < 2:
return jsonify({'success': False, 'message': '请填写问题和至少2个选项'}), 400
poll = Poll(
post_id=post_id,
question=question,
multi=multi
)
poll.set_options([{'text': o, 'votes': 0} for o in options])
poll.set_voters({})
db.session.add(poll)
post.has_poll = True
db.session.commit()
return jsonify({'success': True, 'message': '投票创建成功'})
@app.route('/api/posts/<int:post_id>/vote', methods=['POST'])
def api_vote_poll(post_id):
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
poll = Poll.query.filter_by(post_id=post_id).first()
if not poll:
return jsonify({'success': False, 'message': '投票不存在'}), 404
uid = user.get('id')
voters = poll.get_voters()
if str(uid) in voters:
return jsonify({'success': False, 'message': '您已投过票'}), 400
data = request.get_json()
choices = data.get('choices', [])
if not choices:
return jsonify({'success': False, 'message': '请选择选项'}), 400
if not poll.multi and len(choices) > 1:
return jsonify({'success': False, 'message': '该投票为单选'}), 400
options = poll.get_options()
for idx in choices:
if 0 <= idx < len(options):
options[idx]['votes'] += 1
voters[str(uid)] = choices
poll.set_options(options)
poll.set_voters(voters)
poll.total_votes += 1
db.session.commit()
return jsonify({'success': True, 'message': '投票成功', 'poll': {
'options': options,
'total_votes': poll.total_votes
}})
@app.route('/api/posts/<int:post_id>/poll')
def api_get_poll(post_id):
poll = Poll.query.filter_by(post_id=post_id).first()
if not poll:
return jsonify({'success': False, 'message': '投票不存在'}), 404
user = session.get('user')
voted = False
my_choices = []
if user:
voters = poll.get_voters()
if str(user.get('id')) in voters:
voted = True
my_choices = voters[str(user.get('id'))]
return jsonify({
'success': True,
'poll': {
'question': poll.question,
'options': poll.get_options(),
'total_votes': poll.total_votes,
'multi': poll.multi
},
'voted': voted,
'my_choices': my_choices
})
@app.route('/api/report', methods=['POST'])
def api_report():
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
data = request.get_json()
rtype = data.get('type', '')
target_id = data.get('target_id', 0)
reason = data.get('reason', '')
detail = data.get('detail', '')
if not rtype or not target_id or not reason:
return jsonify({'success': False, 'message': '请填写举报信息'}), 400
report = Report(
type=rtype,
target_id=target_id,
reporter_id=user.get('id'),
reason=reason,
detail=detail
)
db.session.add(report)
db.session.commit()
return jsonify({'success': True, 'message': '举报已提交,管理员将尽快处理'})
@app.route('/api/posts/<int:post_id>/react', methods=['POST'])
def api_react_post(post_id):
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
post = Post.query.get(post_id)
if not post:
return jsonify({'success': False, 'message': '帖子不存在'}), 404
data = request.get_json()
reaction_type = data.get('reaction', '')
if reaction_type not in ('like', 'love', 'haha', 'wow', 'sad', 'angry'):
return jsonify({'success': False, 'message': '无效反应'}), 400
uid = user.get('id')
reaction = Reaction.query.filter_by(user_id=uid, post_id=post_id).first()
if reaction:
if reaction.reaction == reaction_type:
db.session.delete(reaction)
else:
reaction.reaction = reaction_type
else:
reaction = Reaction(user_id=uid, post_id=post_id, reaction=reaction_type)
db.session.add(reaction)
db.session.commit()
counts = {}
for r in Reaction.query.filter_by(post_id=post_id).all():
counts[r.reaction] = counts.get(r.reaction, 0) + 1
return jsonify({'success': True, 'reactions': counts})
@app.route('/api/user/profile/<user_id>')
def api_user_profile(user_id):
user = User.query.get(int(user_id)) if user_id.isdigit() else None
if not user:
return jsonify({'success': False, 'message': '用户不存在'}), 404
post_count = Post.query.filter_by(author_id=user.id).count()
reply_count = Reply.query.filter_by(author_id=user.id).count()
likes_received = db.session.query(db.func.sum(Post.likes)).filter_by(author_id=user.id).scalar() or 0
points = post_count * 10 + reply_count * 3 + likes_received * 2
level = calc_level(points)
recent_posts = Post.query.filter_by(author_id=user.id).order_by(Post.created_at.desc()).limit(5).all()
recent_posts_data = [{'id': p.id, 'title': p.title} for p in recent_posts]
badges = []
if post_count >= 1: badges.append({'icon': '✍️', 'name': '初次发帖', 'desc': '发布第一篇帖子'})
if post_count >= 10: badges.append({'icon': '📝', 'name': '笔耕不辍', 'desc': '发布10篇帖子'})
if post_count >= 50: badges.append({'icon': '📚', 'name': '著作等身', 'desc': '发布50篇帖子'})
if reply_count >= 10: badges.append({'icon': '💬', 'name': '热心回复', 'desc': '回复10次'})
if reply_count >= 100: badges.append({'icon': '🗣️', 'name': '话题达人', 'desc': '回复100次'})
if likes_received >= 10: badges.append({'icon': '👍', 'name': '小有人气', 'desc': '获得10个赞'})
if likes_received >= 100: badges.append({'icon': '🌟', 'name': '人气之星', 'desc': '获得100个赞'})
if likes_received >= 500: badges.append({'icon': '👑', 'name': '万人迷', 'desc': '获得500个赞'})
return jsonify({
'success': True,
'profile': {
'user_id': user.id,
'name': user.name,
'points': points,
'level': level,
'level_title': LEVEL_TITLES.get(level, ''),
'posts_count': post_count,
'replies_count': reply_count,
'likes_received': likes_received,
'badges': badges,
'recent_posts': recent_posts_data
}
})
@app.route('/api/forum/hot')
def api_hot_posts():
posts = Post.query.order_by((Post.likes*3 + Post.replies_count*2 + Post.views).desc()).limit(10).all()
data = [{'id': p.id, 'title': p.title, 'likes': p.likes, 'replies': p.replies_count} for p in posts]
return jsonify({'success': True, 'data': data})
@app.route('/api/forum/stats')
def api_forum_stats():
total_posts = Post.query.count()
total_replies = Reply.query.count()
total_users = User.query.count()
today = datetime.utcnow().date()
today_posts = Post.query.filter(db.func.date(Post.created_at) == today).count()
today_replies = Reply.query.filter(db.func.date(Reply.created_at) == today).count()
online_count = random.randint(max(15, total_users), max(50, total_users * 3))
tag_counts = {}
for tag in db.session.query(Post.tag, db.func.count(Post.tag)).group_by(Post.tag).all():
tag_counts[tag[0]] = tag[1]
active_users = []
recent_posts_users = Post.query.order_by(Post.created_at.desc()).limit(10).all()
seen = set()
for p in recent_posts_users:
if p.author_id not in seen and len(active_users) < 5:
seen.add(p.author_id)
points = (Post.query.filter_by(author_id=p.author_id).count()*10 +
Reply.query.filter_by(author_id=p.author_id).count()*3)
level = calc_level(points)
active_users.append({
'user_id': p.author_id,
'name': p.author.name,
'level': level,
'level_title': LEVEL_TITLES.get(level, '')
})
return jsonify({
'success': True,
'stats': {
'total_posts': total_posts,
'total_replies': total_replies,
'total_users': total_users,
'today_posts': today_posts,
'today_replies': today_replies,
'online_count': online_count,
'tag_counts': tag_counts,
'active_users': active_users
}
})
@app.route('/api/forum/leaderboard')
def api_leaderboard():
users = User.query.all()
board = []
for u in users:
post_count = Post.query.filter_by(author_id=u.id).count()
reply_count = Reply.query.filter_by(author_id=u.id).count()
likes_received = db.session.query(db.func.sum(Post.likes)).filter_by(author_id=u.id).scalar() or 0
points = post_count * 10 + reply_count * 3 + likes_received * 2
level = calc_level(points)
board.append({
'user_id': u.id,
'name': u.name,
'points': points,
'level': level,
'level_title': LEVEL_TITLES.get(level, ''),
'posts_count': post_count,
'likes_received': likes_received
})
board.sort(key=lambda x: x['points'], reverse=True)
return jsonify({'success': True, 'data': board[:20]})
@app.route('/api/posts/with-poll', methods=['POST'])
def api_create_post_with_poll():
user = session.get('user')
if not user:
return jsonify({'success': False, 'message': '请先登录'}), 401
data = request.get_json()
title = data.get('title', '').strip()
content = data.get('content', '').strip()
tag = data.get('tag', '全部')
if not title or not content:
return jsonify({'success': False, 'message': '标题和内容不能为空'}), 400
post = Post(
title=title,
content=content,
author_id=user.get('id'),
tag=tag,
is_official=(user.get('role') == 'teacher' and data.get('is_official', False))
)
db.session.add(post)
db.session.flush()
poll_data = data.get('poll')
if poll_data and poll_data.get('question') and len(poll_data.get('options', [])) >= 2:
poll = Poll(
post_id=post.id,
question=poll_data['question'],
multi=poll_data.get('multi', False)
)
poll.set_options([{'text': o, 'votes': 0} for o in poll_data['options']])
poll.set_voters({})
db.session.add(poll)
post.has_poll = True
db.session.commit()
return jsonify({'success': True, 'message': '帖子发布成功', 'post_id': post.id})
@app.route('/api/posts/<int:post_id>/share', methods=['POST'])
def api_share_post(post_id):
return jsonify({'success': True})
# ========== 管理后台页面 ==========
@app.route('/admin')
def admin_dashboard():
user = require_admin_or_teacher()
return render_template('admin_dashboard.html')
@app.route('/admin/contests')
def admin_contests():
user = require_admin_or_teacher()
return render_template('admin_contests.html')
@app.route('/admin/exams')
def admin_exams():
user = require_admin_or_teacher()
return render_template('admin_exams.html')
@app.route('/admin/posts')
def admin_posts():
user = require_admin_or_teacher()
return render_template('admin_posts.html')
@app.route('/admin/users')
def admin_users():
user = require_admin_or_teacher()
return render_template('admin_users.html')
# ========== 管理后台 API ==========
@app.route('/api/admin/contests', methods=['GET', 'POST'])
def admin_contests_api():
user = require_admin_or_teacher()
if request.method == 'GET':
contests = Contest.query.all()
data = [{
'id': c.id,
'name': c.name,
'organizer': c.organizer,
'start_date': c.start_date,
'description': c.description,
'contact': c.contact if hasattr(c, 'contact') else '',
'status': c.status
} for c in contests]
return jsonify({'success': True, 'contests': data})
elif request.method == 'POST':
data = request.get_json()
contest = Contest(
name=data['name'],
organizer=data['organizer'],
start_date=data['start_date'],
description=data['description'],
contact=data.get('contact', ''),
status=data['status']
)
db.session.add(contest)
db.session.commit()
return jsonify({'success': True})
@app.route('/api/admin/contests/<int:contest_id>', methods=['GET', 'PUT', 'DELETE'])
def admin_contest_detail(contest_id):
user = require_admin_or_teacher()
contest = Contest.query.get(contest_id)
if not contest:
return jsonify({'success': False, 'message': '杯赛不存在'})
if request.method == 'GET':
return jsonify({'success': True, 'contest': {
'id': contest.id,
'name': contest.name,
'organizer': contest.organizer,
'start_date': contest.start_date,
'description': contest.description,
'contact': contest.contact if hasattr(contest, 'contact') else '',
'status': contest.status
}})
elif request.method == 'PUT':
data = request.get_json()
contest.name = data.get('name', contest.name)
contest.organizer = data.get('organizer', contest.organizer)
contest.start_date = data.get('start_date', contest.start_date)
contest.description = data.get('description', contest.description)
contest.contact = data.get('contact', contest.contact if hasattr(contest, 'contact') else '')
contest.status = data.get('status', contest.status)
db.session.commit()
return jsonify({'success': True})
elif request.method == 'DELETE':
db.session.delete(contest)
db.session.commit()
return jsonify({'success': True})
@app.route('/api/admin/users', methods=['GET'])
def admin_users_api():
user = require_admin_or_teacher()
q = request.args.get('q', '')
role = request.args.get('role', '')
query = User.query
if q:
query = query.filter((User.name.contains(q)) | (User.email.contains(q)))
if role:
query = query.filter_by(role=role)
users = query.all()
data = [{
'id': u.id,
'name': u.name,
'email': u.email,
'role': u.role,
'created_at': u.created_at.strftime('%Y-%m-%d %H:%M')
} for u in users]
return jsonify({'success': True, 'users': data})
@app.route('/api/admin/users/<int:user_id>', methods=['DELETE'])
def admin_delete_user(user_id):
user = require_admin_or_teacher()
if user_id == session.get('user', {}).get('id'):
return jsonify({'success': False, 'message': '不能删除自己'})
target = User.query.get(user_id)
if target:
db.session.delete(target)
db.session.commit()
return jsonify({'success': True})
return jsonify({'success': False, 'message': '用户不存在'})
@app.route('/api/admin/posts', methods=['GET'])
def admin_posts_api():
user = require_admin_or_teacher()
q = request.args.get('q', '')
tag = request.args.get('tag', '')
query = Post.query
if q:
query = query.filter((Post.title.contains(q)) | (Post.content.contains(q)))
if tag:
query = query.filter_by(tag=tag)
posts = query.all()
data = [{
'id': p.id,
'title': p.title,
'author': p.author.name,
'tag': p.tag,
'created_at': p.created_at.strftime('%Y-%m-%d %H:%M')
} for p in posts]
return jsonify({'success': True, 'posts': data})
@app.route('/api/admin/posts/<int:post_id>', methods=['DELETE'])
def admin_delete_post(post_id):
user = require_admin_or_teacher()
post = Post.query.get(post_id)
if post:
db.session.delete(post)
db.session.commit()
return jsonify({'success': True})
return jsonify({'success': False, 'message': '帖子不存在'})
@app.route('/api/admin/exams', methods=['GET'])
def admin_exams_api():
user = require_admin_or_teacher()
exams = Exam.query.all()
data = [{
'id': e.id,
'title': e.title,
'subject': e.subject,
'creator_name': e.creator.name if e.creator else '',
'status': e.status,
'created_at': e.created_at.strftime('%Y-%m-%d %H:%M')
} for e in exams]
return jsonify({'success': True, 'exams': data})
@app.route('/api/admin/exams/<int:exam_id>/status', methods=['PUT'])
def admin_update_exam_status(exam_id):
user = require_admin_or_teacher()
exam = Exam.query.get(exam_id)
if not exam:
return jsonify({'success': False, 'message': '考试不存在'})
data = request.get_json()
exam.status = data.get('status', exam.status)
db.session.commit()
return jsonify({'success': True})
@app.route('/api/admin/exams/<int:exam_id>', methods=['DELETE'])
def admin_delete_exam(exam_id):
user = require_admin_or_teacher()
exam = Exam.query.get(exam_id)
if exam:
db.session.delete(exam)
db.session.commit()
return jsonify({'success': True})
return jsonify({'success': False, 'message': '考试不存在'})
@app.route('/api/admin/stats', methods=['GET'])
def admin_stats():
user = require_admin_or_teacher()
stats = {
'users': User.query.count(),
'posts': Post.query.count(),
'exams': Exam.query.count(),
'contests': Contest.query.count()
}
return jsonify({'success': True, 'stats': stats})
@app.route('/api/admin/recent-activities', methods=['GET'])
def admin_recent_activities():
user = require_admin_or_teacher()
activities = []
for s in Submission.query.order_by(Submission.submitted_at.desc()).limit(5).all():
activities.append({
'time': s.submitted_at.strftime('%Y-%m-%d %H:%M'),
'user': s.user_name,
'action': f'提交了试卷 ID {s.exam_id}'
})
for p in Post.query.order_by(Post.created_at.desc()).limit(5).all():
activities.append({
'time': p.created_at.strftime('%Y-%m-%d %H:%M'),
'user': p.author.name,
'action': f'发布了帖子《{p.title}'
})
activities.sort(key=lambda x: x['time'], reverse=True)
return jsonify({'success': True, 'activities': activities[:10]})
# ========== 初始化测试数据 ==========
with app.app_context():
db.create_all()
# 仅当数据库为空时添加测试数据
if Contest.query.count() == 0:
contests = [
Contest(
name='2026年"星火杯"全国中学生数学联考',
organizer='星火杯组委会',
description='本届星火杯旨在选拔优秀数学人才,试题难度对标全国高中数学联赛。',
start_date='2026-03-15',
end_date='2026-03-15',
status='registering',
participants=12500,
created_by='system',
_past_papers=json.dumps([
{"year": "2025", "title": "2025年星火杯真题含答案", "file": "/static/papers/2025_spark.pdf"},
{"year": "2024", "title": "2024年星火杯真题", "file": "/static/papers/2024_spark.pdf"},
{"year": "2023", "title": "2023年星火杯真题解析", "file": "/static/papers/2023_spark_analysis.pdf"}
])
),
Contest(
name='第十一届"圆梦杯"综合能力测试',
organizer='圆梦教育基金会',
description='圆梦杯已成功举办十届,是国内最具影响力的综合素质评价赛事之一。',
start_date='2026-04-01',
end_date='2026-04-01',
status='upcoming',
participants=8000,
created_by='system',
_past_papers=json.dumps([
{"year": "2025", "title": "第十届圆梦杯真题(语数英综合)", "file": "/static/papers/2025_dream.pdf"},
{"year": "2024", "title": "第九届圆梦杯真题及详解", "file": "/static/papers/2024_dream.pdf"}
])
),
Contest(
name='2025年冬季联考已结束',
organizer='联考联盟',
description='回顾2025年冬季联考精彩瞬间查看优秀试卷与解析。',
start_date='2025-12-20',
end_date='2025-12-20',
status='ended',
participants=15000,
created_by='system',
_past_papers=json.dumps([
{"year": "2025", "title": "2025冬季联考数学试卷", "file": "/static/papers/2025_winter_math.pdf"},
{"year": "2025", "title": "2025冬季联考英语试卷", "file": "/static/papers/2025_winter_english.pdf"},
{"year": "2024", "title": "2024冬季联考理综试卷", "file": "/static/papers/2024_winter_science.pdf"}
])
)
]
db.session.add_all(contests)
if Post.query.count() == 0:
# 确保有一个测试用户
test_user = User.query.filter_by(email='test@example.com').first()
if not test_user:
test_user = User(name='测试用户', email='test@example.com', password='123456', role='student')
db.session.add(test_user)
db.session.commit()
posts = [
Post(
title='关于2026年星火杯初赛第12题的讨论',
content='大家觉得第12题的第二问是不是有点超纲了我用洛必达法则算出来的结果是1/2...',
author_id=test_user.id,
tag='题目讨论',
is_official=False,
likes=45,
replies_count=5,
views=320,
has_poll=True
),
Post(
title='【官方通知】关于圆梦杯报名延期的通知',
content='因系统维护原定于2月20日截止的报名时间延长至2月28日...',
author_id=test_user.id,
tag='官方公告',
is_official=True,
pinned=True,
likes=128,
replies_count=3,
views=1560
),
Post(
title='分享一些备考资料和心得(持续更新)',
content='大家好!我是去年星火杯一等奖获得者,整理了一些备考资料和心得分享给大家...',
author_id=test_user.id,
tag='经验分享',
is_official=False,
likes=89,
replies_count=8,
views=890
),
Post(
title='求助:数列通项公式怎么求?',
content='已知数列 {aₙ} 满足 a₁=1, aₙ₊₁ = 2aₙ + 1求通项公式 aₙ',
author_id=test_user.id,
tag='求助答疑',
is_official=False,
likes=12,
replies_count=6,
views=156
),
Post(
title='今天的模拟考大家考得怎么样?',
content='刚考完今天的模拟考,感觉难度比去年真题大不少啊!...',
author_id=test_user.id,
tag='闲聊灌水',
is_official=False,
likes=67,
replies_count=15,
views=445,
has_poll=True
)
]
db.session.add_all(posts)
db.session.commit()
replies = [
Reply(post_id=1, author_id=test_user.id, content='这道题确实用泰勒展开更简单。答案是1/120', likes=23),
Reply(post_id=1, author_id=test_user.id, content='谢谢!原来是泰勒展开到第五阶', likes=5, reply_to='学霸本霸'),
Reply(post_id=3, author_id=test_user.id, content='感谢分享!错题本真的很重要', likes=12),
Reply(post_id=4, author_id=test_user.id, content='令 bₙ = aₙ + 1则 bₙ₊₁ = 2bₙ所以 aₙ = 2ⁿ - 1', likes=18)
]
db.session.add_all(replies)
db.session.commit()
poll1 = Poll(
post_id=1,
question='你觉得第12题的答案是什么',
multi=False
)
poll1.set_options([{'text': '1/2', 'votes': 15}, {'text': '1/3', 'votes': 8}, {'text': '1/120', 'votes': 42}, {'text': '其他', 'votes': 3}])
poll1.set_voters({'test3': [2], 'test4': [0], 'test5': [2]})
poll1.total_votes = 68
db.session.add(poll1)
poll5 = Poll(
post_id=5,
question='今天模拟考你觉得难度如何?',
multi=False
)
poll5.set_options([{'text': '简单', 'votes': 5}, {'text': '适中', 'votes': 18}, {'text': '偏难', 'votes': 35}, {'text': '太难', 'votes': 22}])
poll5.set_voters({'test1': [2], 'test3': [1], 'test4': [3]})
poll5.total_votes = 80
db.session.add(poll5)
db.session.commit()
if __name__ == '__main__':
app.run(debug=True, port=5000)