Source code for grader.program_container
import sys
from time import sleep
from threading import Thread
from .utils import read_code
from .stdio import SyncCondition, SpoofedStdin, SpoofedStdout
[docs]class ProgramContainer(Thread):
""" The thread in which the users program runs """
def __init__(self, module_path, results):
Thread.__init__(self)
self.module_path = module_path
self._results = results
self.condition = SyncCondition()
self.caughtException = None
self.__startProgram()
def __startProgram(self):
self.setDaemon(True)
self.start()
self._started = False
#while not self._started:
# sleep(0.001)
[docs] def run(self):
self.stdin = sys.stdin = SpoofedStdin(self.condition)
self.stdout = sys.stdout = SpoofedStdout()
self.condition.acquire()
self._started = True
self.finished = False
try:
self._exec_code()
except Exception as error:
self.caughtException = error
# notify tester of the end of program
self.condition.notify_release()
self.condition.finish()
self.finished = True
def _exec_code(self):
from types import ModuleType
mod = ModuleType("__main__")
self.module = mod
source = read_code(self.module_path)
code = compile(source, "<tested-program>", "exec", dont_inherit=True)
exec(code, mod.__dict__)
return mod
[docs] def log(self, what):
self._results["log"].append(what)
@classmethod
[docs] def restore_io(cls):
sys.stdin = sys.__stdin__
sys.stdout = sys.__stdout__