Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve throughput for immediate tasks (1/3) #5841

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES/5767.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added a formal "immediate" type of Task and changed workers behavior to prioritize those.
This labeling is exlusive to plugin code and should only be applied where it's known that
the task will finish shortly, like in updates of repositories, remotes, and distributions.
23 changes: 23 additions & 0 deletions pulpcore/app/migrations/0124_task_deferred_task_immediate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 4.2.16 on 2024-10-02 17:42

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("core", "0123_upstreampulp_q_select"),
]

operations = [
migrations.AddField(
model_name="task",
name="deferred",
field=models.BooleanField(default=True, null=True),
),
migrations.AddField(
model_name="task",
name="immediate",
field=models.BooleanField(default=False, null=True),
),
]
11 changes: 11 additions & 0 deletions pulpcore/app/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ class Task(BaseModel, AutoAddObjPermsMixin):
state (models.TextField): The state of the task
name (models.TextField): The name of the task
logging_cid (models.TextField): The logging CID associated with the task
unblocked_at (models.DateTimeField): The time the task was marked as unblocked.
This is supervised/updated by all awake workers and is part of the definition
of a ready-to-be-taken task.
started_at (models.DateTimeField): The time the task started executing
finished_at (models.DateTimeField): The time the task finished executing
error (models.JSONField): Fatal errors generated by the task
Expand All @@ -89,6 +92,11 @@ class Task(BaseModel, AutoAddObjPermsMixin):
the task
reserved_resources_record (django.contrib.postgres.fields.ArrayField): The reserved
resources required for the task.
immediate (models.BooleanField): Whether this is guaranteed to execute fast
without blocking. Defaults to `False`.
deferred (models.BooleanField): Whether to allow defer running the task to a
pulpcore_worker. Both `immediate` and `deferred` cannot both be `False`.
Defaults to `True`.

Relations:

Expand Down Expand Up @@ -126,6 +134,9 @@ class Task(BaseModel, AutoAddObjPermsMixin):

profile_artifacts = models.ManyToManyField("Artifact", through=ProfileArtifact)

immediate = models.BooleanField(default=False, null=True)
deferred = models.BooleanField(default=True, null=True)

def __str__(self):
return "Task: {name} [{state}]".format(name=self.name, state=self.state)

Expand Down
1 change: 0 additions & 1 deletion pulpcore/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
TASK_METRICS_HEARTBEAT_LOCK = 74
STORAGE_METRICS_LOCK = 72


#: All valid task states.
TASK_STATES = SimpleNamespace(
WAITING="waiting",
Expand Down
1 change: 1 addition & 0 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def dispatch(
parent_task=Task.current(),
reserved_resources_record=resources,
versions=versions,
immediate=immediate,
)
if newest_created and task.pulp_created <= newest_created:
# Let this workaround not row forever into the future.
Expand Down
44 changes: 29 additions & 15 deletions pulpcore/tasking/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ def beat(self):
self.worker_cleanup()
with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(TASK_SCHEDULING_LOCK):
dispatch_scheduled_tasks()
# This "reporting code" must not me moved inside a task, because it is supposed
# to be able to report on a congested tasking system to produce reliable results.
self.record_unblocked_waiting_tasks_metric()

def notify_workers(self):
Expand Down Expand Up @@ -234,8 +236,11 @@ def is_compatible(self, task):
return False
return True

def identify_unblocked_tasks(self):
"""Iterate over waiting tasks and mark them unblocked accordingly."""
def unblock_tasks(self):
"""Iterate over waiting tasks and mark them unblocked accordingly.

Returns `True` if at least one task was unblocked. `False` otherwise.
"""

changed = False
taken_exclusive_resources = set()
Expand Down Expand Up @@ -296,33 +301,33 @@ def identify_unblocked_tasks(self):

def iter_tasks(self):
"""Iterate over ready tasks and yield each task while holding the lock."""

while not self.shutdown_requested:
# When batching this query, be sure to use "pulp_created" as a cursor
for task in Task.objects.filter(
state__in=TASK_INCOMPLETE_STATES, unblocked_at__isnull=False
).order_by("pulp_created"):
state__in=TASK_INCOMPLETE_STATES,
unblocked_at__isnull=False,
).order_by("-immediate", "pulp_created"):
# This code will only be called if we acquired the lock successfully
# The lock will be automatically be released at the end of the block
with contextlib.suppress(AdvisoryLockError), task:
# This code will only be called if we acquired the lock successfully
# The lock will be automatically be released at the end of the block
# Check if someone else changed the task before we got the lock
task.refresh_from_db()

if task.state == TASK_STATES.CANCELING and task.worker is None:
# No worker picked this task up before being canceled
if self.cancel_abandoned_task(task, TASK_STATES.CANCELED):
# Continue looking for the next task
# without considering this tasks resources
# as we just released them
# Continue looking for the next task without considering this
# tasks resources, as we just released them
continue
if task.state in [TASK_STATES.RUNNING, TASK_STATES.CANCELING]:
# A running task without a lock must be abandoned
if self.cancel_abandoned_task(
task, TASK_STATES.FAILED, "Worker has gone missing."
):
# Continue looking for the next task
# without considering this tasks resources
# as we just released them
# Continue looking for the next task without considering this
# tasks resources, as we just released them
continue

# This statement is using lazy evaluation
if (
task.state == TASK_STATES.WAITING
Expand All @@ -333,6 +338,7 @@ def iter_tasks(self):
# Start from the top of the Task list
break
else:
# No task found in the for-loop
break

def sleep(self):
Expand Down Expand Up @@ -399,7 +405,7 @@ def supervise_task(self, task):
with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(
TASK_UNBLOCKING_LOCK
):
self.identify_unblocked_tasks()
self.unblock_tasks()
self.wakeup = False
if task_process.sentinel in r:
if not task_process.is_alive():
Expand All @@ -422,6 +428,7 @@ def supervise_task(self, task):
)
cancel_state = TASK_STATES.FAILED
cancel_reason = "Aborted during worker shutdown."

task_process.join()
if not cancel_state and task_process.exitcode != 0:
_logger.warning(
Expand All @@ -445,11 +452,16 @@ def supervise_task(self, task):
self.task = None

def handle_available_tasks(self):
"""Handle available tasks in a monitor/supervise cycle.

The cycle must spin until there are no more available tasks. Any flaw in detecting this
can lead to stale tasks in the database.
"""
keep_looping = True
while keep_looping and not self.shutdown_requested:
try:
with PGAdvisoryLock(TASK_UNBLOCKING_LOCK):
keep_looping = self.identify_unblocked_tasks()
keep_looping = self.unblock_tasks()
except AdvisoryLockError:
keep_looping = True
for task in self.iter_tasks():
Expand Down Expand Up @@ -509,11 +521,13 @@ def run(self, burst=False):
else:
self.cursor.execute("LISTEN pulp_worker_wakeup")
while not self.shutdown_requested:
# do work
if self.shutdown_requested:
break
self.handle_available_tasks()
if self.shutdown_requested:
break
# rest until notified to wakeup
self.sleep()
self.cursor.execute("UNLISTEN pulp_worker_wakeup")
self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat")
Expand Down