Orivel Orivel
Open menu

Rate Limiter with Sliding Window and Burst Allowance

Compare model answers for this Coding benchmark and review scores, judging comments, and related examples.

Login or register to use likes and favorites. Register

X f L

Contents

Task Overview

Benchmark Genres

Coding

Task Creator Model

Answering Models

Judge Models

Task Prompt

Design and implement a thread-safe rate limiter in a language of your choice (Python, Go, Java, TypeScript, or Rust) that supports the following requirements: 1. **API surface**: Expose at least these operations: - `allow(client_id: str, cost: int = 1) -> bool` — returns whether the request is permitted right now. - `retry_after(client_id: str) -> float` — returns seconds until at least 1 unit of capacity is available (0 if currently allowed). - A constructor that accepts per-client configuration: `rate`...

Show more

Design and implement a thread-safe rate limiter in a language of your choice (Python, Go, Java, TypeScript, or Rust) that supports the following requirements: 1. **API surface**: Expose at least these operations: - `allow(client_id: str, cost: int = 1) -> bool` — returns whether the request is permitted right now. - `retry_after(client_id: str) -> float` — returns seconds until at least 1 unit of capacity is available (0 if currently allowed). - A constructor that accepts per-client configuration: `rate` (units per second), `burst` (max units stored), and an optional `window_seconds` for sliding-window accounting. 2. **Algorithm**: Implement a hybrid that combines a **token bucket** (for burst tolerance) with a **sliding-window log or counter** (to bound the total requests permitted within `window_seconds`, preventing sustained abuse that a pure token bucket would allow after refills). A request is permitted only if both checks pass. Justify your data-structure choice for the sliding window (exact log vs. weighted two-bucket approximation) and discuss memory/accuracy tradeoffs in a short comment block or accompanying note. 3. **Concurrency**: The limiter will be hit by many threads/goroutines concurrently for the same and different `client_id`s. Avoid a single global lock becoming a bottleneck (e.g., per-client locks or lock striping). Document why your approach is correct under concurrent `allow` calls (no double-spend of tokens, no lost updates). 4. **Time source**: Make the clock injectable so tests are deterministic. Use a monotonic clock by default. 5. **Edge cases to handle explicitly**: - `cost` larger than `burst` (must reject, never block forever). - Clock going backwards or large pauses (e.g., suspended VM): clamp rather than crash, and don't grant unbounded tokens. - First-ever request for a new client (lazy initialization). - Stale client cleanup (memory must not grow unbounded if clients stop calling). - Fractional tokens / sub-millisecond timing. 6. **Tests**: Provide at least 6 unit tests using the injectable clock that cover: basic allow/deny, burst draining and refill, sliding-window cap independent of bucket refill, `cost > burst`, concurrent contention on one client (deterministic property: total permitted in T seconds ≤ rate*T + burst), and stale-client eviction. 7. **Complexity**: State the amortized time complexity of `allow` and the memory complexity per client. Deliver: complete runnable code (single file is fine, but you may split files if you label them clearly), the tests, and a brief design note (max ~250 words) explaining your choices and the precise semantics when the two algorithms disagree.

Task Context

This task targets backend/systems engineering skills. There are multiple valid solution strategies (pure token bucket + window log, leaky bucket + counter, weighted two-window approximation à la Cloudflare, lock-free with CAS, lock-striped maps, etc.), so quality will differ on correctness under concurrency, handling of subtle edge cases, clarity of the design note, and test coverage.

Judging Policy

A strong answer should: - Provide complete, runnable code in one of the listed languages with no missing imports or pseudo-code stubs. - Correctly implement both a token-bucket component and a sliding-window component, and only permit a request when both agree. Pure token bucket alone, or pure fixed-window counter, should be considered incomplete. - Be demonstrably thread-safe under concurrent calls for the same client without using a single global lock that serializes all clients. Per-client locking, lock stripin...

Show more

A strong answer should: - Provide complete, runnable code in one of the listed languages with no missing imports or pseudo-code stubs. - Correctly implement both a token-bucket component and a sliding-window component, and only permit a request when both agree. Pure token bucket alone, or pure fixed-window counter, should be considered incomplete. - Be demonstrably thread-safe under concurrent calls for the same client without using a single global lock that serializes all clients. Per-client locking, lock striping, or sound lock-free techniques are all acceptable; the answer must explain why no token double-spend can occur. - Handle every listed edge case explicitly: `cost > burst` rejection, monotonic/backwards-clock safety, lazy client init, stale-client eviction (e.g., LRU, TTL sweep, or size-bounded map), and fractional/sub-ms time. - Include an injectable clock and at least 6 meaningful tests, including one concurrent test asserting an upper-bound invariant (not just a happy-path check). Tests should pass as written. - Provide a concise design note covering: which sliding-window variant was chosen and why, the tie-breaking semantics when bucket and window disagree, amortized time complexity of `allow` (ideally O(1) amortized) and memory per client. - Be free of obvious bugs such as off-by-one in refill math, integer overflow on time arithmetic, or unbounded growth of a per-client log without trimming. Higher-quality answers will additionally discuss memory/accuracy tradeoffs between an exact request log and an approximate two-window weighted counter, and will write idiomatic, well-structured code rather than a single monolithic function. Penalize answers that ignore concurrency, omit tests, hand-wave edge cases, or merely paste a textbook token bucket.

Answers

import math
import threading
import time
import unittest
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field

Design note:

This limiter admits a request only when two independent checks agree:

1. a token bucket has at least cost tokens, allowing short bursts up to burst;

2. an exact sliding-window log has room for cost more admitted units in the last

window_seconds, where the window limit is rate * window_seconds.

If the token bucket says yes but the sliding window is full, the request is

denied and no tokens are spent; if the w...

Show Full Answer

import math
import threading
import time
import unittest
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field

Design note:

This limiter admits a request only when two independent checks agree:

1. a token bucket has at least cost tokens, allowing short bursts up to burst;

2. an exact sliding-window log has room for cost more admitted units in the last

window_seconds, where the window limit is rate * window_seconds.

If the token bucket says yes but the sliding window is full, the request is

denied and no tokens are spent; if the window has room but tokens are absent,

it is also denied. The exact log stores one entry per admitted request as

(timestamp, cost), not one entry per unit, so allow is amortized O(1): each log

entry is appended once and removed once. Memory is O(number of admitted requests

in the active window) per client. A weighted two-bucket counter would use O(1)

memory but can over- or under-admit near bucket boundaries; this implementation

chooses exactness because correctness is usually more important for abuse

prevention. The client map is lock-striped; each client state has its own lock.

A caller obtains the per-client lock before mutating tokens/window state, so

concurrent allow calls cannot double-spend tokens or lose window updates.

