A redis scheduler for sidekiq jobs

When dealing with rate-limited external API's from sidekiq background jobs, it is essential to not only handle it in the job but also when scheduling the jobs.

lang-ruby
# frozen_string_literal: true

class Scheduler
  DELAY = 10.seconds

  class Stats
    attr_reader :batch_count, :total_count, :entries

    def initialize(batch_count:, total_count:, entries:)
      @batch_count = batch_count.to_i
      @total_count = total_count.to_i
      @entries     = entries
    end

    def average_batch_size
      total_count / batch_count
    end
  end

  def self.compute_delay(key, batch_size)
    new(key).compute_delay(batch_size)
  end

  def self.stats(key)
    new(key).stats
  end

  def initialize(key)
    @cache_key  = key
    @count_key  = "#{key}:count"
    @total_key  = "#{key}:total"
  end

  def compute_delay(batch_size)
    now    = Time.current.to_i
    keys   = [cache_key, count_key, total_key]
    argv   = [DELAY.to_i, now, batch_size]

    Scripts.call(:compute_delay, keys, argv)
  end

  def stats
    batch_count, total_count, entries = Sidekiq.redis { |conn|
      conn.multi do |pipeline|
        pipeline.get(count_key)
        pipeline.get(total_key)
        pipeline.zrange(cache_key, 0, -1, rev: true, withscores: true)
      end
    }

    Stats.new(
      batch_count: batch_count.to_i,
      total_count: total_count.to_i,
      entries: entries.to_h.values.sort,
    )
  end

  private

  attr_reader :cache_key, :count_key, :total_key, :batch_size
end
This ruby file has a corresponding LUA script and is used from my little Scripts class.

lang-ruby
# frozen_string_literal: true

module Scripts
  LUA_SCRIPTS = {
    rate_limit: Rails.root.join("app/lua/rate_limit.lua").read,
    compute_delay: Rails.root.join("app/lua/compute_delay.lua").read,
    lock: Rails.root.join("app/lua/lock.lua").read,
    unlock: Rails.root.join("app/lua/unlock.lua").read,
  }.freeze

  SHA_CACHE = Concurrent::Map.new

  def self.bootstrap
    Sidekiq.redis do |conn|
      LUA_SCRIPTS.each do |name, lua|
        get_sha(conn, name, lua)
      end
    end
  end

  def self.call(name, keys, args)
    Sidekiq.redis do |conn|
      sha = get_sha(conn, name, LUA_SCRIPTS[name])
      conn.evalsha(sha, keys, normalize_argv(args))
    end
  rescue RedisClient::CommandError => e
    raise unless e.message.include?("NOSCRIPT")

    SHA_CACHE.delete(name)
    retry
  end

  def self.get_sha(conn, name, lua)
    SHA_CACHE.compute_if_absent(name) do
      conn.script(:load, lua)
    end
  end

  def self.normalize_argv(argv)
    argv.map do |item|
      case item
      when FalseClass, NilClass
        0
      when TrueClass
        1
      when ActiveSupport::Duration
        item.to_i * 1000 # Convert Duration to milliseconds
      when Numeric
        item.to_i # Ensure numeric values are integers
      else
        item
      end
    end
  end
end
The LUA script itself is pretty straightforward.
lang-lua
local cache_key   = KEYS[1]
local count_key   = KEYS[2]
local total_key   = KEYS[3]

local delay       = tonumber(ARGV[1])
local time        = tonumber(ARGV[2])
local batch_size  = tonumber(ARGV[3])

-- Clear entries older than the last delay window
-- Keep only entries from (current_time - delay) and newer
local clearBefore = time - delay
redis.call('ZREMRANGEBYSCORE', cache_key, 0, clearBefore)

-- Update counters
redis.call("INCR", count_key)
redis.call("INCRBY", total_key, batch_size)

-- Get the most recent timestamp
local scores = redis.call("ZREVRANGE", cache_key, 0, 0, "WITHSCORES")
local prev_time = nil

if #scores > 0 then
  prev_time = tonumber(scores[2])  -- scores[2] contains the actual score
end

if prev_time then
  -- For subsequent calls, add delay to the previous time
  local next_time = prev_time + delay
  redis.call("ZADD", cache_key, next_time, next_time)

  -- Calculate how long until next_time
  local wait_time = next_time - time
  if wait_time < 0 then
      -- If we're past the scheduled time, calculate the next available slot
      local intervals = math.floor(-wait_time / delay)
      next_time = next_time + (delay * (intervals + 1))
      wait_time = next_time - time
      redis.call("ZADD", cache_key, next_time, next_time)
  end

  redis.call("EXPIRE", cache_key, delay * 2)  -- Only keep entries for 2 intervals
  return wait_time
else
  -- First call, start from current time
  redis.call("ZADD", cache_key, time, time)
  redis.call("EXPIRE", cache_key, delay * 2)
  return 0
end
Then somewhere down the line we call it like follows:

lang-ruby
def scheduler
  @scheduler ||= Scheduler.new(provider.scheduler_key)
end

def enqueue_articles_for_refresh
  provider.log_info(message: "Enqueue", type: Service::DownloadArticle::Job.name, total_count: skus.size)

  if skus.blank?
    provider.log_info(message: "No articles to process", type: Fortnox::DownloadArticle::Job.name)
    return
  end

  skus.in_groups_of(BATCH_SIZE) do |skus_group|
    next if skus_group.compact.empty?

    delay = scheduler.compute_delay(BATCH_SIZE)
    scheduled_time = delay.seconds.from_now

    Service::DownloadArticle::Job.perform_bulk(
      to_sidekiq_args(skus_group.compact),
      scheduled_at: scheduled_time,
    )
  end
end
You'd likely have to adjust a few things; I create database records for our sidekiq jobs for various reasons like locking, tracking, and not just losing shit when a catastrophe hits.

Comments

No comments yet. Be the first to comment!
Your email address will be verified before your first comment is posted. It will not be displayed publicly.
Legal Information
By commenting, you agree to our Privacy Policy and Terms of Service.