upgrade to python3;add some validator examples

This commit is contained in:
2025-10-11 12:33:24 +08:00
parent 65632f0e60
commit 487c041148
25 changed files with 1492 additions and 317 deletions

View File

@@ -0,0 +1,295 @@
# BitOJ 自定义验证程序完整指南
## 概述
BitOJ系统支持4种判题方式
1. **字符串精确比较** - 逐字符对比输出
2. **字符串容错比较** - 允许格式错误(PE),主要比较数字内容
3. **文件MD5校验** - 比较文件大小和MD5值
4. **自定义验证程序** - 编写专门的验证逻辑
本文档重点介绍第4种方式自定义验证程序的编写和使用。
## 自定义验证程序原理
### 工作流程
1. 系统执行用户提交的代码
2. 将输入、期望输出、实际输出传递给验证程序
3. 验证程序分析这些文件并返回判题结果
4. 系统根据返回结果确定最终评分
### 参数说明
验证程序接收以下参数:
- `argv[1]`: 标准输入文件路径
- `argv[2]`: 期望输出文件路径
- `argv[3]`: 用户程序输出文件路径
### 返回值
验证程序必须输出以下结果之一:
- `AC` - Accepted答案正确
- `PE` - Presentation Error格式错误
- `WA` - Wrong Answer答案错误
## 实际示例
### 1. 浮点数误差比较 (C++)
**适用场景**: 计算几何、数值计算等需要浮点数比较的题目
```cpp
#include <iostream>
#include <fstream>
#include <cmath>
using namespace std;
const double EPS = 1e-9;
int main(int argc, char* argv[]) {
ifstream expected(argv[2]); // 标准答案
ifstream result(argv[3]); // 用户输出
double exp_val, res_val;
if (!(expected >> exp_val) || !(result >> res_val)) {
cout << "WA" << endl;
return 0;
}
if (fabs(exp_val - res_val) <= EPS) {
cout << "AC" << endl;
} else {
cout << "WA" << endl;
}
return 0;
}
```
### 2. 多解答案验证 (Python)
**适用场景**: 排序题、图论路径题等有多种正确答案的题目
```python
#!/usr/bin/env python
import sys
def main():
try:
with open(sys.argv[2], 'r') as f:
expected = f.read().strip()
with open(sys.argv[3], 'r') as f:
result = f.read().strip()
# 解析数字并排序比较
expected_nums = sorted(map(int, expected.split()))
result_nums = sorted(map(int, result.split()))
if expected_nums == result_nums:
print("AC")
else:
print("WA")
except:
print("WA")
if __name__ == "__main__":
main()
```
### 3. 输出格式验证 (Java)
**适用场景**: 需要严格输出格式的竞赛题目
```java
import java.io.*;
import java.util.*;
import java.util.regex.Pattern;
public class StringFormatValidator {
public static void main(String[] args) {
try {
Scanner result = new Scanner(new File(args[2]));
int lineNum = 1;
while (result.hasNextLine()) {
String line = result.nextLine().trim();
// 检查格式Case #X: Y
Pattern pattern = Pattern.compile("^Case #" + lineNum + ": \\d+$");
if (!pattern.matcher(line).matches()) {
System.out.println("PE");
return;
}
lineNum++;
}
System.out.println("AC");
} catch (Exception e) {
System.out.println("WA");
}
}
}
```
## 在BitOJ中配置自定义验证程序
### 代码配置示例
```python
from judgescript import ExternalJudge
# 创建自定义验证程序
validator = ExternalJudge(
problemid='P1001', # 题目ID
vlang='gcc-3.3', # 验证程序语言
vcode=validator_code # 验证程序源代码
)
# 在测试过程中调用
result = validator.judge(
sid='S123', # 提交ID
tid='T001', # 测试用例ID
tin='/path/to/input.txt', # 输入文件
tout='/path/to/output.txt', # 期望输出
result='/path/to/result.txt', # 实际输出
errfile='/path/to/error.txt' # 错误输出
)
```
### 支持的编程语言
| 语言 | 标识符 | 说明 |
|------|--------|------|
| C | gcc-3.3 | 适合高性能验证 |
| C++ | g++-3.3 | 支持STL容器 |
| Java | java-1.6 | 强类型安全 |
| Python | python-2.5 | 快速开发 |
| Pascal | fpc-2.2 | 简单逻辑 |
## 最佳实践
### 1. 错误处理
```cpp
// 文件读取失败处理
if (!input_file.is_open()) {
cout << "WA" << endl;
return 0;
}
// 数据解析失败处理
try {
value = stod(line);
} catch (const exception& e) {
cout << "WA" << endl;
return 0;
}
```
### 2. 性能优化
- 使用适当的数据结构
- 避免不必要的字符串操作
- 合理设置缓冲区大小
- 注意内存使用限制
### 3. 安全考虑
- 验证输入参数数量
- 处理异常文件路径
- 限制递归深度
- 避免无限循环
### 4. 可维护性
- 添加清晰的注释
- 使用有意义的变量名
- 模块化验证逻辑
- 提供调试输出选项
## 常见应用场景
### 1. 数值计算题
- 浮点数精度比较
- 矩阵运算结果验证
- 统计数值的近似比较
### 2. 图论算法题
- 最短路径验证(允许多条等长路径)
- 最小生成树验证(权重相等即可)
- 图遍历结果验证
### 3. 字符串处理题
- 忽略大小写比较
- 忽略空白字符差异
- 正则表达式匹配
### 4. 组合数学题
- 排列组合结果验证
- 集合操作结果比较
- 数学证明步骤检查
### 5. 交互式题目
- 模拟交互过程
- 验证查询次数限制
- 检查答案正确性
## 调试技巧
### 1. 本地测试
```bash
# 编译验证程序
g++ -o validator validator.cpp
# 准备测试文件
echo "3.14159" > expected.txt
echo "3.14160" > result.txt
# 运行测试
./validator input.txt expected.txt result.txt
```
### 2. 日志输出
```cpp
#ifdef DEBUG
cerr << "Expected: " << exp_val << ", Got: " << res_val << endl;
cerr << "Difference: " << fabs(exp_val - res_val) << endl;
#endif
```
### 3. 边界情况测试
- 空输出文件
- 超大数值
- 特殊字符
- 格式错误的输入
## 故障排除
### 常见问题
1. **编译失败**
- 检查语言版本兼容性
- 确认头文件引用正确
- 验证语法错误
2. **运行超时**
- 优化算法复杂度
- 减少不必要的计算
- 检查是否有死循环
3. **内存不足**
- 减少内存分配
- 及时释放资源
- 使用流式处理
4. **判题结果异常**
- 验证输出格式
- 检查返回值规范
- 确认异常处理逻辑
## 总结
自定义验证程序是BitOJ系统最灵活的判题方式能够处理各种复杂的判题需求。通过合理设计验证逻辑可以支持
- 多解答案题目
- 浮点数精度要求
- 特殊输出格式
- 交互式评测
- 复杂的正确性检查
掌握自定义验证程序的编写技巧,能够大大扩展在线评测系统的应用范围,为各种类型的编程竞赛和教学需求提供支持。

View File

@@ -0,0 +1,69 @@
/*
* 字符串格式验证程序
* 验证输出是否符合特定格式要求
* 编译: javac StringFormatValidator.java
* 使用: java StringFormatValidator input.txt expected.txt result.txt
*/
import java.io.*;
import java.util.*;
import java.util.regex.Pattern;
public class StringFormatValidator {
public static void main(String[] args) {
if (args.length != 3) {
System.out.println("WA");
return;
}
try {
// 读取期望的行数和格式信息
Scanner expected = new Scanner(new File(args[1]));
Scanner result = new Scanner(new File(args[2]));
// 假设期望文件第一行是期望的行数
int expectedLines = expected.nextInt();
expected.nextLine(); // 消费换行符
// 读取用户输出
List<String> userLines = new ArrayList<>();
while (result.hasNextLine()) {
String line = result.nextLine().trim();
if (!line.isEmpty()) {
userLines.add(line);
}
}
// 检查行数
if (userLines.size() != expectedLines) {
System.out.println("WA");
return;
}
// 检查每行格式
boolean formatCorrect = true;
for (int i = 0; i < userLines.size(); i++) {
String line = userLines.get(i);
// 检查是否符合 "Case #X: Y" 格式
Pattern pattern = Pattern.compile("^Case #" + (i + 1) + ": \\d+$");
if (!pattern.matcher(line).matches()) {
formatCorrect = false;
break;
}
}
if (formatCorrect) {
System.out.println("AC");
} else {
System.out.println("PE"); // 格式错误
}
expected.close();
result.close();
} catch (Exception e) {
System.out.println("WA");
}
}
}

View File

@@ -0,0 +1,328 @@
# BitOJ 自定义验证程序示例
## 1. 基本验证程序结构
自定义验证程序接收以下参数:
- 标准输入文件
- 标准输出文件(期望答案)
- 用户输出文件(用户程序的输出)
验证程序需要输出以下结果之一:
- `AC` - 答案正确
- `PE` - 格式错误(如果支持)
- `WA` - 答案错误
## 2. 示例1数值误差比较验证程序C++
适用于需要浮点数比较的题目:
```cpp
#include <iostream>
#include <fstream>
#include <cmath>
#include <string>
using namespace std;
const double EPS = 1e-9;
int main(int argc, char* argv[]) {
// argv[1]: 标准输入文件
// argv[2]: 标准输出文件
// argv[3]: 用户输出文件
ifstream expected(argv[2]); // 标准答案
ifstream result(argv[3]); // 用户输出
double exp_val, res_val;
// 读取期望的浮点数
if (!(expected >> exp_val)) {
cout << "WA" << endl;
return 0;
}
// 读取用户输出的浮点数
if (!(result >> res_val)) {
cout << "WA" << endl;
return 0;
}
// 比较数值,允许误差
if (fabs(exp_val - res_val) <= EPS) {
cout << "AC" << endl;
} else {
cout << "WA" << endl;
}
return 0;
}
```
## 3. 示例2多解答案验证程序Python
适用于有多个正确答案的题目:
```python
#!/usr/bin/env python
import sys
def main():
# sys.argv[1]: 标准输入文件
# sys.argv[2]: 标准输出文件
# sys.argv[3]: 用户输出文件
try:
with open(sys.argv[2], 'r') as f:
expected = f.read().strip()
with open(sys.argv[3], 'r') as f:
result = f.read().strip()
# 假设这是一个排序题目,任何正确的排序都是对的
expected_nums = list(map(int, expected.split()))
result_nums = list(map(int, result.split()))
# 检查数量是否相同
if len(expected_nums) != len(result_nums):
print("WA")
return
# 检查排序后是否相同(原始数据相同)
if sorted(expected_nums) != sorted(result_nums):
print("WA")
return
# 检查用户输出是否有序
if result_nums == sorted(result_nums):
print("AC")
else:
print("WA")
except Exception as e:
print("WA")
if __name__ == "__main__":
main()
```
## 4. 示例3图论路径验证程序C++
验证最短路径问题,检查路径是否合法且最优:
```cpp
#include <iostream>
#include <fstream>
#include <vector>
#include <string>
#include <sstream>
using namespace std;
struct Edge {
int to, weight;
};
int main(int argc, char* argv[]) {
ifstream input(argv[1]); // 输入数据
ifstream expected(argv[2]); // 期望答案
ifstream result(argv[3]); // 用户答案
// 读取图的信息
int n, m;
input >> n >> m;
vector<vector<Edge>> graph(n + 1);
for (int i = 0; i < m; i++) {
int u, v, w;
input >> u >> v >> w;
graph[u].push_back({v, w});
graph[v].push_back({u, w}); // 无向图
}
int start, end;
input >> start >> end;
// 读取期望的最短距离
int expected_dist;
expected >> expected_dist;
// 读取用户输出的路径
string line;
getline(result, line);
istringstream iss(line);
vector<int> path;
int node;
while (iss >> node) {
path.push_back(node);
}
// 验证路径
if (path.empty() || path[0] != start || path.back() != end) {
cout << "WA" << endl;
return 0;
}
// 计算路径长度
int total_dist = 0;
for (int i = 0; i < path.size() - 1; i++) {
int u = path[i], v = path[i + 1];
bool found = false;
for (const Edge& e : graph[u]) {
if (e.to == v) {
total_dist += e.weight;
found = true;
break;
}
}
if (!found) {
cout << "WA" << endl; // 路径不存在
return 0;
}
}
// 检查是否是最短路径
if (total_dist == expected_dist) {
cout << "AC" << endl;
} else {
cout << "WA" << endl;
}
return 0;
}
```
## 5. 示例4字符串格式验证程序Java
验证输出格式是否正确:
```java
import java.io.*;
import java.util.*;
public class Main {
public static void main(String[] args) throws IOException {
// args[0]: 标准输入文件
// args[1]: 标准输出文件
// args[2]: 用户输出文件
Scanner expected = new Scanner(new File(args[1]));
Scanner result = new Scanner(new File(args[2]));
try {
// 读取期望的行数
int expectedLines = expected.nextInt();
// 检查用户输出的行数
int actualLines = 0;
List<String> userLines = new ArrayList<>();
while (result.hasNextLine()) {
String line = result.nextLine().trim();
if (!line.isEmpty()) {
userLines.add(line);
actualLines++;
}
}
if (actualLines != expectedLines) {
System.out.println("WA");
return;
}
// 检查每行格式:必须是 "Case #X: Y" 的格式
for (int i = 0; i < userLines.size(); i++) {
String line = userLines.get(i);
String pattern = "Case #" + (i + 1) + ": \\d+";
if (!line.matches(pattern)) {
System.out.println("PE"); // 格式错误
return;
}
}
System.out.println("AC");
} catch (Exception e) {
System.out.println("WA");
} finally {
expected.close();
result.close();
}
}
}
```
## 6. 示例5交互式题目验证程序C++
模拟交互过程:
```cpp
#include <iostream>
#include <fstream>
#include <vector>
#include <string>
using namespace std;
int main(int argc, char* argv[]) {
ifstream input(argv[1]); // 输入数据
ifstream expected(argv[2]); // 期望的查询次数等
ifstream result(argv[3]); // 用户的查询序列
// 读取隐藏的数字
int secret;
input >> secret;
// 读取最大查询次数
int maxQueries;
expected >> maxQueries;
// 模拟交互过程
int queryCount = 0;
int guess;
bool found = false;
while (result >> guess && queryCount < maxQueries) {
queryCount++;
if (guess == secret) {
found = true;
break;
}
// 在实际交互中,这里会给用户反馈 "too high" 或 "too low"
}
// 检查结果
if (found && queryCount <= maxQueries) {
cout << "AC" << endl;
} else {
cout << "WA" << endl;
}
return 0;
}
```
## 使用方法
1. **编写验证程序**:选择合适的编程语言编写验证逻辑
2. **设置题目**:在题目配置中指定使用自定义验证程序
3. **上传验证代码**:将验证程序代码保存到系统中
4. **测试验证**:确保验证程序能正确处理各种情况
## 注意事项
1. **异常处理**:验证程序必须处理所有可能的异常情况
2. **性能考虑**:验证程序也有时间限制,需要高效实现
3. **输出格式**:验证程序必须严格按照 AC/PE/WA 格式输出
4. **安全性**:验证程序在沙箱环境中运行,有资源限制
5. **可移植性**:确保验证程序在评测环境中能正常编译运行
## 配置示例
在 BitOJ 中配置自定义验证程序:
```python
# 在题目配置中设置自定义验证程序
problem.validator_language = 'gcc-3.3' # 验证程序语言
problem.validator_code = '''
// 这里放置验证程序的源代码
'''
```

View File

@@ -0,0 +1,54 @@
// 浮点数误差比较验证程序
// 编译: g++ -o float_validator float_validator.cpp
// 使用: ./float_validator input.txt expected.txt result.txt
#include <iostream>
#include <fstream>
#include <cmath>
#include <iomanip>
using namespace std;
const double EPS = 1e-9;
int main(int argc, char* argv[]) {
if (argc != 4) {
cout << "WA" << endl;
return 0;
}
ifstream expected(argv[2]); // 标准答案
ifstream result(argv[3]); // 用户输出
if (!expected.is_open() || !result.is_open()) {
cout << "WA" << endl;
return 0;
}
double exp_val, res_val;
// 读取期望的浮点数
if (!(expected >> exp_val)) {
cout << "WA" << endl;
return 0;
}
// 读取用户输出的浮点数
if (!(result >> res_val)) {
cout << "WA" << endl;
return 0;
}
// 比较数值,允许相对误差和绝对误差
double diff = fabs(exp_val - res_val);
double rel_error = (fabs(exp_val) > EPS) ? diff / fabs(exp_val) : diff;
if (diff <= 1e-6 || rel_error <= 1e-6) { // 放宽误差范围
cout << "AC" << endl;
} else {
cout << "WA" << endl;
}
expected.close();
result.close();
return 0;
}

View File

@@ -0,0 +1,183 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
BitOJ 自定义验证程序集成示例
演示如何在实际的BitOJ系统中使用自定义验证程序
"""
import sys
sys.path.append('../python')
try:
from judgescript import InternalJudge
except ImportError:
# 如果无法导入,创建一个模拟的类
class InternalJudge:
def __init__(self, allowpe=False):
self.allowpe = allowpe
def compare_string(self, output, result):
if output == result:
return 'AC'
if self.allowpe:
# 简化的PE检查
if ''.join(output.split()) == ''.join(result.split()):
return 'PE'
return 'WA'
def demo_internal_judge():
"""演示内置判题器的使用"""
print("=== 内置判题器演示 ===")
# 标准文本比较
ij_strict = InternalJudge(allowpe=False)
print("严格文本比较:")
result = ij_strict.compare_string("hello world\n", "hello world\n")
print(f" 'hello world\\n' vs 'hello world\\n' => {result}")
result = ij_strict.compare_string("hello world", "hello world\n")
print(f" 'hello world' vs 'hello world\\n' => {result}")
# 允许格式错误的比较
ij_pe = InternalJudge(allowpe=True)
print("\n允许格式错误的比较:")
result = ij_pe.compare_string("1 2 3 4 5\n", "1\n2\n3\n4\n5\n")
print(f" '1 2 3 4 5\\n' vs '1\\n2\\n3\\n4\\n5\\n' => {result}")
result = ij_pe.compare_string("12345\n", "1 2 3 4 5\n")
print(f" '12345\\n' vs '1 2 3 4 5\\n' => {result}")
def demo_custom_judge_config():
"""演示如何配置自定义验证程序"""
print("\n=== 自定义验证程序配置示例 ===")
# 浮点数比较验证程序代码
float_validator_code = """
#include <iostream>
#include <fstream>
#include <cmath>
using namespace std;
const double EPS = 1e-9;
int main(int argc, char* argv[]) {
if (argc != 4) {
cout << "WA" << endl;
return 0;
}
ifstream expected(argv[2]);
ifstream result(argv[3]);
double exp_val, res_val;
if (!(expected >> exp_val) || !(result >> res_val)) {
cout << "WA" << endl;
return 0;
}
if (fabs(exp_val - res_val) <= EPS) {
cout << "AC" << endl;
} else {
cout << "WA" << endl;
}
return 0;
}
"""
print("浮点数比较验证程序配置:")
print(" 语言: gcc-3.3")
print(" 代码长度:", len(float_validator_code), "字符")
print(" 功能: 允许浮点数误差比较")
# 多解验证程序代码
multi_solution_code = """
import sys
def main():
try:
with open(sys.argv[2], 'r') as f:
expected = f.read().strip()
with open(sys.argv[3], 'r') as f:
result = f.read().strip()
exp_nums = sorted(map(int, expected.split()))
res_nums = sorted(map(int, result.split()))
if exp_nums == res_nums:
print("AC")
else:
print("WA")
except:
print("WA")
if __name__ == "__main__":
main()
"""
print("\n多解验证程序配置:")
print(" 语言: python-2.5")
print(" 代码长度:", len(multi_solution_code), "字符")
print(" 功能: 允许多种正确答案")
def demo_judge_usage_scenarios():
"""演示不同判题方式的使用场景"""
print("\n=== 判题方式使用场景 ===")
scenarios = [
{
"名称": "字符串精确比较",
"适用": "输出结果固定的算法题",
"示例": "计算1+1的结果",
"验证方式": "InternalJudge(allowpe=False)"
},
{
"名称": "字符串容错比较",
"适用": "允许格式差异的题目",
"示例": "输出多个数字,允许不同的分隔符",
"验证方式": "InternalJudge(allowpe=True)"
},
{
"名称": "文件大小+MD5校验",
"适用": "二进制文件输出",
"示例": "图像处理、音频处理等",
"验证方式": "文件级别的校验"
},
{
"名称": "自定义验证程序",
"适用": "复杂的判题逻辑",
"示例": "浮点数比较、多解题目、交互题等",
"验证方式": "ExternalJudge"
}
]
for i, scenario in enumerate(scenarios, 1):
print(f"{i}. {scenario['名称']}")
print(f" 适用场景: {scenario['适用']}")
print(f" 典型示例: {scenario['示例']}")
print(f" 实现方式: {scenario['验证方式']}")
print()
def main():
"""主演示函数"""
print("BitOJ 判题系统验证程序演示")
print("=" * 50)
demo_internal_judge()
demo_custom_judge_config()
demo_judge_usage_scenarios()
print("演示完成!")
print("\n更多示例文件:")
print(" - float_validator.cpp: 浮点数比较验证程序")
print(" - multi_solution_validator.py: 多解验证程序")
print(" - StringFormatValidator.java: 格式验证程序")
print(" - range_validator.cpp: 数值范围验证程序")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,58 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
多解答案验证程序
适用于有多个正确答案的题目,如排序、排列等
使用: python multi_solution_validator.py input.txt expected.txt result.txt
"""
import sys
import re
def main():
if len(sys.argv) != 4:
print("WA")
return
try:
# 读取标准答案
with open(sys.argv[2], 'r') as f:
expected = f.read().strip()
# 读取用户输出
with open(sys.argv[3], 'r') as f:
result = f.read().strip()
# 去除多余的空白字符
expected = re.sub(r'\s+', ' ', expected)
result = re.sub(r'\s+', ' ', result)
# 解析数字
try:
expected_nums = list(map(int, expected.split()))
result_nums = list(map(int, result.split()))
except ValueError:
print("WA")
return
# 检查数量是否相同
if len(expected_nums) != len(result_nums):
print("WA")
return
# 检查是否包含相同的元素(允许不同的排列)
if sorted(expected_nums) != sorted(result_nums):
print("WA")
return
# 如果需要检查特定的排序顺序,可以添加额外的验证
# 这里我们接受任何包含相同元素的排列
print("AC")
except Exception:
print("WA")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,68 @@
// 数值范围验证程序
// 验证输出的数值是否在合理范围内
// 编译: g++ -o range_validator range_validator.cpp
#include <iostream>
#include <fstream>
#include <vector>
#include <algorithm>
using namespace std;
int main(int argc, char* argv[]) {
if (argc != 4) {
cout << "WA" << endl;
return 0;
}
ifstream input(argv[1]); // 输入数据,包含范围信息
ifstream expected(argv[2]); // 期望答案
ifstream result(argv[3]); // 用户输出
if (!input.is_open() || !expected.is_open() || !result.is_open()) {
cout << "WA" << endl;
return 0;
}
// 读取输入数据中的范围信息
int n;
input >> n;
vector<int> arr(n);
for (int i = 0; i < n; i++) {
input >> arr[i];
}
int min_val = *min_element(arr.begin(), arr.end());
int max_val = *max_element(arr.begin(), arr.end());
// 读取期望答案
int exp_result;
if (!(expected >> exp_result)) {
cout << "WA" << endl;
return 0;
}
// 读取用户答案
int user_result;
if (!(result >> user_result)) {
cout << "WA" << endl;
return 0;
}
// 验证答案是否在合理范围内
if (user_result < min_val || user_result > max_val) {
cout << "WA" << endl;
return 0;
}
// 验证答案是否正确
if (user_result == exp_result) {
cout << "AC" << endl;
} else {
cout << "WA" << endl;
}
input.close();
expected.close();
result.close();
return 0;
}

103
examples/test_validator.py Normal file
View File

@@ -0,0 +1,103 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
验证程序测试脚本
用于测试自定义验证程序是否工作正常
"""
import os
import subprocess
import tempfile
def test_float_validator():
"""测试浮点数验证程序"""
print("测试浮点数验证程序...")
# 创建测试文件
input_file = tempfile.NamedTemporaryFile(mode='w', delete=False)
input_file.write("1.0")
input_file.close()
expected_file = tempfile.NamedTemporaryFile(mode='w', delete=False)
expected_file.write("3.14159")
expected_file.close()
result_file = tempfile.NamedTemporaryFile(mode='w', delete=False)
result_file.write("3.14159001") # 轻微误差
result_file.close()
try:
# 编译验证程序
cmd = ['g++', '-o', 'float_validator', 'float_validator.cpp']
compile_result = subprocess.run(cmd, capture_output=True, text=True)
if compile_result.returncode != 0:
print("编译失败:", compile_result.stderr)
return
# 运行验证程序
cmd = ['./float_validator', input_file.name,
expected_file.name, result_file.name]
run_result = subprocess.run(cmd, capture_output=True, text=True)
print("验证结果:", run_result.stdout.strip())
# 清理
if os.path.exists('float_validator'):
os.remove('float_validator')
finally:
# 清理临时文件
for f in [input_file, expected_file, result_file]:
if os.path.exists(f.name):
os.unlink(f.name)
def test_multi_solution_validator():
"""测试多解验证程序"""
print("测试多解验证程序...")
# 创建测试文件
input_file = tempfile.NamedTemporaryFile(mode='w', delete=False)
input_file.write("")
input_file.close()
expected_file = tempfile.NamedTemporaryFile(mode='w', delete=False)
expected_file.write("1 2 3 4 5")
expected_file.close()
result_file = tempfile.NamedTemporaryFile(mode='w', delete=False)
result_file.write("5 4 3 2 1") # 相同元素,不同顺序
result_file.close()
try:
# 运行验证程序
cmd = ['python3', 'multi_solution_validator.py',
input_file.name, expected_file.name, result_file.name]
run_result = subprocess.run(cmd, capture_output=True, text=True)
print("验证结果:", run_result.stdout.strip())
finally:
# 清理临时文件
for f in [input_file, expected_file, result_file]:
if os.path.exists(f.name):
os.unlink(f.name)
def main():
"""主测试函数"""
print("=== 自定义验证程序测试 ===")
# 切换到示例目录
os.chdir('/Users/liushuming/Desktop/bitoj/examples')
test_float_validator()
print()
test_multi_solution_validator()
print("\n测试完成!")
if __name__ == "__main__":
main()

View File

@@ -1,10 +1,22 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging, os, sys, socket, time, xmlrpclib, bz2, Cookie
import logging
import os
import sys
import socket
import time
import xmlrpc.client as xmlrpclib
import bz2
import http.cookies as Cookie
import unittest
from engineconfig import getConfig
from entity import Submit, Problem, TestCase, PresetCode, DataFile
class DataSourceError(Exception): pass
class DataSourceError(Exception):
pass
class DataSource:
@@ -17,12 +29,14 @@ class DataSource:
def _get_config_retry(self):
ret = self.config.retry
if ret < 1: ret = 1
if ret < 1:
ret = 1
return ret
def _get_config_retry_wait(self):
ret = self.config.retry_wait
if ret < 0.1: ret = 0.1
if ret < 0.1:
ret = 0.1
return ret
def _do_action(self, func, args):
@@ -32,7 +46,7 @@ class DataSource:
while retry > 0:
try:
ret = func(*args)
except DataSourceError, e:
except DataSourceError:
self.logger.exception("DataSourceError")
if retry > 0:
time.sleep(retry_wait)
@@ -74,7 +88,7 @@ class DataSource:
ret.append(TestCase(self, row))
return ret
def get_test(self, tid, raw = True):
def get_test(self, tid, raw=True):
func = self.driver.get_test
args = (tid, )
row = self._do_action(func, args)
@@ -87,13 +101,13 @@ class DataSource:
func = self.driver.get_presetcodes
args = (pid, lang)
rows = self._do_action(func, args)
return map(lambda row: PresetCode(self, row), rows)
return list(map(lambda row: PresetCode(self, row), rows))
def get_datafiles(self, pid):
func = self.driver.get_datafiles
args = (pid, )
rows = self._do_action(func, args)
return map(lambda row: DataFile(self, row), rows)
return list(map(lambda row: DataFile(self, row), rows))
def get_datafile_data(self, datafileid):
func = self.driver.get_datafile_data
@@ -139,7 +153,7 @@ class DataSource:
class JspAuthTransport(xmlrpclib.Transport):
def __init__(self):
xmlrpclib.Transport.__init__(self)
self.__cookies = Cookie.SmartCookie()
self.__cookies = Cookie.SimpleCookie()
def request(self, host, handler, request_body, verbose=0):
# issue XML-RPC request
@@ -168,17 +182,17 @@ class JspAuthTransport(xmlrpclib.Transport):
def get_jsession_id(self):
if self.__cookies.has_key('MoodleSession'):
if 'MoodleSession' in self.__cookies:
return self.__cookies['MoodleSession'].value
return None
def __sendJsessionCookie(self, connection):
if self.__cookies.has_key('MoodleSession'):
if 'MoodleSession' in self.__cookies:
connection.putheader(
'Cookie',
'MoodleSession=%s' % self.__cookies['MoodleSession'].value)
if self.__cookies.has_key('MoodleSessionTest'):
if 'MoodleSessionTest' in self.__cookies:
connection.putheader(
'Cookie',
'MoodleSessionTest=%s' % self.__cookies['MoodleSessionTest'].value)
@@ -209,9 +223,9 @@ class XmlRpcDataSource:
while True:
try:
return self.server.oj.get_judge_id()
except xmlrpclib.Error, e:
except xmlrpclib.Error as e:
raise DataSourceError(e)
except socket.error, e:
except socket.error as e:
self.logger.exception('Failed to get_judge_id')
time.sleep(self.config.retry_wait)
@@ -219,9 +233,9 @@ class XmlRpcDataSource:
while True:
try:
return self.server.oj.reset_submits(judgeid)
except xmlrpclib.Error, e:
except xmlrpclib.Error as e:
raise DataSourceError(e)
except socket.error, e:
except socket.error as e:
self.logger.exception('Failed to reset_submits')
time.sleep(self.config.retry_wait)
@@ -230,12 +244,12 @@ class XmlRpcDataSource:
try:
submits = self.server.oj.get_submits(judgeid, limit)
for submit in submits:
if not isinstance(submit['code'], (str, unicode)):
if not isinstance(submit['code'], str):
submit['code'] = submit['code'].__str__()
return submits
except xmlrpclib.Error, e:
except xmlrpclib.Error as e:
raise DataSourceError(e)
except socket.error, e:
except socket.error as e:
time.sleep(self.config.retry_wait)
self.logger.exception('Failed to get_submits')
@@ -243,9 +257,9 @@ class XmlRpcDataSource:
while True:
try:
return self.server.oj.get_problem(problemid)
except xmlrpclib.Error, e:
except xmlrpclib.Error as e:
raise DataSourceError(e)
except socket.error, e:
except socket.error as e:
self.logger.exception('Failed to get_problem')
time.sleep(self.config.retry_wait)
@@ -254,15 +268,15 @@ class XmlRpcDataSource:
try:
tests = self.server.oj.get_tests(problemid, full)
for test in tests:
if not isinstance(test['input'], (str, unicode)):
if not isinstance(test['input'], str):
test['input'] = test['input'].__str__()
if not isinstance(test['output'], (str, unicode)):
if not isinstance(test['output'], str):
test['output'] = test['output'].__str__()
self.logger.debug('Got %d test case(s)', len(tests))
return tests
except xmlrpclib.Error, e:
except xmlrpclib.Error as e:
raise DataSourceError(e)
except socket.error, e:
except socket.error as e:
self.logger.exception('Failed to get_tests')
time.sleep(self.config.retry_wait)
@@ -270,16 +284,16 @@ class XmlRpcDataSource:
while True:
try:
test = self.server.oj.get_gztest(testid)
if not isinstance(test['input'], (str, unicode)):
if not isinstance(test['input'], str):
test['input'] = test['input'].__str__()
if not isinstance(test['output'], (str, unicode)):
if not isinstance(test['output'], str):
test['output'] = test['output'].__str__()
test['input'] = bz2.decompress(test['input'])
test['output'] = bz2.decompress(test['output'])
return test
except xmlrpclib.Error, e:
except xmlrpclib.Error as e:
raise DataSourceError(e)
except socket.error, e:
except socket.error as e:
self.logger.exception('Failed to get_tests')
time.sleep(self.config.retry_wait)
@@ -288,13 +302,13 @@ class XmlRpcDataSource:
try:
codes = self.server.oj.get_presetcodes(problemid, lang)
for code in codes:
if not isinstance(code['code'], (str, unicode)):
if not isinstance(code['code'], str):
code['code'] = code['code'].__str__()
self.logger.debug('Got %d presetcodes', len(codes))
return codes
except xmlrpclib.Error, e:
except xmlrpclib.Error as e:
raise DataSourceError(e)
except socket.error, e:
except socket.error as e:
self.logger.exception('Failed to get_presetcodes')
time.sleep(self.config.retry_wait)
@@ -304,9 +318,9 @@ class XmlRpcDataSource:
files = self.server.oj.get_datafiles(problemid)
self.logger.debug('Got %d datafiles', len(files))
return files
except xmlrpclib.Error, e:
except xmlrpclib.Error as e:
raise DataSourceError(e)
except socket.error, e:
except socket.error as e:
self.logger.exception('Failed to get_datafiles')
time.sleep(self.config.retry_wait)
@@ -315,9 +329,9 @@ class XmlRpcDataSource:
try:
data = self.server.oj.get_datafile_data(datafileid)
return str(data)
except xmlrpclib.Error, e:
except xmlrpclib.Error as e:
raise DataSourceError(e)
except socket.error, e:
except socket.error as e:
self.logger.exception('Failed to get_datafiles')
time.sleep(self.config.retry_wait)
@@ -327,24 +341,24 @@ class XmlRpcDataSource:
try:
return self.server.oj.update_submit_compilemessage(
id, compilemsg)
except xmlrpclib.Error, e:
except xmlrpclib.Error as e:
raise DataSourceError(e)
except socket.error, e:
except socket.error as e:
self.logger.exception('Failed to update_submit_compilemessage')
time.sleep(self.config.retry_wait)
def update_submit_test_results(self, id, results):
for r in results:
if not isinstance(r['stdout'], (str, unicode)): r['stdout'] = ''
if not isinstance(r['stderr'], (str, unicode)): r['stderr'] = ''
if not isinstance(r['stdout'], str): r['stdout'] = ''
if not isinstance(r['stderr'], str): r['stderr'] = ''
r['stdout'] = xmlrpclib.Binary(r['stdout'])
r['stderr'] = xmlrpclib.Binary(r['stderr'])
while True:
try:
return self.server.oj.update_submit_test_results(id, results)
except xmlrpclib.Error, e:
except xmlrpclib.Error as e:
raise DataSourceError(e)
except socket.error, e:
except socket.error as e:
self.logger.exception('Failed to update_submit_compilemessage')
time.sleep(self.config.retry_wait)
@@ -355,9 +369,9 @@ class XmlRpcDataSource:
mc.oj.update_submit_status(id, newstatus)
try:
return mc()
except xmlrpclib.Error, e:
except xmlrpclib.Error as e:
raise DataSourceError(e)
except socket.error, e:
except socket.error as e:
self.logger.exception('Failed to update_submits_status')
time.sleep(self.config.retry_wait)
@@ -365,9 +379,9 @@ class XmlRpcDataSource:
while True:
try:
return self.server.oj.get_submit_status(sid)
except xmlrpclib.Error, e:
except xmlrpclib.Error as e:
return DataSourceError(e)
except socket.error, e:
except socket.error as e:
self.logger.exception('Failed to get_submit_status')
time.sleep(self.config.retry_wait)
@@ -375,13 +389,13 @@ class XmlRpcDataSource:
while True:
try:
msg = self.server.oj.get_submit_compilemessage(sid)
if isinstance(msg, (str, unicode)):
if isinstance(msg, str):
return msg
else:
return msg.__str__()
except xmlrpclib.Error, e:
except xmlrpclib.Error as e:
return DataSourceError(e)
except socket.error, e:
except socket.error as e:
self.logger.exception('Failed to get_submit_status')
time.sleep(self.config.retry_wait)
@@ -389,16 +403,16 @@ class XmlRpcDataSource:
while True:
try:
return self.server.oj.get_submit(sid)
except xmlrpclib.Error, e:
except xmlrpclib.Error as e:
return DataSourceError(e)
except socket.error, e:
except socket.error as e:
self.logger.exception('Failed to get_submit')
time.sleep(self.config.retry_wait)
class DataSourceTest(unittest.TestCase):
def setUp(self):
execfile(os.path.join('..', 'testdata', 'test_config.py'))
exec(open(os.path.join('..', 'testdata', 'test_config.py')).read())
self.config = getConfig()
self.datasource = self.config.datasources[0]
self.dbname = os.path.join('..', 'testdata', self.config.testdb)

Binary file not shown.

View File

@@ -1,11 +1,17 @@
import logging, os, thread, threading, time, Queue
import logging
import _thread
import threading
import time
import queue
from engineconfig import getConfig
class JudgeEngine:
def __init__(self):
self.quit_event = threading.Event()
self.test_queue = Queue.Queue()
self.test_queue = queue.Queue()
self.config = getConfig()
self.quit_event.clear()
@@ -14,11 +20,11 @@ class JudgeEngine:
def run(self):
# one thread mode is good for debugging
if self.config.test_threads == 1:
thread.start_new_thread(self.transport, ())
_thread.start_new_thread(self.transport, ())
self.test()
else:
for i in range(self.config.test_threads):
thread.start_new_thread(self.test, ())
_thread.start_new_thread(self.test, ())
self.transport()
for ds in self.config.datasources:
@@ -30,7 +36,7 @@ class JudgeEngine:
total = 0
while not self.quit_event.isSet():
self.logger.info(
"%d submit in test queue, %d processed" % \
"%d submit in test queue, %d processed" %
(self.test_queue.qsize(), total))
c = 16 - self.test_queue.qsize()
if c > 0:
@@ -45,10 +51,10 @@ class JudgeEngine:
def test(self):
while not self.quit_event.isSet():
#pdb.set_trace()
# pdb.set_trace()
try:
submit = self.test_queue.get(True, self.config.fetch_interval)
except Queue.Empty:
except queue.Empty:
continue
tester = self.config.get_tester(submit.language)
if tester:
@@ -64,6 +70,8 @@ class JudgeEngine:
self.quit_event.set()
return
class JudgeError(Exception): pass
class JudgeError(Exception):
pass
# vim: set expandtab tabstop=4 shiftwidth=4:

Binary file not shown.

View File

@@ -1,9 +1,8 @@
#!/usr/bin/env python2.6
#!/usr/bin/env python3
import string
from string import split
from os import path
from Queue import Queue
from queue import Queue
from subprocess import Popen, PIPE
class EngineConfig:
@@ -39,9 +38,10 @@ class EngineConfig:
stdin=pa.stdout, stdout=PIPE)
output = pb.communicate()[0]
if output:
for user in string.split(output, '\n'):
user = string.strip(user)
if user: self.runas.put(user)
for user in output.decode().split('\n'):
user = user.strip()
if user:
self.runas.put(user)
pa.wait()
pb.wait()
@@ -49,7 +49,7 @@ class EngineConfig:
self.languages[profile] = tester
def get_tester(self, profile):
if self.languages.has_key(profile):
if profile in self.languages:
return self.languages[profile]
def add_datasource(self, ds):
@@ -65,41 +65,37 @@ class EngineConfig:
default_compileguard = (
'<judgehome>/scripts/compile-guard', ' <datadir>'
)
default_runguard = split(
default_runguard = (
'/usr/bin/sudo -u <user> <judgehome>/scripts/binary-guard ' +
'-e <extraproc> ' +
'-t <timelimit> -T 5 -m <maxmem> -d <rundir> -o <statfile> -p -x',
' '
)
maxmem_runguard = split(
'-t <timelimit> -T 5 -m <maxmem> -d <rundir> -o <statfile> -p -x'
).split(' ')
maxmem_runguard = (
'/usr/bin/sudo -u <user> <judgehome>/scripts/binary-guard ' +
'-e <extraproc> ' +
'-t <timelimit> -T 5 -m <maxmem> -d <rundir> -o <statfile> -p -x',
' '
)
java_runguard = split(
'-t <timelimit> -T 5 -m <maxmem> -d <rundir> -o <statfile> -p -x'
).split(' ')
java_runguard = (
'/usr/bin/sudo -u <user> ' +
'<judgehome>/scripts/java-guard -t <timelimit> -T 5 -m 262144 ' +
'-e <extraproc> ' +
'-d <rundir> -o <statfile> -p -x', ' '
)
python_runguard = split(
'-d <rundir> -o <statfile> -p -x'
).split(' ')
python_runguard = (
'<judgehome>/scripts/python-guardd ' +
'-t <timelimit> -T 5 -m <maxmem> -d <rundir> -o <statfile> -p -x',
' '
)
mono_runguard = split(
'-t <timelimit> -T 5 -m <maxmem> -d <rundir> -o <statfile> -p -x'
).split(' ')
mono_runguard = (
'/usr/bin/sudo -u <user> ' +
'<judgehome>/scripts/mono-guard -t <timelimit> -T 5 -m <maxmem> ' +
'-e <extraproc> ' +
'-d <rundir> -o <statfile> -p -x', ' '
)
bash_runguard = split(
'-d <rundir> -o <statfile> -p -x'
).split(' ')
bash_runguard = (
'/usr/bin/sudo -u <user> <judgehome>/scripts/bash-guard ' +
'-e <extraproc> ' +
'-t <timelimit> -T 5 -m <maxmem> -d <rundir> -o <statfile> -p -x',
' '
)
'-t <timelimit> -T 5 -m <maxmem> -d <rundir> -o <statfile> -p -x'
).split(' ')
default_compare = (
'<judgehome>/scripts/compare-guard', '<language>', '<codefile>',
'<stdinfile>', '<stdoutfile>', '<resultfile>'
@@ -151,7 +147,7 @@ class EngineConfig:
j2se15 = SimpleTester(
source = 'Main.java', target = 'Main.class',
compile = ('<judgehome>/scripts/javac-1.5',),
run = split('/usr/bin/java -cp <datadir> -Xms8M -Xmx64M Main'),
run = ('/usr/bin/java -cp <datadir> -Xms8M -Xmx64M Main').split(),
runenv = {},
basemem = {'RSS' : 7560 },
baseproc = 8,
@@ -163,7 +159,7 @@ class EngineConfig:
j2se16 = SimpleTester(
source = 'Main.java', target = 'Main.class',
compile = ('<judgehome>/scripts/javac-1.6',),
run = split('/usr/bin/java -cp <datadir> -Xms8M -Xmx64M Main'),
run = ('/usr/bin/java -cp <datadir> -Xms8M -Xmx64M Main').split(),
runenv = {},
basemem = {'RSS' : 7560 },
baseproc = 8,

Binary file not shown.

View File

@@ -16,21 +16,21 @@ class Problem:
self.id = row['id']
self.timemodified = row['timemodified']
self.vcode = row['validator_code']
if not isinstance(self.vcode, (str, unicode)):
if not isinstance(self.vcode, str):
self.vcode = self.vcode.__str__()
self.vtype = row['validator_type']
self.vlang = row['validator_lang']
self.gcode = row['generator_code']
if not isinstance(self.gcode, (str, unicode)):
if not isinstance(self.gcode, str):
self.gcode = self.vcode.__str__()
self.gtype = row['generator_type']
self.standard_code = row['standard_code']
if row.has_key('input_filename') and row['input_filename']:
if 'input_filename' in row and row['input_filename']:
self.input_filename = row['input_filename']
else:
self.input_filename = None
if row.has_key('output_filename') and row['output_filename']:
if 'output_filename' in row and row['output_filename']:
self.output_filename = row['output_filename']
else:
self.output_filename = None
@@ -60,7 +60,7 @@ class Problem:
return self.output_filename
def get_presetcodes(self, lang):
if not self.presetcodes.has_key(lang):
if not self.presetcodes or lang not in self.presetcodes:
codes = self.datasource.get_presetcodes(self.id, lang)
for code in codes: code.problem = self
self.presetcodes[lang] = codes
@@ -119,13 +119,11 @@ class Submit:
newresults = []
for r in results:
# stdout
f = file(r[4], 'r')
r[4] = f.read(config.output_sendback_size_limit)
f.close()
with open(r[4], 'r') as f:
r[4] = f.read(config.output_sendback_size_limit)
# stderr
f = file(r[5], 'r')
r[5] = f.read(config.output_sendback_size_limit)
f.close()
with open(r[5], 'r') as f:
r[5] = f.read(config.output_sendback_size_limit)
# strip stdout and stderr send back to datasource
# preventing post data too big
@@ -163,8 +161,8 @@ class TestCase:
self.memlimit = row['memlimit']
if not self.memlimit:
self.memlimit = config.maxmem
if row.has_key('nproc') and row['nproc']:
self.nproc = string.atoi(row['nproc'])
if 'nproc' in row and row['nproc']:
self.nproc = int(row['nproc'])
else:
self.nproc = 0
@@ -192,15 +190,15 @@ class TestCase:
input = string.replace(row['input'], '\r\n', '\n')
output = string.replace(row['output'], '\r\n', '\n')
f = file(self.infile, 'w')
f.write(input)
if len(input) > 0 and input[-1] != '\n': f.write('\n')
f.close()
with open(self.infile, 'w') as f:
f.write(input)
if len(input) > 0 and input[-1] != '\n':
f.write('\n')
f = file(self.outfile, 'w')
f.write(output)
if len(output) > 0 and output[-1] != '\n': f.write('\n')
f.close()
with open(self.outfile, 'w') as f:
f.write(output)
if len(output) > 0 and output[-1] != '\n':
f.write('\n')
logger.debug('Finished')
else:
logger.debug('Skip input/output file creation')
@@ -255,13 +253,11 @@ class DataFile:
data = datasource.get_datafile_data(self.id)
data = bz2.decompress(data)
if self.type == 'text':
f = open(self.absolute_path, 'w')
f.write(string.replace(data, '\r\n', '\n'))
f.close()
with open(self.absolute_path, 'w') as f:
f.write(string.replace(data, '\r\n', '\n'))
else:
f = open(self.absolute_path, 'wb')
f.write(data)
f.close()
with open(self.absolute_path, 'wb') as f:
f.write(data)
DataFile.write_lock.release()

Binary file not shown.

View File

@@ -23,20 +23,23 @@ class InternalJudge:
return self.compare_file(tin, tout, result, self.allowpe)
def compare_file(self, input, output, result, allowpe):
fo = file(output, 'rb'); fr = file(result, 'rb')
with open(output, 'rb') as fo, open(result, 'rb') as fr:
if not allowpe:
r = 'AC'
while r == 'AC':
so = fo.read(8192); sr = fr.read(8192)
if so == '' and sr == '': break
if so != sr: r = 'WA'
else:
so = fo.read(); sr = fr.read()
r = self.compare_string(so, sr)
fo.close(); fr.close()
return r
if not allowpe:
r = 'AC'
while r == 'AC':
so = fo.read(8192)
sr = fr.read(8192)
if so == b'' and sr == b'':
break
if so != sr:
r = 'WA'
else:
so = fo.read()
sr = fr.read()
r = self.compare_string(so.decode('utf-8', errors='ignore'),
sr.decode('utf-8', errors='ignore'))
return r
def compare_string(self, output, result):
if output == result: return 'AC'
@@ -76,10 +79,10 @@ class ExternalJudge:
os.makedirs(datadir)
self.comparecmd = tester.comparecmd
self.codefile = os.path.abspath(os.path.join(datadir, tester.source))
f = file(self.codefile, 'w')
f.write(string.replace(vcode, '\r\n', '\n'))
if len(vcode) > 0 and vcode[-1] != '\n': f.write('\n')
f.close()
with open(self.codefile, 'w') as f:
f.write(string.replace(vcode, '\r\n', '\n'))
if len(vcode) > 0 and vcode[-1] != '\n':
f.write('\n')
self.logger.debug("Save validator code as %s" % self.codefile)
@@ -119,7 +122,7 @@ class ExternalJudge:
if rundir: os.chdir(rundir)
os.execv(cmd[0], cmd)
print 'WA'
print('WA')
remaintime = self.config.judgescript_wait
pid1 = 0
@@ -133,13 +136,12 @@ class ExternalJudge:
while pid1 == 0:
try:
os.kill(pid, 9)
except os.OSError, e:
except OSError:
pass
pid1, status = os.waitpid(pid, os.WNOHANG)
f = file(rfile, 'r')
ret = string.strip(f.readline())
f.close()
with open(rfile, 'r') as f:
ret = f.readline().strip()
if ret != 'AC' and ret != 'PE': ret = 'WA'
return ret
@@ -268,7 +270,7 @@ public class Main {
def testPython(self):
code = """#!/usr/bin/env python
print 'AC'
print('AC')
"""
self.j = ExternalJudge('p1', 'python-2.5', code)
x = self.j.judge('s1', 't1', self.tin.name, self.tout.name, self.rst.name, self.err.name)

Binary file not shown.

View File

@@ -13,7 +13,7 @@ class OJTestCase(unittest.TestCase):
logger.addHandler(hdlr)
logger.setLevel(logging.DEBUG)
execfile(os.path.join('..', 'testdata', 'test_config.py'))
exec(open(os.path.join('..', 'testdata', 'test_config.py')).read())
self.config = getConfig()
self.ds = self.config.datasources[0]
self.dbname = os.path.join('..', 'testdata', self.config.testdb)

Binary file not shown.

View File

@@ -1,6 +1,6 @@
import math, os, resource, signal, string, sys, threading, logging, time, pickle
import shutil, Queue
import shutil, queue
import unittest
import pdb
from engineconfig import getConfig
@@ -103,12 +103,12 @@ class SimpleTester(TesterBase):
if os.path.exists(pcname):
try:
os.unlink(pcname)
except OSError, e:
except OSError:
self.logger.exception(
"Failed to delete presetcode file %s" % pcname)
return False
f = open(pcname, 'w')
f.write(string.replace(presetcode.code, '\r\n', '\n'))
f.write(presetcode.code.replace('\r\n', '\n'))
f.write('\n');
f.close()
@@ -118,24 +118,24 @@ class SimpleTester(TesterBase):
if os.path.exists(datadirsource):
try:
os.unlink(datadirsource)
except OSError, e:
except OSError:
self.logger.exception("Failed to delete source")
return False
if os.path.exists(datadirtarget):
try:
os.unlink(datadirtarget)
except OSError, e:
except OSError:
self.logger.exception("Failed to delete target")
return False
# preprocess source code
code = string.replace(submit.code, '\r\n', '\n')
code = string.replace(code, chr(0x1a), '') # char generated by tc
code = string.replace(code, 'getch()', '')
code = string.replace(code, 'getch ()', '')
code = string.replace(code, 'getch ( )', '')
code = submit.code.replace('\r\n', '\n')
code = code.replace(chr(0x1a), '') # char generated by tc
code = code.replace('getch()', '')
code = code.replace('getch ()', '')
code = code.replace('getch ( )', '')
code = string.replace(code, '\r\n', '\n')
code = code.replace('\r\n', '\n')
# write source to disk
f = open(datadirsource, 'w')
f.write(code)
@@ -146,7 +146,7 @@ class SimpleTester(TesterBase):
config = getConfig()
try:
submit.user = config.runas.get_nowait()
except Queue.Empty:
except queue.Empty:
self.logger.exception("No runas user left, please create more!")
return False
rundir = self.get_rundir(submit)
@@ -157,12 +157,12 @@ class SimpleTester(TesterBase):
if os.path.exists(rundir):
try:
self._remove(rundir)
except OSError, e:
except OSError:
self.logger.exception("Failed to delete rundir")
config.runas.put(submit.user)
return False
os.mkdir(rundir)
os.chmod(rundir, 0775)
os.chmod(rundir, 0o775)
return True
@@ -199,7 +199,7 @@ class SimpleTester(TesterBase):
for code in submit.get_presetcodes():
if not code.isheader:
cmd.append(code.name)
self.logger.debug(string.join(cmd, '_'))
self.logger.debug('_'.join(cmd))
errfile = os.path.join(datadir, 'compile.err')
@@ -210,9 +210,8 @@ class SimpleTester(TesterBase):
compilemsg = None
if os.path.exists(errfile):
f = file(errfile, 'r')
compilemsg = string.join(f.readlines(), '')
f.close()
with open(errfile, 'r') as f:
compilemsg = ''.join(f.readlines())
if compilemsg:
submit.set_compilemessage(compilemsg)
@@ -283,7 +282,7 @@ class SimpleTester(TesterBase):
s = s.replace('<datadir>', datadir)
s = s.replace('<user>', submit.user)
self.runenv[k] = s
self.logger.debug(string.join(cmd, ' ') + ' ' + str(self.runenv))
self.logger.debug(' '.join(cmd) + ' ' + str(self.runenv))
(exitcode, sig, timeused, memused) = \
self._execute(submit, cmd, timelimit = testcase.timelimit * 10,
@@ -365,22 +364,22 @@ class SimpleTester(TesterBase):
try:
os.close(0)
os.open(infile, os.O_RDONLY)
except Exception, e:
print e
except Exception as e:
print(e)
sys.exit(125)
if outfile:
try:
os.close(1)
os.open(outfile, os.O_WRONLY|os.O_CREAT|os.O_TRUNC, 0666)
except Exception, e:
print e
os.open(outfile, os.O_WRONLY|os.O_CREAT|os.O_TRUNC, 0o666)
except Exception as e:
print(e)
sys.exit(125)
if errfile:
try:
os.close(2)
os.open(errfile, os.O_WRONLY|os.O_CREAT|os.O_TRUNC, 0666)
except Exception, e:
print e
os.open(errfile, os.O_WRONLY|os.O_CREAT|os.O_TRUNC, 0o666)
except Exception as e:
print(e)
sys.exit(125)
#os.chdir(self.get_datadir(submit))
@@ -398,22 +397,23 @@ class SimpleTester(TesterBase):
# read information form statfile
if statfile:
try:
stat = pickle.load(file(statfile, 'r'))
with open(statfile, 'rb') as f:
stat = pickle.load(f)
exitcode = stat['exitcode']
sig = stat['sig']
timeused = stat['timeused']
memused = 0
if self.basemem.has_key('RSS'):
if 'RSS' in self.basemem:
memused += stat['memrss'] - self.basemem['RSS']
if self.basemem.has_key('Data'):
if 'Data' in self.basemem:
memused += stat['memdata'] - self.basemem['Data']
if self.basemem.has_key('Stack'):
if 'Stack' in self.basemem:
memused += stat['memstack'] - self.basemem['Stack']
memused = max(0, memused)
except Exception, e:
except Exception as e:
self.logger.exception(e)
self.logger.error("Failed to read statfile: %s" % statfile)
exitcode = 127 # judge script error
exitcode = 127 # judge script error
return (exitcode, sig, timeused, memused)

Binary file not shown.

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
import sys, string, hashlib, os
from stat import *
@@ -9,9 +9,9 @@ def check_file(name, size, md5sum):
m = hashlib.md5()
if os.path.isfile(name):
s = os.stat(name)[ST_SIZE]
if s == string.atoi(size):
if s == int(size):
f = open(name, 'rb')
m.update(f.read(string.atoi(size)))
m.update(f.read(int(size)))
f.close()
d = m.hexdigest()
result.write("%s %d %s\n" % (name, s, d))
@@ -26,9 +26,10 @@ if __name__ == '__main__':
result = open(sys.argv[3], 'w+')
fstdout = open(sys.argv[2], 'r')
for line in fstdout:
name, size, md5sum = string.split(string.strip(line), ' ')
name, size, md5sum = line.strip().split(' ')
if not check_file(name, size, md5sum):
print 'WA'; break
print('WA')
break
fstdout.close()
result.close()
print 'AC'
print('AC')

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
import os, sys, string, signal, resource, time, getopt, pickle
@@ -43,22 +43,23 @@ class RunGuard:
optlist, self.args = getopt.gnu_getopt(sys.argv, 'e:t:m:d:o:T:p')
for o, v in optlist:
if o == '-e':
self.nproc += string.atoi(v)
self.nproc += int(v)
if o == '-t':
self.timelimit = string.atoi(v)
self.timelimit = int(v)
if o == '-m':
self.memlimit = string.atoi(v) * 1024
self.memlimit = int(v) * 1024
if o == '-d':
self.rundir = v
if o == '-o':
self.writeto = v
if o == '-T':
self.timetime = string.atoi(v)
self.timetime = int(v)
if o == '-p':
self.usepickle = True
v = os.getenv('GUARD_RLIMIT_OFILE')
if v: self.ofile = string.atoi(v)
if v:
self.ofile = int(v)
self.ldpreload = os.getenv('GUARD_LD_PRELOAD')
def execute(self):
@@ -90,7 +91,7 @@ class RunGuard:
while pid == 0:
try:
os.kill(self.childpid, signal.SIGKILL)
except OSError, e:
except OSError:
pass
pid, status, ru = os.wait4(self.childpid, os.WNOHANG)
time.sleep(0.1)
@@ -106,24 +107,27 @@ class RunGuard:
def _get_memused(self):
procdir = '/proc/%d' % self.childpid
if os.path.isdir(procdir):
cmdline = file(procdir + '/cmdline', 'r').readlines()
with open(procdir + '/cmdline', 'r') as f:
cmdline = f.readlines()
# do not get memory usage of this script after just fork
if len(cmdline) > 0 and \
string.strip(cmdline[0], '\0') != \
string.join(self.args[1:], '\0'):
cmdline[0].strip('\0') != \
'\0'.join(self.args[1:]):
return
procstatus = file(procdir + '/status', 'r')
rss = 0; data = 0; stack = 0
for line in procstatus:
n = line[0:6]
if n == 'VmRSS:':
rss = string.atoi(line[7:-3])
if n == 'VmData':
data = string.atoi(line[8:-3])
if n == 'VmStk:':
stack = string.atoi(line[7:-3])
with open(procdir + '/status', 'r') as procstatus:
rss = 0
data = 0
stack = 0
for line in procstatus:
n = line[0:6]
if n == 'VmRSS:':
rss = int(line[7:-3])
if n == 'VmData':
data = int(line[8:-3])
if n == 'VmStk:':
stack = int(line[7:-3])
self.memrss = max(self.memrss, rss)
if self.memdata + self.memstack < data + stack:
self.memdata = data
@@ -134,26 +138,23 @@ class RunGuard:
if self.writeto == None:
f = sys.stdout
else:
f = file(self.writeto, 'w')
if self.usepickle:
obj = { 'exitcode' : self.exitcode,
'sig' : self.sig,
'timeused' : self.timeused,
'memrss' : self.memrss,
'memdata' : self.memdata,
'memstack' : self.memstack }
pickle.dump(obj, f)
else:
print >>f, "exitcode: %d" % self.exitcode
print >>f, "sig: %d" % self.sig
print >>f, "time: %.3f" % self.timeused
print >>f, "rss: %d" % self.memrss
print >>f, "data: %d" % self.memdata
print >>f, "stack: %d" % self.memstack
if self.writeto != None: f.close()
with open(self.writeto, 'w') as f:
if self.usepickle:
obj = { 'exitcode' : self.exitcode,
'sig' : self.sig,
'timeused' : self.timeused,
'memrss' : self.memrss,
'memdata' : self.memdata,
'memstack' : self.memstack }
pickle.dump(obj, f)
else:
print("exitcode: %d" % self.exitcode, file=f)
print("sig: %d" % self.sig, file=f)
print("time: %.3f" % self.timeused, file=f)
print("rss: %d" % self.memrss, file=f)
print("data: %d" % self.memdata, file=f)
print("stack: %d" % self.memstack, file=f)
if __name__ == '__main__':
os.umask(0002)
os.umask(0o002)
RunGuard().run()

View File

@@ -1,4 +1,4 @@
#!/usr/bin/python
#!/usr/bin/python3
"""HTTP debugging proxy
@@ -34,108 +34,107 @@ http://localhost:5335/RPC2)
import asynchat
import asyncore
import socket
import string
class proxy_server (asyncore.dispatcher):
class proxy_server(asyncore.dispatcher):
def __init__ (self, host, port):
asyncore.dispatcher.__init__ (self)
self.create_socket (socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.there = (host, port)
here = ('', port + 8000)
self.bind (here)
self.listen (5)
def __init__(self, host, port):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.there = (host, port)
here = ('', port + 8000)
self.bind(here)
self.listen(5)
def handle_accept (self):
print 'New connection'
proxy_receiver (self, self.accept())
def handle_accept(self):
print('New connection')
proxy_receiver(self, self.accept())
class proxy_sender (asynchat.async_chat):
class proxy_sender(asynchat.async_chat):
"Sends data to the server"
"Sends data to the server"
def __init__ (self, receiver, address):
asynchat.async_chat.__init__ (self)
self.receiver = receiver
self.set_terminator (None)
self.create_socket (socket.AF_INET, socket.SOCK_STREAM)
self.buffer = ''
self.set_terminator ('\n')
self.connect (address)
def __init__(self, receiver, address):
asynchat.async_chat.__init__(self)
self.receiver = receiver
self.set_terminator(None)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.buffer = ''
self.set_terminator('\n')
self.connect(address)
def handle_connect (self):
print 'Sender connected'
def handle_connect(self):
print('Sender connected')
def collect_incoming_data (self, data):
self.buffer = self.buffer + data
def collect_incoming_data(self, data):
self.buffer = self.buffer + data.decode('utf-8', errors='ignore')
def found_terminator (self):
data = self.buffer
self.buffer = ''
print (u'==> (%d) %s' % (self.id, unicode(repr(data), 'utf-8'))).encode('utf-8')
self.receiver.push (data + '\n')
def found_terminator(self):
data = self.buffer
self.buffer = ''
print('==> (%d) %s' % (self.id, repr(data)))
self.receiver.push(data + '\n')
def handle_close (self):
print 'Sender closing (inbuf len %d (%s), ac_in %d, ac_out %d )' % (
len( self.buffer ),
self.buffer,
len( self.ac_in_buffer ),
len( self.ac_out_buffer )
)
def handle_close(self):
print('Sender closing (inbuf len %d (%s), ac_in %d, ac_out %d )' % (
len(self.buffer),
self.buffer,
len(self.ac_in_buffer),
len(self.ac_out_buffer)
))
if len( self.buffer ):
self.found_terminator()
if len(self.buffer):
self.found_terminator()
self.receiver.close_when_done()
self.close()
self.receiver.close_when_done()
self.close()
class proxy_receiver (asynchat.async_chat):
class proxy_receiver(asynchat.async_chat):
"Receives data from the caller"
"Receives data from the caller"
channel_counter = 0
channel_counter = 0
def __init__ (self, server, (conn, addr)):
asynchat.async_chat.__init__ (self, conn)
self.set_terminator ('\n')
self.server = server
self.id = self.channel_counter
self.channel_counter = self.channel_counter + 1
self.sender = proxy_sender (self, server.there)
self.sender.id = self.id
self.buffer = ''
def __init__(self, server, conn_addr):
conn, addr = conn_addr
asynchat.async_chat.__init__(self, conn)
self.set_terminator('\n')
self.server = server
self.id = self.channel_counter
self.channel_counter = self.channel_counter + 1
self.sender = proxy_sender(self, server.there)
self.sender.id = self.id
self.buffer = ''
def collect_incoming_data (self, data):
self.buffer = self.buffer + data
def collect_incoming_data(self, data):
self.buffer = self.buffer + data.decode('utf-8', errors='ignore')
def found_terminator (self):
import re
data = re.sub( r'\:8080', '', self.buffer )
data = re.sub( r'localhost', self.server.there[0], data )
self.buffer = ''
print (u'<== (%d) %s' % (self.id, unicode(repr(data), 'utf-8'))).encode('utf-8')
self.sender.push (data + '\n')
def found_terminator(self):
import re
data = re.sub(r'\:8080', '', self.buffer)
data = re.sub(r'localhost', self.server.there[0], data)
self.buffer = ''
print('<== (%d) %s' % (self.id, repr(data)))
self.sender.push(data + '\n')
def handle_close (self):
print 'Receiver closing (inbuf len %d (%s), ac_in %d, ac_out %d )' % (
len( self.buffer ),
self.buffer,
len( self.ac_in_buffer ),
len( self.ac_out_buffer )
)
def handle_close(self):
print('Receiver closing (inbuf len %d (%s), ac_in %d, ac_out %d )' % (
len(self.buffer),
self.buffer,
len(self.ac_in_buffer),
len(self.ac_out_buffer)
))
if len( self.buffer ):
self.found_terminator()
if len(self.buffer):
self.found_terminator()
self.sender.close_when_done()
self.close()
self.sender.close_when_done()
self.close()
if __name__ == '__main__':
import sys
import string
if len(sys.argv) < 3:
print 'Usage: %s <server-host> <server-port>' % sys.argv[0]
else:
ps = proxy_server (sys.argv[1], string.atoi (sys.argv[2]))
asyncore.loop()
import sys
if len(sys.argv) < 3:
print('Usage: %s <server-host> <server-port>' % sys.argv[0])
else:
ps = proxy_server(sys.argv[1], int(sys.argv[2]))
asyncore.loop()