class MonotonicClock:
def now(self) -> float:
return time.monotonic()

class ManualClock:
def init(self, start: float = 0.0):
self._now = float(start)
self._lock = threading.Lock()

def now(self) -> float:
    with self._lock:
        return self._now

def set(self, value: float) -> None:
    with self._lock:
        self._now = float(value)

def advance(self, seconds: float) -> None:
    with self._lock:
        self._now += float(seconds)

@dataclass(frozen=True)
class ClientConfig:
rate: float
burst: float
window_seconds: float = 1.0

def validated(self) -> 'ClientConfig':
    rate = float(self.rate)
    burst = float(self.burst)
    window = float(self.window_seconds)
    if not math.isfinite(rate) or rate <= 0:
        raise ValueError('rate must be a positive finite number')
    if not math.isfinite(burst) or burst < 1:
        raise ValueError('burst must be a finite number >= 1')
    if not math.isfinite(window) or window <= 0:
        raise ValueError('window_seconds must be a positive finite number')
    return ClientConfig(rate, burst, window)

@property
def window_limit(self) -> float:
    return self.rate * self.window_seconds

@dataclass
class _ClientState:
config: ClientConfig
tokens: float
last_refill: float
last_seen: float
events: deque = field(default_factory=deque)
window_sum: float = 0.0
lock: threading.Lock = field(default_factory=threading.Lock)

class HybridRateLimiter:
def init(
self,
rate: float,
burst: float,
window_seconds: float = 1.0,
per_client_configs=None,
clock=None,
stripes: int = 64,
idle_ttl_seconds: float | None = None,
cleanup_interval_seconds: float | None = None,
):
if stripes <= 0:
raise ValueError('stripes must be positive')
self._default_config = ClientConfig(rate, burst, window_seconds).validated()
self._configs = {}
if per_client_configs:
for client_id, cfg in per_client_configs.items():
self._configs[client_id] = self._coerce_config(cfg)

    max_window = self._default_config.window_seconds
    for cfg in self._configs.values():
        max_window = max(max_window, cfg.window_seconds)

    if idle_ttl_seconds is None:
        idle_ttl_seconds = max(60.0, 2.0 * max_window)
    idle_ttl_seconds = float(idle_ttl_seconds)
    if not math.isfinite(idle_ttl_seconds) or idle_ttl_seconds <= 0:
        raise ValueError('idle_ttl_seconds must be positive and finite')
    if idle_ttl_seconds < max_window:
        raise ValueError('idle_ttl_seconds must be >= the largest window_seconds')
    self._idle_ttl = idle_ttl_seconds

    if cleanup_interval_seconds is None:
        cleanup_interval_seconds = min(60.0, max(1.0, idle_ttl_seconds / 2.0))
    cleanup_interval_seconds = float(cleanup_interval_seconds)
    if not math.isfinite(cleanup_interval_seconds) or cleanup_interval_seconds <= 0:
        raise ValueError('cleanup_interval_seconds must be positive and finite')
    self._cleanup_interval = cleanup_interval_seconds

    self._clock = clock if clock is not None else MonotonicClock()
    self._clients = {}
    self._stripe_locks = [threading.Lock() for _ in range(stripes)]
    self._cleanup_mutex = threading.Lock()
    self._last_cleanup = self._safe_time()
    self._eps = 1e-12

@staticmethod
def _coerce_config(cfg) -> ClientConfig:
    if isinstance(cfg, ClientConfig):
        return cfg.validated()
    if isinstance(cfg, dict):
        return ClientConfig(cfg['rate'], cfg['burst'], cfg.get('window_seconds', 1.0)).validated()
    rate, burst, *rest = cfg
    window = rest[0] if rest else 1.0
    return ClientConfig(rate, burst, window).validated()

def _safe_time(self) -> float:
    t = float(self._clock.now())
    if math.isnan(t):
        raise ValueError('clock returned NaN')
    return t

def _config_for(self, client_id: str) -> ClientConfig:
    return self._configs.get(client_id, self._default_config)

def _stripe_index(self, client_id: str) -> int:
    return hash(client_id) % len(self._stripe_locks)

def _get_state_locked(self, client_id: str) -> _ClientState:
    idx = self._stripe_index(client_id)
    stripe = self._stripe_locks[idx]
    stripe.acquire()
    try:
        state = self._clients.get(client_id)
        if state is None:
            cfg = self._config_for(client_id)
            now = self._safe_time()
            state = _ClientState(
                config=cfg,
                tokens=cfg.burst,
                last_refill=now,
                last_seen=now,
            )
            self._clients[client_id] = state
        state.lock.acquire()
        return state
    finally:
        stripe.release()

def _advance_and_prune_locked(self, state: _ClientState) -> float:
    raw_now = self._safe_time()
    now = max(raw_now, state.last_seen)

    elapsed = max(0.0, now - state.last_refill)
    max_useful_elapsed = state.config.burst / state.config.rate
    elapsed_for_tokens = min(elapsed, max_useful_elapsed)
    if elapsed_for_tokens > 0:
        state.tokens = min(
            state.config.burst,
            state.tokens + elapsed_for_tokens * state.config.rate,
        )
    state.last_refill = now
    state.last_seen = now

    cutoff = now - state.config.window_seconds
    while state.events and state.events[0][0] <= cutoff + self._eps:
        _, old_cost = state.events.popleft()
        state.window_sum -= old_cost
    if state.window_sum < self._eps:
        state.window_sum = 0.0
    return now

def allow(self, client_id: str, cost: int = 1) -> bool:
    if cost <= 0:
        raise ValueError('cost must be a positive integer')
    self._maybe_cleanup()

    cfg = self._config_for(client_id)
    if cost > cfg.burst:
        return False

    state = self._get_state_locked(client_id)
    try:
        now = self._advance_and_prune_locked(state)
        enough_tokens = state.tokens + self._eps >= cost
        enough_window = state.window_sum + cost <= state.config.window_limit + self._eps
        if not (enough_tokens and enough_window):
            return False
        state.tokens -= cost
        if state.tokens < self._eps:
            state.tokens = 0.0
        state.window_sum += cost
        state.events.append((now, float(cost)))
        return True
    finally:
        state.lock.release()

def retry_after(self, client_id: str) -> float:
    self._maybe_cleanup()
    state = self._get_state_locked(client_id)
    try:
        now = self._advance_and_prune_locked(state)
        cfg = state.config
        if cfg.burst < 1 or cfg.window_limit < 1:
            return math.inf
        token_wait = 0.0 if state.tokens + self._eps >= 1.0 else (1.0 - state.tokens) / cfg.rate

        if state.window_sum + 1.0 <= cfg.window_limit + self._eps:
            window_wait = 0.0
        else:
            window_wait = math.inf
            remaining = state.window_sum
            for ts, event_cost in state.events:
                remaining -= event_cost
                if remaining + 1.0 <= cfg.window_limit + self._eps:
                    window_wait = max(0.0, ts + cfg.window_seconds - now)
                    break
        return max(token_wait, window_wait)
    finally:
        state.lock.release()

