Rate Limited Async Loop

A recent project included some modest load testing. For this we created a small console application to hit our API over HTTPS. A key metric in load testing is the number of requests an endpoint can handle per second, so it’s useful to be able to control and configure the rate at which requests are made.

This in itself is not difficult: a basic sleep wait of duration 1/requests-per-sec will achieve this. However we had an additional constraint that called for a slightly more complex solution.

The application uses Auth0, an authentication-as-a-service provider, and it rate limits use of its API. Exceeding the rate results in failed HTTP requests, and if frequent enough, can result in users being blocked. Furthermore, it is a remote and relatively slow API, with round-trip times in the order of 3 seconds (i.e. fetching 100 users serially would take 5 minutes), so it’s important that we access it concurrently, up to our limit. Additionally, the token received from calling it is cachable until its expiry, and if we can get the token from our cache then we want to skip any sleep-wait in order to minimize running time.

This leads to the goal: to maximize the number of concurrent requests made to an API up to a fixed number of requests per second; and to use cached data (and therefore not use a request) where possible. To solve this I want a rate-limited concurrent loop.

Implementation

A little searching on the internet resulted in either extensive libraries that implemented a different paradigm, like Reactive, or things that didn’t quite meet my requirements. I therefore – having taking the appropriate remedies to treat potential Not-Invented-Here Syndrome – went ahead and put something together myself.

public class RateLimitedTaskProperties
{
    public bool IgnoreRateLimit { get; set; }
}

public static async Task RateLimitedLoop(int perSec, IEnumerable enumerable, Func<T, Task> action)
{
    int periodMs = 1000 / perSec;
    var tasks = new List();
    foreach(T item in enumerable)
    {
        T capture = item;
        Task task = action(capture);
        tasks.Add(task);

        if (task.IsCompleted && task.Result.IgnoreRateLimit)
            continue;

        System.Threading.Thread.Sleep(periodMs);
    }

    await Task.WhenAll(tasks);
}

The loop starts a new task every periodMs. Concurrency is achieve by using tasks, which are non-blocking, and waiting for their completion outside the loop with await Task.WhenAll(tasks). The case where something has been retrieved from a cache is handled by the task returning synchronously and setting the IgnoreRateLimit flag. This combination causes the loop to skip the sleep and move straight onto triggering the next task.

The following is an example of its use, where MyOperation() is a method that returns a flag indicating whether or not it performed a fetch from the rate-limited API.

const int tokenReqsPerSec = 5;
await RateLimitedLoop(tokenReqsPerSec, items, async(item) =>
{
    bool requiredFetch = await item.MyOperation();
    // don't rate limit if I got it from the cache (fetch wasn't required)
    return new RateLimitedTaskProperties { IgnoreRateLimit = !requiredFetch };
});