From 8c9902d008146487ea888630bc9653adaf39408c Mon Sep 17 00:00:00 2001 From: Vladyslav Davydenko Date: Mon, 20 Apr 2026 12:48:43 +0300 Subject: [PATCH] Detect updates to dynamic recurring tasks on poll --- Gemfile.lock | 1 + .../scheduler/recurring_schedule.rb | 30 ++++++-- test/unit/scheduler_test.rb | 77 +++++++++++++++++++ 3 files changed, 102 insertions(+), 6 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index 7c4662de..9317c7cd 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -193,6 +193,7 @@ PLATFORMS arm64-darwin-22 arm64-darwin-23 arm64-darwin-24 + arm64-darwin-25 x86_64-darwin-21 x86_64-darwin-23 x86_64-linux diff --git a/lib/solid_queue/scheduler/recurring_schedule.rb b/lib/solid_queue/scheduler/recurring_schedule.rb index a1e2409e..864d069c 100644 --- a/lib/solid_queue/scheduler/recurring_schedule.rb +++ b/lib/solid_queue/scheduler/recurring_schedule.rb @@ -4,6 +4,11 @@ module SolidQueue class Scheduler::RecurringSchedule include AppExecutor + ScheduledRecurringTask = Struct.new(:task, :handle) do + delegate :key, :updated_at, to: :task + delegate :cancel, to: :handle + end + attr_reader :scheduled_tasks def initialize(static_tasks, dynamic_tasks_enabled: false) @@ -34,7 +39,11 @@ def schedule_tasks end def schedule_task(task) - scheduled_tasks[task.key] = schedule(task) + scheduled_tasks[task.key] = ScheduledRecurringTask.new(task, schedule(task)) + end + + def unschedule_task(key) + scheduled_tasks.delete(key)&.cancel end def unschedule_tasks @@ -49,6 +58,7 @@ def task_keys def reschedule_dynamic_tasks wrap_in_app_executor do reload_dynamic_tasks + reschedule_changed_dynamic_tasks schedule_created_dynamic_tasks unschedule_deleted_dynamic_tasks end @@ -69,16 +79,24 @@ def dynamic_tasks_enabled? @dynamic_tasks_enabled end - def schedule_created_dynamic_tasks - RecurringTask.dynamic.where.not(key: scheduled_tasks.keys).each do |task| + def reschedule_changed_dynamic_tasks + dynamic_tasks.each do |task| + next if !scheduled_tasks.key?(task.key) || scheduled_tasks[task.key].updated_at == task.updated_at + + unschedule_task(task.key) schedule_task(task) end end + def schedule_created_dynamic_tasks + dynamic_tasks.each do |task| + schedule_task(task) unless scheduled_tasks.key?(task.key) + end + end + def unschedule_deleted_dynamic_tasks - (scheduled_tasks.keys - RecurringTask.pluck(:key)).each do |key| - scheduled_tasks[key].cancel - scheduled_tasks.delete(key) + (scheduled_tasks.keys - dynamic_tasks.map(&:key) - static_task_keys).each do |key| + unschedule_task(key) end end diff --git a/test/unit/scheduler_test.rb b/test/unit/scheduler_test.rb index e914a23c..e838970a 100644 --- a/test/unit/scheduler_test.rb +++ b/test/unit/scheduler_test.rb @@ -156,4 +156,81 @@ class SchedulerTest < ActiveSupport::TestCase ensure scheduler&.stop end + + test "picks up schedule updates on existing dynamic tasks post-start" do + task = SolidQueue::RecurringTask.create!( + key: "updatable_task", + static: false, + class_name: "AddToBufferJob", + schedule: "every hour", + arguments: [ 42 ] + ) + + scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}, dynamic_tasks_enabled: true, polling_interval: 0.1).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + + skip_active_record_query_cache do + task.update!(schedule: "every second") + + wait_while_with_timeout(3.seconds) { SolidQueue::Job.count < 1 } + assert SolidQueue::Job.count >= 1, "Expected jobs to be enqueued after schedule was updated" + end + ensure + scheduler&.stop + end + + test "picks up argument updates on existing dynamic tasks post-start" do + task = SolidQueue::RecurringTask.create!( + key: "updatable_args_task", + static: false, + class_name: "AddToBufferJob", + schedule: "every hour", + arguments: [ 42 ] + ) + + scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}, dynamic_tasks_enabled: true, polling_interval: 0.1).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + + skip_active_record_query_cache do + previous_scheduled_task = scheduler.recurring_schedule.scheduled_tasks["updatable_args_task"] + assert_not_nil previous_scheduled_task + + task.update!(arguments: [ 99 ]) + + wait_while_with_timeout(3.seconds) do + scheduler.recurring_schedule.scheduled_tasks["updatable_args_task"].equal?(previous_scheduled_task) + end + + assert_not_same previous_scheduled_task, scheduler.recurring_schedule.scheduled_tasks["updatable_args_task"] + end + ensure + scheduler&.stop + end + + test "does not replace scheduled task when polling finds no updates" do + SolidQueue::RecurringTask.create!( + key: "stable_task", + static: false, + class_name: "AddToBufferJob", + schedule: "every hour", + arguments: [ 42 ] + ) + + scheduler = SolidQueue::Scheduler.new(recurring_tasks: {}, dynamic_tasks_enabled: true, polling_interval: 0.1).tap(&:start) + + wait_for_registered_processes(1, timeout: 1.second) + + skip_active_record_query_cache do + scheduled_task = scheduler.recurring_schedule.scheduled_tasks["stable_task"] + assert_not_nil scheduled_task + + sleep 0.5 + + assert_same scheduled_task, scheduler.recurring_schedule.scheduled_tasks["stable_task"] + end + ensure + scheduler&.stop + end end