def _maybe_cleanup(self) -> None:
    raw_now = self._safe_time()
    if raw_now - self._last_cleanup < self._cleanup_interval:
        return
    if not self._cleanup_mutex.acquire(blocking=False):
        return
    try:
        now = max(raw_now, self._last_cleanup)
        if now - self._last_cleanup >= self._cleanup_interval:
            self._cleanup_stale_at(now)
            self._last_cleanup = now
    finally:
        self._cleanup_mutex.release()

def cleanup_stale(self) -> int:
    with self._cleanup_mutex:
        now = max(self._safe_time(), self._last_cleanup)
        removed = self._cleanup_stale_at(now)
        self._last_cleanup = now
        return removed

def _cleanup_stale_at(self, now: float) -> int:
    removed = 0
    for idx, stripe in enumerate(self._stripe_locks):
        with stripe:
            for client_id, state in list(self._clients.items()):
                if self._stripe_index(client_id) != idx:
                    continue
                if not state.lock.acquire(blocking=False):
                    continue
                try:
                    if self._clients.get(client_id) is state and now - state.last_seen >= self._idle_ttl:
                        del self._clients[client_id]
                        removed += 1
                finally:
                    state.lock.release()
    return removed

def client_count(self) -> int:
    total = 0
    for stripe in self._stripe_locks:
        with stripe:
            total += len(self._clients)
            break
    return len(self._clients)

class HybridRateLimiterTests(unittest.TestCase):
def test_basic_allow_deny_and_retry_after(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=1, burst=2, window_seconds=10, clock=clock)
self.assertTrue(limiter.allow('alice'))
self.assertTrue(limiter.allow('alice'))
self.assertFalse(limiter.allow('alice'))
self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9)

def test_burst_draining_and_fractional_refill(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(rate=2, burst=3, window_seconds=10, clock=clock)
    self.assertTrue(limiter.allow('alice'))
    self.assertTrue(limiter.allow('alice'))
    self.assertTrue(limiter.allow('alice'))
    self.assertFalse(limiter.allow('alice'))
    clock.advance(0.25)
    self.assertFalse(limiter.allow('alice'))
    self.assertAlmostEqual(limiter.retry_after('alice'), 0.25, places=9)
    clock.advance(0.25)
    self.assertTrue(limiter.allow('alice'))

def test_sliding_window_caps_even_when_bucket_refilled(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(rate=10, burst=10, window_seconds=1, clock=clock)
    for _ in range(10):
        self.assertTrue(limiter.allow('abuser'))
    clock.advance(0.5)
    self.assertFalse(limiter.allow('abuser'))
    self.assertAlmostEqual(limiter.retry_after('abuser'), 0.5, places=9)
    clock.advance(0.5)
    self.assertTrue(limiter.allow('abuser'))

def test_cost_larger_than_burst_is_rejected_without_state_creation(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(rate=10, burst=5, window_seconds=10, clock=clock)
    self.assertFalse(limiter.allow('alice', cost=6))
    self.assertEqual(limiter.client_count(), 0)
    self.assertTrue(limiter.allow('alice', cost=5))
    self.assertFalse(limiter.allow('alice'))

def test_concurrent_contention_obeys_token_bucket_bound(self):
    clock = ManualClock()
    rate = 5
    burst = 5
    limiter = HybridRateLimiter(rate=rate, burst=burst, window_seconds=100, clock=clock, stripes=8)
    successes = 0
    for tick in range(11):
        clock.set(tick / 10.0)
        with ThreadPoolExecutor(max_workers=32) as pool:
            results = list(pool.map(lambda _: limiter.allow('hot'), range(32)))
        successes += sum(1 for r in results if r)
    bound = rate * 1.0 + burst
    self.assertLessEqual(successes, bound)

def test_stale_client_eviction(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(
        rate=10,
        burst=10,
        window_seconds=1,
        clock=clock,
        idle_ttl_seconds=5,
        cleanup_interval_seconds=1,
    )
    self.assertTrue(limiter.allow('old-client'))
    self.assertEqual(limiter.client_count(), 1)
    clock.advance(6)
    self.assertTrue(limiter.allow('new-client'))
    self.assertEqual(limiter.client_count(), 1)

def test_clock_going_backwards_is_clamped(self):
    clock = ManualClock(10)
    limiter = HybridRateLimiter(rate=1, burst=1, window_seconds=10, clock=clock)
    self.assertTrue(limiter.allow('alice'))
    clock.set(5)
    self.assertFalse(limiter.allow('alice'))
    self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9)

def test_large_pause_refills_only_to_burst(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(rate=1, burst=3, window_seconds=1000, clock=clock)
    for _ in range(3):
        self.assertTrue(limiter.allow('alice'))
    clock.advance(10 ** 9)
    for _ in range(3):
        self.assertTrue(limiter.allow('alice'))
    self.assertFalse(limiter.allow('alice'))

if name == 'main':
unittest.main()

Result

#1 | Winner

Winning Votes

3 / 3

Average Score

89

Total Score

92

Overall Comments

Answer A provides a production-quality, sophisticated, and robust implementation of the hybrid rate limiter. Its key strengths are the highly scalable concurrency model using lock striping combined with per-client locks, an elegant cleanup mechanism that avoids a background thread, and a comprehensive, well-designed test suite that correctly validates complex behaviors like concurrent contention. The code is clean, well-structured, and fully adheres to all prompt requirements, including advanced features like per-client configurations.

View Score Details

Correctness

Weight 35%
90

The implementation is highly correct and robust. The concurrency model using lock striping and per-client locks is a sophisticated and correct way to avoid bottlenecks. All edge cases, including clock safety and fractional refills, are handled correctly. The logic for both the token bucket and sliding window is sound.

Completeness

Weight 20%
95

This answer is exceptionally complete. It implements all required API methods, handles every specified edge case, provides a comprehensive test suite with more than the required number of tests, and includes a clear design note. It also implements the suggested per-client configuration, demonstrating a full understanding of the prompt.

Code Quality

Weight 20%
90

The code quality is excellent. It uses modern Python features like dataclasses effectively, is well-structured into logical methods, and is highly readable. The design choices, such as the integrated cleanup mechanism and lock striping, are elegant and demonstrate a high level of engineering skill.

Practical Value

Weight 15%
90

The implementation is of production-grade quality and highly practical. The scalable concurrency model makes it suitable for high-throughput systems. The flexible configuration and robust handling of edge cases mean it could be deployed with confidence in a real-world application.

Instruction Following

Weight 10%
100

The answer follows all instructions perfectly. It provides code in a requested language, implements the specified API, algorithm, and concurrency strategy, handles all edge cases, includes the required tests and complexity analysis, and provides a design note within the specified constraints.

Judge Models OpenAI GPT-5.4

Total Score

89

Overall Comments

Answer A is a strong, mostly complete implementation that directly matches the requested API and semantics. It combines a token bucket with an exact sliding-window log, uses an injectable monotonic clock, avoids a single global bottleneck through striped map locks plus per-client locks, and includes solid deterministic tests covering the required edge cases. The design note is concise and addresses tradeoffs, concurrency correctness, and complexity. Minor weaknesses include a somewhat awkward client_count implementation and retry_after only reasoning about 1-unit availability rather than arbitrary cost, though that matches the requested API.

View Score Details

Correctness

Weight 35%
88

Implements the required hybrid correctly: request passes only if both token bucket and exact sliding window allow it, with proper token refill clamping, lazy init, stale cleanup, and per-client serialized mutation preventing double-spend. retry_after is coherent for 1-unit availability, and tests cover important invariants. Minor concerns are small implementation quirks such as client_count reading the map without full synchronization and some boundary choices in window eviction.

Completeness

Weight 20%
93

Covers nearly all requested items: required API surface, hybrid algorithm, injectable monotonic clock, per-client configuration support, explicit edge cases, stale-client cleanup, fractional timing, concurrency discussion, complexity note, and 8 meaningful tests including deterministic concurrent contention and eviction. This is very close to fully satisfying the prompt.

Code Quality

Weight 20%
84

Well-structured and idiomatic, with dataclasses, separated helpers, explicit validation, and clear naming. The locking strategy is thoughtfully organized and the design note is concise. A few rough edges remain, such as the unusual client_count implementation and some low-level lock management that could be simplified.

Practical Value

Weight 15%
87

Practical for backend use: deterministic clock injection, bounded stale-state cleanup, lock striping plus per-client locks for contention, exact window accounting, and solid tests make it deployable or adaptable. The implementation is realistic and addresses common production edge cases like large pauses and VM suspension.

Instruction Following

Weight 10%
95

Follows the prompt closely: language choice is valid, API names match, design note is within scope, tests use the injectable clock, and requested edge cases are explicitly addressed. The answer aligns well with both the technical and documentation requirements.

Total Score

85

Overall Comments

Answer A is a polished, production-quality hybrid rate limiter. It uses dataclasses, stripe locks plus per-client locks, integrates stale cleanup into allow calls (avoiding background thread issues), validates configuration, properly clamps elapsed time for both backwards-clock and large pauses, and rejects cost>burst without creating client state. The 8 tests use the injectable ManualClock deterministically, including a concurrent contention test that genuinely asserts the rate*T+burst invariant. The design note is concise and addresses all required points.

View Score Details

Correctness

Weight 35%
85

Correctly implements hybrid token bucket + exact sliding window log. Handles clock-backwards by clamping via max(raw_now, last_seen), rejects cost>burst without creating state, clamps elapsed for tokens to avoid overflow, uses per-client locks with stripe locks for the map. retry_after correctly computes wait time based on window log expiration. The eps handling and event accounting look sound.

Completeness

Weight 20%
90

Covers all required edge cases: cost>burst (without state creation), clock-backwards (test included), large pauses (test included), lazy init, stale eviction (deterministic via _maybe_cleanup tied to injected clock), fractional tokens. 8 tests including concurrent bound test that asserts the rate*T+burst invariant deterministically using ManualClock.

Code Quality

Weight 20%
80

Well-structured with dataclasses, separation of MonotonicClock/ManualClock, ClientConfig validation, per-client and stripe locks clearly explained. Cleanup is integrated into allow path (no background thread complications). Concise design note at top.

Practical Value

Weight 15%
80

Production-quality: stripe locks scale better, integrated lazy cleanup avoids background thread lifecycle, deterministic test support, config validation. Suitable for embedding in a real service.

Instruction Following

Weight 10%
90

Follows all instructions: API surface matches signatures, hybrid algorithm, per-client locks with stripe locks, injectable monotonic clock, all edge cases, 8 tests including deterministic concurrent invariant test, design note with semantics and complexity stated.

import threading
import time
import collections

Injectable clock for deterministic testing

class MonotonicClock:
def time(self):
return time.monotonic()

class TestClock:
def init(self, initial_time=0.0):
self._time = initial_time
self._lock = threading.Lock()

def time(self):
    with self._lock:
        return self._time

def advance(self, seconds):
    with self._lock:
        self._time += seconds

def set_time(self, new_time):
    with self._lock:
        self._time = new_time

Client state for the rate limiter

class ClientState:...

Show Full Answer

import threading
import time
import collections

Injectable clock for deterministic testing

class MonotonicClock:
def time(self):
return time.monotonic()

class TestClock:
def init(self, initial_time=0.0):
self._time = initial_time
self._lock = threading.Lock()

def time(self):
    with self._lock:
        return self._time

def advance(self, seconds):
    with self._lock:
        self._time += seconds

def set_time(self, new_time):
    with self._lock:
        self._time = new_time

Client state for the rate limiter

class ClientState:
def init(self, rate: float, burst: int, window_seconds: int, current_time: float):
self.lock = threading.Lock()
self.tokens = float(burst) # Current tokens in the bucket
self.last_refill_time = current_time # Last time tokens were refilled
self.burst = burst # Max tokens
self.rate = rate # Tokens per second

    # Sliding window log: stores (timestamp, cost) tuples of allowed requests
    # We use a deque for efficient O(1) append and popleft
    self.request_timestamps = collections.deque()
    self.current_window_cost = 0 # Sum of costs in the current window
    self.window_seconds = window_seconds

    self.last_access_time = current_time # For stale client eviction

def _refill_tokens(self, current_time: float):
    # Clamp last_refill_time to current_time to prevent negative time delta
    # and thus unbounded token grants if clock goes backwards (though monotonic clock prevents this)
    # or if there's a huge pause (e.g., suspended VM).
    if current_time < self.last_refill_time:
        self.last_refill_time = current_time

    time_delta = current_time - self.last_refill_time
    if time_delta > 0:
        refill_amount = time_delta * self.rate
        self.tokens = min(self.burst, self.tokens + refill_amount)
        self.last_refill_time = current_time

def _clean_window(self, current_time: float):
    # Remove (timestamp, cost) tuples older than window_seconds
    while self.request_timestamps and self.request_timestamps[0][0] <= current_time - self.window_seconds:
        _, removed_cost = self.request_timestamps.popleft()
        self.current_window_cost -= removed_cost

class RateLimiter:
def init(self, rate: float, burst: int, window_seconds: int = 60, clock=None, cleanup_interval: float = 300.0, client_ttl_multiplier: float = 2.0):
if rate <= 0 or burst < 0 or window_seconds <= 0:
raise ValueError("Rate must be positive, burst non-negative, window_seconds positive.")

    self.rate = rate
    self.burst = burst
    self.window_seconds = window_seconds
    self.clock = clock if clock is not None else MonotonicClock()

    self.clients = {} # client_id -> ClientState
    self._clients_lock = threading.Lock() # Protects access to the clients dictionary

    # Stale client cleanup
    self.cleanup_interval = cleanup_interval
    self.client_ttl_multiplier = client_ttl_multiplier # e.g., 2.0 * window_seconds
    self._cleanup_thread = None
    self._stop_cleanup_event = threading.Event()
    self._start_cleanup_thread()

def _get_or_create_client_state(self, client_id: str) -> ClientState:
    with self._clients_lock:
        if client_id not in self.clients:
            self.clients[client_id] = ClientState(self.rate, self.burst, self.window_seconds, self.clock.time())
        return self.clients[client_id]

def allow(self, client_id: str, cost: int = 1) -> bool:
    if cost <= 0:
        return True # Zero or negative cost requests are always allowed (or could raise error)

    client_state = self._get_or_create_client_state(client_id)
    current_time = self.clock.time()

    with client_state.lock:
        # Edge case: cost > burst. A single request cannot exceed the max capacity of the bucket.
        if cost > client_state.burst:
            return False

        # Update last access time for cleanup
        client_state.last_access_time = current_time

        # Token Bucket check
        client_state._refill_tokens(current_time)
        if client_state.tokens < cost:
            return False # Not enough tokens

        # Sliding Window check
        client_state._clean_window(current_time)
        # Window capacity is `rate * window_seconds` units.
        if client_state.current_window_cost + cost > self.rate * self.window_seconds:
            return False # Window capacity exceeded

        # Both checks passed. Consume tokens and log request.
        client_state.tokens -= cost
        client_state.request_timestamps.append((current_time, cost))
        client_state.current_window_cost += cost
        return True

def retry_after(self, client_id: str) -> float:
    client_state = self._get_or_create_client_state(client_id)
    current_time = self.clock.time()

    with client_state.lock:
        # Update last access time for cleanup
        client_state.last_access_time = current_time

        client_state._refill_tokens(current_time)
        client_state._clean_window(current_time)

        # Check token bucket first for 1 unit of cost
        if client_state.tokens >= 1: 
            # Token bucket allows, now check sliding window for 1 unit
            if client_state.current_window_cost + 1 <= self.rate * self.window_seconds:
                return 0.0 # Currently allowed for 1 unit
            else:
                # Sliding window denies for 1 unit. Find when oldest request in window expires.
                needed_to_remove = (client_state.current_window_cost + 1) - (self.rate * self.window_seconds)
                if needed_to_remove <= 0: # Should not happen if we are in this else branch
                    return 0.0

                removed_sum = 0
                for ts, c in client_state.request_timestamps:
                    removed_sum += c
                    if removed_sum >= needed_to_remove:
                        return max(0.0, (ts + self.window_seconds) - current_time)
                return 0.0 # Should not be reached if window denies
        else:
            # Token bucket denies. Calculate time until enough tokens are available for 1 unit.
            tokens_needed = 1 - client_state.tokens
            time_to_refill = tokens_needed / self.rate
            return max(0.0, time_to_refill)

def _cleanup_clients(self):
    while not self._stop_cleanup_event.wait(self.cleanup_interval):
        current_time = self.clock.time()
        stale_threshold = current_time - (self.window_seconds * self.client_ttl_multiplier)
        
        clients_to_remove = []
        with self._clients_lock:
            for client_id, state in self.clients.items():
                # Check last_access_time without acquiring client's lock to avoid potential deadlocks
                # with concurrent `allow` calls. A slight inaccuracy in cleanup timing is acceptable.
                if state.last_access_time < stale_threshold:
                    clients_to_remove.append(client_id)
            
            for client_id in clients_to_remove:
                # Remove from the dictionary. Subsequent `allow` calls will re-create if needed.
                if client_id in self.clients: # Check again in case another thread already removed it
                    del self.clients[client_id]

def _start_cleanup_thread(self):
    if self._cleanup_thread is None or not self._cleanup_thread.is_alive():
        self._cleanup_thread = threading.Thread(target=self._cleanup_clients, daemon=True)
        self._cleanup_thread.start()

def stop_cleanup_thread(self):
    self._stop_cleanup_event.set()
    if self._cleanup_thread:
        self._cleanup_thread.join()
        self._cleanup_thread = None

Design Note:

This rate limiter combines a token bucket with a sliding-window log to provide robust rate limiting.

Algorithm Justification:

The token bucket handles burst tolerance, allowing requests to exceed the average rate temporarily

up to the burst capacity. Tokens refill at rate per second. This prevents short-term spikes.

The sliding-window log (implemented with a collections.deque storing (timestamp, cost) tuples)

prevents sustained abuse that a pure token bucket might allow after refills. It ensures that the

total cost of requests within any window_seconds period does not exceed rate * window_seconds.

This hybrid approach ensures both short-term burstiness and long-term rate adherence.

Data Structure Choice for Sliding Window:

An exact sliding-window log is used. While it consumes more memory than a weighted two-bucket

approximation (like Cloudflare's), it offers perfect accuracy within the defined window. For a

per-client limiter, the memory footprint O(rate * window_seconds) per active client (storing

(timestamp, cost) tuples) is generally acceptable, especially with the stale client eviction

mechanism. The deque provides O(1) amortized time for adding and removing elements from its ends,

making window maintenance efficient.

Concurrency:

Thread safety is achieved using per-client locks. Each ClientState object has its own threading.Lock.

This prevents a single global lock from becoming a bottleneck, allowing concurrent allow calls for

different client_ids to proceed in parallel. For a specific client_id, its lock ensures that

operations on its tokens, last_refill_time, request_timestamps, and current_window_cost are atomic.

This guarantees no double-spend of tokens or lost updates for that client. The self._clients_lock

protects the self.clients dictionary itself, ensuring safe addition/removal of client states.

Semantics when algorithms disagree:

A request is permitted only if both the token bucket check and the sliding-window log check pass.

If the token bucket has enough capacity but the sliding window indicates too many units have been

consumed recently, the request is denied. Conversely, if the sliding window has space but the

token bucket is empty, the request is also denied. This ensures the stricter of the two limits

is always enforced.

Time Complexity:

- allow: Amortized O(1). Token bucket operations are O(1). Sliding window deque operations

(append, popleft) are O(1) amortized. In the worst case, _clean_window might iterate

through many elements if the clock jumps significantly, but this is amortized over time.

- retry_after: Amortized O(N) in the worst case for the sliding window part, where N is the number

of requests in the window, as it might iterate through the deque to find the expiration point.

However, N is bounded by rate * window_seconds.

Memory Complexity:

- Per client: O(rate * window_seconds) for storing (timestamp, cost) tuples in the sliding window log.

Plus constant memory for token bucket state and lock.

- Total: O(Number of active clients * rate * window_seconds). Stale client cleanup prevents unbounded growth.

Unit Tests

import unittest
import time
import threading
import collections

Re-define TestClock here for self-contained test file

class TestClock:
def init(self, initial_time=0.0):
self._time = initial_time
self._lock = threading.Lock()

def time(self):
    with self._lock:
        return self._time

def advance(self, seconds):
    with self._lock:
        self._time += seconds

def set_time(self, new_time):
    with self._lock:
        self._time = new_time

class RateLimiterTest(unittest.TestCase):
def setUp(self):
self.clock = TestClock()
# Use a short cleanup interval for tests to make eviction testable
self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)

def tearDown(self):
    self.limiter.stop_cleanup_thread()

def test_basic_allow_deny(self):
    client_id = "client1"
    # Initial burst of 5 + 10 tokens/sec * 0 sec = 5 tokens
    # Allow 5 requests
    for _ in range(5):
        self.assertTrue(self.limiter.allow(client_id))
    
    # Deny 6th request (burst exhausted)
    self.assertFalse(self.limiter.allow(client_id))
    self.assertGreater(self.limiter.retry_after(client_id), 0)

    # Advance time to allow refill
    self.clock.advance(0.1) # 1 token refilled (10 tokens/sec * 0.1 sec)
    self.assertTrue(self.limiter.allow(client_id)) # Should allow 1
    self.assertFalse(self.limiter.allow(client_id)) # Deny 2nd

    self.clock.advance(0.9) # 9 more tokens refilled, total 10 tokens refilled since initial burst exhausted
    for _ in range(9):
        self.assertTrue(self.limiter.allow(client_id))
    self.assertFalse(self.limiter.allow(client_id)) # Deny 10th

def test_burst_draining_and_refill(self):
    client_id = "client2"
    # Burst: 5, Rate: 10/s
    
    # Drain burst
    for _ in range(5):
        self.assertTrue(self.limiter.allow(client_id))
    self.assertFalse(self.limiter.allow(client_id)) # Burst exhausted

    # Advance time for partial refill
    self.clock.advance(0.2) # 2 tokens refilled (10 * 0.2)
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertFalse(self.limiter.allow(client_id)) # Refill exhausted

    # Advance time for full refill (up to burst capacity)
    self.clock.advance(1.0) # 10 tokens refilled. Bucket should be capped at burst (5).
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertFalse(self.limiter.allow(client_id)) # Burst exhausted again

def test_sliding_window_cap_independent_of_bucket_refill(self):
    # Reset limiter and client for this specific test scenario
    self.limiter.stop_cleanup_thread() 
    self.limiter = RateLimiter(rate=10, burst=10, window_seconds=1, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)
    self.clock.set_time(0.0)
    client_id = "client_window_test"
    
    # Allow 10 requests within a very short time (burst allows this)
    for _ in range(10):
        self.assertTrue(self.limiter.allow(client_id))
    
    # Window capacity is 10 units (10 units/s * 1s).
    # We have 10 requests in the window, total cost 10.
    # Next request should be denied by window, even if bucket has tokens.
    self.assertFalse(self.limiter.allow(client_id))
    self.assertGreater(self.limiter.retry_after(client_id), 0)
    
    # Advance time by 0.5s. Still denied by window.
    self.clock.advance(0.5)
    self.assertFalse(self.limiter.allow(client_id))
    
    # Advance time by 0.5s more. Total 1.0s since the first request (at time 0.0).
    # The first request (at time 0.0) should now be out of the window.
    self.clock.advance(0.5)
    self.assertTrue(self.limiter.allow(client_id)) # Should allow, as one request expired from window.
    self.assertEqual(self.limiter.clients[client_id].current_window_cost, 10) # 10 requests in window, 10 units.

def test_cost_greater_than_burst(self):
    client_id = "client4"
    # Burst is 5. Cost 6 should be rejected.
    self.assertFalse(self.limiter.allow(client_id, cost=6))
    self.assertEqual(self.limiter.clients[client_id].tokens, 5) # Tokens should not be consumed
    self.assertEqual(self.limiter.clients[client_id].current_window_cost, 0) # Window should be empty

    # Cost 5 should be allowed
    self.assertTrue(self.limiter.allow(client_id, cost=5))
    self.assertEqual(self.limiter.clients[client_id].tokens, 0)
    self.assertEqual(self.limiter.clients[client_id].current_window_cost, 5)

def test_concurrent_contention_on_one_client(self):
    self.limiter.stop_cleanup_thread()
    self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)
    self.clock.set_time(0.0)
    
    client_id = "client_fixed_duration"
    test_duration = 2.0 # seconds
    num_threads = 10
    
    allowed_count = [0] # Use a list to make it mutable in nested scope
    
    def fixed_duration_worker():
        start_time = self.clock.time()
        while self.clock.time() < start_time + test_duration:
            if self.limiter.allow(client_id):
                with threading.Lock(): # Protect shared counter
                    allowed_count[0] += 1
            # Simulate some work and advance clock slightly
            self.clock.advance(0.001) # Each attempt takes 1ms

    threads = []
    for _ in range(num_threads):
        t = threading.Thread(target=fixed_duration_worker)
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    
    # Max allowed by token bucket: burst + rate * test_duration = 5 + 10 * 2 = 25.
    # Max allowed by window: rate * window_seconds = 10 * 10 = 100.
    # So, the token bucket is the stricter bottleneck here. Expected max allowed is 25.
    
    self.assertLessEqual(allowed_count[0], 25)
    # Due to non-deterministic scheduling and clock advancement by multiple threads,
    # asserting a precise lower bound is difficult. We primarily verify the upper bound.
    self.assertGreater(allowed_count[0], 15) # Should allow a significant number of requests

def test_stale_client_eviction(self):
    client_id_active = "active_client"
    client_id_stale = "stale_client"
    
    self.limiter.stop_cleanup_thread()
    self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)
    self.clock.set_time(0.0) # Reset clock
    
    # Access active client at time 0.0
    self.assertTrue(self.limiter.allow(client_id_active))
    
    # Access stale client at time 0.0
    self.assertTrue(self.limiter.allow(client_id_stale))
    
    # Advance time to just before stale_threshold for stale_client
    # Stale threshold is 1.5 * 10 = 15 seconds after last access.
    # So, stale_client will be stale at time 15.0.
    self.clock.advance(14.0) # Time is now 14.0
    
    # Access active client again to update its last_access_time
    self.assertTrue(self.limiter.allow(client_id_active)) # last_access_time for active_client is now 14.0
    
    # Advance time past stale_threshold for stale_client
    self.clock.advance(1.5) # Time is now 15.5. stale_client's last_access_time (0.0) is < 15.5 - 15 = 0.5.
    
    time.sleep(self.limiter.cleanup_interval * 2) # Give cleanup thread time to run
    
    self.assertNotIn(client_id_stale, self.limiter.clients)
    self.assertIn(client_id_active, self.limiter.clients) # Active client should still be there

def test_retry_after(self):
    client_id = "client_retry"
    
    # Initial state: 5 burst tokens, 0 window cost.
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0)
    
    # Consume burst
    for _ in range(5):
        self.limiter.allow(client_id)
    
    # No tokens left. Need 1 token. Rate is 10/s. So, 0.1s needed.
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.1)
    
    self.clock.advance(0.05) # Advance half the time needed
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.05)
    
    self.clock.advance(0.05) # Advance remaining time. Now 0.1s passed. 1 token refilled.
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0)
    self.assertTrue(self.limiter.allow(client_id)) # Consume the token
    
    # Now test retry_after for window limit
    self.limiter.stop_cleanup_thread()
    self.limiter = RateLimiter(rate=10, burst=10, window_seconds=1, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)
    self.clock.set_time(0.0)
    
    # Fill up the window (10 units in 1 second window)
    for i in range(10):
        self.clock.advance(0.01) # Small advance to make timestamps distinct
        self.limiter.allow(client_id)
    
    # Current time is approx 0.1s. All 10 requests are in the window.
    # Window capacity is 10 units.
    # Next request (cost 1) should be denied by window.
    self.assertGreater(self.limiter.retry_after(client_id), 0.0)
    
    # The first request was at time ~0.01. It will expire at ~0.01 + 1 = ~1.01.
    # Current time is ~0.1. So, retry_after should be ~1.01 - ~0.1 = ~0.91.
    first_req_ts = self.limiter.clients[client_id].request_timestamps[0][0]
    expected_retry = (first_req_ts + self.limiter.window_seconds) - self.clock.time()
    self.assertAlmostEqual(self.limiter.retry_after(client_id), expected_retry, places=2)
    
    # Advance time until the first request expires
    self.clock.advance(expected_retry + 0.001) # Advance slightly past expiration
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0, places=2)
    self.assertTrue(self.limiter.allow(client_id)) # Should now be allowed

