When dealing with rate-limited external API's from sidekiq background jobs, it is essential to not only handle it in the job but also when scheduling the jobs.
lang-ruby # frozen_string_literal: true class Scheduler DELAY = 10.seconds class Stats attr_reader :batch_count, :total_count, :entries def initialize(batch_count:, total_count:, entries:) @batch_count = batch_count.to_i @total_count = total_count.to_i @entries = entries end def average_batch_size total_count / batch_count end end def self.compute_delay(key, batch_size) new(key).compute_delay(batch_size) end def self.stats(key) new(key).stats end def initialize(key) @cache_key = key @count_key = "#{key}:count" @total_key = "#{key}:total" end def compute_delay(batch_size) now = Time.current.to_i keys = [cache_key, count_key, total_key] argv = [DELAY.to_i, now, batch_size] Scripts.call(:compute_delay, keys, argv) end def stats batch_count, total_count, entries = Sidekiq.redis { |conn| conn.multi do |pipeline| pipeline.get(count_key) pipeline.get(total_key) pipeline.zrange(cache_key, 0, -1, rev: true, withscores: true) end } Stats.new( batch_count: batch_count.to_i, total_count: total_count.to_i, entries: entries.to_h.values.sort, ) end private attr_reader :cache_key, :count_key, :total_key, :batch_size end
This ruby file has a corresponding LUA script and is used from my little Scripts class.
lang-ruby # frozen_string_literal: true module Scripts LUA_SCRIPTS = { rate_limit: Rails.root.join("app/lua/rate_limit.lua").read, compute_delay: Rails.root.join("app/lua/compute_delay.lua").read, lock: Rails.root.join("app/lua/lock.lua").read, unlock: Rails.root.join("app/lua/unlock.lua").read, }.freeze SHA_CACHE = Concurrent::Map.new def self.bootstrap Sidekiq.redis do |conn| LUA_SCRIPTS.each do |name, lua| get_sha(conn, name, lua) end end end def self.call(name, keys, args) Sidekiq.redis do |conn| sha = get_sha(conn, name, LUA_SCRIPTS[name]) conn.evalsha(sha, keys, normalize_argv(args)) end rescue RedisClient::CommandError => e raise unless e.message.include?("NOSCRIPT") SHA_CACHE.delete(name) retry end def self.get_sha(conn, name, lua) SHA_CACHE.compute_if_absent(name) do conn.script(:load, lua) end end def self.normalize_argv(argv) argv.map do |item| case item when FalseClass, NilClass 0 when TrueClass 1 when ActiveSupport::Duration item.to_i * 1000 # Convert Duration to milliseconds when Numeric item.to_i # Ensure numeric values are integers else item end end end end
The LUA script itself is pretty straightforward.
lang-lua local cache_key = KEYS[1] local count_key = KEYS[2] local total_key = KEYS[3] local delay = tonumber(ARGV[1]) local time = tonumber(ARGV[2]) local batch_size = tonumber(ARGV[3]) -- Clear entries older than the last delay window -- Keep only entries from (current_time - delay) and newer local clearBefore = time - delay redis.call('ZREMRANGEBYSCORE', cache_key, 0, clearBefore) -- Update counters redis.call("INCR", count_key) redis.call("INCRBY", total_key, batch_size) -- Get the most recent timestamp local scores = redis.call("ZREVRANGE", cache_key, 0, 0, "WITHSCORES") local prev_time = nil if #scores > 0 then prev_time = tonumber(scores[2]) -- scores[2] contains the actual score end if prev_time then -- For subsequent calls, add delay to the previous time local next_time = prev_time + delay redis.call("ZADD", cache_key, next_time, next_time) -- Calculate how long until next_time local wait_time = next_time - time if wait_time < 0 then -- If we're past the scheduled time, calculate the next available slot local intervals = math.floor(-wait_time / delay) next_time = next_time + (delay * (intervals + 1)) wait_time = next_time - time redis.call("ZADD", cache_key, next_time, next_time) end redis.call("EXPIRE", cache_key, delay * 2) -- Only keep entries for 2 intervals return wait_time else -- First call, start from current time redis.call("ZADD", cache_key, time, time) redis.call("EXPIRE", cache_key, delay * 2) return 0 end
Then somewhere down the line we call it like follows:
lang-ruby def scheduler @scheduler ||= Scheduler.new(provider.scheduler_key) end def enqueue_articles_for_refresh provider.log_info(message: "Enqueue", type: Service::DownloadArticle::Job.name, total_count: skus.size) if skus.blank? provider.log_info(message: "No articles to process", type: Fortnox::DownloadArticle::Job.name) return end skus.in_groups_of(BATCH_SIZE) do |skus_group| next if skus_group.compact.empty? delay = scheduler.compute_delay(BATCH_SIZE) scheduled_time = delay.seconds.from_now Service::DownloadArticle::Job.perform_bulk( to_sidekiq_args(skus_group.compact), scheduled_at: scheduled_time, ) end end
You'd likely have to adjust a few things; I create database records for our sidekiq jobs for various reasons like locking, tracking, and not just losing shit when a catastrophe hits.