Skip to content

Commit

Permalink
fix: better deadlock detection when all procs. aren't busy
Browse files Browse the repository at this point in the history
  • Loading branch information
tazlin committed Oct 4, 2024
1 parent 17f5099 commit cfc62eb
Showing 1 changed file with 34 additions and 5 deletions.
39 changes: 34 additions & 5 deletions horde_worker_regen/process_management/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,10 @@ def get_process_info_strings(self) -> list[str]:

return info_strings

def all_waiting_for_job(self) -> bool:
"""Return true if all processes are waiting for a job."""
return all(p.last_process_state == HordeProcessState.WAITING_FOR_JOB for p in self.values())


class TorchDeviceInfo(BaseModel):
"""Contains information about a torch device."""
Expand Down Expand Up @@ -1577,6 +1581,7 @@ def receive_and_handle_process_messages(self) -> None:
break

self._in_deadlock = False
self._in_queue_deadlock = False

if isinstance(message, HordeProcessHeartbeatMessage):
self._process_map.on_heartbeat(
Expand Down Expand Up @@ -1967,7 +1972,7 @@ def preload_models(self) -> bool:
f"Already preloading {num_preloading_processes} models, waiting for one to finish before "
f"preloading {job.model}",
)
self._preload_delay_notified = True
self._preload_delay_notified = True
return False

self._preload_delay_notified = False
Expand Down Expand Up @@ -2061,6 +2066,7 @@ def handle_process_missing(job: ImageGenerateJobPopResponse) -> None:
if job.model is not None:
logger.debug(f"Expiring entry for model {job.model}")
self._horde_model_map.expire_entry(job.model)

try:
self.jobs_in_progress.remove(job)
except ValueError:
Expand Down Expand Up @@ -2112,6 +2118,8 @@ def handle_process_missing(job: ImageGenerateJobPopResponse) -> None:
return None

if process_with_model is None:
if self._preload_delay_notified:
return None
handle_process_missing(next_job)
return None

Expand Down Expand Up @@ -3930,7 +3938,7 @@ def _print_deadlock_info() -> None:
not self._in_queue_deadlock
and (self._process_map.num_busy_processes() == 0 and len(self.job_deque) > 0)
and len(self.jobs_in_progress) == 0
):
) or (self._process_map.all_waiting_for_job() and len(self.job_deque) > 0):

currently_loaded_models = set()
model_process_map: dict[str, int] = {}
Expand All @@ -3945,13 +3953,34 @@ def _print_deadlock_info() -> None:
self._last_queue_deadlock_detected_time = time.time()
self._queue_deadlock_model = job.model
self._queue_deadlock_process_id = model_process_map[job.model]
break
else:
logger.debug("Queue deadlock detected without a model causing it.")
_print_deadlock_info()
self._in_queue_deadlock = True
self._last_queue_deadlock_detected_time = time.time()
# we're going to fall back to the next model in the deque
self._queue_deadlock_model = self.job_deque[0].model

elif self._in_queue_deadlock and (self._last_queue_deadlock_detected_time + 10) < time.time():
logger.debug("Queue deadlock detected")
_print_deadlock_info()
logger.debug(f"Model causing deadlock: {self._queue_deadlock_model}")
if self._queue_deadlock_process_id is not None:
self._replace_inference_process(self._process_map[self._queue_deadlock_process_id])
if self._queue_deadlock_model is not None:
logger.debug(f"Model causing deadlock: {self._queue_deadlock_model}")
if self._queue_deadlock_process_id is not None:
self._horde_model_map.expire_entry(self._queue_deadlock_model)
self._replace_inference_process(self._process_map[self._queue_deadlock_process_id])
else:
logger.warning("Queue deadlock detected but no model causing it.")
num_processes_replaced = 0
# We're going to replace up to two process which aren't busy
for process in self._process_map.values():
if not process.is_process_busy():
self._replace_inference_process(process)
num_processes_replaced += 1
if num_processes_replaced >= 2:
break

self._in_queue_deadlock = False
self._queue_deadlock_model = None
self._queue_deadlock_process_id = None
Expand Down

0 comments on commit cfc62eb

Please sign in to comment.