def test_clock_going_backwards_or_large_pauses(self):
    # Monotonic clock prevents going backwards, but large pauses are handled by clamping.
    # TestClock can simulate backwards movement.
    self.limiter.stop_cleanup_thread()
    self.clock = TestClock(initial_time=100.0)
    self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock)
    client_id = "client_time_warp"

    self.assertTrue(self.limiter.allow(client_id)) # Time 100.0, tokens 4, window cost 1
    self.clock.set_time(50.0) # Clock goes backwards
    
    # Refill should clamp last_refill_time to current_time (50.0), no extra tokens granted.
    # Tokens should still be 4 (from previous allow). The refill logic will see time_delta=0.
    self.assertTrue(self.limiter.allow(client_id)) # Should still allow, tokens 3.
    self.assertEqual(self.limiter.clients[client_id].tokens, 3)
    
    # Now advance time normally, tokens should refill from 50.0
    self.clock.set_time(51.0) # 1 second passed from clamped time
    self.assertTrue(self.limiter.allow(client_id)) # Tokens 3 + 10 (refill) - 1 (cost) = 12. Capped at burst 5. So 5 - 1 = 4.
    self.assertEqual(self.limiter.clients[client_id].tokens, 4)

def test_first_ever_request_lazy_initialization(self):
    client_id = "new_client"
    self.assertNotIn(client_id, self.limiter.clients)
    self.assertTrue(self.limiter.allow(client_id))
    self.assertIn(client_id, self.limiter.clients)
    self.assertEqual(self.limiter.clients[client_id].tokens, 4) # 5 (burst) - 1 (cost)
    self.assertEqual(self.limiter.clients[client_id].current_window_cost, 1)

def test_fractional_tokens_sub_millisecond_timing(self):
    self.limiter.stop_cleanup_thread()
    self.limiter = RateLimiter(rate=1, burst=1, window_seconds=10, clock=self.clock)
    client_id = "fractional_client"
    
    self.assertTrue(self.limiter.allow(client_id)) # Tokens 0
    self.assertFalse(self.limiter.allow(client_id)) # No tokens
    
    self.clock.advance(0.0001) # 0.0001 tokens refilled
    self.assertFalse(self.limiter.allow(client_id)) # Not enough for cost 1
    
    self.clock.advance(0.9999) # Total 1.0s passed. 1 token refilled.
    self.assertTrue(self.limiter.allow(client_id)) # Allowed. Tokens 0.

Result

#2

Winning Votes

0 / 3

Average Score

69

Total Score

76

Overall Comments

Answer B delivers a functional and mostly correct implementation, accompanied by a very detailed design note. It correctly implements the hybrid algorithm and uses a standard per-client locking strategy for thread safety. However, it has several weaknesses compared to Answer A: its concurrency model is less scalable due to a global lock on the main client dictionary, it relies on a background thread for cleanup which adds complexity, and some of its unit tests are flawed (the concurrency test) or potentially flaky (the eviction test). It also omits the suggested per-client configuration feature.

View Score Details

Correctness

Weight 35%
75

The core rate-limiting logic is correct. However, the concurrency model, while functional, is less robust than A's; the global lock on the client dictionary can become a bottleneck under high client churn. The concurrent test has a flawed design where multiple threads advance a shared clock, making its results unreliable. The `retry_after` logic is O(N) in the worst case, which is a minor correctness/performance issue.

Completeness

Weight 20%
80

The answer is largely complete, providing the core API, tests, and a design note. However, it fails to implement the per-client configuration feature suggested in the prompt's constructor requirements, offering only a single global configuration. This is a notable omission.

Code Quality

Weight 20%
70

The code quality is good but not exceptional. The use of a background thread for cleanup adds complexity and resource management overhead compared to A's approach. The combination of a global lock for the client map and per-client locks is less elegant than A's pure lock striping. The test code also contains a duplicated class definition.

Practical Value

Weight 15%
75

The implementation is practical for many use cases. However, the global lock on the client dictionary limits its scalability in scenarios with a high rate of new clients. The reliance on a background thread also makes it slightly more complex to integrate and manage in some application environments.

Instruction Following

Weight 10%
85

The answer follows most instructions well. The main deviation is not implementing a constructor that accepts per-client configuration, which was explicitly mentioned as a requirement for the API surface. The design note is also significantly longer than the suggested word count, though it is thorough.

