A redis scheduler for sidekiq jobs

When dealing with rate-limited external API's from sidekiq background jobs, it is essential to not only handle it in the job but also when scheduling the jobs.

lang-ruby
# frozen_string_literal: true

class Scheduler
  DELAY = 10.seconds

  class Stats
    attr_reader :batch_count, :total_count, :entries

    def initialize(batch_count:, total_count:, entries:)
      @batch_count = batch_count.to_i
      @total_count = total_count.to_i
      @entries     = entries
    end

    def average_batch_size
      total_count / batch_count
    end
  end

  def self.compute_delay(key, batch_size)
    new(key).compute_delay(batch_size)
  end

  def self.stats(key)
    new(key).stats
  end

  def initialize(key)
    @cache_key  = key
    @count_key  = "#{key}:count"
    @total_key  = "#{key}:total"
  end

  def compute_delay(batch_size)
    now    = Time.current.to_i
    keys   = [cache_key, count_key, total_key]
    argv   = [DELAY.to_i, now, batch_size]

    Scripts.call(:compute_delay, keys, argv)
  end

  def stats
    batch_count, total_count, entries = Sidekiq.redis { |conn|
      conn.multi do |pipeline|
        pipeline.get(count_key)
        pipeline.get(total_key)
        pipeline.zrange(cache_key, 0, -1, rev: true, withscores: true)
      end
    }

    Stats.new(
      batch_count: batch_count.to_i,
      total_count: total_count.to_i,
      entries: entries.to_h.values.sort,
    )
  end

  private

  attr_reader :cache_key, :count_key, :total_key, :batch_size
end
This ruby file has a corresponding LUA script and is used from my little Scripts class.

lang-ruby
# frozen_string_literal: true

module Scripts
  LUA_SCRIPTS = {
    rate_limit: Rails.root.join("app/lua/rate_limit.lua").read,
    compute_delay: Rails.root.join("app/lua/compute_delay.lua").read,
    lock: Rails.root.join("app/lua/lock.lua").read,
    unlock: Rails.root.join("app/lua/unlock.lua").read,
  }.freeze

  SHA_CACHE = Concurrent::Map.new

  def self.bootstrap
    Sidekiq.redis do |conn|
      LUA_SCRIPTS.each do |name, lua|
        get_sha(conn, name, lua)
      end
    end
  end

  def self.call(name, keys, args)
    Sidekiq.redis do |conn|
      sha = get_sha(conn, name, LUA_SCRIPTS[name])
      conn.evalsha(sha, keys, normalize_argv(args))
    end
  rescue RedisClient::CommandError => e
    raise unless e.message.include?("NOSCRIPT")

    SHA_CACHE.delete(name)
    retry
  end

  def self.get_sha(conn, name, lua)
    SHA_CACHE.compute_if_absent(name) do
      conn.script(:load, lua)
    end
  end

  def self.normalize_argv(argv)
    argv.map do |item|
      case item
      when FalseClass, NilClass
        0
      when TrueClass
        1
      when ActiveSupport::Duration
        item.to_i * 1000 # Convert Duration to milliseconds
      when Numeric
        item.to_i # Ensure numeric values are integers
      else
        item
      end
    end
  end
end
The LUA script itself is pretty straightforward.
lang-lua
local cache_key   = KEYS[1]
local count_key   = KEYS[2]
local total_key   = KEYS[3]

local delay       = tonumber(ARGV[1])
local time        = tonumber(ARGV[2])
local batch_size  = tonumber(ARGV[3])

-- Clear entries older than the last delay window
-- Keep only entries from (current_time - delay) and newer
local clearBefore = time - delay
redis.call('ZREMRANGEBYSCORE', cache_key, 0, clearBefore)

-- Update counters
redis.call("INCR", count_key)
redis.call("INCRBY", total_key, batch_size)

-- Get the most recent timestamp
local scores = redis.call("ZREVRANGE", cache_key, 0, 0, "WITHSCORES")
local prev_time = nil

if #scores > 0 then
  prev_time = tonumber(scores[2])  -- scores[2] contains the actual score
end

if prev_time then
  -- For subsequent calls, add delay to the previous time
  local next_time = prev_time + delay
  redis.call("ZADD", cache_key, next_time, next_time)

  -- Calculate how long until next_time
  local wait_time = next_time - time
  if wait_time < 0 then
      -- If we're past the scheduled time, calculate the next available slot
      local intervals = math.floor(-wait_time / delay)
      next_time = next_time + (delay * (intervals + 1))
      wait_time = next_time - time
      redis.call("ZADD", cache_key, next_time, next_time)
  end

  redis.call("EXPIRE", cache_key, delay * 2)  -- Only keep entries for 2 intervals
  return wait_time
else
  -- First call, start from current time
  redis.call("ZADD", cache_key, time, time)
  redis.call("EXPIRE", cache_key, delay * 2)
  return 0
end
Then somewhere down the line we call it like follows:

lang-ruby
def scheduler
  @scheduler ||= Scheduler.new(provider.scheduler_key)
end

def enqueue_articles_for_refresh
  provider.log_info(message: "Enqueue", type: Service::DownloadArticle::Job.name, total_count: skus.size)

  if skus.blank?
    provider.log_info(message: "No articles to process", type: Fortnox::DownloadArticle::Job.name)
    return
  end

  skus.in_groups_of(BATCH_SIZE) do |skus_group|
    next if skus_group.compact.empty?

    delay = scheduler.compute_delay(BATCH_SIZE)
    scheduled_time = delay.seconds.from_now

    Service::DownloadArticle::Job.perform_bulk(
      to_sidekiq_args(skus_group.compact),
      scheduled_at: scheduled_time,
    )
  end
end
You'd likely have to adjust a few things; I create database records for our sidekiq jobs for various reasons like locking, tracking, and not just losing shit when a catastrophe hits.