first commit

This commit is contained in:
2025-10-11 11:23:58 +08:00
commit e8774085f8
48 changed files with 3087 additions and 0 deletions

829
python/tester.py Executable file
View File

@@ -0,0 +1,829 @@
import math, os, resource, signal, string, sys, threading, logging, time, pickle
import shutil, Queue
import unittest
import pdb
from engineconfig import getConfig
from ojunit import *
class TesterBase:
def __init__(self):
self.logger = logging.getLogger('main')
def get_datadir(self, submit):
config = getConfig()
# get datadir
datadir = os.path.abspath(config.datadir)
if not os.path.exists(datadir): os.mkdir(datadir)
# get filenames
datadir = os.path.join(datadir, submit.id)
if not os.path.exists(datadir): os.mkdir(datadir)
return datadir
def get_rundir(self, submit):
config = getConfig()
return os.path.join(config.rundir_root, submit.user, 'run')
class SimpleTester(TesterBase):
has_timelimit = True
has_memlimit = True
check_result = True
is_lasttest = True
def __init__(self,
source, target,
compile = None, compileenv = {},
run = None, runenv = {}, basemem = 0,
baseproc = 0, extraproc = 0,
compileguard = None, runguard = None, comparecmd = None):
TesterBase.__init__(self)
self.source = source
self.target = target
self.compilecmd = compile
self.compileenv = compileenv
self.runcmd = run
self.runenv = runenv
self.basemem = basemem
self.baseproc = baseproc
self.extraproc = extraproc
self.compileguard = compileguard
self.runguard = runguard
self.comparecmd = comparecmd
def test(self, submit):
#pdb.set_trace()
if not self.prepare(submit):
self.logger.info('self.prepare')
submit.set_status('compile_failed')
submit.set_compilemessage('Failed to delete source/target file')
return False
submit.set_status('compiling')
if not self.compile(submit):
self.logger.info('self.compile')
submit.set_status('compile_failed')
self.cleanup(submit)
return False
submit.set_status('running')
testcases = submit.get_testcases()
ret = []
for testcase in testcases:
r = self.run(submit, testcase)
ret.append(r)
self.logger.debug('run result: ' + r.__str__())
passed = True
if self.check_result:
for r in ret:
if r[1] == 'AC' or not self.has_timelimit and r[1] == 'TLE':
pass
else:
passed = False
if not passed or self.is_lasttest:
submit.update_test_results(ret)
submit.set_status('finish')
self.cleanup(submit)
return passed
def prepare(self, submit):
datadir = self.get_datadir(submit)
# write presetcodes to dir
presetcodes = submit.get_presetcodes()
for presetcode in presetcodes:
pcname = os.path.join(datadir, presetcode.name)
if os.path.exists(pcname):
try:
os.unlink(pcname)
except OSError, e:
self.logger.exception(
"Failed to delete presetcode file %s" % pcname)
return False
f = open(pcname, 'w')
f.write(string.replace(presetcode.code, '\r\n', '\n'))
f.write('\n');
f.close()
# delete existing source and target file
datadirsource = os.path.join(datadir, self.source)
datadirtarget = os.path.join(datadir, self.target)
if os.path.exists(datadirsource):
try:
os.unlink(datadirsource)
except OSError, e:
self.logger.exception("Failed to delete source")
return False
if os.path.exists(datadirtarget):
try:
os.unlink(datadirtarget)
except OSError, e:
self.logger.exception("Failed to delete target")
return False
# preprocess source code
code = string.replace(submit.code, '\r\n', '\n')
code = string.replace(code, chr(0x1a), '') # char generated by tc
code = string.replace(code, 'getch()', '')
code = string.replace(code, 'getch ()', '')
code = string.replace(code, 'getch ( )', '')
code = string.replace(code, '\r\n', '\n')
# write source to disk
f = open(datadirsource, 'w')
f.write(code)
if len(submit.code) > 0 and submit.code[-1] != '\n': f.write('\n')
f.close()
# setup rundir
config = getConfig()
try:
submit.user = config.runas.get_nowait()
except Queue.Empty:
self.logger.exception("No runas user left, please create more!")
return False
rundir = self.get_rundir(submit)
if not os.path.exists(os.path.dirname(rundir)):
os.mkdir(os.path.dirname(rundir))
if os.path.exists(rundir):
try:
self._remove(rundir)
except OSError, e:
self.logger.exception("Failed to delete rundir")
config.runas.put(submit.user)
return False
os.mkdir(rundir)
os.chmod(rundir, 0775)
return True
def cleanup(self, submit, force = False):
datadir = self.get_datadir(submit)
rundir = self.get_rundir(submit)
config = getConfig()
if not config.no_cleanup or force:
if os.path.exists(datadir):
self._remove(datadir)
if os.path.exists(rundir):
try:
self._remove(rundir)
except OSError:
os.system('/usr/bin/sudo -u %s /bin/rm -rf %s' % (submit.user, rundir))
config.runas.put(submit.user)
def compile(self, submit):
config = getConfig()
datadir = self.get_datadir(submit)
cmd = []
if self.compileguard:
for s in self.compileguard:
s = string.replace(s, '<judgehome>', config.judgehome)
s = string.replace(s, '<datadir>', datadir)
cmd.append(s)
if self.compilecmd:
for s in self.compilecmd:
s = string.replace(s, '<judgehome>', config.judgehome)
s = string.replace(s, '<datadir>', datadir)
cmd.append(s)
cmd.append(self.source)
for code in submit.get_presetcodes():
if not code.isheader:
cmd.append(code.name)
self.logger.debug(string.join(cmd, '_'))
errfile = os.path.join(datadir, 'compile.err')
(exitcode, sig, timeused, memused) = \
self._execute(submit, cmd, timelimit = config.compile_timelimit,
infile = None, outfile = None, errfile = errfile,
env = self.compileenv)
compilemsg = None
if os.path.exists(errfile):
f = file(errfile, 'r')
compilemsg = string.join(f.readlines(), '')
f.close()
if compilemsg:
submit.set_compilemessage(compilemsg)
if exitcode == 0:
self.logger.info('submit %s compile success' % submit.id)
else:
self.logger.info('submit %s compile failed' % submit.id)
self.logger.debug(compilemsg)
return exitcode == 0
def run(self, submit, testcase):
config = getConfig()
datadir = self.get_datadir(submit) #dir save input and result
rundir = self.get_rundir(submit) #dir program runs in
infile = testcase.get_input_file()
outfile = os.path.join(datadir, testcase.id + '.out')
errfile = os.path.join(datadir, testcase.id + '.err')
statfile = os.path.join(rundir, testcase.id + '.stat')
# If submit input filename or output filename is provided,
# no input or output redirect will happen.
submit_input_filename = submit.get_input_filename()
submit_output_filename = submit.get_output_filename()
if submit_input_filename:
shutil.copyfile(infile, os.path.join(rundir, submit_input_filename))
_infile = infile
infile = None
if submit_output_filename:
_outfile = outfile
outfile = None
# Create data file symolic links
datafiles = submit.get_problem().get_datafiles()
for datafile in datafiles:
targetname = os.path.join(rundir, datafile.filename)
try:
shutil.copyfile(datafile.absolute_path, targetname)
except:
pass
# Use extra process setting on testcase if it exist
extraproc = self.baseproc + (testcase.nproc if testcase.nproc else self.extraproc)
cmd = []
if self.runguard:
for s in self.runguard:
s = s.replace('<judgehome>', config.judgehome)
s = s.replace('<extraproc>', '%d' % extraproc)
s = s.replace('<timelimit>', '%d' % (testcase.timelimit))
s = s.replace('<maxmem>', '%d' % config.maxmem)
s = s.replace('<rundir>', rundir)
s = s.replace('<user>', submit.user)
s = s.replace('<statfile>', statfile)
cmd.append(s)
if self.runcmd:
for s in self.runcmd:
s = s.replace('<judgehome>', config.judgehome)
s = s.replace('<datadir>', datadir)
s = s.replace('<user>', submit.user)
cmd.append(s)
if self.runenv:
for k in self.runenv.keys():
s = self.runenv[k]
s = s.replace('<judgehome>', config.judgehome)
s = s.replace('<datadir>', datadir)
s = s.replace('<user>', submit.user)
self.runenv[k] = s
self.logger.debug(string.join(cmd, ' ') + ' ' + str(self.runenv))
(exitcode, sig, timeused, memused) = \
self._execute(submit, cmd, timelimit = testcase.timelimit * 10,
infile = infile, outfile = outfile,
errfile = errfile, env = self.runenv,
rlimit_fsize = config.output_size_limit + 2,
statfile = statfile)
if submit_input_filename:
infile = _infile
if submit_output_filename:
outfile = _outfile
try:
shutil.copyfile(os.path.join(rundir, submit_output_filename),
outfile)
except IOError:
f = file(outfile, 'w')
f.close()
ret = [testcase.id, exitcode, sig, outfile, errfile, timeused, memused]
if timeused > testcase.timelimit:
ret.insert(1, 'TLE')
return ret
if memused > testcase.memlimit:
ret.insert(1, 'MLE')
return ret
if exitcode == 125:
ret.insert(1, 'JSE')
return ret
elif exitcode == 126:
ret.insert(1, 'RFC')
return ret
elif exitcode == 127:
ret.insert(1, 'JGE')
return ret
if sig == signal.SIGABRT or sig == signal.SIGSEGV:
ret.insert(1, 'RE')
return ret
elif sig == signal.SIGKILL or sig == signal.SIGXCPU:
ret.insert(1, 'TLE')
return ret
elif sig == signal.SIGXFSZ:
ret.insert(1, 'OLE')
return ret
elif sig == signal.SIGFPE:
ret.insert(1, 'FPE')
return ret
elif sig > 0:
ret.insert(1, 'KS')
return ret
s = os.stat(outfile)
if s.st_size > config.output_size_limit:
ret.insert(1, 'OLE')
return ret
js = submit.get_judge_script()
ret.insert(1, js.judge(submit.id, testcase.id,
os.path.abspath(testcase.get_input_file()),
os.path.abspath(testcase.get_output_file()),
os.path.abspath(outfile),
os.path.abspath(errfile), rundir))
return ret
def _execute(self, submit, command, timelimit = 1,
infile = None, outfile = None, errfile = None, env = {},
rlimit_fsize = -1, statfile = None):
pid = os.fork()
if pid == 0:
if rlimit_fsize > 0:
resource.setrlimit(resource.RLIMIT_FSIZE,
(rlimit_fsize, rlimit_fsize))
# child process
if infile:
try:
os.close(0)
os.open(infile, os.O_RDONLY)
except Exception, e:
print e
sys.exit(125)
if outfile:
try:
os.close(1)
os.open(outfile, os.O_WRONLY|os.O_CREAT|os.O_TRUNC, 0666)
except Exception, e:
print e
sys.exit(125)
if errfile:
try:
os.close(2)
os.open(errfile, os.O_WRONLY|os.O_CREAT|os.O_TRUNC, 0666)
except Exception, e:
print e
sys.exit(125)
#os.chdir(self.get_datadir(submit))
os.execve(command[0], command, env)
sys.exit(125)
# parent process
pid, status = os.waitpid(pid, 0)
timeused = 0; memused = 0; sig = 0; exitcode = 0
# get this child sig and exitcode
if os.WIFEXITED(status): exitcode = os.WEXITSTATUS(status)
if os.WIFSIGNALED(status): sig = os.WTERMSIG(status)
# read information form statfile
if statfile:
try:
stat = pickle.load(file(statfile, 'r'))
exitcode = stat['exitcode']
sig = stat['sig']
timeused = stat['timeused']
memused = 0
if self.basemem.has_key('RSS'):
memused += stat['memrss'] - self.basemem['RSS']
if self.basemem.has_key('Data'):
memused += stat['memdata'] - self.basemem['Data']
if self.basemem.has_key('Stack'):
memused += stat['memstack'] - self.basemem['Stack']
memused = max(0, memused)
except Exception, e:
self.logger.exception(e)
self.logger.error("Failed to read statfile: %s" % statfile)
exitcode = 127 # judge script error
return (exitcode, sig, timeused, memused)
def _remove(self, top):
for root, dirs, files in os.walk(top, topdown=False):
for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
os.rmdir(top)
class ComboTester:
testers = []
def add_tester(self, tester,
has_timelimit = True, has_memlimit = True,
check_result = True, is_lasttest = False):
tester.has_timelimit = has_timelimit
tester.has_memlimit = has_memlimit
tester.check_result = check_result
tester.is_lasttest = is_lasttest
self.testers.append(tester)
def test(self, submit):
r = True
for t in self.testers:
r = t.test(submit)
if not r:
break
return r
class SimpleTesterTestCase(OJTestCase):
@classmethod
def suite(clazz):
tests = ( 'testSteps', 'testNoSource',
'testAC', 'testWA', 'testZero',
'testBinary', 'testCE', 'testOLE', 'testTLE',
'testTargetDeleted',
)
return unittest.TestSuite(map(SimpleTesterTestCase, tests))
def setUp(self):
OJTestCase.setUp(self)
self.st = self.config.languages['gcc-3.3-nobc']
def testSteps(self):
self.logger.info("TESTING: Steps")
submit = self.ds.get_submit(3)
self.st.prepare(submit)
datadir = os.path.join(self.config.datadir, submit.id)
self.assertTrue(os.path.isdir(datadir))
self.assertTrue(os.path.isfile(os.path.join(datadir, self.st.source)))
submit.set_status('compiling')
r = self.st.compile(submit)
self.assertTrue(r)
self.assertTrue(os.path.isfile(os.path.join(datadir, self.st.target)))
self.assertTrue(os.path.isfile(os.path.join(datadir, 'compile.err')))
submit.set_status('running')
testcases = submit.get_testcases()
r = self.st.run(submit, testcases[0])
self.assertEqual(r[0], testcases[0].id)
self.assertEqual(r[1], 'AC')
self.assertEqual(r[2], 0)
self.assertEqual(r[3], 0)
f = file(r[4], 'r')
o = string.join(f.readlines(), '\n')
f.close()
self.assertEqual(o, '3\n')
f = file(r[5], 'r')
o = string.join(f.readlines(), '\n')
f.close()
self.assertEqual(o, '')
r = self.st.run(submit, testcases[1])
f = file(r[4], 'r')
o = string.join(f.readlines(), '\n')
f.close()
self.assertEqual(o, '4\n')
self.st.cleanup(submit)
self.assertFalse(not self.config.no_cleanup and os.path.exists(datadir))
def testAC(self):
self.logger.info("TESTING: AC")
submit = self.ds.get_submit(3)
datadir = os.path.join(self.config.datadir, submit.id)
r = self.st.test(submit)
self.assertTrue(r) # passed
self.assertFalse(not self.config.no_cleanup and os.path.exists(datadir)) # cleanup
from pysqlite2 import dbapi2 as sqlite
conn = sqlite.connect(self.dbname)
cur = conn.cursor()
cur.execute('SELECT * FROM submit_test_result WHERE sid=?', (submit.id, ))
rows = cur.fetchall()
cur.execute('DELETE FROM submit_test_result')
conn.commit()
self.assertEqual(len(rows), 2)
# test if the submit status change
cur.execute('SELECT status FROM submit WHERE id=?', (submit.id, ))
rows = cur.fetchall()
self.assertEqual(rows[0][0], 10)
cur.close()
conn.close()
def testNoSource(self):
self.logger.info("TESTING: No Source")
submit = self.ds.get_submit(1)
# the code of this submit is empty
datadir = os.path.join(self.config.datadir, submit.id)
r = self.st.test(submit)
self.assertFalse(r)
self.assertFalse(not self.config.no_cleanup and os.path.exists(datadir)) # cleanup
self.assertTrue(submit.get_status(), 'compile_failed')
def testNoTarget(self):
self.logger.info("TESTING: No Target")
submit = self.ds.get_submit(1)
# the code of this submit generate nothing
datadir = os.path.join(self.config.datadir, submit.id)
r = self.st.test(submit)
self.assertFalse(r)
self.assertFalse(not self.config.no_cleanup and os.path.exists(datadir)) # cleanup
self.assertTrue(submit.get_status(), 'compile_failed')
def testCE(self):
self.logger.info("TESTING: CE")
submit = self.ds.get_submit(7)
datadir = os.path.join(self.config.datadir, submit.id)
r = self.st.test(submit)
self.assertFalse(r) # not passed
self.assertFalse(not self.config.no_cleanup and os.path.exists(datadir)) # cleanup
self.assertTrue(len(submit.get_compilemessage()) > 0)
self.assertTrue(submit.get_status(), 'compile_failed')
def testWA(self):
self.logger.info("TESTING: WA")
submit = self.ds.get_submit(4)
datadir = os.path.join(self.config.datadir, submit.id)
r = self.st.test(submit)
self.assertFalse(r) # not passed
self.assertFalse(not self.config.no_cleanup and os.path.exists(datadir)) # cleanup
from pysqlite2 import dbapi2 as sqlite
conn = sqlite.connect(self.dbname)
cur = conn.cursor()
cur.execute('SELECT * FROM submit_test_result')
rows = cur.fetchall()
cur.execute('DELETE FROM submit_test_result')
conn.commit()
self.assertEqual(len(rows), 2)
cur.close()
conn.close()
def testTargetDeleted(self):
self.logger.info("TESTING: Target Deleted")
submit = self.ds.get_submit(3)
self.st.prepare(submit)
datadir = os.path.join(self.config.datadir, submit.id)
self.assertTrue(os.path.isdir(datadir))
self.assertTrue(os.path.isfile(os.path.join(datadir, self.st.source)))
submit.set_status('compiling')
r = self.st.compile(submit)
self.assertTrue(r)
self.assertTrue(os.path.isfile(os.path.join(datadir, self.st.target)))
self.assertTrue(os.path.isfile(os.path.join(datadir, 'compile.err')))
self.st.cleanup(submit, True)
submit.set_status('running')
testcases = submit.get_testcases()
r = self.st.run(submit, testcases[0])
self.assertNotEqual(r[1], 'AC')
def testZero(self):
self.logger.info("TESTING: ZERO")
submit = self.ds.get_submit(5)
# this program output '\0'
datadir = os.path.join(self.config.datadir, submit.id)
self.st.prepare(submit)
r = self.st.compile(submit)
self.assertTrue(r)
testcases = submit.get_testcases()
r = self.st.run(submit, testcases[0])
f = file(os.path.join(datadir, '0000000001.out'))
o = string.join(f.readlines(), '\n')
f.close()
self.assertEqual(o, '3\0\n')
self.st.cleanup(submit)
self.assertFalse(not self.config.no_cleanup and os.path.exists(datadir))
def testBinary(self):
self.logger.info("TESTING: BINARY")
submit = self.ds.get_submit(6)
# this program output '0xbb'
datadir = os.path.join(self.config.datadir, submit.id)
self.st.prepare(submit)
r = self.st.compile(submit)
self.assertTrue(r)
testcases = submit.get_testcases()
r = self.st.run(submit, testcases[0])
f = file(os.path.join(datadir, '0000000001.out'))
o = string.join(f.readlines(), '\n')
f.close()
self.assertEqual(o, '\xbb')
self.st.cleanup(submit)
self.assertFalse(not self.config.no_cleanup and os.path.exists(datadir))
def testOLE(self):
self.logger.info("TESTING: OLE")
submit = self.ds.get_submit(10)
self.st.prepare(submit)
r = self.st.compile(submit)
self.assertTrue(r, "Submit compile failed")
ts = submit.get_testcases()
r = self.st.run(submit, ts[0])
self.assertEqual(r[1], 'OLE')
def testTLE(self):
self.logger.info("TESTING: TLE")
submit = self.ds.get_submit(11)
self.st.prepare(submit)
self.st.compile(submit)
ts = submit.get_testcases()
r = self.st.run(submit, ts[0])
self.assertEqual(r[1], 'TLE')
class ComboTesterTestCase(OJTestCase):
@classmethod
def suite(clazz):
tests = ( 'testGCC33', )
return unittest.TestSuite(map(ComboTesterTestCase, tests))
def testGCC33(self):
self.ct = self.config.languages['gcc-3.3']
submits = self.ds.get_submits(8)
r = self.ct.test(submits[2])
self.assertTrue(r)
from pysqlite2 import dbapi2 as sqlite
conn = sqlite.connect(self.dbname)
cur = conn.cursor()
cur.execute('DELETE FROM submit_test_result')
conn.commit()
cur.close()
conn.close()
class LanguageTestCase(OJTestCase):
@classmethod
def suite(clazz):
tests = ( 'testGCC33NOBC', 'testGCC33BC', 'testGXX33',
'testJava5', 'testJava6',
'testFreePascal20',
'testPython25',
)
return unittest.TestSuite(map(LanguageTestCase, tests))
def setUp(self):
OJTestCase.setUp(self)
def testGCC33NOBC(self):
st = self.config.languages['gcc-3.3-nobc']
submit = self.ds.get_submit(1001)
r = st.test(submit)
self.assertTrue(r, 'GCC NOBC test failed')
def testGCC33BC(self):
st = self.config.languages['gcc-3.3-bc']
submit = self.ds.get_submit(1001)
r = st.test(submit)
self.assertTrue(r, 'GCC WITH BC test failed')
def testGXX33(self):
st = self.config.languages['g++-3.3']
submit = self.ds.get_submit(1002)
r = st.test(submit)
self.assertTrue(r, 'GCC WITH BC test failed')
def testJava5(self):
st = self.config.languages['java-1.6']
submit = self.ds.get_submit(1004)
r = st.test(submit)
self.assertTrue(r, 'Java 5 test failed')
def testJava6(self):
st = self.config.languages['java-1.6']
submit = self.ds.get_submit(1004)
r = st.test(submit)
self.assertTrue(r, 'Java 6 test failed')
def testFreePascal20(self):
st = self.config.languages['fpc-2.0']
submit = self.ds.get_submit(1005)
r = st.test(submit)
self.assertTrue(r, 'FreePascal 2.0 test failed')
def testPython25(self):
st = self.config.languages['python-2.5']
submit = self.ds.get_submit(1006)
r = st.test(submit)
self.assertTrue(r, 'Python 2.5 test failed')
class SecurityTestCase(OJTestCase):
"""
Security test including:
* File Permission: only rundir are allowed to visit
* Forked Process: can oj kill all these process
* Daemon Process: can oj kill the daemon process
* Sleep: can oj kill sleep process
* Network: network usage should be forbidden
"""
@classmethod
def suite(clazz):
tests = (#'testGCC33FilePermission', 'testJavaFilePermission',
#'testPython25FilePermission',
# 'testSleep',
'testFork', 'testDaemon',
)
return unittest.TestSuite(map(SecurityTestCase, tests))
def testGCC33FilePermission(self):
st = self.config.languages['gcc-3.3-nobc']
submit = self.ds.get_submit(2311)
r = st.test(submit)
self.assertTrue(r, 'gcc 3.3 no bc security test failed')
def testJavaFilePermission(self):
st = self.config.languages['java-1.6']
submit = self.ds.get_submit(2331)
r = st.test(submit)
self.assertTrue(r, 'Java 1.6 security test failed')
def testPython25FilePermission(self):
st = self.config.languages['python-2.5']
submit = self.ds.get_submit(2361)
r = st.test(submit)
self.assertTrue(r, 'Python 2.5 security test failed')
def testFork(self):
st = self.config.languages['gcc-3.3-nobc']
submit = self.ds.get_submit(2412)
st.prepare(submit)
r = st.compile(submit)
self.assertTrue(r, "Failed")
ts = submit.get_testcases()
r = st.run(submit, ts[0])
self.assertEqual(r[1], 'TLE', "Sleep test failed")
def testDaemon(self):
st = self.config.languages['gcc-3.3-nobc']
submit = self.ds.get_submit(2413)
st.prepare(submit)
r = st.compile(submit)
self.assertTrue(r, "Failed")
ts = submit.get_testcases()
r = st.run(submit, ts[0])
self.assertEqual(r[1], 'TLE', "Sleep test failed")
def testSleep(self):
st = self.config.languages['gcc-3.3-nobc']
submit = self.ds.get_submit(2411)
st.prepare(submit)
r = st.compile(submit)
self.assertTrue(r, "Failed")
ts = submit.get_testcases()
r = st.run(submit, ts[0])
self.assertEqual(r[1], 'TLE', "Sleep test failed")
def testNetwork(self):
pass
if __name__ == '__main__':
#unittest.main()
suite = unittest.TestSuite()
#suite.addTests(LanguageTestCase.suite())
#suite.addTests(SimpleTesterTestCase.suite())
#suite.addTest(ComboTesterTestCase.suite())
suite.addTest(SecurityTestCase.suite())
unittest.TextTestRunner().run(suite)
# vim: set expandtab tabstop=4 shiftwidth=4: