Client side rate limiter

I was recently working on some code that involved an external API. The original author of the code thought it best to run the entire thing sequentially, which upon crashing, prevented the later parts of the integration from ever finishing. Also, a deployment or restart of our background workers eventually killed that long-running job.

Long-running jobs are bad, like terrible, and should be avoided at almost all costs. Splitting the work into multiple background jobs using batches is a great idea. Now as the work ran in parallel, it finished fast! Unfortunately, it bombarded the external API with too many concurrent requests, and their API returned many rate limit errors (HTTP status 429?).

Fortunately for me, we are using sidekiq and there for redis. Maintaining sidekiq unique jobs I've become comfortable with redis and lua. This code needs to be transactional, and since Lua locks redis for the duration of the script execution, it perfectly fits the requirements.

Sliding window

The algorithm I was faced with is called a sliding window (at least, I think that's what it's called). To make things a little more complex, I have two windows, one for 5 seconds and one for 5 minutes. My script needs to verify that both windows are ok. 

Phew, now let's get some code written! The first thing I like to do is to start with some Lua. 

lang-lua
local key     = KEYS[1]
local key2    = KEYS[2]

local now     = tonumber(ARGV[1])
local limit   = tonumber(ARGV[2])
local window  = tonumber(ARGV[3])
local limit2  = tonumber(ARGV[4])
local window2 = tonumber(ARGV[5])

local result = {}

local clearWindow = now - window
redis.call('ZREMRANGEBYSCORE', key, 0, clearWindow)

local amount = redis.call('ZCARD', key)
if amount < limit then
  redis.call('ZADD', key, now, now)
end

redis.call('EXPIRE', key, window)

result[1] = limit - amount

if limit2 and window2 then
  local clearWindow2 = now - window2
  redis.call('ZREMRANGEBYSCORE', key2, 0, clearWindow2)

  local amount2 = redis.call('ZCARD', key2)
  if amount2 < limit2 then
    redis.call('ZADD', key2, now, now)
  end

  redis.call('EXPIRE', key2, window2)

  result[2] = limit2 - amount2
end

return result

Lua explained

  1. The lua code takes two keys, one for the first window and one for the second. We need to store some data in redis in between checks. 
  2. The lua code takes 5 arguments. 
    1. The current time represented as a float (this is used for weight or score in the sorted set).
    2. The limit of the first (narrow) window
    3. The first window itself in seconds
    4. The limit of the second (wider) window
    5. The second window itself in seconds

The first thing we want to do now is to expire the anything that is before now - the allowed window. This ensures that we get a correct result from ZCARD and whatever ZCARD returns is simply how many slots are available during the window. If the amount from ZCARD is less than the limit then we are good to add another entry in the sorted set with now as score.

Repeat the exact same logic a second time for the second window.

lang-ruby
# frozen_string_literal: true

class RateLimit
  class Limited < StandardError
  end

  #
  # Initialize a rate limit instance
  #
  # @param [String] key example the id of a provider
  # @param [Integer] limit: the number to limit to
  # @param [Integer] window: the period or window to limit for
  # @param [Integer] limit_2: nil a second limit to consider
  # @param [Integer] window_2: nil a second window to consider
  #
  def initialize(key, limit:, window:, limit_2: nil, window_2: nil)
    @key      = key
    @key_2    = "#{key}:fallback"
    @limit    = limit
    @window   = window
    @limit_2  = limit_2
    @window_2 = window_2
  end

  attr_reader :key, :key_2, :limit, :window, :limit_2, :window_2

  #
  # Ensures rate limit can't be exceeded
  #
  # @return [Integer] the number of available slots
  #
  # @yieldparam [void] if given a block
  def ensure(&block)
    available = record_rate_limit_attempt

    remaining_1, remaining_2 = *available

    if remaining_1.zero?
      raise Limited, "#{key} is constrained to #{limit} calls per #{window} seconds"
    elsif remaining_2&.zero?
      raise Limited, "#{key} is constrained to #{limit_2} calls per #{window_2} seconds"
    elsif block
      return yield
    end

    available
  end

  #
  # Record an attempt to rate limit without raising yielding
  #
  #
  # @return [Array<Integer>] the remaining count for the given window
  #
  def record_rate_limit_attempt
    now  = Time.current.to_f
    keys = [key, key_2]
    args = [
      now,
      limit,
      window,
      limit_2,
      window_2,
    ]

    Scripts.call(:rate_limit, keys, args)
  end
end

Ruby explained

Well, there isn't much to it. Instantiate a rate limit instance with the configuration you need and wrap the code that needs protection inside `rate_limit.ensure { }`.

  1. Record a rate limit attempt that calls the lua script for us.
  2. Raise an error if either the first or the second key is out of attempts
  3. Yield to the block if we still have attempts

Usage

A valid use case might look like below: 

lang-rb
# frozen_string_literal: true

class CompanyAPI::Client
  API_URL = "https://api.company.com/v42/"
    
  # @param [String] source_id for a Shop::Customer
  def get_customer(source_id)
    raise ArgumentError.new("source_id is required") if source_id.blank?

    request(:get, "customers/#{source_id}")
  rescue JSON::ParserError
    raise CompanyAPI::InvalidResponse.new("Failed parsing the customer #{source_id} response body.", response: res, subdomain: site.subdomain)
  end

  # @param [String] uri will build to absolute URL
  # @param [Symbol] http_verb
  # @param [Hash] body optional
  #
  # @return [Faraday::Response] response object
  def request(verb, uri, params_or_body = {})
    url = Addressable::URI.encode(uri)
    rate_limit_message = "Couldn't #{verb.to_s.upcase} #{url} for site: #{site.subdomain}"

    rate_limit.ensure do
      connection = Faraday.new(API_URL) do |f|
        f.headers["Content-Type"] = "application/json"
        f.headers["Authorization"] = "Bearer #{provider.authentication["access_token"]}"
        f.adapter Faraday.default_adapter
      end

      res = case verb
      when :get
        connection.get(url, params_or_body)
      when :post, :put
        connection.public_send(verb, url, params_or_body.to_json)
      end

      handle_errors(res, rate_limit_message)

      JSON.parse(res.body) 
    end
  end

  def handle_errors(res, rate_limit_message)
    return unless res.status >= 400

    case res.status
    when 400
      raise CompanyAPI::BadRequest.new("The request cannot be fulfilled due to bad syntax", response: res)
    when 401
      raise CompanyAPI::Unauthorized.new("The client needs a new refresh token", response: res)
    when 403
      raise CompanyAPI::Forbidden.new("Not authorized to perform that action", response: res)
    when 404
      raise CompanyAPI::ResourceNotFound.new("The requested resource could not be found", response: res)
    when 429
      raise CompanyAPI::RateLimit.new(rate_limit_message, response: res)
    when 500
      raise CompanyAPI::InternalServerError.new("Internal Server Error", response: res)
    end
  end

  def rate_limit
    @rate_limit ||= RateLimit.new(provider.rate_limit_key, limit: 25, window: 5, limit_2: 300, window_2: 60)
  end
end


Usage explained

It is crucial to have two unique error classes, one for the rate limiter and one for the API. This way, we will know if something changes or if our code isn't working.