From ef6ddb0618c7bbf5e425b9c123922b92dc27f125 Mon Sep 17 00:00:00 2001 From: Pedro Brochado Date: Wed, 25 Sep 2024 17:58:29 -0300 Subject: [PATCH] Add Task.immediate field and prioritize them on workers * immediate prioritization This defines tasks of the type "immediate" by adding a Task.immediate(bool) field to the Task model. These type of task are ordered first in the worker's query to pick up a new available task. * misc This also adds a "deferred" field to the Task model, so we can later harden the constraint that 'deferred == False' and immediate == True' shouldn't get past the API level. This is possible today in some edge case failure scenarios on dispatch. Closes #5767 --- CHANGES/5767.feature | 3 ++ .../0124_task_deferred_task_immediate.py | 23 ++++++++++ pulpcore/app/models/task.py | 11 +++++ pulpcore/constants.py | 1 - pulpcore/tasking/tasks.py | 1 + pulpcore/tasking/worker.py | 44 ++++++++++++------- 6 files changed, 67 insertions(+), 16 deletions(-) create mode 100644 CHANGES/5767.feature create mode 100644 pulpcore/app/migrations/0124_task_deferred_task_immediate.py diff --git a/CHANGES/5767.feature b/CHANGES/5767.feature new file mode 100644 index 0000000000..52c14e0936 --- /dev/null +++ b/CHANGES/5767.feature @@ -0,0 +1,3 @@ +Added a formal "immediate" type of Task and changed workers behavior to prioritize those. +This labeling is exlusive to plugin code and should only be applied where it's known that +the task will finish shortly, like in updates of repositories, remotes, and distributions. diff --git a/pulpcore/app/migrations/0124_task_deferred_task_immediate.py b/pulpcore/app/migrations/0124_task_deferred_task_immediate.py new file mode 100644 index 0000000000..16cbaf20c7 --- /dev/null +++ b/pulpcore/app/migrations/0124_task_deferred_task_immediate.py @@ -0,0 +1,23 @@ +# Generated by Django 4.2.16 on 2024-10-02 17:42 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("core", "0123_upstreampulp_q_select"), + ] + + operations = [ + migrations.AddField( + model_name="task", + name="deferred", + field=models.BooleanField(default=True, null=True), + ), + migrations.AddField( + model_name="task", + name="immediate", + field=models.BooleanField(default=False, null=True), + ), + ] diff --git a/pulpcore/app/models/task.py b/pulpcore/app/models/task.py index 12fbf0f995..7a8cfea625 100644 --- a/pulpcore/app/models/task.py +++ b/pulpcore/app/models/task.py @@ -81,6 +81,9 @@ class Task(BaseModel, AutoAddObjPermsMixin): state (models.TextField): The state of the task name (models.TextField): The name of the task logging_cid (models.TextField): The logging CID associated with the task + unblocked_at (models.DateTimeField): The time the task was marked as unblocked. + This is supervised/updated by all awake workers and is part of the definition + of a ready-to-be-taken task. started_at (models.DateTimeField): The time the task started executing finished_at (models.DateTimeField): The time the task finished executing error (models.JSONField): Fatal errors generated by the task @@ -89,6 +92,11 @@ class Task(BaseModel, AutoAddObjPermsMixin): the task reserved_resources_record (django.contrib.postgres.fields.ArrayField): The reserved resources required for the task. + immediate (models.BooleanField): Whether this is guaranteed to execute fast + without blocking. Defaults to `False`. + deferred (models.BooleanField): Whether to allow defer running the task to a + pulpcore_worker. Both `immediate` and `deferred` cannot both be `False`. + Defaults to `True`. Relations: @@ -126,6 +134,9 @@ class Task(BaseModel, AutoAddObjPermsMixin): profile_artifacts = models.ManyToManyField("Artifact", through=ProfileArtifact) + immediate = models.BooleanField(default=False, null=True) + deferred = models.BooleanField(default=True, null=True) + def __str__(self): return "Task: {name} [{state}]".format(name=self.name, state=self.state) diff --git a/pulpcore/constants.py b/pulpcore/constants.py index 2c11bd3c1c..91b6b37d39 100644 --- a/pulpcore/constants.py +++ b/pulpcore/constants.py @@ -12,7 +12,6 @@ TASK_METRICS_HEARTBEAT_LOCK = 74 STORAGE_METRICS_LOCK = 72 - #: All valid task states. TASK_STATES = SimpleNamespace( WAITING="waiting", diff --git a/pulpcore/tasking/tasks.py b/pulpcore/tasking/tasks.py index cea508040f..aa2893ff1b 100644 --- a/pulpcore/tasking/tasks.py +++ b/pulpcore/tasking/tasks.py @@ -183,6 +183,7 @@ def dispatch( parent_task=Task.current(), reserved_resources_record=resources, versions=versions, + immediate=immediate, ) if newest_created and task.pulp_created <= newest_created: # Let this workaround not row forever into the future. diff --git a/pulpcore/tasking/worker.py b/pulpcore/tasking/worker.py index df7322f44d..d593f856fb 100644 --- a/pulpcore/tasking/worker.py +++ b/pulpcore/tasking/worker.py @@ -175,6 +175,8 @@ def beat(self): self.worker_cleanup() with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(TASK_SCHEDULING_LOCK): dispatch_scheduled_tasks() + # This "reporting code" must not me moved inside a task, because it is supposed + # to be able to report on a congested tasking system to produce reliable results. self.record_unblocked_waiting_tasks_metric() def notify_workers(self): @@ -234,8 +236,11 @@ def is_compatible(self, task): return False return True - def identify_unblocked_tasks(self): - """Iterate over waiting tasks and mark them unblocked accordingly.""" + def unblock_tasks(self): + """Iterate over waiting tasks and mark them unblocked accordingly. + + Returns `True` if at least one task was unblocked. `False` otherwise. + """ changed = False taken_exclusive_resources = set() @@ -296,33 +301,33 @@ def identify_unblocked_tasks(self): def iter_tasks(self): """Iterate over ready tasks and yield each task while holding the lock.""" - while not self.shutdown_requested: # When batching this query, be sure to use "pulp_created" as a cursor for task in Task.objects.filter( - state__in=TASK_INCOMPLETE_STATES, unblocked_at__isnull=False - ).order_by("pulp_created"): + state__in=TASK_INCOMPLETE_STATES, + unblocked_at__isnull=False, + ).order_by("-immediate", "pulp_created"): + # This code will only be called if we acquired the lock successfully + # The lock will be automatically be released at the end of the block with contextlib.suppress(AdvisoryLockError), task: - # This code will only be called if we acquired the lock successfully - # The lock will be automatically be released at the end of the block # Check if someone else changed the task before we got the lock task.refresh_from_db() + if task.state == TASK_STATES.CANCELING and task.worker is None: # No worker picked this task up before being canceled if self.cancel_abandoned_task(task, TASK_STATES.CANCELED): - # Continue looking for the next task - # without considering this tasks resources - # as we just released them + # Continue looking for the next task without considering this + # tasks resources, as we just released them continue if task.state in [TASK_STATES.RUNNING, TASK_STATES.CANCELING]: # A running task without a lock must be abandoned if self.cancel_abandoned_task( task, TASK_STATES.FAILED, "Worker has gone missing." ): - # Continue looking for the next task - # without considering this tasks resources - # as we just released them + # Continue looking for the next task without considering this + # tasks resources, as we just released them continue + # This statement is using lazy evaluation if ( task.state == TASK_STATES.WAITING @@ -333,6 +338,7 @@ def iter_tasks(self): # Start from the top of the Task list break else: + # No task found in the for-loop break def sleep(self): @@ -399,7 +405,7 @@ def supervise_task(self, task): with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock( TASK_UNBLOCKING_LOCK ): - self.identify_unblocked_tasks() + self.unblock_tasks() self.wakeup = False if task_process.sentinel in r: if not task_process.is_alive(): @@ -422,6 +428,7 @@ def supervise_task(self, task): ) cancel_state = TASK_STATES.FAILED cancel_reason = "Aborted during worker shutdown." + task_process.join() if not cancel_state and task_process.exitcode != 0: _logger.warning( @@ -445,11 +452,16 @@ def supervise_task(self, task): self.task = None def handle_available_tasks(self): + """Handle available tasks in a monitor/supervise cycle. + + The cycle must spin until there are no more available tasks. Any flaw in detecting this + can lead to stale tasks in the database. + """ keep_looping = True while keep_looping and not self.shutdown_requested: try: with PGAdvisoryLock(TASK_UNBLOCKING_LOCK): - keep_looping = self.identify_unblocked_tasks() + keep_looping = self.unblock_tasks() except AdvisoryLockError: keep_looping = True for task in self.iter_tasks(): @@ -509,11 +521,13 @@ def run(self, burst=False): else: self.cursor.execute("LISTEN pulp_worker_wakeup") while not self.shutdown_requested: + # do work if self.shutdown_requested: break self.handle_available_tasks() if self.shutdown_requested: break + # rest until notified to wakeup self.sleep() self.cursor.execute("UNLISTEN pulp_worker_wakeup") self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat")