I was recently working on some code that involved an external API. The original author of the code thought it best to run the entire thing sequentially, which upon crashing, prevented the later parts of the integration from ever finishing. Also, a deployment or restart of our background workers eventually killed that long-running job.
Long-running jobs are bad, like terrible, and should be avoided at almost all costs. Splitting the work into multiple background jobs using batches is a great idea. Now as the work ran in parallel, it finished fast! Unfortunately, it bombarded the external API with too many concurrent requests, and their API returned many rate limit errors (HTTP status 429?).
Fortunately for me, we are using sidekiq and there for redis. Maintaining sidekiq unique jobs I've become comfortable with redis and lua. This code needs to be transactional, and since Lua locks redis for the duration of the script execution, it perfectly fits the requirements.
Sliding window
The algorithm I was faced with is called a sliding window (at least, I think that's what it's called). To make things a little more complex, I have two windows, one for 5 seconds and one for 5 minutes. My script needs to verify that both windows are ok.
Phew, now let's get some code written! The first thing I like to do is to start with some Lua.
Long-running jobs are bad, like terrible, and should be avoided at almost all costs. Splitting the work into multiple background jobs using batches is a great idea. Now as the work ran in parallel, it finished fast! Unfortunately, it bombarded the external API with too many concurrent requests, and their API returned many rate limit errors (HTTP status 429?).
Fortunately for me, we are using sidekiq and there for redis. Maintaining sidekiq unique jobs I've become comfortable with redis and lua. This code needs to be transactional, and since Lua locks redis for the duration of the script execution, it perfectly fits the requirements.
Sliding window
The algorithm I was faced with is called a sliding window (at least, I think that's what it's called). To make things a little more complex, I have two windows, one for 5 seconds and one for 5 minutes. My script needs to verify that both windows are ok.
Phew, now let's get some code written! The first thing I like to do is to start with some Lua.
lang-lua local key = KEYS[1] local key2 = KEYS[2] local now = tonumber(ARGV[1]) local limit = tonumber(ARGV[2]) local window = tonumber(ARGV[3]) local limit2 = tonumber(ARGV[4]) local window2 = tonumber(ARGV[5]) local result = {} local clearWindow = now - window redis.call('ZREMRANGEBYSCORE', key, 0, clearWindow) local amount = redis.call('ZCARD', key) if amount < limit then redis.call('ZADD', key, now, now) end redis.call('EXPIRE', key, window) result[1] = limit - amount if limit2 and window2 then local clearWindow2 = now - window2 redis.call('ZREMRANGEBYSCORE', key2, 0, clearWindow2) local amount2 = redis.call('ZCARD', key2) if amount2 < limit2 then redis.call('ZADD', key2, now, now) end redis.call('EXPIRE', key2, window2) result[2] = limit2 - amount2 end return result
Lua explained
- The lua code takes two keys, one for the first window and one for the second. We need to store some data in redis in between checks.
- The lua code takes 5 arguments.
- The current time represented as a float (this is used for weight or score in the sorted set).
- The limit of the first (narrow) window
- The first window itself in seconds
- The limit of the second (wider) window
- The second window itself in seconds
The first thing we want to do now is to expire the anything that is before now - the allowed window. This ensures that we get a correct result from ZCARD and whatever ZCARD returns is simply how many slots are available during the window. If the amount from ZCARD is less than the limit then we are good to add another entry in the sorted set with now as score.
Repeat the exact same logic a second time for the second window.
lang-ruby # frozen_string_literal: true class RateLimit class Limited < StandardError end # # Initialize a rate limit instance # # @param [String] key example the id of a provider # @param [Integer] limit: the number to limit to # @param [Integer] window: the period or window to limit for # @param [Integer] limit_2: nil a second limit to consider # @param [Integer] window_2: nil a second window to consider # def initialize(key, limit:, window:, limit_2: nil, window_2: nil) @key = key @key_2 = "#{key}:fallback" @limit = limit @window = window @limit_2 = limit_2 @window_2 = window_2 end attr_reader :key, :key_2, :limit, :window, :limit_2, :window_2 # # Ensures rate limit can't be exceeded # # @return [Integer] the number of available slots # # @yieldparam [void] if given a block def ensure(&block) available = record_rate_limit_attempt remaining_1, remaining_2 = *available if remaining_1.zero? raise Limited, "#{key} is constrained to #{limit} calls per #{window} seconds" elsif remaining_2&.zero? raise Limited, "#{key} is constrained to #{limit_2} calls per #{window_2} seconds" elsif block return yield end available end # # Record an attempt to rate limit without raising yielding # # # @return [Array<Integer>] the remaining count for the given window # def record_rate_limit_attempt now = Time.current.to_f keys = [key, key_2] args = [ now, limit, window, limit_2, window_2, ] Scripts.call(:rate_limit, keys, args) end end
Ruby explained
Well, there isn't much to it. Instantiate a rate limit instance with the configuration you need and wrap the code that needs protection inside `rate_limit.ensure { }`.
- Record a rate limit attempt that calls the lua script for us.
- Raise an error if either the first or the second key is out of attempts
- Yield to the block if we still have attempts
Usage
A valid use case might look like below:
lang-rb # frozen_string_literal: true class CompanyAPI::Client API_URL = "https://api.company.com/v42/" # @param [String] source_id for a Shop::Customer def get_customer(source_id) raise ArgumentError.new("source_id is required") if source_id.blank? request(:get, "customers/#{source_id}") rescue JSON::ParserError raise CompanyAPI::InvalidResponse.new("Failed parsing the customer #{source_id} response body.", response: res, subdomain: site.subdomain) end # @param [String] uri will build to absolute URL # @param [Symbol] http_verb # @param [Hash] body optional # # @return [Faraday::Response] response object def request(verb, uri, params_or_body = {}) url = Addressable::URI.encode(uri) rate_limit_message = "Couldn't #{verb.to_s.upcase} #{url} for site: #{site.subdomain}" rate_limit.ensure do connection = Faraday.new(API_URL) do |f| f.headers["Content-Type"] = "application/json" f.headers["Authorization"] = "Bearer #{provider.authentication["access_token"]}" f.adapter Faraday.default_adapter end res = case verb when :get connection.get(url, params_or_body) when :post, :put connection.public_send(verb, url, params_or_body.to_json) end handle_errors(res, rate_limit_message) JSON.parse(res.body) end end def handle_errors(res, rate_limit_message) return unless res.status >= 400 case res.status when 400 raise CompanyAPI::BadRequest.new("The request cannot be fulfilled due to bad syntax", response: res) when 401 raise CompanyAPI::Unauthorized.new("The client needs a new refresh token", response: res) when 403 raise CompanyAPI::Forbidden.new("Not authorized to perform that action", response: res) when 404 raise CompanyAPI::ResourceNotFound.new("The requested resource could not be found", response: res) when 429 raise CompanyAPI::RateLimit.new(rate_limit_message, response: res) when 500 raise CompanyAPI::InternalServerError.new("Internal Server Error", response: res) end end def rate_limit @rate_limit ||= RateLimit.new(provider.rate_limit_key, limit: 25, window: 5, limit_2: 300, window_2: 60) end end
Usage explained
It is crucial to have two unique error classes, one for the rate limiter and one for the API. This way, we will know if something changes or if our code isn't working.