Judge Models OpenAI GPT-5.4

Total Score

64

Overall Comments

Answer B shows good intent and includes a hybrid limiter, design discussion, and many tests, but it falls short on several key requirements. Its API does not support per-client configuration in the constructor, it uses a global clients lock for all lookups/creation, stale cleanup depends on a real background thread rather than deterministic clock-driven cleanup, and several behaviors are incorrect or underspecified, such as allowing non-positive cost instead of rejecting and clamping backwards time by moving last_refill_time backwards. Some tests are nondeterministic or rely on real sleep, reducing benchmark reliability.

View Score Details

Correctness

Weight 35%
60

Core hybrid behavior is present, but there are correctness issues: non-positive cost is allowed instead of explicitly rejected, backwards clock handling rewinds last_refill_time instead of clamping to prior state semantics, stale cleanup can remove clients based on unlocked reads, and cleanup behavior depends on a background thread with real time. Concurrency on the client map still funnels through a global lock for lookup/creation. Some tests assume behaviors that are not fully deterministic.

Completeness

Weight 20%
70

Includes runnable code, hybrid logic, design note, and more than 6 tests, but misses the constructor requirement for per-client configuration and does not provide deterministic cleanup/testing for stale eviction. It handles many edge cases, yet some are handled in a way that conflicts with the prompt, especially cost validation and time behavior. Completeness is decent but notably incomplete versus the spec.

Code Quality

Weight 20%
68

Readable and organized, with clear classes and comments, but quality is reduced by duplicated TestClock definitions, mixing production code and tests in a somewhat repetitive way, and reliance on a background cleanup thread that complicates reasoning and testing. Several API and validation choices are also inconsistent with the requested contract.

Practical Value

Weight 15%
62

Useful as a teaching or prototype implementation, but less practical in production because cleanup is tied to a background thread and real-time sleeping, constructor customization is limited, and the global clients lock can become a point of contention. Some edge-case behavior would need refinement before dependable use.

Instruction Following

Weight 10%
65

Partially follows instructions, but misses the per-client configuration constructor requirement and deviates from the specified allow contract by permitting zero/negative cost. Tests are not fully deterministic due to real sleep/background cleanup, and the concurrency approach does not fully reflect the prompt's emphasis on avoiding a global bottleneck.

Total Score

65

Overall Comments

Answer B implements the required hybrid algorithm with per-client locks and covers the listed edge cases with many tests. However, it has notable defects: the concurrent test uses `with threading.Lock():` which creates a fresh lock per increment (so the upper-bound check isn't reliably protected), stale-client cleanup uses a real-time background thread requiring time.sleep in tests (non-deterministic and contradicts the injectable-clock goal), and lifecycle management (stop_cleanup_thread) is awkward. The design note is thorough but somewhat verbose.

View Score Details

Correctness

Weight 35%
60

Implements hybrid correctly in basic flow, but has issues: the concurrent contention test uses `with threading.Lock():` creating a NEW lock each time (not protecting the counter), so the assertion is not properly validated. Also creates client state in retry_after even when cost>burst check isn't done. The clock-backwards handling clamps last_refill_time but the test expects refill from 50.0->51.0 to give exactly 4 tokens — the math comment is inconsistent. Uses a daemon thread for cleanup which makes deterministic tests rely on real time.sleep. Generally functional but less robust.

Completeness

Weight 20%
75

Covers all listed edge cases with dedicated tests including lazy init, fractional, clock-backwards, stale eviction. However stale eviction relies on real time.sleep with a background thread which is non-deterministic. Concurrent test asserts only upper bound but the locking bug undermines it. 9 tests but quality is mixed.

Code Quality

Weight 20%
60

Reasonable structure but mixes background cleanup thread with injectable clock (cleanup uses self.clock.time() but waits on real time via Event.wait). Re-defines TestClock in test section. The concurrent test has a clear bug (`with threading.Lock():` creates new lock each call). Design note is more verbose. Some inconsistencies in comments vs code.

Practical Value

Weight 15%
65

Functional, but background daemon thread for cleanup complicates lifecycle (needs explicit stop_cleanup_thread). Tests rely on real sleep. Still usable but less robust for production.

Instruction Following

Weight 10%
75

Follows most instructions but the concurrent test doesn't deterministically validate the invariant due to the lock bug, and stale-client test uses real-time sleep rather than purely the injectable clock. Design note covers required points.

Comparison Summary

Final rank order is determined by judge-wise rank aggregation (average rank + Borda tie-break). Average score is shown for reference.

Judges: 3

Winning Votes

3 / 3

Average Score

89
View this answer

Winning Votes

0 / 3

Average Score

69
View this answer

Judging Results

Why This Side Won

Answer A is more correct and robust under concurrency, with deterministic tests that actually rely on the injectable clock (including the concurrent invariant test), better lifecycle (no background thread), and stricter edge-case handling (cost>burst without state creation, explicit clamping of elapsed time). Answer B has a buggy concurrent test (new lock per increment) and relies on real-time sleep for the stale-eviction test, weakening both correctness and instruction following — the two highest-weighted criteria.

Judge Models OpenAI GPT-5.4

Why This Side Won

Answer A wins because its weighted performance is stronger on the most important criteria, especially correctness and completeness. It more faithfully implements the specified API, handles the listed edge cases more rigorously, provides a more concurrency-conscious design without relying on a single global bottleneck for ordinary per-client mutations, and includes deterministic tests that better match the prompt. Answer B is reasonably structured, but its missing per-client configuration, weaker edge-case handling, and nondeterministic cleanup/testing make it less suitable overall.

Why This Side Won

Answer A is the clear winner due to its superior design and implementation quality. Its use of lock striping for concurrency is significantly more scalable than Answer B's global lock on the client map. Answer A's approach to stale client cleanup, by integrating it into API calls, is more elegant and avoids the overhead and complexity of a dedicated background thread as seen in B. Furthermore, A's unit tests are more robustly designed, especially the concurrent test which uses a more sound methodology. Finally, Answer A fully satisfies all prompt requirements, including the flexible per-client configuration in the constructor, which Answer B omits.

X f L