When dealing with rate-limited external API's from sidekiq background jobs, it is essential to not only handle it in the job but also when scheduling the jobs.
lang-ruby
# frozen_string_literal: true
class Scheduler
DELAY = 10.seconds
class Stats
attr_reader :batch_count, :total_count, :entries
def initialize(batch_count:, total_count:, entries:)
@batch_count = batch_count.to_i
@total_count = total_count.to_i
@entries = entries
end
def average_batch_size
total_count / batch_count
end
end
def self.compute_delay(key, batch_size)
new(key).compute_delay(batch_size)
end
def self.stats(key)
new(key).stats
end
def initialize(key)
@cache_key = key
@count_key = "#{key}:count"
@total_key = "#{key}:total"
end
def compute_delay(batch_size)
now = Time.current.to_i
keys = [cache_key, count_key, total_key]
argv = [DELAY.to_i, now, batch_size]
Scripts.call(:compute_delay, keys, argv)
end
def stats
batch_count, total_count, entries = Sidekiq.redis { |conn|
conn.multi do |pipeline|
pipeline.get(count_key)
pipeline.get(total_key)
pipeline.zrange(cache_key, 0, -1, rev: true, withscores: true)
end
}
Stats.new(
batch_count: batch_count.to_i,
total_count: total_count.to_i,
entries: entries.to_h.values.sort,
)
end
private
attr_reader :cache_key, :count_key, :total_key, :batch_size
endThis ruby file has a corresponding LUA script and is used from my little Scripts class.
lang-ruby
# frozen_string_literal: true
module Scripts
LUA_SCRIPTS = {
rate_limit: Rails.root.join("app/lua/rate_limit.lua").read,
compute_delay: Rails.root.join("app/lua/compute_delay.lua").read,
lock: Rails.root.join("app/lua/lock.lua").read,
unlock: Rails.root.join("app/lua/unlock.lua").read,
}.freeze
SHA_CACHE = Concurrent::Map.new
def self.bootstrap
Sidekiq.redis do |conn|
LUA_SCRIPTS.each do |name, lua|
get_sha(conn, name, lua)
end
end
end
def self.call(name, keys, args)
Sidekiq.redis do |conn|
sha = get_sha(conn, name, LUA_SCRIPTS[name])
conn.evalsha(sha, keys, normalize_argv(args))
end
rescue RedisClient::CommandError => e
raise unless e.message.include?("NOSCRIPT")
SHA_CACHE.delete(name)
retry
end
def self.get_sha(conn, name, lua)
SHA_CACHE.compute_if_absent(name) do
conn.script(:load, lua)
end
end
def self.normalize_argv(argv)
argv.map do |item|
case item
when FalseClass, NilClass
0
when TrueClass
1
when ActiveSupport::Duration
item.to_i * 1000 # Convert Duration to milliseconds
when Numeric
item.to_i # Ensure numeric values are integers
else
item
end
end
end
endThe LUA script itself is pretty straightforward.
lang-lua
local cache_key = KEYS[1]
local count_key = KEYS[2]
local total_key = KEYS[3]
local delay = tonumber(ARGV[1])
local time = tonumber(ARGV[2])
local batch_size = tonumber(ARGV[3])
-- Clear entries older than the last delay window
-- Keep only entries from (current_time - delay) and newer
local clearBefore = time - delay
redis.call('ZREMRANGEBYSCORE', cache_key, 0, clearBefore)
-- Update counters
redis.call("INCR", count_key)
redis.call("INCRBY", total_key, batch_size)
-- Get the most recent timestamp
local scores = redis.call("ZREVRANGE", cache_key, 0, 0, "WITHSCORES")
local prev_time = nil
if #scores > 0 then
prev_time = tonumber(scores[2]) -- scores[2] contains the actual score
end
if prev_time then
-- For subsequent calls, add delay to the previous time
local next_time = prev_time + delay
redis.call("ZADD", cache_key, next_time, next_time)
-- Calculate how long until next_time
local wait_time = next_time - time
if wait_time < 0 then
-- If we're past the scheduled time, calculate the next available slot
local intervals = math.floor(-wait_time / delay)
next_time = next_time + (delay * (intervals + 1))
wait_time = next_time - time
redis.call("ZADD", cache_key, next_time, next_time)
end
redis.call("EXPIRE", cache_key, delay * 2) -- Only keep entries for 2 intervals
return wait_time
else
-- First call, start from current time
redis.call("ZADD", cache_key, time, time)
redis.call("EXPIRE", cache_key, delay * 2)
return 0
endThen somewhere down the line we call it like follows:
lang-ruby
def scheduler
@scheduler ||= Scheduler.new(provider.scheduler_key)
end
def enqueue_articles_for_refresh
provider.log_info(message: "Enqueue", type: Service::DownloadArticle::Job.name, total_count: skus.size)
if skus.blank?
provider.log_info(message: "No articles to process", type: Fortnox::DownloadArticle::Job.name)
return
end
skus.in_groups_of(BATCH_SIZE) do |skus_group|
next if skus_group.compact.empty?
delay = scheduler.compute_delay(BATCH_SIZE)
scheduled_time = delay.seconds.from_now
Service::DownloadArticle::Job.perform_bulk(
to_sidekiq_args(skus_group.compact),
scheduled_at: scheduled_time,
)
end
endYou'd likely have to adjust a few things; I create database records for our sidekiq jobs for various reasons like locking, tracking, and not just losing shit when a catastrophe hits.