Skip to content

Commit

Permalink
Re-introduce parallel mode
Browse files Browse the repository at this point in the history
  • Loading branch information
niknetniko committed Sep 19, 2023
1 parent e3792c8 commit a0618e2
Showing 1 changed file with 76 additions and 42 deletions.
118 changes: 76 additions & 42 deletions tested/judge/core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import shutil
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

from tested.configs import Bundle
Expand Down Expand Up @@ -150,57 +151,90 @@ def judge(bundle: Bundle):

_logger.info("Starting execution")

currently_open_tab = -1
# Create a list of runs we want to execute.
for i, planned_unit in enumerate(plan.units):
# Prepare the unit.
execution_dir, dependencies = set_up_unit(bundle, plan, i)

# If compilation is necessary, do it.
if compilation_results is None:
local_compilation_results, dependencies = compile_unit(
bundle, plan, i, execution_dir, dependencies
)
if _is_fatal_compilation_error(local_compilation_results):
_handle_time_or_memory_compilation(
bundle, collector, local_compilation_results
)
return
else:
local_compilation_results = compilation_results

# Execute the unit.
if local_compilation_results.status == Status.CORRECT:
remaining_time = plan.remaining_time()
execution_result, status = execute_unit(
bundle, planned_unit, execution_dir, dependencies, remaining_time
)
else:
execution_result = None

result_status, currently_open_tab = _process_results(
bundle=bundle,
unit=planned_unit,
execution_result=execution_result,
execution_dir=execution_dir,
compilation_results=local_compilation_results,
collector=collector,
currently_open_tab=currently_open_tab,
def _process_one_unit(
index: int,
) -> tuple[CompilationResult, ExecutionResult | None, Path]:
return _execute_one_unit(bundle, plan, compilation_results, index)

if bundle.config.options.parallel:
max_workers = None
else:
max_workers = 1

_logger.debug(f"Executing with {max_workers} workers")

with ThreadPoolExecutor(max_workers=max_workers) as executor:
remaining_time = plan.remaining_time()
results = executor.map(
_process_one_unit, range(len(plan.units)), timeout=remaining_time
)
try:
currently_open_tab = -1
for i, (
local_compilation_results,
execution_result,
execution_dir,
) in enumerate(results):
planned_unit = plan.units[i]
_logger.debug(f"Processing results for execution unit {i}")
result_status, currently_open_tab = _process_results(
bundle=bundle,
unit=planned_unit,
execution_result=execution_result,
execution_dir=execution_dir,
compilation_results=local_compilation_results,
collector=collector,
currently_open_tab=currently_open_tab,
)

if result_status in (
Status.TIME_LIMIT_EXCEEDED,
Status.MEMORY_LIMIT_EXCEEDED,
Status.OUTPUT_LIMIT_EXCEEDED,
):
terminate(bundle, collector, result_status)
if result_status in (
Status.TIME_LIMIT_EXCEEDED,
Status.MEMORY_LIMIT_EXCEEDED,
Status.OUTPUT_LIMIT_EXCEEDED,
):
del results
terminate(bundle, collector, result_status)
return
except TimeoutError:
terminate(bundle, collector, Status.TIME_LIMIT_EXCEEDED)
return

# Close the last tab.
collector.add(CloseTab(), currently_open_tab)
collector.add(CloseJudgement())


def _execute_one_unit(
bundle: Bundle,
plan: ExecutionPlan,
compilation_results: CompilationResult | None,
index: int,
) -> tuple[CompilationResult, ExecutionResult | None, Path]:
planned_unit = plan.units[index]
# Prepare the unit.
execution_dir, dependencies = set_up_unit(bundle, plan, index)

# If compilation is necessary, do it.
if compilation_results is None:
local_compilation_results, dependencies = compile_unit(
bundle, plan, index, execution_dir, dependencies
)
else:
local_compilation_results = compilation_results

# Execute the unit.
if local_compilation_results.status == Status.CORRECT:
remaining_time = plan.remaining_time()
execution_result, status = execute_unit(
bundle, planned_unit, execution_dir, dependencies, remaining_time
)
local_compilation_results.status = status
else:
execution_result = None

return local_compilation_results, execution_result, execution_dir


def _generate_files(
bundle: Bundle, execution_plan: list[PlannedExecutionUnit]
) -> tuple[Path, list[str], str | None]:
Expand Down

0 comments on commit a0618e2

Please sign in to comment.