Files
bitoj_python/python/tester.py
2025-10-11 11:23:58 +08:00

830 lines
28 KiB
Python
Executable File

import math, os, resource, signal, string, sys, threading, logging, time, pickle
import shutil, Queue
import unittest
import pdb
from engineconfig import getConfig
from ojunit import *
class TesterBase:
def __init__(self):
self.logger = logging.getLogger('main')
def get_datadir(self, submit):
config = getConfig()
# get datadir
datadir = os.path.abspath(config.datadir)
if not os.path.exists(datadir): os.mkdir(datadir)
# get filenames
datadir = os.path.join(datadir, submit.id)
if not os.path.exists(datadir): os.mkdir(datadir)
return datadir
def get_rundir(self, submit):
config = getConfig()
return os.path.join(config.rundir_root, submit.user, 'run')
class SimpleTester(TesterBase):
has_timelimit = True
has_memlimit = True
check_result = True
is_lasttest = True
def __init__(self,
source, target,
compile = None, compileenv = {},
run = None, runenv = {}, basemem = 0,
baseproc = 0, extraproc = 0,
compileguard = None, runguard = None, comparecmd = None):
TesterBase.__init__(self)
self.source = source
self.target = target
self.compilecmd = compile
self.compileenv = compileenv
self.runcmd = run
self.runenv = runenv
self.basemem = basemem
self.baseproc = baseproc
self.extraproc = extraproc
self.compileguard = compileguard
self.runguard = runguard
self.comparecmd = comparecmd
def test(self, submit):
#pdb.set_trace()
if not self.prepare(submit):
self.logger.info('self.prepare')
submit.set_status('compile_failed')
submit.set_compilemessage('Failed to delete source/target file')
return False
submit.set_status('compiling')
if not self.compile(submit):
self.logger.info('self.compile')
submit.set_status('compile_failed')
self.cleanup(submit)
return False
submit.set_status('running')
testcases = submit.get_testcases()
ret = []
for testcase in testcases:
r = self.run(submit, testcase)
ret.append(r)
self.logger.debug('run result: ' + r.__str__())
passed = True
if self.check_result:
for r in ret:
if r[1] == 'AC' or not self.has_timelimit and r[1] == 'TLE':
pass
else:
passed = False
if not passed or self.is_lasttest:
submit.update_test_results(ret)
submit.set_status('finish')
self.cleanup(submit)
return passed
def prepare(self, submit):
datadir = self.get_datadir(submit)
# write presetcodes to dir
presetcodes = submit.get_presetcodes()
for presetcode in presetcodes:
pcname = os.path.join(datadir, presetcode.name)
if os.path.exists(pcname):
try:
os.unlink(pcname)
except OSError, e:
self.logger.exception(
"Failed to delete presetcode file %s" % pcname)
return False
f = open(pcname, 'w')
f.write(string.replace(presetcode.code, '\r\n', '\n'))
f.write('\n');
f.close()
# delete existing source and target file
datadirsource = os.path.join(datadir, self.source)
datadirtarget = os.path.join(datadir, self.target)
if os.path.exists(datadirsource):
try:
os.unlink(datadirsource)
except OSError, e:
self.logger.exception("Failed to delete source")
return False
if os.path.exists(datadirtarget):
try:
os.unlink(datadirtarget)
except OSError, e:
self.logger.exception("Failed to delete target")
return False
# preprocess source code
code = string.replace(submit.code, '\r\n', '\n')
code = string.replace(code, chr(0x1a), '') # char generated by tc
code = string.replace(code, 'getch()', '')
code = string.replace(code, 'getch ()', '')
code = string.replace(code, 'getch ( )', '')
code = string.replace(code, '\r\n', '\n')
# write source to disk
f = open(datadirsource, 'w')
f.write(code)
if len(submit.code) > 0 and submit.code[-1] != '\n': f.write('\n')
f.close()
# setup rundir
config = getConfig()
try:
submit.user = config.runas.get_nowait()
except Queue.Empty:
self.logger.exception("No runas user left, please create more!")
return False
rundir = self.get_rundir(submit)
if not os.path.exists(os.path.dirname(rundir)):
os.mkdir(os.path.dirname(rundir))
if os.path.exists(rundir):
try:
self._remove(rundir)
except OSError, e:
self.logger.exception("Failed to delete rundir")
config.runas.put(submit.user)
return False
os.mkdir(rundir)
os.chmod(rundir, 0775)
return True
def cleanup(self, submit, force = False):
datadir = self.get_datadir(submit)
rundir = self.get_rundir(submit)
config = getConfig()
if not config.no_cleanup or force:
if os.path.exists(datadir):
self._remove(datadir)
if os.path.exists(rundir):
try:
self._remove(rundir)
except OSError:
os.system('/usr/bin/sudo -u %s /bin/rm -rf %s' % (submit.user, rundir))
config.runas.put(submit.user)
def compile(self, submit):
config = getConfig()
datadir = self.get_datadir(submit)
cmd = []
if self.compileguard:
for s in self.compileguard:
s = string.replace(s, '<judgehome>', config.judgehome)
s = string.replace(s, '<datadir>', datadir)
cmd.append(s)
if self.compilecmd:
for s in self.compilecmd:
s = string.replace(s, '<judgehome>', config.judgehome)
s = string.replace(s, '<datadir>', datadir)
cmd.append(s)
cmd.append(self.source)
for code in submit.get_presetcodes():
if not code.isheader:
cmd.append(code.name)
self.logger.debug(string.join(cmd, '_'))
errfile = os.path.join(datadir, 'compile.err')
(exitcode, sig, timeused, memused) = \
self._execute(submit, cmd, timelimit = config.compile_timelimit,
infile = None, outfile = None, errfile = errfile,
env = self.compileenv)
compilemsg = None
if os.path.exists(errfile):
f = file(errfile, 'r')
compilemsg = string.join(f.readlines(), '')
f.close()
if compilemsg:
submit.set_compilemessage(compilemsg)
if exitcode == 0:
self.logger.info('submit %s compile success' % submit.id)
else:
self.logger.info('submit %s compile failed' % submit.id)
self.logger.debug(compilemsg)
return exitcode == 0
def run(self, submit, testcase):
config = getConfig()
datadir = self.get_datadir(submit) #dir save input and result
rundir = self.get_rundir(submit) #dir program runs in
infile = testcase.get_input_file()
outfile = os.path.join(datadir, testcase.id + '.out')
errfile = os.path.join(datadir, testcase.id + '.err')
statfile = os.path.join(rundir, testcase.id + '.stat')
# If submit input filename or output filename is provided,
# no input or output redirect will happen.
submit_input_filename = submit.get_input_filename()
submit_output_filename = submit.get_output_filename()
if submit_input_filename:
shutil.copyfile(infile, os.path.join(rundir, submit_input_filename))
_infile = infile
infile = None
if submit_output_filename:
_outfile = outfile
outfile = None
# Create data file symolic links
datafiles = submit.get_problem().get_datafiles()
for datafile in datafiles:
targetname = os.path.join(rundir, datafile.filename)
try:
shutil.copyfile(datafile.absolute_path, targetname)
except:
pass
# Use extra process setting on testcase if it exist
extraproc = self.baseproc + (testcase.nproc if testcase.nproc else self.extraproc)
cmd = []
if self.runguard:
for s in self.runguard:
s = s.replace('<judgehome>', config.judgehome)
s = s.replace('<extraproc>', '%d' % extraproc)
s = s.replace('<timelimit>', '%d' % (testcase.timelimit))
s = s.replace('<maxmem>', '%d' % config.maxmem)
s = s.replace('<rundir>', rundir)
s = s.replace('<user>', submit.user)
s = s.replace('<statfile>', statfile)
cmd.append(s)
if self.runcmd:
for s in self.runcmd:
s = s.replace('<judgehome>', config.judgehome)
s = s.replace('<datadir>', datadir)
s = s.replace('<user>', submit.user)
cmd.append(s)
if self.runenv:
for k in self.runenv.keys():
s = self.runenv[k]
s = s.replace('<judgehome>', config.judgehome)
s = s.replace('<datadir>', datadir)
s = s.replace('<user>', submit.user)
self.runenv[k] = s
self.logger.debug(string.join(cmd, ' ') + ' ' + str(self.runenv))
(exitcode, sig, timeused, memused) = \
self._execute(submit, cmd, timelimit = testcase.timelimit * 10,
infile = infile, outfile = outfile,
errfile = errfile, env = self.runenv,
rlimit_fsize = config.output_size_limit + 2,
statfile = statfile)
if submit_input_filename:
infile = _infile
if submit_output_filename:
outfile = _outfile
try:
shutil.copyfile(os.path.join(rundir, submit_output_filename),
outfile)
except IOError:
f = file(outfile, 'w')
f.close()
ret = [testcase.id, exitcode, sig, outfile, errfile, timeused, memused]
if timeused > testcase.timelimit:
ret.insert(1, 'TLE')
return ret
if memused > testcase.memlimit:
ret.insert(1, 'MLE')
return ret
if exitcode == 125:
ret.insert(1, 'JSE')
return ret
elif exitcode == 126:
ret.insert(1, 'RFC')
return ret
elif exitcode == 127:
ret.insert(1, 'JGE')
return ret
if sig == signal.SIGABRT or sig == signal.SIGSEGV:
ret.insert(1, 'RE')
return ret
elif sig == signal.SIGKILL or sig == signal.SIGXCPU:
ret.insert(1, 'TLE')
return ret
elif sig == signal.SIGXFSZ:
ret.insert(1, 'OLE')
return ret
elif sig == signal.SIGFPE:
ret.insert(1, 'FPE')
return ret
elif sig > 0:
ret.insert(1, 'KS')
return ret
s = os.stat(outfile)
if s.st_size > config.output_size_limit:
ret.insert(1, 'OLE')
return ret
js = submit.get_judge_script()
ret.insert(1, js.judge(submit.id, testcase.id,
os.path.abspath(testcase.get_input_file()),
os.path.abspath(testcase.get_output_file()),
os.path.abspath(outfile),
os.path.abspath(errfile), rundir))
return ret
def _execute(self, submit, command, timelimit = 1,
infile = None, outfile = None, errfile = None, env = {},
rlimit_fsize = -1, statfile = None):
pid = os.fork()
if pid == 0:
if rlimit_fsize > 0:
resource.setrlimit(resource.RLIMIT_FSIZE,
(rlimit_fsize, rlimit_fsize))
# child process
if infile:
try:
os.close(0)
os.open(infile, os.O_RDONLY)
except Exception, e:
print e
sys.exit(125)
if outfile:
try:
os.close(1)
os.open(outfile, os.O_WRONLY|os.O_CREAT|os.O_TRUNC, 0666)
except Exception, e:
print e
sys.exit(125)
if errfile:
try:
os.close(2)
os.open(errfile, os.O_WRONLY|os.O_CREAT|os.O_TRUNC, 0666)
except Exception, e:
print e
sys.exit(125)
#os.chdir(self.get_datadir(submit))
os.execve(command[0], command, env)
sys.exit(125)
# parent process
pid, status = os.waitpid(pid, 0)
timeused = 0; memused = 0; sig = 0; exitcode = 0
# get this child sig and exitcode
if os.WIFEXITED(status): exitcode = os.WEXITSTATUS(status)
if os.WIFSIGNALED(status): sig = os.WTERMSIG(status)
# read information form statfile
if statfile:
try:
stat = pickle.load(file(statfile, 'r'))
exitcode = stat['exitcode']
sig = stat['sig']
timeused = stat['timeused']
memused = 0
if self.basemem.has_key('RSS'):
memused += stat['memrss'] - self.basemem['RSS']
if self.basemem.has_key('Data'):
memused += stat['memdata'] - self.basemem['Data']
if self.basemem.has_key('Stack'):
memused += stat['memstack'] - self.basemem['Stack']
memused = max(0, memused)
except Exception, e:
self.logger.exception(e)
self.logger.error("Failed to read statfile: %s" % statfile)
exitcode = 127 # judge script error
return (exitcode, sig, timeused, memused)
def _remove(self, top):
for root, dirs, files in os.walk(top, topdown=False):
for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
os.rmdir(top)
class ComboTester:
testers = []
def add_tester(self, tester,
has_timelimit = True, has_memlimit = True,
check_result = True, is_lasttest = False):
tester.has_timelimit = has_timelimit
tester.has_memlimit = has_memlimit
tester.check_result = check_result
tester.is_lasttest = is_lasttest
self.testers.append(tester)
def test(self, submit):
r = True
for t in self.testers:
r = t.test(submit)
if not r:
break
return r
class SimpleTesterTestCase(OJTestCase):
@classmethod
def suite(clazz):
tests = ( 'testSteps', 'testNoSource',
'testAC', 'testWA', 'testZero',
'testBinary', 'testCE', 'testOLE', 'testTLE',
'testTargetDeleted',
)
return unittest.TestSuite(map(SimpleTesterTestCase, tests))
def setUp(self):
OJTestCase.setUp(self)
self.st = self.config.languages['gcc-3.3-nobc']
def testSteps(self):
self.logger.info("TESTING: Steps")
submit = self.ds.get_submit(3)
self.st.prepare(submit)
datadir = os.path.join(self.config.datadir, submit.id)
self.assertTrue(os.path.isdir(datadir))
self.assertTrue(os.path.isfile(os.path.join(datadir, self.st.source)))
submit.set_status('compiling')
r = self.st.compile(submit)
self.assertTrue(r)
self.assertTrue(os.path.isfile(os.path.join(datadir, self.st.target)))
self.assertTrue(os.path.isfile(os.path.join(datadir, 'compile.err')))
submit.set_status('running')
testcases = submit.get_testcases()
r = self.st.run(submit, testcases[0])
self.assertEqual(r[0], testcases[0].id)
self.assertEqual(r[1], 'AC')
self.assertEqual(r[2], 0)
self.assertEqual(r[3], 0)
f = file(r[4], 'r')
o = string.join(f.readlines(), '\n')
f.close()
self.assertEqual(o, '3\n')
f = file(r[5], 'r')
o = string.join(f.readlines(), '\n')
f.close()
self.assertEqual(o, '')
r = self.st.run(submit, testcases[1])
f = file(r[4], 'r')
o = string.join(f.readlines(), '\n')
f.close()
self.assertEqual(o, '4\n')
self.st.cleanup(submit)
self.assertFalse(not self.config.no_cleanup and os.path.exists(datadir))
def testAC(self):
self.logger.info("TESTING: AC")
submit = self.ds.get_submit(3)
datadir = os.path.join(self.config.datadir, submit.id)
r = self.st.test(submit)
self.assertTrue(r) # passed
self.assertFalse(not self.config.no_cleanup and os.path.exists(datadir)) # cleanup
from pysqlite2 import dbapi2 as sqlite
conn = sqlite.connect(self.dbname)
cur = conn.cursor()
cur.execute('SELECT * FROM submit_test_result WHERE sid=?', (submit.id, ))
rows = cur.fetchall()
cur.execute('DELETE FROM submit_test_result')
conn.commit()
self.assertEqual(len(rows), 2)
# test if the submit status change
cur.execute('SELECT status FROM submit WHERE id=?', (submit.id, ))
rows = cur.fetchall()
self.assertEqual(rows[0][0], 10)
cur.close()
conn.close()
def testNoSource(self):
self.logger.info("TESTING: No Source")
submit = self.ds.get_submit(1)
# the code of this submit is empty
datadir = os.path.join(self.config.datadir, submit.id)
r = self.st.test(submit)
self.assertFalse(r)
self.assertFalse(not self.config.no_cleanup and os.path.exists(datadir)) # cleanup
self.assertTrue(submit.get_status(), 'compile_failed')
def testNoTarget(self):
self.logger.info("TESTING: No Target")
submit = self.ds.get_submit(1)
# the code of this submit generate nothing
datadir = os.path.join(self.config.datadir, submit.id)
r = self.st.test(submit)
self.assertFalse(r)
self.assertFalse(not self.config.no_cleanup and os.path.exists(datadir)) # cleanup
self.assertTrue(submit.get_status(), 'compile_failed')
def testCE(self):
self.logger.info("TESTING: CE")
submit = self.ds.get_submit(7)
datadir = os.path.join(self.config.datadir, submit.id)
r = self.st.test(submit)
self.assertFalse(r) # not passed
self.assertFalse(not self.config.no_cleanup and os.path.exists(datadir)) # cleanup
self.assertTrue(len(submit.get_compilemessage()) > 0)
self.assertTrue(submit.get_status(), 'compile_failed')
def testWA(self):
self.logger.info("TESTING: WA")
submit = self.ds.get_submit(4)
datadir = os.path.join(self.config.datadir, submit.id)
r = self.st.test(submit)
self.assertFalse(r) # not passed
self.assertFalse(not self.config.no_cleanup and os.path.exists(datadir)) # cleanup
from pysqlite2 import dbapi2 as sqlite
conn = sqlite.connect(self.dbname)
cur = conn.cursor()
cur.execute('SELECT * FROM submit_test_result')
rows = cur.fetchall()
cur.execute('DELETE FROM submit_test_result')
conn.commit()
self.assertEqual(len(rows), 2)
cur.close()
conn.close()
def testTargetDeleted(self):
self.logger.info("TESTING: Target Deleted")
submit = self.ds.get_submit(3)
self.st.prepare(submit)
datadir = os.path.join(self.config.datadir, submit.id)
self.assertTrue(os.path.isdir(datadir))
self.assertTrue(os.path.isfile(os.path.join(datadir, self.st.source)))
submit.set_status('compiling')
r = self.st.compile(submit)
self.assertTrue(r)
self.assertTrue(os.path.isfile(os.path.join(datadir, self.st.target)))
self.assertTrue(os.path.isfile(os.path.join(datadir, 'compile.err')))
self.st.cleanup(submit, True)
submit.set_status('running')
testcases = submit.get_testcases()
r = self.st.run(submit, testcases[0])
self.assertNotEqual(r[1], 'AC')
def testZero(self):
self.logger.info("TESTING: ZERO")
submit = self.ds.get_submit(5)
# this program output '\0'
datadir = os.path.join(self.config.datadir, submit.id)
self.st.prepare(submit)
r = self.st.compile(submit)
self.assertTrue(r)
testcases = submit.get_testcases()
r = self.st.run(submit, testcases[0])
f = file(os.path.join(datadir, '0000000001.out'))
o = string.join(f.readlines(), '\n')
f.close()
self.assertEqual(o, '3\0\n')
self.st.cleanup(submit)
self.assertFalse(not self.config.no_cleanup and os.path.exists(datadir))
def testBinary(self):
self.logger.info("TESTING: BINARY")
submit = self.ds.get_submit(6)
# this program output '0xbb'
datadir = os.path.join(self.config.datadir, submit.id)
self.st.prepare(submit)
r = self.st.compile(submit)
self.assertTrue(r)
testcases = submit.get_testcases()
r = self.st.run(submit, testcases[0])
f = file(os.path.join(datadir, '0000000001.out'))
o = string.join(f.readlines(), '\n')
f.close()
self.assertEqual(o, '\xbb')
self.st.cleanup(submit)
self.assertFalse(not self.config.no_cleanup and os.path.exists(datadir))
def testOLE(self):
self.logger.info("TESTING: OLE")
submit = self.ds.get_submit(10)
self.st.prepare(submit)
r = self.st.compile(submit)
self.assertTrue(r, "Submit compile failed")
ts = submit.get_testcases()
r = self.st.run(submit, ts[0])
self.assertEqual(r[1], 'OLE')
def testTLE(self):
self.logger.info("TESTING: TLE")
submit = self.ds.get_submit(11)
self.st.prepare(submit)
self.st.compile(submit)
ts = submit.get_testcases()
r = self.st.run(submit, ts[0])
self.assertEqual(r[1], 'TLE')
class ComboTesterTestCase(OJTestCase):
@classmethod
def suite(clazz):
tests = ( 'testGCC33', )
return unittest.TestSuite(map(ComboTesterTestCase, tests))
def testGCC33(self):
self.ct = self.config.languages['gcc-3.3']
submits = self.ds.get_submits(8)
r = self.ct.test(submits[2])
self.assertTrue(r)
from pysqlite2 import dbapi2 as sqlite
conn = sqlite.connect(self.dbname)
cur = conn.cursor()
cur.execute('DELETE FROM submit_test_result')
conn.commit()
cur.close()
conn.close()
class LanguageTestCase(OJTestCase):
@classmethod
def suite(clazz):
tests = ( 'testGCC33NOBC', 'testGCC33BC', 'testGXX33',
'testJava5', 'testJava6',
'testFreePascal20',
'testPython25',
)
return unittest.TestSuite(map(LanguageTestCase, tests))
def setUp(self):
OJTestCase.setUp(self)
def testGCC33NOBC(self):
st = self.config.languages['gcc-3.3-nobc']
submit = self.ds.get_submit(1001)
r = st.test(submit)
self.assertTrue(r, 'GCC NOBC test failed')
def testGCC33BC(self):
st = self.config.languages['gcc-3.3-bc']
submit = self.ds.get_submit(1001)
r = st.test(submit)
self.assertTrue(r, 'GCC WITH BC test failed')
def testGXX33(self):
st = self.config.languages['g++-3.3']
submit = self.ds.get_submit(1002)
r = st.test(submit)
self.assertTrue(r, 'GCC WITH BC test failed')
def testJava5(self):
st = self.config.languages['java-1.6']
submit = self.ds.get_submit(1004)
r = st.test(submit)
self.assertTrue(r, 'Java 5 test failed')
def testJava6(self):
st = self.config.languages['java-1.6']
submit = self.ds.get_submit(1004)
r = st.test(submit)
self.assertTrue(r, 'Java 6 test failed')
def testFreePascal20(self):
st = self.config.languages['fpc-2.0']
submit = self.ds.get_submit(1005)
r = st.test(submit)
self.assertTrue(r, 'FreePascal 2.0 test failed')
def testPython25(self):
st = self.config.languages['python-2.5']
submit = self.ds.get_submit(1006)
r = st.test(submit)
self.assertTrue(r, 'Python 2.5 test failed')
class SecurityTestCase(OJTestCase):
"""
Security test including:
* File Permission: only rundir are allowed to visit
* Forked Process: can oj kill all these process
* Daemon Process: can oj kill the daemon process
* Sleep: can oj kill sleep process
* Network: network usage should be forbidden
"""
@classmethod
def suite(clazz):
tests = (#'testGCC33FilePermission', 'testJavaFilePermission',
#'testPython25FilePermission',
# 'testSleep',
'testFork', 'testDaemon',
)
return unittest.TestSuite(map(SecurityTestCase, tests))
def testGCC33FilePermission(self):
st = self.config.languages['gcc-3.3-nobc']
submit = self.ds.get_submit(2311)
r = st.test(submit)
self.assertTrue(r, 'gcc 3.3 no bc security test failed')
def testJavaFilePermission(self):
st = self.config.languages['java-1.6']
submit = self.ds.get_submit(2331)
r = st.test(submit)
self.assertTrue(r, 'Java 1.6 security test failed')
def testPython25FilePermission(self):
st = self.config.languages['python-2.5']
submit = self.ds.get_submit(2361)
r = st.test(submit)
self.assertTrue(r, 'Python 2.5 security test failed')
def testFork(self):
st = self.config.languages['gcc-3.3-nobc']
submit = self.ds.get_submit(2412)
st.prepare(submit)
r = st.compile(submit)
self.assertTrue(r, "Failed")
ts = submit.get_testcases()
r = st.run(submit, ts[0])
self.assertEqual(r[1], 'TLE', "Sleep test failed")
def testDaemon(self):
st = self.config.languages['gcc-3.3-nobc']
submit = self.ds.get_submit(2413)
st.prepare(submit)
r = st.compile(submit)
self.assertTrue(r, "Failed")
ts = submit.get_testcases()
r = st.run(submit, ts[0])
self.assertEqual(r[1], 'TLE', "Sleep test failed")
def testSleep(self):
st = self.config.languages['gcc-3.3-nobc']
submit = self.ds.get_submit(2411)
st.prepare(submit)
r = st.compile(submit)
self.assertTrue(r, "Failed")
ts = submit.get_testcases()
r = st.run(submit, ts[0])
self.assertEqual(r[1], 'TLE', "Sleep test failed")
def testNetwork(self):
pass
if __name__ == '__main__':
#unittest.main()
suite = unittest.TestSuite()
#suite.addTests(LanguageTestCase.suite())
#suite.addTests(SimpleTesterTestCase.suite())
#suite.addTest(ComboTesterTestCase.suite())
suite.addTest(SecurityTestCase.suite())
unittest.TextTestRunner().run(suite)
# vim: set expandtab tabstop=4 shiftwidth=4: