# app.py import os import time import json import random import string import smtplib from datetime import datetime, timedelta from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from functools import wraps from flask import ( Flask, render_template, request, jsonify, session, redirect, url_for, flash, abort ) from dotenv import load_dotenv from captcha.image import ImageCaptcha from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest # 引入所有数据库模型 from models import db, User, Exam, Submission, Draft, Contest, ContestMembership, Post, Reply, Poll, Report, Bookmark, Reaction, Notification, EditHistory, ContestRegistration, TeacherApplication, Friend, ExamBookmark load_dotenv() app = Flask(__name__) app.secret_key = os.urandom(24) # 内存存储(用于临时验证码) captcha_store = {} email_codes = {} # 数据库配置 app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db.init_app(app) # 阿里云号码认证服务客户端 acs_client = AcsClient( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID', ''), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET', ''), 'cn-hangzhou' ) # ========== 辅助函数 ========== def generate_captcha_id(): return ''.join(random.choices(string.ascii_lowercase + string.digits, k=16)) def generate_code(length=6): return ''.join(random.choices(string.digits, k=length)) def login_required(f): @wraps(f) def decorated(*args, **kwargs): if 'user' not in session: return redirect(url_for('login')) return f(*args, **kwargs) return decorated def teacher_required(f): @wraps(f) def decorated(*args, **kwargs): user = session.get('user') if not user: return redirect(url_for('login')) if user.get('role') != 'teacher': return redirect(url_for('exam_list')) return f(*args, **kwargs) return decorated def send_email(to_email, subject, html_content): smtp_host = os.getenv('SMTP_HOST', 'smtp.163.com') smtp_port = int(os.getenv('SMTP_PORT', 465)) smtp_user = os.getenv('SMTP_USER', '') smtp_pass = os.getenv('SMTP_PASS', '') msg = MIMEMultipart('alternative') msg['From'] = f'"验证码" <{smtp_user}>' msg['To'] = to_email msg['Subject'] = subject msg.attach(MIMEText(html_content, 'html', 'utf-8')) with smtplib.SMTP_SSL(smtp_host, smtp_port) as server: server.login(smtp_user, smtp_pass) server.sendmail(smtp_user, to_email, msg.as_string()) def get_current_user(): return session.get('user') def require_login(): user = get_current_user() if not user: abort(401) return user def require_admin_or_teacher(): user = require_login() if user['role'] not in ['admin', 'teacher']: abort(403) return user # 积分等级相关 LEVEL_TITLES = {1:'新手上路',2:'初出茅庐',3:'小有名气',4:'渐入佳境',5:'驾轻就熟',6:'炉火纯青',7:'学富五车',8:'出类拔萃',9:'登峰造极',10:'一代宗师'} def calc_level(points): if points >= 5000: return 10 if points >= 3000: return 9 if points >= 2000: return 8 if points >= 1200: return 7 if points >= 800: return 6 if points >= 500: return 5 if points >= 300: return 4 if points >= 150: return 3 if points >= 50: return 2 return 1 def add_notification(user_id, ntype, content, from_user='', post_id=0): notif = Notification( user_id=user_id, type=ntype, content=content, from_user=from_user, post_id=post_id ) db.session.add(notif) db.session.commit() # ========== 杯赛讨论区发帖权限检查 ========== def can_post_in_contest(user, contest): """检查用户是否可以在指定杯赛的讨论区发帖""" if user['role'] in ['admin', 'teacher']: return True # 杯赛负责人或老师 membership = ContestMembership.query.filter_by(user_id=user['id'], contest_id=contest.id).first() if membership and membership.role in ['owner', 'teacher']: return True # 报名且至少参与一次考试 registration = ContestRegistration.query.filter_by(user_id=user['id'], contest_id=contest.id).first() if not registration: return False # 获取该杯赛下的所有考试 ID exam_ids = [exam.id for exam in contest.exams] if not exam_ids: return False # 查询用户在这些考试中的提交记录 submission_count = Submission.query.filter( Submission.user_id == user['id'], Submission.exam_id.in_(exam_ids) ).count() return submission_count >= 1 # ========== 上下文处理器 ========== @app.context_processor def inject_user(): return {'user': get_current_user()} # ========== 页面路由 ========== @app.route('/') def home(): return render_template('home.html') @app.route('/login', methods=['GET']) def login(): return render_template('login.html') @app.route('/register', methods=['GET']) def register(): return render_template('register.html') @app.route('/logout') def logout(): session.pop('user', None) return redirect(url_for('login')) @app.route('/contests') def contest_list(): return render_template('contest_list.html') @app.route('/contests/') def contest_detail(contest_id): contest = Contest.query.get(contest_id) if not contest: return redirect(url_for('contest_list')) user = get_current_user() registered = False can_post = False if user: # 检查是否报名 registered = ContestRegistration.query.filter_by(user_id=user['id'], contest_id=contest_id).first() is not None # 检查是否有发帖权限 can_post = can_post_in_contest(user, contest) return render_template('contest_detail.html', contest=contest, registered=registered, can_post=can_post) @app.route('/exams') @login_required def exam_list(): user = session.get('user') search_query = request.args.get('q', '').strip() subject_filter = request.args.get('subject', '').strip() query = Exam.query if subject_filter: query = query.filter_by(subject=subject_filter) if search_query: query = query.filter( (Exam.title.contains(search_query)) | (Exam.subject.contains(search_query)) ) all_exams = query.all() all_subjects = db.session.query(Exam.subject).distinct().all() all_subjects = [s[0] for s in all_subjects if s[0]] user_submissions = {} for sub in Submission.query.filter_by(user_id=user.get('id')).all(): user_submissions[sub.exam_id] = { 'id': sub.id, 'graded': sub.graded, 'score': sub.score } return render_template( 'exam_list.html', exams=all_exams, user_submissions=user_submissions, search_query=search_query, subject_filter=subject_filter, all_subjects=all_subjects ) @app.route('/exams/create') @teacher_required def exam_create(): return render_template('exam_create.html') @app.route('/exams/') @login_required def exam_detail(exam_id): exam = Exam.query.get(exam_id) if not exam: return redirect(url_for('exam_list')) user = session.get('user') existing = Submission.query.filter_by(exam_id=exam_id, user_id=user.get('id')).first() draft = Draft.query.filter_by(exam_id=exam_id, user_id=user.get('id')).first() return render_template('exam_detail.html', exam=exam, existing_submission=existing, draft=draft) @app.route('/exams//result') @login_required def exam_result(exam_id): exam = Exam.query.get(exam_id) if not exam: return redirect(url_for('exam_list')) user = session.get('user') submission = Submission.query.filter_by(exam_id=exam_id, user_id=user.get('id')).first() if not submission: return redirect(url_for('exam_detail', exam_id=exam_id)) return render_template('exam_result.html', exam=exam, submission=submission) @app.route('/exams//submissions') @teacher_required def exam_submissions(exam_id): exam = Exam.query.get(exam_id) if not exam: return redirect(url_for('exam_list')) subs = Submission.query.filter_by(exam_id=exam_id).all() stats = {} if subs: graded_subs = [s for s in subs if s.graded] scores = [s.score for s in graded_subs] if graded_subs else [] stats = { 'total': len(subs), 'graded': len(graded_subs), 'ungraded': len(subs) - len(graded_subs), 'avg_score': round(sum(scores) / len(scores), 1) if scores else 0, 'max_score': max(scores) if scores else 0, 'min_score': min(scores) if scores else 0, } next_ungraded = None for s in subs: if not s.graded: next_ungraded = s.id break return render_template('exam_submissions.html', exam=exam, submissions=subs, stats=stats, next_ungraded=next_ungraded) @app.route('/exams//grade/') @teacher_required def exam_grade(exam_id, sub_id): exam = Exam.query.get(exam_id) sub = Submission.query.get(sub_id) if not exam or not sub: return redirect(url_for('exam_list')) next_ungraded = None next_sub = Submission.query.filter_by(exam_id=exam_id, graded=False).filter(Submission.id != sub_id).first() if next_sub: next_ungraded = next_sub.id return render_template('exam_grade.html', exam=exam, submission=sub, next_ungraded=next_ungraded) @app.route('/exams//print') @login_required def exam_print(exam_id): exam = Exam.query.get(exam_id) if not exam: return redirect(url_for('exam_list')) return render_template('exam_print.html', exam=exam) @app.route('/forum') def forum(): return render_template('forum.html') # ========== 个人中心页面 ========== @app.route('/profile') @login_required def profile(): return render_template('profile.html') # ========== 个人中心 API ========== @app.route('/api/user/friends') @login_required def api_user_friends(): user_id = session['user']['id'] friendships = Friend.query.filter( ((Friend.user_id == user_id) | (Friend.friend_id == user_id)) & (Friend.status == 'accepted') ).all() friends = [] for f in friendships: if f.user_id == user_id: friend_user = User.query.get(f.friend_id) else: friend_user = User.query.get(f.user_id) if friend_user: friends.append({ 'id': friend_user.id, 'name': friend_user.name, 'avatar': friend_user.avatar if hasattr(friend_user, 'avatar') else '', 'created_at': f.created_at.strftime('%Y-%m-%d') }) return jsonify({'success': True, 'friends': friends}) @app.route('/api/user/posts') @login_required def api_user_posts(): user_id = session['user']['id'] posts = Post.query.filter_by(author_id=user_id).order_by(Post.created_at.desc()).all() data = [{ 'id': p.id, 'title': p.title, 'content': p.content[:100] + '...' if len(p.content) > 100 else p.content, 'created_at': p.created_at.strftime('%Y-%m-%d %H:%M'), 'replies': p.replies_count, 'likes': p.likes } for p in posts] return jsonify({'success': True, 'posts': data}) @app.route('/api/user/exam-bookmarks') @login_required def api_user_exam_bookmarks(): user_id = session['user']['id'] bookmarks = ExamBookmark.query.filter_by(user_id=user_id).order_by(ExamBookmark.created_at.desc()).all() data = [] for bm in bookmarks: exam = bm.exam if exam: data.append({ 'id': exam.id, 'title': exam.title, 'subject': exam.subject, 'total_score': exam.total_score, 'duration': exam.duration, 'status': exam.status, 'bookmarked_at': bm.created_at.strftime('%Y-%m-%d %H:%M') }) return jsonify({'success': True, 'bookmarks': data}) @app.route('/api/exams//bookmark', methods=['POST', 'DELETE']) @login_required def api_toggle_exam_bookmark(exam_id): user_id = session['user']['id'] if request.method == 'POST': existing = ExamBookmark.query.filter_by(user_id=user_id, exam_id=exam_id).first() if existing: return jsonify({'success': False, 'message': '已经收藏过了'}), 400 bookmark = ExamBookmark(user_id=user_id, exam_id=exam_id) db.session.add(bookmark) db.session.commit() return jsonify({'success': True, 'message': '收藏成功', 'bookmarked': True}) elif request.method == 'DELETE': bookmark = ExamBookmark.query.filter_by(user_id=user_id, exam_id=exam_id).first() if not bookmark: return jsonify({'success': False, 'message': '未收藏'}), 400 db.session.delete(bookmark) db.session.commit() return jsonify({'success': True, 'message': '已取消收藏', 'bookmarked': False}) @app.route('/api/friend/add', methods=['POST']) @login_required def api_add_friend(): user_id = session['user']['id'] data = request.get_json() friend_id = data.get('friend_id') if not friend_id or friend_id == user_id: return jsonify({'success': False, 'message': '无效的好友ID'}), 400 existing = Friend.query.filter( ((Friend.user_id == user_id) & (Friend.friend_id == friend_id)) | ((Friend.user_id == friend_id) & (Friend.friend_id == user_id)) ).first() if existing: return jsonify({'success': False, 'message': '已经是好友或已发送请求'}), 400 friend_req = Friend(user_id=user_id, friend_id=friend_id, status='pending') db.session.add(friend_req) db.session.commit() return jsonify({'success': True, 'message': '好友请求已发送'}) @app.route('/api/friend/accept/', methods=['POST']) @login_required def api_accept_friend(request_id): user_id = session['user']['id'] req = Friend.query.get(request_id) if not req or req.friend_id != user_id or req.status != 'pending': return jsonify({'success': False, 'message': '请求不存在或无权操作'}), 404 req.status = 'accepted' db.session.commit() return jsonify({'success': True, 'message': '好友已添加'}) # ========== 教师申请路由 ========== @app.route('/apply-teacher', methods=['GET', 'POST']) @login_required def apply_teacher(): if request.method == 'POST': name = request.form.get('name', '').strip() email = request.form.get('email', '').strip() reason = request.form.get('reason', '').strip() if not name or not email or not reason: flash('请填写完整信息', 'error') return redirect(url_for('apply_teacher')) user = session['user'] # 检查是否已经申请过且为pending existing = TeacherApplication.query.filter_by(user_id=user['id'], status='pending').first() if existing: flash('您已提交过申请,请耐心等待审核', 'error') return redirect(url_for('apply_teacher')) # 创建申请记录 appli = TeacherApplication( user_id=user['id'], name=name, email=email, reason=reason ) db.session.add(appli) db.session.commit() flash('申请已提交,管理员会尽快审核', 'success') return redirect(url_for('home')) return render_template('apply_teacher.html') # ========== 杯赛申请路由 ========== @app.route('/apply-contest', methods=['GET', 'POST']) @login_required def apply_contest(): if request.method == 'POST': # 这里可以扩展为保存申请到数据库(需创建 ContestApplication 模型) flash('申请已提交,管理员会尽快审核', 'success') return redirect(url_for('contest_list')) return render_template('apply_contest.html') # ========== API 路由 ========== @app.route('/api/captcha') def api_captcha(): global captcha_store captcha_text = ''.join(random.choices(string.digits, k=4)) captcha_id = generate_captcha_id() image = ImageCaptcha(width=150, height=50, font_sizes=[40]) data = image.generate(captcha_text) import base64 img_base64 = base64.b64encode(data.read()).decode('utf-8') captcha_store[captcha_id] = { 'text': captcha_text.lower(), 'expires': time.time() + 300 } expired = [k for k, v in captcha_store.items() if time.time() > v['expires']] for k in expired: del captcha_store[k] return jsonify({'captchaId': captcha_id, 'img': img_base64}) @app.route('/api/send-email-code', methods=['POST']) def api_send_email_code(): global captcha_store, email_codes data = request.get_json() email = data.get('email', '') captcha_id = data.get('captchaId', '') captcha_text = data.get('captchaText', '') if not email: return jsonify({'success': False, 'message': '邮箱不能为空'}), 400 if not captcha_id or not captcha_text: return jsonify({'success': False, 'message': '请输入图形验证码'}), 400 record = captcha_store.get(captcha_id) if not record or time.time() > record['expires']: captcha_store.pop(captcha_id, None) return jsonify({'success': False, 'message': '图形验证码已过期', 'refreshCaptcha': True}), 400 if record['text'] != captcha_text.lower(): captcha_store.pop(captcha_id, None) return jsonify({'success': False, 'message': '图形验证码错误', 'refreshCaptcha': True}), 400 captcha_store.pop(captcha_id, None) code = generate_code(6) email_codes[email] = {'code': code, 'expires': time.time() + 300} try: send_email(email, '您的注册验证码', f'

您的验证码是:{code},5分钟内有效。

') print(f'邮箱验证码已发送到 {email}: {code}') return jsonify({'success': True, 'message': '验证码已发送到邮箱'}) except Exception as e: print(f'发送邮件失败: {e}') email_codes.pop(email, None) return jsonify({'success': False, 'message': f'发送邮件失败: {str(e)}'}), 500 # ========== 注册相关 API ========== @app.route('/api/register', methods=['POST']) def api_register(): """邮箱注册(需绑定手机号)""" global email_codes data = request.get_json() name = data.get('name', '') email = data.get('email', '') phone = data.get('phone', '') password = data.get('password', '') invite_code = data.get('inviteCode', '') email_code = data.get('emailCode', '') if not name or not email or not password or not phone: return jsonify({'success': False, 'message': '请填写完整信息'}), 400 if not email_code: return jsonify({'success': False, 'message': '请输入邮箱验证码'}), 400 if not phone.isdigit() or len(phone) != 11: return jsonify({'success': False, 'message': '手机号格式不正确'}), 400 record = email_codes.get(email) if not record or time.time() > record['expires']: email_codes.pop(email, None) return jsonify({'success': False, 'message': '邮箱验证码已过期', 'refreshCaptcha': True}), 400 if record['code'] != email_code: email_codes.pop(email, None) return jsonify({'success': False, 'message': '邮箱验证码错误', 'refreshCaptcha': True}), 400 email_codes.pop(email, None) if User.query.filter_by(email=email).first(): return jsonify({'success': False, 'message': '该邮箱已注册'}), 400 if User.query.filter_by(phone=phone).first(): return jsonify({'success': False, 'message': '该手机号已绑定其他账户'}), 400 role = 'teacher' if invite_code else 'student' user = User(name=name, email=email, phone=phone, password=password, role=role) db.session.add(user) db.session.commit() user_data = {'name': user.name, 'email': user.email, 'phone': user.phone, 'role': user.role, 'id': user.id} session['user'] = user_data return jsonify({'success': True, 'message': '注册成功', 'user': user_data}) @app.route('/api/register-mobile', methods=['POST']) def api_register_mobile(): """手机号注册""" data = request.get_json() name = data.get('name', '') phone = data.get('phone', '') password = data.get('password', '') invite_code = data.get('inviteCode', '') sms_code = data.get('smsCode', '') if not name or not phone or not password or not sms_code: return jsonify({'success': False, 'message': '请填写完整信息'}), 400 if not phone.isdigit() or len(phone) != 11: return jsonify({'success': False, 'message': '手机号格式不正确'}), 400 try: req = CommonRequest() req.set_accept_format('json') req.set_domain('dypnsapi.aliyuncs.com') req.set_method('POST') req.set_protocol_type('https') req.set_version('2017-05-25') req.set_action_name('CheckSmsVerifyCode') req.add_query_param('PhoneNumber', phone) req.add_query_param('VerifyCode', sms_code) response = acs_client.do_action_with_exception(req) result = json.loads(response) print(f'验证结果: {json.dumps(result, ensure_ascii=False, indent=2)}') model = result.get('Model', {}) if not (result.get('Code') == 'OK' and model.get('VerifyResult') == 'PASS'): return jsonify({'success': False, 'message': '短信验证码错误或已过期'}), 400 except Exception as e: print(f'验证短信验证码出错: {e}') return jsonify({'success': False, 'message': '短信验证码验证失败'}), 500 if User.query.filter_by(phone=phone).first(): return jsonify({'success': False, 'message': '该手机号已注册'}), 400 role = 'teacher' if invite_code else 'student' user = User(name=name, phone=phone, password=password, role=role) db.session.add(user) db.session.commit() user_data = {'name': user.name, 'phone': user.phone, 'role': user.role, 'id': user.id} session['user'] = user_data return jsonify({'success': True, 'message': '注册成功', 'user': user_data}) # ========== 登录相关 API ========== @app.route('/api/login', methods=['POST']) def api_login(): data = request.get_json() email = data.get('email', '') password = data.get('password', '') if not email or not password: return jsonify({'success': False, 'message': '请输入邮箱和密码'}), 400 user = User.query.filter_by(email=email).first() if not user: return jsonify({'success': False, 'message': '账号不存在,请先注册'}), 400 if user.password != password: return jsonify({'success': False, 'message': '密码错误'}), 400 user_data = {'name': user.name, 'email': user.email, 'role': user.role, 'id': user.id} session['user'] = user_data return jsonify({'success': True, 'message': '登录成功', 'user': user_data}) @app.route('/api/send-sms', methods=['POST']) def api_send_sms(): global captcha_store data = request.get_json() phone = data.get('phone', '') captcha_id = data.get('captchaId', '') captcha_text = data.get('captchaText', '') if not phone: return jsonify({'success': False, 'message': '手机号不能为空'}), 400 if not captcha_id or not captcha_text: return jsonify({'success': False, 'message': '请输入图形验证码'}), 400 record = captcha_store.get(captcha_id) if not record or time.time() > record['expires']: captcha_store.pop(captcha_id, None) return jsonify({'success': False, 'message': '图形验证码已过期,请刷新', 'refreshCaptcha': True}), 400 if record['text'] != captcha_text.lower(): captcha_store.pop(captcha_id, None) return jsonify({'success': False, 'message': '图形验证码错误', 'refreshCaptcha': True}), 400 captcha_store.pop(captcha_id, None) try: req = CommonRequest() req.set_accept_format('json') req.set_domain('dypnsapi.aliyuncs.com') req.set_method('POST') req.set_protocol_type('https') req.set_version('2017-05-25') req.set_action_name('SendSmsVerifyCode') req.add_query_param('PhoneNumber', phone) req.add_query_param('SignName', os.getenv('SIGN_NAME', '')) req.add_query_param('TemplateCode', os.getenv('TEMPLATE_CODE', '')) req.add_query_param('TemplateParam', json.dumps({'code': '##code##', 'min': '5'})) req.add_query_param('CountryCode', '86') req.add_query_param('ValidTime', 300) req.add_query_param('Interval', 60) req.add_query_param('ReturnVerifyCode', True) req.add_query_param('CodeType', 1) req.add_query_param('CodeLength', 4) response = acs_client.do_action_with_exception(req) result = json.loads(response) print(f'号码认证服务返回: {json.dumps(result, ensure_ascii=False, indent=2)}') if result.get('Code') == 'OK': resp_data = {'success': True, 'message': '验证码发送成功'} model = result.get('Model', {}) if model.get('VerifyCode'): resp_data['mockCode'] = model['VerifyCode'] print(f'验证码: {model["VerifyCode"]}') return jsonify(resp_data) else: print(f'发送失败: {result.get("Message")}') return jsonify({'success': False, 'message': result.get('Message', '发送失败')}), 500 except Exception as e: print(f'发送短信出错: {e}') return jsonify({'success': False, 'message': f'发送短信出错: {str(e)}'}), 500 @app.route('/api/verify-code', methods=['POST']) def api_verify_code(): data = request.get_json() phone = data.get('phone', '') code = data.get('code', '') if not phone or not code: return jsonify({'success': False, 'message': '手机号和验证码不能为空'}), 400 try: req = CommonRequest() req.set_accept_format('json') req.set_domain('dypnsapi.aliyuncs.com') req.set_method('POST') req.set_protocol_type('https') req.set_version('2017-05-25') req.set_action_name('CheckSmsVerifyCode') req.add_query_param('PhoneNumber', phone) req.add_query_param('VerifyCode', code) response = acs_client.do_action_with_exception(req) result = json.loads(response) print(f'验证结果: {json.dumps(result, ensure_ascii=False, indent=2)}') model = result.get('Model', {}) if result.get('Code') == 'OK' and model.get('VerifyResult') == 'PASS': user = User.query.filter_by(phone=phone).first() if not user: user = User( name=f'用户{phone[-4:]}', phone=phone, password=generate_code(8), role='student' ) db.session.add(user) db.session.commit() user_data = {'name': user.name, 'phone': user.phone, 'role': user.role, 'id': user.id} session['user'] = user_data return jsonify({'success': True, 'message': '验证成功', 'user': user_data}) else: msg = '验证码已过期' if model.get('VerifyResult') == 'UNKNOWN' else '验证码错误' return jsonify({'success': False, 'message': msg}), 400 except Exception as e: print(f'验证出错: {e}') return jsonify({'success': False, 'message': '验证码错误或已过期'}), 400 # ========== 考试系统 API ========== @app.route('/api/exams', methods=['POST']) def api_create_exam(): user = session.get('user') if not user or user.get('role') != 'teacher': return jsonify({'success': False, 'message': '无权限'}), 403 data = request.get_json() title = data.get('title', '') subject = data.get('subject', '') duration = data.get('duration', 120) questions = data.get('questions', []) if not title or not questions: return jsonify({'success': False, 'message': '请填写试卷标题和题目'}), 400 total_score = sum(q.get('score', 0) for q in questions) exam = Exam( title=title, subject=subject, duration=duration, total_score=total_score, creator_id=user.get('id') ) exam.set_questions(questions) db.session.add(exam) db.session.commit() return jsonify({'success': True, 'message': '试卷创建成功', 'exam_id': exam.id}) @app.route('/api/exams//save-draft', methods=['POST']) def api_save_draft(exam_id): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 exam = Exam.query.get(exam_id) if not exam: return jsonify({'success': False, 'message': '试卷不存在'}), 404 data = request.get_json() answers = data.get('answers', {}) draft = Draft.query.filter_by(exam_id=exam_id, user_id=user.get('id')).first() if not draft: draft = Draft(exam_id=exam_id, user_id=user.get('id')) draft.set_answers(answers) db.session.add(draft) db.session.commit() return jsonify({'success': True, 'message': '草稿已保存'}) @app.route('/api/exams//submit', methods=['POST']) def api_submit_exam(exam_id): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 exam = Exam.query.get(exam_id) if not exam: return jsonify({'success': False, 'message': '试卷不存在'}), 404 if exam.status == 'closed': return jsonify({'success': False, 'message': '该考试已关闭'}), 400 if Submission.query.filter_by(exam_id=exam_id, user_id=user.get('id')).first(): return jsonify({'success': False, 'message': '您已提交过该试卷'}), 400 data = request.get_json() answers = data.get('answers', {}) score = 0 auto_graded = True question_scores = {} questions = exam.get_questions() for q in questions: qid = str(q['id']) if q['type'] == 'choice': if answers.get(qid) == q.get('answer'): score += q.get('score', 0) question_scores[qid] = q.get('score', 0) else: question_scores[qid] = 0 elif q['type'] == 'fill': student_answer = answers.get(qid, '').strip() correct_answers = [a.strip() for a in q.get('answer', '').split('|')] if student_answer in correct_answers: score += q.get('score', 0) question_scores[qid] = q.get('score', 0) else: question_scores[qid] = 0 else: auto_graded = False question_scores[qid] = 0 sub = Submission( exam_id=exam_id, user_id=user.get('id'), score=score, graded=auto_graded, graded_by='系统自动' if auto_graded else '' ) sub.set_answers(answers) sub.set_question_scores(question_scores) db.session.add(sub) Draft.query.filter_by(exam_id=exam_id, user_id=user.get('id')).delete() db.session.commit() return jsonify({'success': True, 'message': '提交成功', 'submission_id': sub.id}) @app.route('/api/exams//grade/', methods=['POST']) def api_grade_submission(exam_id, sub_id): user = session.get('user') if not user or user.get('role') != 'teacher': return jsonify({'success': False, 'message': '无权限'}), 403 sub = Submission.query.get(sub_id) if not sub or sub.exam_id != exam_id: return jsonify({'success': False, 'message': '提交记录不存在'}), 404 data = request.get_json() scores = data.get('scores', {}) total = sum(scores.values()) sub.score = total sub.set_question_scores(scores) sub.graded = True sub.graded_by = user.get('name', '') db.session.commit() return jsonify({'success': True, 'message': '批改完成', 'total_score': total}) @app.route('/api/exams//status', methods=['POST']) def api_update_exam_status(exam_id): user = session.get('user') if not user or user.get('role') != 'teacher': return jsonify({'success': False, 'message': '无权限'}), 403 exam = Exam.query.get(exam_id) if not exam: return jsonify({'success': False, 'message': '试卷不存在'}), 404 if exam.creator_id != user.get('id'): return jsonify({'success': False, 'message': '只能修改自己创建的试卷'}), 403 data = request.get_json() new_status = data.get('status', '') if new_status not in ('available', 'closed'): return jsonify({'success': False, 'message': '无效状态'}), 400 exam.status = new_status db.session.commit() return jsonify({'success': True, 'message': f'试卷已{"发布" if new_status == "available" else "关闭"}'}) @app.route('/api/exams/', methods=['DELETE']) def api_delete_exam(exam_id): user = session.get('user') if not user or user.get('role') != 'teacher': return jsonify({'success': False, 'message': '无权限'}), 403 exam = Exam.query.get(exam_id) if not exam: return jsonify({'success': False, 'message': '试卷不存在'}), 404 if exam.creator_id != user.get('id'): return jsonify({'success': False, 'message': '只能删除自己创建的试卷'}), 403 Submission.query.filter_by(exam_id=exam_id).delete() Draft.query.filter_by(exam_id=exam_id).delete() db.session.delete(exam) db.session.commit() return jsonify({'success': True, 'message': '试卷已删除'}) # ========== 杯赛相关 API ========== @app.route('/api/contests/search') def api_search_contests(): keyword = request.args.get('q', '').strip().lower() query = Contest.query if keyword: query = query.filter( (Contest.name.contains(keyword)) | (Contest.organizer.contains(keyword)) | (Contest.description.contains(keyword)) ) contests = query.all() data = [{ 'id': c.id, 'name': c.name, 'organizer': c.organizer, 'description': c.description, 'start_date': c.start_date, 'end_date': c.end_date, 'status': c.status, 'participants': c.participants, 'created_by': c.created_by } for c in contests] return jsonify({'success': True, 'data': data}) @app.route('/api/contests', methods=['POST']) def api_create_contest(): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 data = request.get_json() contest = Contest( name=data.get('name'), organizer=data.get('organizer'), description=data.get('description'), start_date=data.get('start_date'), end_date=data.get('end_date'), status=data.get('status', 'upcoming'), participants=data.get('participants', 0), created_by=user.get('id') ) db.session.add(contest) db.session.commit() return jsonify({'success': True, 'message': '杯赛创建成功', 'contest_id': contest.id}) # ========== 杯赛报名 API ========== @app.route('/api/contests//register', methods=['POST']) @login_required def api_register_contest(contest_id): """用户报名杯赛""" user = session['user'] contest = Contest.query.get(contest_id) if not contest: return jsonify({'success': False, 'message': '杯赛不存在'}), 404 existing = ContestRegistration.query.filter_by(user_id=user['id'], contest_id=contest_id).first() if existing: return jsonify({'success': False, 'message': '您已报名该杯赛'}), 400 registration = ContestRegistration(user_id=user['id'], contest_id=contest_id) db.session.add(registration) contest.participants += 1 db.session.commit() return jsonify({'success': True, 'message': '报名成功', 'participants': contest.participants}) @app.route('/api/contests//unregister', methods=['POST']) @login_required def api_unregister_contest(contest_id): """用户取消报名""" user = session['user'] contest = Contest.query.get(contest_id) if not contest: return jsonify({'success': False, 'message': '杯赛不存在'}), 404 registration = ContestRegistration.query.filter_by(user_id=user['id'], contest_id=contest_id).first() if not registration: return jsonify({'success': False, 'message': '您尚未报名该杯赛'}), 400 db.session.delete(registration) if contest.participants > 0: contest.participants -= 1 db.session.commit() return jsonify({'success': True, 'message': '已取消报名', 'participants': contest.participants}) # ========== 杯赛讨论区 API ========== @app.route('/api/contests//posts', methods=['GET']) def api_contest_posts(contest_id): """获取杯赛讨论区帖子列表""" contest = Contest.query.get_or_404(contest_id) posts = Post.query.filter_by(contest_id=contest_id).order_by(Post.created_at.desc()).all() user = get_current_user() uid = user['id'] if user else None data = [] for p in posts: p_dict = { 'id': p.id, 'title': p.title, 'content': p.content[:200] + ('...' if len(p.content) > 200 else ''), 'author': p.author.name, 'author_id': p.author_id, 'tag': p.tag, 'is_official': p.is_official, 'pinned': p.pinned, 'likes': p.likes, 'replies': p.replies_count, 'views': p.views, 'created_at': p.created_at.strftime('%Y-%m-%d %H:%M'), } if uid: p_dict['liked'] = Reaction.query.filter_by(user_id=uid, post_id=p.id).first() is not None data.append(p_dict) return jsonify({'success': True, 'data': data}) @app.route('/api/contests//posts', methods=['POST']) @login_required def api_create_contest_post(contest_id): """在杯赛讨论区发布帖子""" user = session['user'] contest = Contest.query.get_or_404(contest_id) # 权限检查 if not can_post_in_contest(user, contest): return jsonify({'success': False, 'message': '您没有权限在此杯赛讨论区发帖'}), 403 data = request.get_json() title = data.get('title', '').strip() content = data.get('content', '').strip() if not title or not content: return jsonify({'success': False, 'message': '标题和内容不能为空'}), 400 if len(title) > 100: return jsonify({'success': False, 'message': '标题不能超过100字'}), 400 post = Post( title=title, content=content, author_id=user['id'], tag='杯赛讨论', # 可自定义 contest_id=contest_id, is_official=(user['role'] in ['admin', 'teacher'] and data.get('is_official', False)) ) db.session.add(post) db.session.commit() return jsonify({'success': True, 'message': '帖子发布成功', 'post_id': post.id}) # ========== 论坛相关 API ========== @app.route('/api/posts/search') def api_search_posts(): keyword = request.args.get('q', '').strip().lower() tag = request.args.get('tag', '').strip() sort = request.args.get('sort', 'newest') # 只获取非杯赛的帖子(contest_id 为 NULL) query = Post.query.filter_by(contest_id=None) if keyword: query = query.filter( (Post.title.contains(keyword)) | (Post.content.contains(keyword)) ) if tag: query = query.filter_by(tag=tag) if sort == 'hottest': query = query.order_by(Post.likes.desc()) elif sort == 'most_replies': query = query.order_by(Post.replies_count.desc()) else: query = query.order_by(Post.created_at.desc()) posts = query.all() user = session.get('user') uid = user.get('id') if user else None data = [] for p in posts: p_dict = { 'id': p.id, 'title': p.title, 'content': p.content[:200] + ('...' if len(p.content) > 200 else ''), 'author': p.author.name, 'author_id': p.author_id, 'tag': p.tag, 'is_official': p.is_official, 'pinned': p.pinned, 'likes': p.likes, 'replies': p.replies_count, 'views': p.views, 'has_poll': p.has_poll, 'created_at': p.created_at.strftime('%Y-%m-%d %H:%M'), } if uid: p_dict['liked'] = Reaction.query.filter_by(user_id=uid, post_id=p.id).first() is not None p_dict['bookmarked'] = Bookmark.query.filter_by(user_id=uid, post_id=p.id).first() is not None data.append(p_dict) return jsonify({'success': True, 'data': data}) @app.route('/api/posts', methods=['POST']) def api_create_post(): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 data = request.get_json() title = data.get('title', '').strip() content = data.get('content', '').strip() tag = data.get('tag', '全部') if not title or not content: return jsonify({'success': False, 'message': '标题和内容不能为空'}), 400 if len(title) > 100: return jsonify({'success': False, 'message': '标题不能超过100字'}), 400 post = Post( title=title, content=content, author_id=user.get('id'), tag=tag, is_official=(user.get('role') == 'teacher' and data.get('is_official', False)) ) db.session.add(post) db.session.commit() return jsonify({'success': True, 'message': '帖子发布成功', 'post_id': post.id}) @app.route('/api/posts/') def api_get_post(post_id): post = Post.query.get(post_id) if not post: return jsonify({'success': False, 'message': '帖子不存在'}), 404 post.views += 1 db.session.commit() replies = Reply.query.filter_by(post_id=post_id).order_by(Reply.created_at).all() user = session.get('user') uid = user.get('id') if user else None post_dict = { 'id': post.id, 'title': post.title, 'content': post.content, 'author': post.author.name, 'author_id': post.author_id, 'tag': post.tag, 'is_official': post.is_official, 'pinned': post.pinned, 'likes': post.likes, 'replies': post.replies_count, 'views': post.views, 'has_poll': post.has_poll, 'created_at': post.created_at.strftime('%Y-%m-%d %H:%M'), 'edited': (post.updated_at > post.created_at), } if uid: post_dict['liked'] = Reaction.query.filter_by(user_id=uid, post_id=post.id).first() is not None post_dict['bookmarked'] = Bookmark.query.filter_by(user_id=uid, post_id=post.id).first() is not None replies_data = [] for r in replies: r_dict = { 'id': r.id, 'content': r.content, 'author': r.author.name, 'author_id': r.author_id, 'likes': r.likes, 'reply_to': r.reply_to, 'created_at': r.created_at.strftime('%Y-%m-%d %H:%M'), 'edited': (r.updated_at > r.created_at), } if uid: r_dict['liked'] = Reaction.query.filter_by(user_id=uid, reply_id=r.id).first() is not None replies_data.append(r_dict) return jsonify({'success': True, 'data': post_dict, 'replies': replies_data}) @app.route('/api/posts/', methods=['DELETE']) def api_delete_post(post_id): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 post = Post.query.get(post_id) if not post: return jsonify({'success': False, 'message': '帖子不存在'}), 404 if post.author_id != user.get('id') and user.get('role') != 'teacher': return jsonify({'success': False, 'message': '无权删除'}), 403 db.session.delete(post) db.session.commit() return jsonify({'success': True, 'message': '帖子已删除'}) @app.route('/api/posts//like', methods=['POST']) def api_like_post(post_id): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 post = Post.query.get(post_id) if not post: return jsonify({'success': False, 'message': '帖子不存在'}), 404 uid = user.get('id') reaction = Reaction.query.filter_by(user_id=uid, post_id=post_id).first() if reaction: db.session.delete(reaction) post.likes -= 1 db.session.commit() return jsonify({'success': True, 'liked': False, 'likes': post.likes}) else: reaction = Reaction(user_id=uid, post_id=post_id, reaction='like') db.session.add(reaction) post.likes += 1 db.session.commit() return jsonify({'success': True, 'liked': True, 'likes': post.likes}) @app.route('/api/posts//bookmark', methods=['POST']) def api_bookmark_post(post_id): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 post = Post.query.get(post_id) if not post: return jsonify({'success': False, 'message': '帖子不存在'}), 404 uid = user.get('id') bookmark = Bookmark.query.filter_by(user_id=uid, post_id=post_id).first() if bookmark: db.session.delete(bookmark) db.session.commit() return jsonify({'success': True, 'bookmarked': False}) else: bookmark = Bookmark(user_id=uid, post_id=post_id) db.session.add(bookmark) db.session.commit() return jsonify({'success': True, 'bookmarked': True}) @app.route('/api/posts//pin', methods=['POST']) def api_pin_post(post_id): user = session.get('user') if not user or user.get('role') != 'teacher': return jsonify({'success': False, 'message': '无权限'}), 403 post = Post.query.get(post_id) if not post: return jsonify({'success': False, 'message': '帖子不存在'}), 404 post.pinned = not post.pinned db.session.commit() return jsonify({'success': True, 'pinned': post.pinned}) @app.route('/api/posts//replies', methods=['POST']) def api_create_reply(post_id): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 post = Post.query.get(post_id) if not post: return jsonify({'success': False, 'message': '帖子不存在'}), 404 data = request.get_json() content = data.get('content', '').strip() if not content: return jsonify({'success': False, 'message': '回复内容不能为空'}), 400 reply = Reply( post_id=post_id, author_id=user.get('id'), content=content, reply_to=data.get('reply_to', '') ) db.session.add(reply) post.replies_count += 1 db.session.commit() return jsonify({'success': True, 'message': '回复成功', 'reply': { 'id': reply.id, 'content': reply.content, 'author': user.get('name'), 'author_id': user.get('id'), 'likes': 0, 'reply_to': reply.reply_to, 'created_at': reply.created_at.strftime('%Y-%m-%d %H:%M') }}) @app.route('/api/replies//like', methods=['POST']) def api_like_reply(reply_id): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 reply = Reply.query.get(reply_id) if not reply: return jsonify({'success': False, 'message': '回复不存在'}), 404 uid = user.get('id') reaction = Reaction.query.filter_by(user_id=uid, reply_id=reply_id).first() if reaction: db.session.delete(reaction) reply.likes -= 1 db.session.commit() return jsonify({'success': True, 'liked': False, 'likes': reply.likes}) else: reaction = Reaction(user_id=uid, reply_id=reply_id, reaction='like') db.session.add(reaction) reply.likes += 1 db.session.commit() return jsonify({'success': True, 'liked': True, 'likes': reply.likes}) @app.route('/api/replies/', methods=['DELETE']) def api_delete_reply(reply_id): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 reply = Reply.query.get(reply_id) if not reply: return jsonify({'success': False, 'message': '回复不存在'}), 404 if reply.author_id != user.get('id') and user.get('role') != 'teacher': return jsonify({'success': False, 'message': '无权删除'}), 403 post = Post.query.get(reply.post_id) if post: post.replies_count -= 1 db.session.delete(reply) db.session.commit() return jsonify({'success': True, 'message': '回复已删除'}) @app.route('/api/user/bookmarks') def api_user_bookmarks(): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 uid = user.get('id') bookmarks = Bookmark.query.filter_by(user_id=uid).all() post_ids = [b.post_id for b in bookmarks] posts = Post.query.filter(Post.id.in_(post_ids)).all() data = [{ 'id': p.id, 'title': p.title, 'author': p.author.name, 'created_at': p.created_at.strftime('%Y-%m-%d %H:%M') } for p in posts] return jsonify({'success': True, 'data': data}) # ========== 论坛增强 API ========== @app.route('/api/posts//edit', methods=['POST']) def api_edit_post(post_id): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 post = Post.query.get(post_id) if not post: return jsonify({'success': False, 'message': '帖子不存在'}), 404 if post.author_id != user.get('id') and user.get('role') != 'teacher': return jsonify({'success': False, 'message': '无权编辑'}), 403 data = request.get_json() new_title = data.get('title', '').strip() new_content = data.get('content', '').strip() new_tag = data.get('tag', post.tag) if not new_title or not new_content: return jsonify({'success': False, 'message': '标题和内容不能为空'}), 400 post.title = new_title post.content = new_content post.tag = new_tag db.session.commit() return jsonify({'success': True, 'message': '编辑成功'}) @app.route('/api/replies//edit', methods=['POST']) def api_edit_reply(reply_id): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 reply = Reply.query.get(reply_id) if not reply: return jsonify({'success': False, 'message': '回复不存在'}), 404 if reply.author_id != user.get('id'): return jsonify({'success': False, 'message': '无权编辑'}), 403 data = request.get_json() new_content = data.get('content', '').strip() if not new_content: return jsonify({'success': False, 'message': '内容不能为空'}), 400 reply.content = new_content db.session.commit() return jsonify({'success': True, 'message': '编辑成功'}) @app.route('/api/posts//poll', methods=['POST']) def api_create_poll(post_id): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 post = Post.query.get(post_id) if not post: return jsonify({'success': False, 'message': '帖子不存在'}), 404 if post.author_id != user.get('id'): return jsonify({'success': False, 'message': '只有作者可以创建投票'}), 403 if post.poll: return jsonify({'success': False, 'message': '该帖子已有投票'}), 400 data = request.get_json() question = data.get('question', '').strip() options = data.get('options', []) multi = data.get('multi', False) if not question or len(options) < 2: return jsonify({'success': False, 'message': '请填写问题和至少2个选项'}), 400 poll = Poll( post_id=post_id, question=question, multi=multi ) poll.set_options([{'text': o, 'votes': 0} for o in options]) poll.set_voters({}) db.session.add(poll) post.has_poll = True db.session.commit() return jsonify({'success': True, 'message': '投票创建成功'}) @app.route('/api/posts//vote', methods=['POST']) def api_vote_poll(post_id): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 poll = Poll.query.filter_by(post_id=post_id).first() if not poll: return jsonify({'success': False, 'message': '投票不存在'}), 404 uid = user.get('id') voters = poll.get_voters() if str(uid) in voters: return jsonify({'success': False, 'message': '您已投过票'}), 400 data = request.get_json() choices = data.get('choices', []) if not choices: return jsonify({'success': False, 'message': '请选择选项'}), 400 if not poll.multi and len(choices) > 1: return jsonify({'success': False, 'message': '该投票为单选'}), 400 options = poll.get_options() for idx in choices: if 0 <= idx < len(options): options[idx]['votes'] += 1 voters[str(uid)] = choices poll.set_options(options) poll.set_voters(voters) poll.total_votes += 1 db.session.commit() return jsonify({'success': True, 'message': '投票成功', 'poll': { 'options': options, 'total_votes': poll.total_votes }}) @app.route('/api/posts//poll') def api_get_poll(post_id): poll = Poll.query.filter_by(post_id=post_id).first() if not poll: return jsonify({'success': False, 'message': '投票不存在'}), 404 user = session.get('user') voted = False my_choices = [] if user: voters = poll.get_voters() if str(user.get('id')) in voters: voted = True my_choices = voters[str(user.get('id'))] return jsonify({ 'success': True, 'poll': { 'question': poll.question, 'options': poll.get_options(), 'total_votes': poll.total_votes, 'multi': poll.multi }, 'voted': voted, 'my_choices': my_choices }) @app.route('/api/report', methods=['POST']) def api_report(): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 data = request.get_json() rtype = data.get('type', '') target_id = data.get('target_id', 0) reason = data.get('reason', '') detail = data.get('detail', '') if not rtype or not target_id or not reason: return jsonify({'success': False, 'message': '请填写举报信息'}), 400 report = Report( type=rtype, target_id=target_id, reporter_id=user.get('id'), reason=reason, detail=detail ) db.session.add(report) db.session.commit() return jsonify({'success': True, 'message': '举报已提交,管理员将尽快处理'}) @app.route('/api/posts//react', methods=['POST']) def api_react_post(post_id): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 post = Post.query.get(post_id) if not post: return jsonify({'success': False, 'message': '帖子不存在'}), 404 data = request.get_json() reaction_type = data.get('reaction', '') if reaction_type not in ('like', 'love', 'haha', 'wow', 'sad', 'angry'): return jsonify({'success': False, 'message': '无效反应'}), 400 uid = user.get('id') reaction = Reaction.query.filter_by(user_id=uid, post_id=post_id).first() if reaction: if reaction.reaction == reaction_type: db.session.delete(reaction) else: reaction.reaction = reaction_type else: reaction = Reaction(user_id=uid, post_id=post_id, reaction=reaction_type) db.session.add(reaction) db.session.commit() counts = {} for r in Reaction.query.filter_by(post_id=post_id).all(): counts[r.reaction] = counts.get(r.reaction, 0) + 1 return jsonify({'success': True, 'reactions': counts}) @app.route('/api/user/profile/') def api_user_profile(user_id): user = User.query.get(int(user_id)) if user_id.isdigit() else None if not user: return jsonify({'success': False, 'message': '用户不存在'}), 404 post_count = Post.query.filter_by(author_id=user.id).count() reply_count = Reply.query.filter_by(author_id=user.id).count() likes_received = db.session.query(db.func.sum(Post.likes)).filter_by(author_id=user.id).scalar() or 0 points = post_count * 10 + reply_count * 3 + likes_received * 2 level = calc_level(points) recent_posts = Post.query.filter_by(author_id=user.id).order_by(Post.created_at.desc()).limit(5).all() recent_posts_data = [{'id': p.id, 'title': p.title} for p in recent_posts] badges = [] if post_count >= 1: badges.append({'icon': '✍️', 'name': '初次发帖', 'desc': '发布第一篇帖子'}) if post_count >= 10: badges.append({'icon': '📝', 'name': '笔耕不辍', 'desc': '发布10篇帖子'}) if post_count >= 50: badges.append({'icon': '📚', 'name': '著作等身', 'desc': '发布50篇帖子'}) if reply_count >= 10: badges.append({'icon': '💬', 'name': '热心回复', 'desc': '回复10次'}) if reply_count >= 100: badges.append({'icon': '🗣️', 'name': '话题达人', 'desc': '回复100次'}) if likes_received >= 10: badges.append({'icon': '👍', 'name': '小有人气', 'desc': '获得10个赞'}) if likes_received >= 100: badges.append({'icon': '🌟', 'name': '人气之星', 'desc': '获得100个赞'}) if likes_received >= 500: badges.append({'icon': '👑', 'name': '万人迷', 'desc': '获得500个赞'}) return jsonify({ 'success': True, 'profile': { 'user_id': user.id, 'name': user.name, 'points': points, 'level': level, 'level_title': LEVEL_TITLES.get(level, ''), 'posts_count': post_count, 'replies_count': reply_count, 'likes_received': likes_received, 'badges': badges, 'recent_posts': recent_posts_data } }) @app.route('/api/forum/hot') def api_hot_posts(): posts = Post.query.order_by((Post.likes*3 + Post.replies_count*2 + Post.views).desc()).limit(10).all() data = [{'id': p.id, 'title': p.title, 'likes': p.likes, 'replies': p.replies_count} for p in posts] return jsonify({'success': True, 'data': data}) @app.route('/api/forum/stats') def api_forum_stats(): total_posts = Post.query.count() total_replies = Reply.query.count() total_users = User.query.count() today = datetime.utcnow().date() today_posts = Post.query.filter(db.func.date(Post.created_at) == today).count() today_replies = Reply.query.filter(db.func.date(Reply.created_at) == today).count() online_count = random.randint(max(15, total_users), max(50, total_users * 3)) tag_counts = {} for tag in db.session.query(Post.tag, db.func.count(Post.tag)).group_by(Post.tag).all(): tag_counts[tag[0]] = tag[1] active_users = [] recent_posts_users = Post.query.order_by(Post.created_at.desc()).limit(10).all() seen = set() for p in recent_posts_users: if p.author_id not in seen and len(active_users) < 5: seen.add(p.author_id) points = (Post.query.filter_by(author_id=p.author_id).count()*10 + Reply.query.filter_by(author_id=p.author_id).count()*3) level = calc_level(points) active_users.append({ 'user_id': p.author_id, 'name': p.author.name, 'level': level, 'level_title': LEVEL_TITLES.get(level, '') }) return jsonify({ 'success': True, 'stats': { 'total_posts': total_posts, 'total_replies': total_replies, 'total_users': total_users, 'today_posts': today_posts, 'today_replies': today_replies, 'online_count': online_count, 'tag_counts': tag_counts, 'active_users': active_users } }) @app.route('/api/forum/leaderboard') def api_leaderboard(): users = User.query.all() board = [] for u in users: post_count = Post.query.filter_by(author_id=u.id).count() reply_count = Reply.query.filter_by(author_id=u.id).count() likes_received = db.session.query(db.func.sum(Post.likes)).filter_by(author_id=u.id).scalar() or 0 points = post_count * 10 + reply_count * 3 + likes_received * 2 level = calc_level(points) board.append({ 'user_id': u.id, 'name': u.name, 'points': points, 'level': level, 'level_title': LEVEL_TITLES.get(level, ''), 'posts_count': post_count, 'likes_received': likes_received }) board.sort(key=lambda x: x['points'], reverse=True) return jsonify({'success': True, 'data': board[:20]}) @app.route('/api/posts/with-poll', methods=['POST']) def api_create_post_with_poll(): user = session.get('user') if not user: return jsonify({'success': False, 'message': '请先登录'}), 401 data = request.get_json() title = data.get('title', '').strip() content = data.get('content', '').strip() tag = data.get('tag', '全部') if not title or not content: return jsonify({'success': False, 'message': '标题和内容不能为空'}), 400 post = Post( title=title, content=content, author_id=user.get('id'), tag=tag, is_official=(user.get('role') == 'teacher' and data.get('is_official', False)) ) db.session.add(post) db.session.flush() poll_data = data.get('poll') if poll_data and poll_data.get('question') and len(poll_data.get('options', [])) >= 2: poll = Poll( post_id=post.id, question=poll_data['question'], multi=poll_data.get('multi', False) ) poll.set_options([{'text': o, 'votes': 0} for o in poll_data['options']]) poll.set_voters({}) db.session.add(poll) post.has_poll = True db.session.commit() return jsonify({'success': True, 'message': '帖子发布成功', 'post_id': post.id}) @app.route('/api/posts//share', methods=['POST']) def api_share_post(post_id): return jsonify({'success': True}) # ========== 管理后台页面 ========== @app.route('/admin') def admin_dashboard(): user = require_admin_or_teacher() return render_template('admin_dashboard.html') @app.route('/admin/contests') def admin_contests(): user = require_admin_or_teacher() return render_template('admin_contests.html') @app.route('/admin/exams') def admin_exams(): user = require_admin_or_teacher() return render_template('admin_exams.html') @app.route('/admin/posts') def admin_posts(): user = require_admin_or_teacher() return render_template('admin_posts.html') @app.route('/admin/users') def admin_users(): user = require_admin_or_teacher() return render_template('admin_users.html') # ========== 管理后台 API ========== @app.route('/api/admin/contests', methods=['GET', 'POST']) def admin_contests_api(): user = require_admin_or_teacher() if request.method == 'GET': contests = Contest.query.all() data = [{ 'id': c.id, 'name': c.name, 'organizer': c.organizer, 'start_date': c.start_date, 'description': c.description, 'contact': c.contact if hasattr(c, 'contact') else '', 'status': c.status } for c in contests] return jsonify({'success': True, 'contests': data}) elif request.method == 'POST': data = request.get_json() contest = Contest( name=data['name'], organizer=data['organizer'], start_date=data['start_date'], description=data['description'], contact=data.get('contact', ''), status=data['status'] ) db.session.add(contest) db.session.commit() return jsonify({'success': True}) @app.route('/api/admin/contests/', methods=['GET', 'PUT', 'DELETE']) def admin_contest_detail(contest_id): user = require_admin_or_teacher() contest = Contest.query.get(contest_id) if not contest: return jsonify({'success': False, 'message': '杯赛不存在'}) if request.method == 'GET': return jsonify({'success': True, 'contest': { 'id': contest.id, 'name': contest.name, 'organizer': contest.organizer, 'start_date': contest.start_date, 'description': contest.description, 'contact': contest.contact if hasattr(contest, 'contact') else '', 'status': contest.status }}) elif request.method == 'PUT': data = request.get_json() contest.name = data.get('name', contest.name) contest.organizer = data.get('organizer', contest.organizer) contest.start_date = data.get('start_date', contest.start_date) contest.description = data.get('description', contest.description) contest.contact = data.get('contact', contest.contact if hasattr(contest, 'contact') else '') contest.status = data.get('status', contest.status) db.session.commit() return jsonify({'success': True}) elif request.method == 'DELETE': db.session.delete(contest) db.session.commit() return jsonify({'success': True}) @app.route('/api/admin/users', methods=['GET']) def admin_users_api(): user = require_admin_or_teacher() q = request.args.get('q', '') role = request.args.get('role', '') query = User.query if q: query = query.filter((User.name.contains(q)) | (User.email.contains(q))) if role: query = query.filter_by(role=role) users = query.all() data = [{ 'id': u.id, 'name': u.name, 'email': u.email, 'role': u.role, 'created_at': u.created_at.strftime('%Y-%m-%d %H:%M') } for u in users] return jsonify({'success': True, 'users': data}) @app.route('/api/admin/users/', methods=['DELETE']) def admin_delete_user(user_id): user = require_admin_or_teacher() if user_id == session.get('user', {}).get('id'): return jsonify({'success': False, 'message': '不能删除自己'}) target = User.query.get(user_id) if target: db.session.delete(target) db.session.commit() return jsonify({'success': True}) return jsonify({'success': False, 'message': '用户不存在'}) @app.route('/api/admin/posts', methods=['GET']) def admin_posts_api(): user = require_admin_or_teacher() q = request.args.get('q', '') tag = request.args.get('tag', '') query = Post.query if q: query = query.filter((Post.title.contains(q)) | (Post.content.contains(q))) if tag: query = query.filter_by(tag=tag) posts = query.all() data = [{ 'id': p.id, 'title': p.title, 'author': p.author.name, 'tag': p.tag, 'created_at': p.created_at.strftime('%Y-%m-%d %H:%M') } for p in posts] return jsonify({'success': True, 'posts': data}) @app.route('/api/admin/posts/', methods=['DELETE']) def admin_delete_post(post_id): user = require_admin_or_teacher() post = Post.query.get(post_id) if post: db.session.delete(post) db.session.commit() return jsonify({'success': True}) return jsonify({'success': False, 'message': '帖子不存在'}) @app.route('/api/admin/exams', methods=['GET']) def admin_exams_api(): user = require_admin_or_teacher() exams = Exam.query.all() data = [{ 'id': e.id, 'title': e.title, 'subject': e.subject, 'creator_name': e.creator.name if e.creator else '', 'status': e.status, 'created_at': e.created_at.strftime('%Y-%m-%d %H:%M') } for e in exams] return jsonify({'success': True, 'exams': data}) @app.route('/api/admin/exams//status', methods=['PUT']) def admin_update_exam_status(exam_id): user = require_admin_or_teacher() exam = Exam.query.get(exam_id) if not exam: return jsonify({'success': False, 'message': '考试不存在'}) data = request.get_json() exam.status = data.get('status', exam.status) db.session.commit() return jsonify({'success': True}) @app.route('/api/admin/exams/', methods=['DELETE']) def admin_delete_exam(exam_id): user = require_admin_or_teacher() exam = Exam.query.get(exam_id) if exam: db.session.delete(exam) db.session.commit() return jsonify({'success': True}) return jsonify({'success': False, 'message': '考试不存在'}) @app.route('/api/admin/stats', methods=['GET']) def admin_stats(): user = require_admin_or_teacher() stats = { 'users': User.query.count(), 'posts': Post.query.count(), 'exams': Exam.query.count(), 'contests': Contest.query.count() } return jsonify({'success': True, 'stats': stats}) @app.route('/api/admin/recent-activities', methods=['GET']) def admin_recent_activities(): user = require_admin_or_teacher() activities = [] for s in Submission.query.order_by(Submission.submitted_at.desc()).limit(5).all(): activities.append({ 'time': s.submitted_at.strftime('%Y-%m-%d %H:%M'), 'user': s.user_name, 'action': f'提交了试卷 ID {s.exam_id}' }) for p in Post.query.order_by(Post.created_at.desc()).limit(5).all(): activities.append({ 'time': p.created_at.strftime('%Y-%m-%d %H:%M'), 'user': p.author.name, 'action': f'发布了帖子《{p.title}》' }) activities.sort(key=lambda x: x['time'], reverse=True) return jsonify({'success': True, 'activities': activities[:10]}) # ========== 初始化测试数据 ========== with app.app_context(): db.create_all() # 仅当数据库为空时添加测试数据 if Contest.query.count() == 0: contests = [ Contest( name='2026年"星火杯"全国中学生数学联考', organizer='星火杯组委会', description='本届星火杯旨在选拔优秀数学人才,试题难度对标全国高中数学联赛。', start_date='2026-03-15', end_date='2026-03-15', status='registering', participants=12500, created_by='system', _past_papers=json.dumps([ {"year": "2025", "title": "2025年星火杯真题(含答案)", "file": "/static/papers/2025_spark.pdf"}, {"year": "2024", "title": "2024年星火杯真题", "file": "/static/papers/2024_spark.pdf"}, {"year": "2023", "title": "2023年星火杯真题解析", "file": "/static/papers/2023_spark_analysis.pdf"} ]) ), Contest( name='第十一届"圆梦杯"综合能力测试', organizer='圆梦教育基金会', description='圆梦杯已成功举办十届,是国内最具影响力的综合素质评价赛事之一。', start_date='2026-04-01', end_date='2026-04-01', status='upcoming', participants=8000, created_by='system', _past_papers=json.dumps([ {"year": "2025", "title": "第十届圆梦杯真题(语数英综合)", "file": "/static/papers/2025_dream.pdf"}, {"year": "2024", "title": "第九届圆梦杯真题及详解", "file": "/static/papers/2024_dream.pdf"} ]) ), Contest( name='2025年冬季联考(已结束)', organizer='联考联盟', description='回顾2025年冬季联考精彩瞬间,查看优秀试卷与解析。', start_date='2025-12-20', end_date='2025-12-20', status='ended', participants=15000, created_by='system', _past_papers=json.dumps([ {"year": "2025", "title": "2025冬季联考数学试卷", "file": "/static/papers/2025_winter_math.pdf"}, {"year": "2025", "title": "2025冬季联考英语试卷", "file": "/static/papers/2025_winter_english.pdf"}, {"year": "2024", "title": "2024冬季联考理综试卷", "file": "/static/papers/2024_winter_science.pdf"} ]) ) ] db.session.add_all(contests) if Post.query.count() == 0: # 确保有一个测试用户 test_user = User.query.filter_by(email='test@example.com').first() if not test_user: test_user = User(name='测试用户', email='test@example.com', password='123456', role='student') db.session.add(test_user) db.session.commit() posts = [ Post( title='关于2026年星火杯初赛第12题的讨论', content='大家觉得第12题的第二问是不是有点超纲了?我用洛必达法则算出来的结果是1/2...', author_id=test_user.id, tag='题目讨论', is_official=False, likes=45, replies_count=5, views=320, has_poll=True ), Post( title='【官方通知】关于圆梦杯报名延期的通知', content='因系统维护,原定于2月20日截止的报名时间延长至2月28日...', author_id=test_user.id, tag='官方公告', is_official=True, pinned=True, likes=128, replies_count=3, views=1560 ), Post( title='分享一些备考资料和心得(持续更新)', content='大家好!我是去年星火杯一等奖获得者,整理了一些备考资料和心得分享给大家...', author_id=test_user.id, tag='经验分享', is_official=False, likes=89, replies_count=8, views=890 ), Post( title='求助:数列通项公式怎么求?', content='已知数列 {aₙ} 满足 a₁=1, aₙ₊₁ = 2aₙ + 1,求通项公式 aₙ', author_id=test_user.id, tag='求助答疑', is_official=False, likes=12, replies_count=6, views=156 ), Post( title='今天的模拟考大家考得怎么样?', content='刚考完今天的模拟考,感觉难度比去年真题大不少啊!...', author_id=test_user.id, tag='闲聊灌水', is_official=False, likes=67, replies_count=15, views=445, has_poll=True ) ] db.session.add_all(posts) db.session.commit() replies = [ Reply(post_id=1, author_id=test_user.id, content='这道题确实用泰勒展开更简单。答案是1/120', likes=23), Reply(post_id=1, author_id=test_user.id, content='谢谢!原来是泰勒展开到第五阶', likes=5, reply_to='学霸本霸'), Reply(post_id=3, author_id=test_user.id, content='感谢分享!错题本真的很重要', likes=12), Reply(post_id=4, author_id=test_user.id, content='令 bₙ = aₙ + 1,则 bₙ₊₁ = 2bₙ,所以 aₙ = 2ⁿ - 1', likes=18) ] db.session.add_all(replies) db.session.commit() poll1 = Poll( post_id=1, question='你觉得第12题的答案是什么?', multi=False ) poll1.set_options([{'text': '1/2', 'votes': 15}, {'text': '1/3', 'votes': 8}, {'text': '1/120', 'votes': 42}, {'text': '其他', 'votes': 3}]) poll1.set_voters({'test3': [2], 'test4': [0], 'test5': [2]}) poll1.total_votes = 68 db.session.add(poll1) poll5 = Poll( post_id=5, question='今天模拟考你觉得难度如何?', multi=False ) poll5.set_options([{'text': '简单', 'votes': 5}, {'text': '适中', 'votes': 18}, {'text': '偏难', 'votes': 35}, {'text': '太难', 'votes': 22}]) poll5.set_voters({'test1': [2], 'test3': [1], 'test4': [3]}) poll5.total_votes = 80 db.session.add(poll5) db.session.commit() if __name__ == '__main__': app.run(debug=True, port=5000)