I was recently working on some code that involved an external API. The original author of the code thought it best to run the entire thing sequentially, which upon crashing, prevented the later parts of the integration from ever finishing. Also, a deployment or restart of our background workers eventually killed that long-running job.
Long-running jobs are bad, like terrible, and should be avoided at almost all costs. Splitting the work into multiple background jobs using batches is a great idea. Now as the work ran in parallel, it finished fast! Unfortunately, it bombarded the external API with too many concurrent requests, and their API returned many rate limit errors (HTTP status 429?).
Fortunately for me, we are using sidekiq and there for redis. Maintaining sidekiq unique jobs I've become comfortable with redis and lua. This code needs to be transactional, and since Lua locks redis for the duration of the script execution, it perfectly fits the requirements.
Sliding window
The algorithm I was faced with is called a sliding window (at least, I think that's what it's called). To make things a little more complex, I have two windows, one for 5 seconds and one for 5 minutes. My script needs to verify that both windows are ok.
Phew, now let's get some code written! The first thing I like to do is to start with some Lua.
Long-running jobs are bad, like terrible, and should be avoided at almost all costs. Splitting the work into multiple background jobs using batches is a great idea. Now as the work ran in parallel, it finished fast! Unfortunately, it bombarded the external API with too many concurrent requests, and their API returned many rate limit errors (HTTP status 429?).
Fortunately for me, we are using sidekiq and there for redis. Maintaining sidekiq unique jobs I've become comfortable with redis and lua. This code needs to be transactional, and since Lua locks redis for the duration of the script execution, it perfectly fits the requirements.
Sliding window
The algorithm I was faced with is called a sliding window (at least, I think that's what it's called). To make things a little more complex, I have two windows, one for 5 seconds and one for 5 minutes. My script needs to verify that both windows are ok.
Phew, now let's get some code written! The first thing I like to do is to start with some Lua.
lang-lua
local key = KEYS[1]
local key2 = KEYS[2]
local now = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local window = tonumber(ARGV[3])
local limit2 = tonumber(ARGV[4])
local window2 = tonumber(ARGV[5])
local result = {}
local clearWindow = now - window
redis.call('ZREMRANGEBYSCORE', key, 0, clearWindow)
local amount = redis.call('ZCARD', key)
if amount < limit then
redis.call('ZADD', key, now, now)
end
redis.call('EXPIRE', key, window)
result[1] = limit - amount
if limit2 and window2 then
local clearWindow2 = now - window2
redis.call('ZREMRANGEBYSCORE', key2, 0, clearWindow2)
local amount2 = redis.call('ZCARD', key2)
if amount2 < limit2 then
redis.call('ZADD', key2, now, now)
end
redis.call('EXPIRE', key2, window2)
result[2] = limit2 - amount2
end
return resultLua explained
- The lua code takes two keys, one for the first window and one for the second. We need to store some data in redis in between checks.
- The lua code takes 5 arguments.
- The current time represented as a float (this is used for weight or score in the sorted set).
- The limit of the first (narrow) window
- The first window itself in seconds
- The limit of the second (wider) window
- The second window itself in seconds
The first thing we want to do now is to expire the anything that is before now - the allowed window. This ensures that we get a correct result from ZCARD and whatever ZCARD returns is simply how many slots are available during the window. If the amount from ZCARD is less than the limit then we are good to add another entry in the sorted set with now as score.
Repeat the exact same logic a second time for the second window.
lang-ruby
# frozen_string_literal: true
class RateLimit
class Limited < StandardError
end
#
# Initialize a rate limit instance
#
# @param [String] key example the id of a provider
# @param [Integer] limit: the number to limit to
# @param [Integer] window: the period or window to limit for
# @param [Integer] limit_2: nil a second limit to consider
# @param [Integer] window_2: nil a second window to consider
#
def initialize(key, limit:, window:, limit_2: nil, window_2: nil)
@key = key
@key_2 = "#{key}:fallback"
@limit = limit
@window = window
@limit_2 = limit_2
@window_2 = window_2
end
attr_reader :key, :key_2, :limit, :window, :limit_2, :window_2
#
# Ensures rate limit can't be exceeded
#
# @return [Integer] the number of available slots
#
# @yieldparam [void] if given a block
def ensure(&block)
available = record_rate_limit_attempt
remaining_1, remaining_2 = *available
if remaining_1.zero?
raise Limited, "#{key} is constrained to #{limit} calls per #{window} seconds"
elsif remaining_2&.zero?
raise Limited, "#{key} is constrained to #{limit_2} calls per #{window_2} seconds"
elsif block
return yield
end
available
end
#
# Record an attempt to rate limit without raising yielding
#
#
# @return [Array<Integer>] the remaining count for the given window
#
def record_rate_limit_attempt
now = Time.current.to_f
keys = [key, key_2]
args = [
now,
limit,
window,
limit_2,
window_2,
]
Scripts.call(:rate_limit, keys, args)
end
endRuby explained
Well, there isn't much to it. Instantiate a rate limit instance with the configuration you need and wrap the code that needs protection inside `rate_limit.ensure { }`.
- Record a rate limit attempt that calls the lua script for us.
- Raise an error if either the first or the second key is out of attempts
- Yield to the block if we still have attempts
Usage
A valid use case might look like below:
lang-rb
# frozen_string_literal: true
class CompanyAPI::Client
API_URL = "https://api.company.com/v42/"
# @param [String] source_id for a Shop::Customer
def get_customer(source_id)
raise ArgumentError.new("source_id is required") if source_id.blank?
request(:get, "customers/#{source_id}")
rescue JSON::ParserError
raise CompanyAPI::InvalidResponse.new("Failed parsing the customer #{source_id} response body.", response: res, subdomain: site.subdomain)
end
# @param [String] uri will build to absolute URL
# @param [Symbol] http_verb
# @param [Hash] body optional
#
# @return [Faraday::Response] response object
def request(verb, uri, params_or_body = {})
url = Addressable::URI.encode(uri)
rate_limit_message = "Couldn't #{verb.to_s.upcase} #{url} for site: #{site.subdomain}"
rate_limit.ensure do
connection = Faraday.new(API_URL) do |f|
f.headers["Content-Type"] = "application/json"
f.headers["Authorization"] = "Bearer #{provider.authentication["access_token"]}"
f.adapter Faraday.default_adapter
end
res = case verb
when :get
connection.get(url, params_or_body)
when :post, :put
connection.public_send(verb, url, params_or_body.to_json)
end
handle_errors(res, rate_limit_message)
JSON.parse(res.body)
end
end
def handle_errors(res, rate_limit_message)
return unless res.status >= 400
case res.status
when 400
raise CompanyAPI::BadRequest.new("The request cannot be fulfilled due to bad syntax", response: res)
when 401
raise CompanyAPI::Unauthorized.new("The client needs a new refresh token", response: res)
when 403
raise CompanyAPI::Forbidden.new("Not authorized to perform that action", response: res)
when 404
raise CompanyAPI::ResourceNotFound.new("The requested resource could not be found", response: res)
when 429
raise CompanyAPI::RateLimit.new(rate_limit_message, response: res)
when 500
raise CompanyAPI::InternalServerError.new("Internal Server Error", response: res)
end
end
def rate_limit
@rate_limit ||= RateLimit.new(provider.rate_limit_key, limit: 25, window: 5, limit_2: 300, window_2: 60)
end
end
Usage explained
It is crucial to have two unique error classes, one for the rate limiter and one for the API. This way, we will know if something changes or if our code isn't working.