diff --git a/horde_worker_regen/process_management/process_manager.py b/horde_worker_regen/process_management/process_manager.py index bfe34f75..d1adb28a 100644 --- a/horde_worker_regen/process_management/process_manager.py +++ b/horde_worker_regen/process_management/process_manager.py @@ -702,6 +702,10 @@ def get_process_info_strings(self) -> list[str]: return info_strings + def all_waiting_for_job(self) -> bool: + """Return true if all processes are waiting for a job.""" + return all(p.last_process_state == HordeProcessState.WAITING_FOR_JOB for p in self.values()) + class TorchDeviceInfo(BaseModel): """Contains information about a torch device.""" @@ -1577,6 +1581,7 @@ def receive_and_handle_process_messages(self) -> None: break self._in_deadlock = False + self._in_queue_deadlock = False if isinstance(message, HordeProcessHeartbeatMessage): self._process_map.on_heartbeat( @@ -1967,7 +1972,7 @@ def preload_models(self) -> bool: f"Already preloading {num_preloading_processes} models, waiting for one to finish before " f"preloading {job.model}", ) - self._preload_delay_notified = True + self._preload_delay_notified = True return False self._preload_delay_notified = False @@ -2061,6 +2066,7 @@ def handle_process_missing(job: ImageGenerateJobPopResponse) -> None: if job.model is not None: logger.debug(f"Expiring entry for model {job.model}") self._horde_model_map.expire_entry(job.model) + try: self.jobs_in_progress.remove(job) except ValueError: @@ -2112,6 +2118,8 @@ def handle_process_missing(job: ImageGenerateJobPopResponse) -> None: return None if process_with_model is None: + if self._preload_delay_notified: + return None handle_process_missing(next_job) return None @@ -3930,7 +3938,7 @@ def _print_deadlock_info() -> None: not self._in_queue_deadlock and (self._process_map.num_busy_processes() == 0 and len(self.job_deque) > 0) and len(self.jobs_in_progress) == 0 - ): + ) or (self._process_map.all_waiting_for_job() and len(self.job_deque) > 0): currently_loaded_models = set() model_process_map: dict[str, int] = {} @@ -3945,13 +3953,34 @@ def _print_deadlock_info() -> None: self._last_queue_deadlock_detected_time = time.time() self._queue_deadlock_model = job.model self._queue_deadlock_process_id = model_process_map[job.model] + break + else: + logger.debug("Queue deadlock detected without a model causing it.") + _print_deadlock_info() + self._in_queue_deadlock = True + self._last_queue_deadlock_detected_time = time.time() + # we're going to fall back to the next model in the deque + self._queue_deadlock_model = self.job_deque[0].model elif self._in_queue_deadlock and (self._last_queue_deadlock_detected_time + 10) < time.time(): logger.debug("Queue deadlock detected") _print_deadlock_info() - logger.debug(f"Model causing deadlock: {self._queue_deadlock_model}") - if self._queue_deadlock_process_id is not None: - self._replace_inference_process(self._process_map[self._queue_deadlock_process_id]) + if self._queue_deadlock_model is not None: + logger.debug(f"Model causing deadlock: {self._queue_deadlock_model}") + if self._queue_deadlock_process_id is not None: + self._horde_model_map.expire_entry(self._queue_deadlock_model) + self._replace_inference_process(self._process_map[self._queue_deadlock_process_id]) + else: + logger.warning("Queue deadlock detected but no model causing it.") + num_processes_replaced = 0 + # We're going to replace up to two process which aren't busy + for process in self._process_map.values(): + if not process.is_process_busy(): + self._replace_inference_process(process) + num_processes_replaced += 1 + if num_processes_replaced >= 2: + break + self._in_queue_deadlock = False self._queue_deadlock_model = None self._queue_deadlock_process_id = None