import string, os, time, tempfile, logging import unittest from engineconfig import getConfig class InternalJudge: def __init__(self, allowpe = False): self.logger = logging.getLogger('main') self.allowpe = allowpe def judge(self, sid, tid, tin, tout, result, errfile, rundir = None): """Judge if the result correct. @param sid String submit id @param tid String testcase id @param tin String filename of the standard input @param tout String filename of the standard output @param result String filename of the result """ return self.compare_file(tin, tout, result, self.allowpe) def compare_file(self, input, output, result, allowpe): with open(output, 'rb') as fo, open(result, 'rb') as fr: if not allowpe: r = 'AC' while r == 'AC': so = fo.read(8192) sr = fr.read(8192) if so == b'' and sr == b'': break if so != sr: r = 'WA' else: so = fo.read() sr = fr.read() r = self.compare_string(so.decode('utf-8', errors='ignore'), sr.decode('utf-8', errors='ignore')) return r def compare_string(self, output, result): if output == result: return 'AC' outnum = '' retnum = '' for c in output: if c in string.digits: outnum += c for c in result: if c in string.digits: retnum += c self.logger.debug('numbers in output: %s' % outnum) self.logger.debug('numbers in result: %s' % retnum) if len(outnum) > 0 and len(retnum) > 0 and outnum == retnum: return 'PE' return 'WA' class ExternalJudge: def __init__(self, problemid, vlang, vcode): """Constructor. @param pid String problem id @param vlang String validator language @param vcode String validator code """ self.config = getConfig() self.logger = logging.getLogger('main') self.problemid = problemid self.lang = vlang self.code = vcode # save validator code tester = self.config.get_tester(self.lang) datadir = os.path.join(self.config.datadir, 'validator', problemid) if not os.path.exists(datadir): os.makedirs(datadir) self.comparecmd = tester.comparecmd self.codefile = os.path.abspath(os.path.join(datadir, tester.source)) with open(self.codefile, 'w') as f: f.write(vcode.replace('\r\n', '\n')) if len(vcode) > 0 and vcode[-1] != '\n': f.write('\n') self.logger.debug("Save validator code as %s" % self.codefile) def judge(self, sid, tid, tin, tout, result, errfile, rundir = None): """Judge if the result correct. @param sid String submit id @param tid String testcase id @param tin String filename of the standard input @param tout String filename of the standard output @param result String filename of the result @param rundir String in which dir the judge script should be started """ rfiledir = os.path.join(self.config.datadir, sid) rfile = os.path.join(rfiledir, tid + '.rst') if not os.path.exists(rfiledir): os.mkdir(rfiledir) cmd = [] for s in self.comparecmd: s = s.replace('', self.config.judgehome) s = s.replace('', self.lang) s = s.replace('', self.codefile) s = s.replace('', tin) s = s.replace('', tout) s = s.replace('', result) cmd.append(s) self.logger.debug("Run validator as %s" % cmd.__str__()) pid = os.fork() if pid == 0: os.close(1) os.open(rfile, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o666) os.close(2) os.open(errfile, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o666) if rundir: os.chdir(rundir) os.execv(cmd[0], cmd) print('WA') remaintime = self.config.judgescript_wait pid1 = 0 while remaintime > 0: pid1, status = os.waitpid(pid, os.WNOHANG) if pid1 > 0: break time.sleep(0.1) remaintime -= 0.1 while pid1 == 0: try: os.kill(pid, 9) except OSError: pass pid1, status = os.waitpid(pid, os.WNOHANG) with open(rfile, 'r') as f: ret = f.readline().strip() if ret != 'AC' and ret != 'PE': ret = 'WA' return ret class InternalJudgeTest(unittest.TestCase): """Internal Judge Test Case.""" def testWithoutPE(self): ij = InternalJudge() output = 'hello world\n' result = 'hello world\n' self.assertEqual(ij.compare_string(output, result), 'AC') output = 'hello world' self.assertEqual(ij.compare_string(output, result), 'WA') result = 'hello world' self.assertEqual(ij.compare_string(output, result), 'AC') result = 'hello world ' self.assertEqual(ij.compare_string(output, result), 'WA') def testWithPE(self): ij = InternalJudge(True) input = '' output = 'hello world\n' result = 'hello world\n' self.assertEqual(ij.compare_string(output, result), 'AC') result = 'hello worl\n' self.assertEqual(ij.compare_string(output, result), 'WA') output = '1 2 3 4 5\n' result = '1\n2\n3\n4\n5\n' self.assertEqual(ij.compare_string(output, result), 'PE') result = '12345\n' self.assertEqual(ij.compare_string(output, result), 'PE') result = '1 2 3 4 5' self.assertEqual(ij.compare_string(output, result), 'PE') class ExternalJudgeTest(unittest.TestCase): """External Judge Test Case.""" def setUp(self): config = getConfig() config.judgehome = '../' config.datadir = os.path.join('..', 'testdata') self.tin = tempfile.NamedTemporaryFile('w') self.tin.write('3') self.tin.flush() self.tout = tempfile.NamedTemporaryFile('w') self.tout.write('3') self.tout.flush() self.rst = tempfile.NamedTemporaryFile('w') self.rst.write('6') self.rst.flush() self.err = tempfile.NamedTemporaryFile('w') self.err.close() self.tin.close() self.tout.close() self.rst.close() def testGCC33(self): code = """ #include int main() { printf("AC\\n"); return 0; } """ self.j = ExternalJudge('p1', 'gcc-3.3', code) x = self.j.judge('s1', 't1', self.tin.name, self.tout.name, self.rst.name, self.err.name) self.assertEquals('AC', x) def testGPP33(self): code = """ #include using namespace std; int main() { cout << "AC" << endl; return 0; } """ self.j = ExternalJudge('p1', 'g++-3.3', code) x = self.j.judge('s1', 't1', self.tin.name, self.tout.name, self.rst.name, self.err.name) self.assertEquals('AC', x) def testFPC(self): code = """ program main; begin write('AC'); end. """ self.j = ExternalJudge('p1', 'fpc-2.2', code) x = self.j.judge('s1', 't1', self.tin.name, self.tout.name, self.rst.name, self.err.name) self.assertEquals('AC', x) def testJava(self): code = """ public class Main { public static void main(String[] args) { System.out.println("AC"); } } """ self.j = ExternalJudge('p1', 'java-1.5', code) x = self.j.judge('s1', 't1', self.tin.name, self.tout.name, self.rst.name, self.err.name) self.assertEquals('AC', x) self.j = ExternalJudge('p1', 'java-1.6', code) x = self.j.judge('s1', 't1', self.tin.name, self.tout.name, self.rst.name, self.err.name) self.assertEquals('AC', x) def testPython(self): code = """public class Main { public static void Main() { System.Console.WriteLine("AC"); } } """ self.j = ExternalJudge('p1', 'gmcs-2.0', code) x = self.j.judge('s1', 't1', self.tin.name, self.tout.name, self.rst.name, self.err.name) self.assertEquals('AC', x) def testPython(self): code = """#!/usr/bin/env python print('AC') """ self.j = ExternalJudge('p1', 'python-2.5', code) x = self.j.judge('s1', 't1', self.tin.name, self.tout.name, self.rst.name, self.err.name) self.assertEquals('AC', x) def testBash(self): code = """echo AC""" self.j = ExternalJudge('p1', 'bash-3', code) x = self.j.judge('s1', 't1', self.tin.name, self.tout.name, self.rst.name, self.err.name) self.assertEquals('AC', x) if __name__ == '__main__': unittest.main() # vim: set expandtab tabstop=4 shiftwidth=4: