Skip to content

Commit 50d6b7f

Browse files
authored
Merge pull request #33 from babylist/middleware-use-job-payload-instead-of-worker-instance
Use job payload instead of worker instance in middleware
2 parents 7358ade + 6d71eb5 commit 50d6b7f

4 files changed

Lines changed: 146 additions & 94 deletions

File tree

lib/sidekiq/cronitor.rb

Lines changed: 27 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,72 +13,63 @@
1313

1414
module Sidekiq::Cronitor
1515
class ServerMiddleware
16-
def call(worker, message, queue)
17-
ping(worker: worker, state: 'run')
16+
# @param [Object] worker the instance of the job that was queued
17+
# @param [Hash] job_payload the full job payload
18+
# * @see https://github.com/sidekiq/sidekiq/wiki/Job-Format
19+
# @param [String] queue the name of the queue the job was pulled from
20+
# @yield the next middleware in the chain or worker `perform` method
21+
def call(worker, job_payload, queue)
22+
ping(job_payload: job_payload, state: 'run')
1823

1924
result = yield
2025
rescue => e
21-
ping(worker: worker, state: 'fail', message: e.to_s)
26+
ping(job_payload: job_payload, state: 'fail', message: e.to_s)
2227

2328
raise e
2429
else
25-
ping(worker: worker, state: 'complete')
30+
ping(job_payload: job_payload, state: 'complete')
2631
result # to be consistent with client middleware, return results of yield
2732
end
2833

2934
private
3035

31-
def cronitor(worker)
32-
Cronitor::Monitor.new(job_key(worker))
36+
def cronitor(job_payload)
37+
Cronitor::Monitor.new(job_key(job_payload))
3338
end
3439

35-
def cronitor_disabled?(worker)
36-
if worker.class.sidekiq_options.has_key?("cronitor_enabled")
37-
!worker.class.sidekiq_options.fetch("cronitor_enabled", Cronitor.auto_discover_sidekiq)
40+
def cronitor_disabled?(job_payload)
41+
if job_payload.has_key?("cronitor_enabled")
42+
!job_payload.fetch("cronitor_enabled", Cronitor.auto_discover_sidekiq)
3843
else
39-
worker.class.sidekiq_options.fetch("cronitor_disabled", options(worker).fetch(:disabled, !Cronitor.auto_discover_sidekiq))
44+
job_payload.fetch("cronitor_disabled", options(job_payload).fetch("disabled", !Cronitor.auto_discover_sidekiq))
4045
end
4146
end
4247

43-
def job_key(worker)
44-
periodic_job_key(worker) || worker.class.sidekiq_options.fetch('cronitor_key', nil) ||
45-
options(worker).fetch(:key, worker.class.name)
48+
def job_key(job_payload)
49+
job_payload['cronitor_key'] || options(job_payload)['key'] || job_payload['class']
4650
end
4751

48-
def periodic_job_key(worker)
49-
return unless defined?(Sidekiq::Periodic)
50-
51-
periodic_job = Sidekiq::Periodic::LoopSet.new.find do |lop|
52-
lop.history.find { |j| j[0] == worker.jid }
53-
end
54-
55-
periodic_job.present? && periodic_job.options.fetch('cronitor_key', nil)
56-
end
57-
58-
def options(worker)
52+
def options(job_payload)
5953
# eventually we will delete this method of passing options
6054
# ultimately we want all cronitor options to be top level keys
61-
opts = worker.class.sidekiq_options.fetch("cronitor", {})
62-
# symbolize_keys is a rails helper, so only use it if it's defined
63-
opts = opts.symbolize_keys if opts.respond_to?(:symbolize_keys)
64-
opts
55+
job_payload.fetch("cronitor", {})
6556
end
6657

67-
def ping(worker:, state:, message: nil)
68-
return unless should_ping?(worker)
58+
def ping(job_payload:, state:, message: nil)
59+
return unless should_ping?(job_payload)
6960

70-
Sidekiq.logger.debug("[cronitor] ping: worker=#{job_key(worker)} state=#{state} message=#{message}")
61+
Sidekiq.logger.debug("[cronitor] ping: worker=#{job_key(job_payload)} state=#{state} message=#{message}")
7162

72-
cronitor(worker).ping(state: state, message: message)
63+
cronitor(job_payload).ping(state: state, message: message)
7364
rescue Cronitor::Error => e
74-
Sidekiq.logger.error("[cronitor] error during ping: worker=#{job_key(worker)} error=#{e.message}")
65+
Sidekiq.logger.error("[cronitor] error during ping: worker=#{job_key(job_payload)} error=#{e.message}")
7566
rescue => e
76-
Sidekiq.logger.error("[cronitor] unexpected error: worker=#{job_key(worker)} error=#{e.message}")
67+
Sidekiq.logger.error("[cronitor] unexpected error: worker=#{job_key(job_payload)} error=#{e.message}")
7768
Sidekiq.logger.error(e.backtrace.first)
7869
end
7970

80-
def should_ping?(worker)
81-
!cronitor(worker).api_key.nil? && !cronitor_disabled?(worker)
71+
def should_ping?(job_payload)
72+
!cronitor(job_payload).api_key.nil? && !cronitor_disabled?(job_payload)
8273
end
8374
end
8475
end

lib/sidekiq/cronitor/periodic_jobs.rb

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ def self.sync_schedule!
66
monitors_payload = []
77
loops = Sidekiq::Periodic::LoopSet.new
88
loops.each do |lop|
9-
if lop.options.has_key?('cronitor_enabled') || lop.klass.constantize.sidekiq_options.has_key?('cronitor_enabled')
9+
if parsed_options(lop).has_key?('cronitor_enabled') || Object.const_get(lop.klass).sidekiq_options.has_key?('cronitor_enabled')
1010
next unless fetch_option(lop, 'cronitor_enabled', Cronitor.auto_discover_sidekiq)
1111
else
1212
next if fetch_option(lop, 'cronitor_disabled', !Cronitor.auto_discover_sidekiq)
@@ -29,9 +29,17 @@ def self.sync_schedule!
2929
Sidekiq.logger.error("[cronitor] error during #{name}.#{__method__}: #{e}")
3030
end
3131

32+
def self.parsed_options(lop)
33+
if lop.options.is_a?(String)
34+
JSON.parse(lop.options)
35+
else
36+
lop.options
37+
end
38+
end
39+
3240
def self.fetch_option(lop, key, default = nil)
33-
lop.options.fetch(key, default) ||
34-
lop.klass.constantize.sidekiq_options.fetch(key, default)
41+
parsed_options(lop).fetch(key, default) ||
42+
Object.const_get(lop.klass).sidekiq_options.fetch(key, default)
3543
end
3644
end
3745
end

spec/sidekiq/cronitor/periodic_jobs_spec.rb

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,70 @@
22

33
require 'sidekiq/cronitor/periodic_jobs'
44

5+
class DummyWorker
6+
def self.sidekiq_options=(options)
7+
@sidekiq_options ||= {}
8+
@sidekiq_options.merge!(options)
9+
end
10+
def self.sidekiq_options
11+
@sidekiq_options ||= {}
12+
end
13+
def self.reset_options
14+
@sidekiq_options = {}
15+
end
16+
end
17+
518
RSpec.describe Sidekiq::Cronitor::PeriodicJobs do
19+
let(:cronitor_key) { "dummy-worker" }
20+
let(:loops) { [job] }
21+
622
before do
7-
allow(Sidekiq::Periodic::LoopSet).to receive(:new).and_return([])
23+
class_double("Sidekiq::Periodic::LoopSet").as_stubbed_const
24+
allow(Sidekiq::Periodic::LoopSet).to receive(:new).and_return(loops)
825
end
926

1027
describe '.sync_schedule!' do
11-
xit { expect { described_class.sync_schedule! }.not_to raise_error }
28+
context "when no loops" do
29+
let(:loops) { [] }
30+
it { expect { described_class.sync_schedule! }.not_to raise_error }
31+
end
32+
33+
context "when job options are stringified JSON" do
34+
let(:job) {
35+
instance_double(
36+
"Sidekiq::Periodic::Loop",
37+
klass: DummyWorker.name,
38+
schedule: "* * * * *",
39+
tz_name: "Etc/UTC",
40+
options: {
41+
"cronitor_key" => cronitor_key,
42+
"cronitor_group" => "dummy-team"
43+
}.to_json
44+
)
45+
}
46+
it "updates monitors" do
47+
expect(Cronitor::Monitor).to receive(:put).with(monitors: [hash_including(key: cronitor_key)])
48+
described_class.sync_schedule!
49+
end
50+
end
51+
52+
context "when job options are a hash" do
53+
let(:job) {
54+
instance_double(
55+
"Sidekiq::Periodic::Loop",
56+
klass: DummyWorker.name,
57+
schedule: "* * * * *",
58+
tz_name: "Etc/UTC",
59+
options: {
60+
"cronitor_key" => cronitor_key,
61+
"cronitor_group" => "dummy-team"
62+
}
63+
)
64+
}
65+
it "updates monitors" do
66+
expect(Cronitor::Monitor).to receive(:put).with(monitors: [hash_including(key: cronitor_key)])
67+
described_class.sync_schedule!
68+
end
69+
end
1270
end
1371
end

0 commit comments

Comments
 (0)