Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ PLATFORMS
arm64-darwin-22
arm64-darwin-23
arm64-darwin-24
arm64-darwin-25
x86_64-darwin-21
x86_64-darwin-23
x86_64-linux
Expand Down
30 changes: 24 additions & 6 deletions lib/solid_queue/scheduler/recurring_schedule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ module SolidQueue
class Scheduler::RecurringSchedule
include AppExecutor

ScheduledRecurringTask = Struct.new(:task, :handle) do
delegate :key, :updated_at, to: :task
delegate :cancel, to: :handle
end

attr_reader :scheduled_tasks

def initialize(static_tasks, dynamic_tasks_enabled: false)
Expand Down Expand Up @@ -34,7 +39,11 @@ def schedule_tasks
end

def schedule_task(task)
scheduled_tasks[task.key] = schedule(task)
scheduled_tasks[task.key] = ScheduledRecurringTask.new(task, schedule(task))
end

def unschedule_task(key)
scheduled_tasks.delete(key)&.cancel
end

def unschedule_tasks
Expand All @@ -49,6 +58,7 @@ def task_keys
def reschedule_dynamic_tasks
wrap_in_app_executor do
reload_dynamic_tasks
reschedule_changed_dynamic_tasks
schedule_created_dynamic_tasks
unschedule_deleted_dynamic_tasks
end
Expand All @@ -69,16 +79,24 @@ def dynamic_tasks_enabled?
@dynamic_tasks_enabled
end

def schedule_created_dynamic_tasks
RecurringTask.dynamic.where.not(key: scheduled_tasks.keys).each do |task|
def reschedule_changed_dynamic_tasks
dynamic_tasks.each do |task|
next if !scheduled_tasks.key?(task.key) || scheduled_tasks[task.key].updated_at == task.updated_at

unschedule_task(task.key)
schedule_task(task)
end
end

def schedule_created_dynamic_tasks
dynamic_tasks.each do |task|
schedule_task(task) unless scheduled_tasks.key?(task.key)
end
end

def unschedule_deleted_dynamic_tasks
(scheduled_tasks.keys - RecurringTask.pluck(:key)).each do |key|
scheduled_tasks[key].cancel
scheduled_tasks.delete(key)
(scheduled_tasks.keys - dynamic_tasks.map(&:key) - static_task_keys).each do |key|
unschedule_task(key)
end
end

Expand Down
77 changes: 77 additions & 0 deletions test/unit/scheduler_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,81 @@ class SchedulerTest < ActiveSupport::TestCase
ensure
scheduler&.stop
end

test "picks up schedule updates on existing dynamic tasks post-start" do
task = SolidQueue::RecurringTask.create!(
key: "updatable_task",
static: false,
class_name: "AddToBufferJob",
schedule: "every hour",
arguments: [ 42 ]
)

scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}, dynamic_tasks_enabled: true, polling_interval: 0.1).tap(&:start)

wait_for_registered_processes(1, timeout: 1.second)

skip_active_record_query_cache do
task.update!(schedule: "every second")

wait_while_with_timeout(3.seconds) { SolidQueue::Job.count < 1 }
assert SolidQueue::Job.count >= 1, "Expected jobs to be enqueued after schedule was updated"
end
ensure
scheduler&.stop
end

test "picks up argument updates on existing dynamic tasks post-start" do
task = SolidQueue::RecurringTask.create!(
key: "updatable_args_task",
static: false,
class_name: "AddToBufferJob",
schedule: "every hour",
arguments: [ 42 ]
)

scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}, dynamic_tasks_enabled: true, polling_interval: 0.1).tap(&:start)

wait_for_registered_processes(1, timeout: 1.second)

skip_active_record_query_cache do
previous_scheduled_task = scheduler.recurring_schedule.scheduled_tasks["updatable_args_task"]
assert_not_nil previous_scheduled_task

task.update!(arguments: [ 99 ])

wait_while_with_timeout(3.seconds) do
scheduler.recurring_schedule.scheduled_tasks["updatable_args_task"].equal?(previous_scheduled_task)
end

assert_not_same previous_scheduled_task, scheduler.recurring_schedule.scheduled_tasks["updatable_args_task"]
end
ensure
scheduler&.stop
end

test "does not replace scheduled task when polling finds no updates" do
SolidQueue::RecurringTask.create!(
key: "stable_task",
static: false,
class_name: "AddToBufferJob",
schedule: "every hour",
arguments: [ 42 ]
)

scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}, dynamic_tasks_enabled: true, polling_interval: 0.1).tap(&:start)

wait_for_registered_processes(1, timeout: 1.second)

skip_active_record_query_cache do
scheduled_task = scheduler.recurring_schedule.scheduled_tasks["stable_task"]
assert_not_nil scheduled_task

sleep 0.5

assert_same scheduled_task, scheduler.recurring_schedule.scheduled_tasks["stable_task"]
end
ensure
scheduler&.stop
end
end
Loading