Orivel Orivel
Menue oeffnen

Ratenbegrenzer mit gleitendem Fenster und Burst-Zulassung

Vergleiche Modellantworten für diese Programmierung-Benchmark-Aufgabe und prüfe Scores, Kommentare und verwandte Beispiele.

Bitte einloggen oder registrieren, um Likes und Favoriten zu nutzen. Registrieren

X f L

Inhalt

Aufgabenubersicht

Vergleichsgenres

Programmierung

Aufgaben-Erstellermodell

Antwortende Modelle

Bewertungsmodelle

Aufgabenstellung

Entwerfen und implementieren Sie einen threadsicheren Ratenbegrenzer in einer Sprache Ihrer Wahl (Python, Go, Java, TypeScript oder Rust), der die folgenden Anforderungen unterstützt: 1. **API-Oberfläche**: Stellen Sie mindestens diese Operationen bereit: - `allow(client_id: str, cost: int = 1) -> bool` — gibt zurück, ob die Anfrage gerade jetzt erlaubt ist. - `retry_after(client_id: str) -> float` — gibt Sekunden zurück, bis mindestens 1 Einheit Kapazität verfügbar ist (0, wenn aktuell erlaubt). - Ein Ko...

Mehr anzeigen

Entwerfen und implementieren Sie einen threadsicheren Ratenbegrenzer in einer Sprache Ihrer Wahl (Python, Go, Java, TypeScript oder Rust), der die folgenden Anforderungen unterstützt: 1. **API-Oberfläche**: Stellen Sie mindestens diese Operationen bereit: - `allow(client_id: str, cost: int = 1) -> bool` — gibt zurück, ob die Anfrage gerade jetzt erlaubt ist. - `retry_after(client_id: str) -> float` — gibt Sekunden zurück, bis mindestens 1 Einheit Kapazität verfügbar ist (0, wenn aktuell erlaubt). - Ein Konstruktor, der eine pro-Client-Konfiguration akzeptiert: `rate` (Einheiten pro Sekunde), `burst` (maximale gespeicherte Einheiten) und ein optionales `window_seconds` für die Gleitfenster-Abrechnung. 2. **Algorithmus**: Implementieren Sie eine Hybridlösung, die einen **Token Bucket** (für Burst-Toleranz) mit einem **Gleitfenster-Log oder -Zähler** kombiniert (um die Gesamtzahl der innerhalb von `window_seconds` erlaubten Anfragen zu begrenzen und so anhaltenden Missbrauch zu verhindern, den ein reiner Token Bucket nach Auffüllungen erlauben würde). Eine Anfrage ist nur dann erlaubt, wenn beide Prüfungen bestehen. Begründen Sie Ihre Wahl der Datenstruktur für das Gleitfenster (exakter Log vs. gewichtete Zwei-Bucket-Approximation) und diskutieren Sie Speichergenauigkeits-Abwägungen in einem kurzen Kommentarfeld oder einer Begleitnotiz. 3. **Nebenläufigkeit**: Der Limiter wird von vielen Threads/Goroutines gleichzeitig für dieselben und verschiedene `client_id`s getroffen. Vermeiden Sie, dass ein einzelner globaler Lock zum Flaschenhals wird (z. B. per-Client-Locks oder Lock-Striping). Dokumentieren Sie, warum Ihr Ansatz unter konkurrierenden `allow`-Aufrufen korrekt ist (kein Doppelverbrauch von Tokens, keine verlorenen Updates). 4. **Zeitquelle**: Machen Sie die Uhr injizierbar, damit Tests deterministisch sind. Verwenden Sie standardmäßig eine monotonische Uhr. 5. **Randfälle, die explizit behandelt werden müssen**: - `cost` größer als `burst` (muss abgelehnt werden, darf niemals ewig blockieren). - Uhr geht rückwärts oder große Pausen (z. B. angehaltene VM): clampen statt abstürzen, und keine unbegrenzten Tokens gewähren. - Erste Anfrage für einen neuen Client (Lazy-Initialisierung). - Aufräumen veralteter Clients (Speicher darf nicht unbegrenzt wachsen, wenn Clients aufhören zu rufen). - Bruchteilige Tokens / sub-millisekunden Timing. 6. **Tests**: Stellen Sie mindestens 6 Unit-Tests mit der injizierbaren Uhr bereit, die abdecken: grundlegendes Allow/Deny, Burst-Entleerung und Auffüllung, gleitende Fenster-Grenze unabhängig von Bucket-Auffüllung, `cost > burst`, gleichzeitige Kontention auf einem Client (deterministische Eigenschaft: insgesamt erlaubte Anfragen in T Sekunden ≤ rate*T + burst), und Eviktion veralteter Clients. 7. **Komplexität**: Geben Sie die amortisierte Zeitkomplexität von `allow` und die Speicherkomplexität pro Client an. Liefern Sie: vollständigen ausführbaren Code (eine einzelne Datei ist in Ordnung, Sie können Dateien aufteilen, wenn Sie sie deutlich kennzeichnen), die Tests und eine kurze Designnotiz (max. ~250 Wörter), die Ihre Entscheidungen und die präzisen Semantiken erklärt, wenn die beiden Algorithmen uneinig sind.

Erganzende Informationen

Diese Aufgabe zielt auf Backend-/System-Engineering-Fähigkeiten ab. Es gibt mehrere gültige Lösungsstrategien (reiner Token Bucket + Window-Log, Leaky Bucket + Zähler, gewichtete Zwei-Fenster-Approximation à la Cloudflare, lock-free mit CAS, lock-gestreifte Maps usw.), daher wird die Qualität in Bezug auf Korrektheit unter Nebenläufigkeit, Behandlung subtiler Randfälle, Klarheit der Designnotiz und Testabdeckung variieren.

Bewertungsrichtlinie

Eine starke Antwort sollte: - Vollständigen, ausführbaren Code in einer der aufgeführten Sprachen liefern, ohne fehlende Imports oder Pseudo-Code-Stubs. - Sowohl eine Token-Bucket-Komponente als auch eine Gleitfenster-Komponente korrekt implementieren, und eine Anfrage nur dann zulassen, wenn beide zustimmen. Ein reiner Token Bucket allein oder ein reiner Fixed-Window-Zähler gilt als unvollständig. - Nachweislich threadsicher unter konkurrierenden Aufrufen für denselben Client sein, ohne einen einzelnen globalen L...

Mehr anzeigen

Eine starke Antwort sollte: - Vollständigen, ausführbaren Code in einer der aufgeführten Sprachen liefern, ohne fehlende Imports oder Pseudo-Code-Stubs. - Sowohl eine Token-Bucket-Komponente als auch eine Gleitfenster-Komponente korrekt implementieren, und eine Anfrage nur dann zulassen, wenn beide zustimmen. Ein reiner Token Bucket allein oder ein reiner Fixed-Window-Zähler gilt als unvollständig. - Nachweislich threadsicher unter konkurrierenden Aufrufen für denselben Client sein, ohne einen einzelnen globalen Lock zu verwenden, der alle Clients serialisiert. Per-Client-Locks, Lock-Striping oder fundierte lock-free Techniken sind akzeptabel; die Antwort muss erklären, warum kein doppelter Token-Verbrauch erfolgen kann. - Jeden aufgelisteten Randfall explizit behandeln: `cost > burst` Ablehnung, monotone/rückwärts laufende Uhr Sicherheit, Lazy-Client-Initialisierung, Eviktion veralteter Clients (z. B. LRU, TTL-Sweep oder größenbegrenzte Map) und fraktionale/sub-ms Zeit. - Eine injizierbare Uhr und mindestens 6 sinnvolle Tests enthalten, einschließlich eines concorrenten Tests, der eine obere Schranke als Invariante prüft (nicht nur Happy-Path-Checks). Die Tests sollten wie geschrieben bestehen. - Eine knappe Designnotiz enthalten: welche Gleitfenster-Variante gewählt wurde und warum, die Tiebreaking-Semantik, wenn Bucket und Fenster uneinig sind, amortisierte Zeitkomplexität von `allow` (idealerweise O(1) amortisiert) und Speicher pro Client. - Frei von offensichtlichen Fehlern sein, wie Off-by-One bei Auffüllberechnungen, Integer-Überlauf bei Zeitarithmetik oder unbeschränktem Wachstum eines pro-Client-Logs ohne Beschneidung. Höherwertige Antworten werden zusätzlich den Speicher/Genauigkeits-Tradeoff zwischen einem exakten Request-Log und einer approximativen Zwei-Fenster-Gewichtungszählung diskutieren und idiomatischen, gut strukturierten Code liefern, statt einer einzigen monolithischen Funktion. Antworten, die Nebenläufigkeit ignorieren, Tests weglassen, Randfälle abschwächen oder nur ein Lehrbuch-Token-Bucket wiedergeben, werden abgewertet.

Antworten

Antwort A: OpenAI GPT-5.5

import math
import threading
import time
import unittest
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field

Design note:

This limiter admits a request only when two independent checks agree:

1. a token bucket has at least cost tokens, allowing short bursts up to burst;

2. an exact sliding-window log has room for cost more admitted units in the last

window_seconds, where the window limit is rate * window_seconds.

If the token bucket says yes but the sliding window is full, the request is

denied and no tokens are spent; if the w...

Vollstandige Antwort anzeigen

import math
import threading
import time
import unittest
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field

Design note:

This limiter admits a request only when two independent checks agree:

1. a token bucket has at least cost tokens, allowing short bursts up to burst;

2. an exact sliding-window log has room for cost more admitted units in the last

window_seconds, where the window limit is rate * window_seconds.

If the token bucket says yes but the sliding window is full, the request is

denied and no tokens are spent; if the window has room but tokens are absent,

it is also denied. The exact log stores one entry per admitted request as

(timestamp, cost), not one entry per unit, so allow is amortized O(1): each log

entry is appended once and removed once. Memory is O(number of admitted requests

in the active window) per client. A weighted two-bucket counter would use O(1)

memory but can over- or under-admit near bucket boundaries; this implementation

chooses exactness because correctness is usually more important for abuse

prevention. The client map is lock-striped; each client state has its own lock.

A caller obtains the per-client lock before mutating tokens/window state, so

concurrent allow calls cannot double-spend tokens or lose window updates.

class MonotonicClock:
def now(self) -> float:
return time.monotonic()

class ManualClock:
def init(self, start: float = 0.0):
self._now = float(start)
self._lock = threading.Lock()

def now(self) -> float:
    with self._lock:
        return self._now

def set(self, value: float) -> None:
    with self._lock:
        self._now = float(value)

def advance(self, seconds: float) -> None:
    with self._lock:
        self._now += float(seconds)

@dataclass(frozen=True)
class ClientConfig:
rate: float
burst: float
window_seconds: float = 1.0

def validated(self) -> 'ClientConfig':
    rate = float(self.rate)
    burst = float(self.burst)
    window = float(self.window_seconds)
    if not math.isfinite(rate) or rate <= 0:
        raise ValueError('rate must be a positive finite number')
    if not math.isfinite(burst) or burst < 1:
        raise ValueError('burst must be a finite number >= 1')
    if not math.isfinite(window) or window <= 0:
        raise ValueError('window_seconds must be a positive finite number')
    return ClientConfig(rate, burst, window)

@property
def window_limit(self) -> float:
    return self.rate * self.window_seconds

@dataclass
class _ClientState:
config: ClientConfig
tokens: float
last_refill: float
last_seen: float
events: deque = field(default_factory=deque)
window_sum: float = 0.0
lock: threading.Lock = field(default_factory=threading.Lock)

class HybridRateLimiter:
def init(
self,
rate: float,
burst: float,
window_seconds: float = 1.0,
per_client_configs=None,
clock=None,
stripes: int = 64,
idle_ttl_seconds: float | None = None,
cleanup_interval_seconds: float | None = None,
):
if stripes <= 0:
raise ValueError('stripes must be positive')
self._default_config = ClientConfig(rate, burst, window_seconds).validated()
self._configs = {}
if per_client_configs:
for client_id, cfg in per_client_configs.items():
self._configs[client_id] = self._coerce_config(cfg)

    max_window = self._default_config.window_seconds
    for cfg in self._configs.values():
        max_window = max(max_window, cfg.window_seconds)

    if idle_ttl_seconds is None:
        idle_ttl_seconds = max(60.0, 2.0 * max_window)
    idle_ttl_seconds = float(idle_ttl_seconds)
    if not math.isfinite(idle_ttl_seconds) or idle_ttl_seconds <= 0:
        raise ValueError('idle_ttl_seconds must be positive and finite')
    if idle_ttl_seconds < max_window:
        raise ValueError('idle_ttl_seconds must be >= the largest window_seconds')
    self._idle_ttl = idle_ttl_seconds

    if cleanup_interval_seconds is None:
        cleanup_interval_seconds = min(60.0, max(1.0, idle_ttl_seconds / 2.0))
    cleanup_interval_seconds = float(cleanup_interval_seconds)
    if not math.isfinite(cleanup_interval_seconds) or cleanup_interval_seconds <= 0:
        raise ValueError('cleanup_interval_seconds must be positive and finite')
    self._cleanup_interval = cleanup_interval_seconds

    self._clock = clock if clock is not None else MonotonicClock()
    self._clients = {}
    self._stripe_locks = [threading.Lock() for _ in range(stripes)]
    self._cleanup_mutex = threading.Lock()
    self._last_cleanup = self._safe_time()
    self._eps = 1e-12

@staticmethod
def _coerce_config(cfg) -> ClientConfig:
    if isinstance(cfg, ClientConfig):
        return cfg.validated()
    if isinstance(cfg, dict):
        return ClientConfig(cfg['rate'], cfg['burst'], cfg.get('window_seconds', 1.0)).validated()
    rate, burst, *rest = cfg
    window = rest[0] if rest else 1.0
    return ClientConfig(rate, burst, window).validated()

def _safe_time(self) -> float:
    t = float(self._clock.now())
    if math.isnan(t):
        raise ValueError('clock returned NaN')
    return t

def _config_for(self, client_id: str) -> ClientConfig:
    return self._configs.get(client_id, self._default_config)

def _stripe_index(self, client_id: str) -> int:
    return hash(client_id) % len(self._stripe_locks)

def _get_state_locked(self, client_id: str) -> _ClientState:
    idx = self._stripe_index(client_id)
    stripe = self._stripe_locks[idx]
    stripe.acquire()
    try:
        state = self._clients.get(client_id)
        if state is None:
            cfg = self._config_for(client_id)
            now = self._safe_time()
            state = _ClientState(
                config=cfg,
                tokens=cfg.burst,
                last_refill=now,
                last_seen=now,
            )
            self._clients[client_id] = state
        state.lock.acquire()
        return state
    finally:
        stripe.release()

def _advance_and_prune_locked(self, state: _ClientState) -> float:
    raw_now = self._safe_time()
    now = max(raw_now, state.last_seen)

    elapsed = max(0.0, now - state.last_refill)
    max_useful_elapsed = state.config.burst / state.config.rate
    elapsed_for_tokens = min(elapsed, max_useful_elapsed)
    if elapsed_for_tokens > 0:
        state.tokens = min(
            state.config.burst,
            state.tokens + elapsed_for_tokens * state.config.rate,
        )
    state.last_refill = now
    state.last_seen = now

    cutoff = now - state.config.window_seconds
    while state.events and state.events[0][0] <= cutoff + self._eps:
        _, old_cost = state.events.popleft()
        state.window_sum -= old_cost
    if state.window_sum < self._eps:
        state.window_sum = 0.0
    return now

def allow(self, client_id: str, cost: int = 1) -> bool:
    if cost <= 0:
        raise ValueError('cost must be a positive integer')
    self._maybe_cleanup()

    cfg = self._config_for(client_id)
    if cost > cfg.burst:
        return False

    state = self._get_state_locked(client_id)
    try:
        now = self._advance_and_prune_locked(state)
        enough_tokens = state.tokens + self._eps >= cost
        enough_window = state.window_sum + cost <= state.config.window_limit + self._eps
        if not (enough_tokens and enough_window):
            return False
        state.tokens -= cost
        if state.tokens < self._eps:
            state.tokens = 0.0
        state.window_sum += cost
        state.events.append((now, float(cost)))
        return True
    finally:
        state.lock.release()

def retry_after(self, client_id: str) -> float:
    self._maybe_cleanup()
    state = self._get_state_locked(client_id)
    try:
        now = self._advance_and_prune_locked(state)
        cfg = state.config
        if cfg.burst < 1 or cfg.window_limit < 1:
            return math.inf
        token_wait = 0.0 if state.tokens + self._eps >= 1.0 else (1.0 - state.tokens) / cfg.rate

        if state.window_sum + 1.0 <= cfg.window_limit + self._eps:
            window_wait = 0.0
        else:
            window_wait = math.inf
            remaining = state.window_sum
            for ts, event_cost in state.events:
                remaining -= event_cost
                if remaining + 1.0 <= cfg.window_limit + self._eps:
                    window_wait = max(0.0, ts + cfg.window_seconds - now)
                    break
        return max(token_wait, window_wait)
    finally:
        state.lock.release()

def _maybe_cleanup(self) -> None:
    raw_now = self._safe_time()
    if raw_now - self._last_cleanup < self._cleanup_interval:
        return
    if not self._cleanup_mutex.acquire(blocking=False):
        return
    try:
        now = max(raw_now, self._last_cleanup)
        if now - self._last_cleanup >= self._cleanup_interval:
            self._cleanup_stale_at(now)
            self._last_cleanup = now
    finally:
        self._cleanup_mutex.release()

def cleanup_stale(self) -> int:
    with self._cleanup_mutex:
        now = max(self._safe_time(), self._last_cleanup)
        removed = self._cleanup_stale_at(now)
        self._last_cleanup = now
        return removed

def _cleanup_stale_at(self, now: float) -> int:
    removed = 0
    for idx, stripe in enumerate(self._stripe_locks):
        with stripe:
            for client_id, state in list(self._clients.items()):
                if self._stripe_index(client_id) != idx:
                    continue
                if not state.lock.acquire(blocking=False):
                    continue
                try:
                    if self._clients.get(client_id) is state and now - state.last_seen >= self._idle_ttl:
                        del self._clients[client_id]
                        removed += 1
                finally:
                    state.lock.release()
    return removed

def client_count(self) -> int:
    total = 0
    for stripe in self._stripe_locks:
        with stripe:
            total += len(self._clients)
            break
    return len(self._clients)

class HybridRateLimiterTests(unittest.TestCase):
def test_basic_allow_deny_and_retry_after(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=1, burst=2, window_seconds=10, clock=clock)
self.assertTrue(limiter.allow('alice'))
self.assertTrue(limiter.allow('alice'))
self.assertFalse(limiter.allow('alice'))
self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9)

def test_burst_draining_and_fractional_refill(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(rate=2, burst=3, window_seconds=10, clock=clock)
    self.assertTrue(limiter.allow('alice'))
    self.assertTrue(limiter.allow('alice'))
    self.assertTrue(limiter.allow('alice'))
    self.assertFalse(limiter.allow('alice'))
    clock.advance(0.25)
    self.assertFalse(limiter.allow('alice'))
    self.assertAlmostEqual(limiter.retry_after('alice'), 0.25, places=9)
    clock.advance(0.25)
    self.assertTrue(limiter.allow('alice'))

def test_sliding_window_caps_even_when_bucket_refilled(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(rate=10, burst=10, window_seconds=1, clock=clock)
    for _ in range(10):
        self.assertTrue(limiter.allow('abuser'))
    clock.advance(0.5)
    self.assertFalse(limiter.allow('abuser'))
    self.assertAlmostEqual(limiter.retry_after('abuser'), 0.5, places=9)
    clock.advance(0.5)
    self.assertTrue(limiter.allow('abuser'))

def test_cost_larger_than_burst_is_rejected_without_state_creation(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(rate=10, burst=5, window_seconds=10, clock=clock)
    self.assertFalse(limiter.allow('alice', cost=6))
    self.assertEqual(limiter.client_count(), 0)
    self.assertTrue(limiter.allow('alice', cost=5))
    self.assertFalse(limiter.allow('alice'))

def test_concurrent_contention_obeys_token_bucket_bound(self):
    clock = ManualClock()
    rate = 5
    burst = 5
    limiter = HybridRateLimiter(rate=rate, burst=burst, window_seconds=100, clock=clock, stripes=8)
    successes = 0
    for tick in range(11):
        clock.set(tick / 10.0)
        with ThreadPoolExecutor(max_workers=32) as pool:
            results = list(pool.map(lambda _: limiter.allow('hot'), range(32)))
        successes += sum(1 for r in results if r)
    bound = rate * 1.0 + burst
    self.assertLessEqual(successes, bound)

def test_stale_client_eviction(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(
        rate=10,
        burst=10,
        window_seconds=1,
        clock=clock,
        idle_ttl_seconds=5,
        cleanup_interval_seconds=1,
    )
    self.assertTrue(limiter.allow('old-client'))
    self.assertEqual(limiter.client_count(), 1)
    clock.advance(6)
    self.assertTrue(limiter.allow('new-client'))
    self.assertEqual(limiter.client_count(), 1)

def test_clock_going_backwards_is_clamped(self):
    clock = ManualClock(10)
    limiter = HybridRateLimiter(rate=1, burst=1, window_seconds=10, clock=clock)
    self.assertTrue(limiter.allow('alice'))
    clock.set(5)
    self.assertFalse(limiter.allow('alice'))
    self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9)

def test_large_pause_refills_only_to_burst(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(rate=1, burst=3, window_seconds=1000, clock=clock)
    for _ in range(3):
        self.assertTrue(limiter.allow('alice'))
    clock.advance(10 ** 9)
    for _ in range(3):
        self.assertTrue(limiter.allow('alice'))
    self.assertFalse(limiter.allow('alice'))

if name == 'main':
unittest.main()

Ergebnis

#1 | Sieger

Siegstimmen

3 / 3

Durchschnittsscore

89
Bewertungsmodelle Google Gemini 2.5 Pro

Gesamtpunktzahl

92

Gesamtkommentar

Antwort A bietet eine produktionsreife, hochentwickelte und robuste Implementierung des hybriden Ratenbegrenzers. Ihre wichtigsten Stärken sind das hochgradig skalierbare Concurrency-Modell mit Lock-Striping in Kombination mit Per-Client-Locks, ein eleganter Bereinigungsmechanismus, der einen Hintergrund-Thread vermeidet, und eine umfassende, gut durchdachte Testsuite, die komplexe Verhaltensweisen wie gleichzeitige Konflikte korrekt validiert. Der Code ist sauber, gut strukturiert und entspricht vollständig allen Anforderungen der Aufgabenstellung, einschließlich erweiterter Funktionen wie Per-Client-Konfigurationen.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
90

Die Implementierung ist hochgradig korrekt und robust. Das Concurrency-Modell mit Lock-Striping und Per-Client-Locks ist eine ausgeklügelte und korrekte Methode zur Vermeidung von Engpässen. Alle Edge Cases, einschließlich Clock-Sicherheit und fraktionale Refills, werden korrekt behandelt. Die Logik sowohl für den Token-Bucket als auch für das Sliding Window ist solide.

Vollstandigkeit

Gewichtung 20%
95

Diese Antwort ist außergewöhnlich vollständig. Sie implementiert alle erforderlichen API-Methoden, behandelt jeden spezifizierten Edge Case, bietet eine umfassende Testsuite mit mehr als der geforderten Anzahl von Tests und enthält eine klare Designnotiz. Sie implementiert auch die vorgeschlagene Per-Client-Konfiguration, was ein vollständiges Verständnis der Aufgabenstellung zeigt.

Codequalitat

Gewichtung 20%
90

Die Codequalität ist ausgezeichnet. Sie verwendet moderne Python-Funktionen wie Dataclasses effektiv, ist gut in logische Methoden gegliedert und sehr gut lesbar. Die Designentscheidungen, wie der integrierte Bereinigungsmechanismus und das Lock-Striping, sind elegant und zeugen von einem hohen Ingenieurskönnen.

Praktischer Nutzen

Gewichtung 15%
90

Die Implementierung ist von produktionsreifer Qualität und sehr praktisch. Das skalierbare Concurrency-Modell macht sie für Systeme mit hohem Durchsatz geeignet. Die flexible Konfiguration und die robuste Behandlung von Edge Cases bedeuten, dass sie mit Zuversicht in einer realen Anwendung eingesetzt werden könnte.

Befolgung der Anweisungen

Gewichtung 10%
100

Die Antwort folgt allen Anweisungen perfekt. Sie liefert Code in der angeforderten Sprache, implementiert die spezifizierte API, den Algorithmus und die Concurrency-Strategie, behandelt alle Edge Cases, enthält die erforderlichen Tests und Komplexitätsanalysen und liefert eine Designnotiz innerhalb der vorgegebenen Einschränkungen.

Bewertungsmodelle OpenAI GPT-5.4

Gesamtpunktzahl

89

Gesamtkommentar

Antwort A ist eine starke, weitgehend vollständige Implementierung, die direkt der angeforderten API und Semantik entspricht. Sie kombiniert einen Token-Eimer mit einem exakten Sliding-Window-Log, verwendet eine injizierbare monotone Uhr, vermeidet einen einzigen globalen Engpass durch gestreifte Map-Sperren plus Sperren pro Client und enthält solide deterministische Tests, die die erforderlichen Randfälle abdecken. Die Design-Notiz ist prägnant und behandelt Kompromisse, Korrektheit bei Nebenläufigkeit und Komplexität. Kleinere Schwächen sind eine etwas umständliche Implementierung von client_count und retry_after, die nur die Verfügbarkeit von 1 Einheit und nicht die willkürliche Kosten berücksichtigt, obwohl dies der angeforderten API entspricht.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
88

Implementiert den erforderlichen Hybrid korrekt: Die Anfrage wird nur genehmigt, wenn sowohl der Token-Eimer als auch das exakte Sliding Window dies zulassen, mit ordnungsgemäßer Begrenzung der Token-Nachfüllung, Lazy Init, Bereinigung veralteter Einträge und serialisierter Mutation pro Client, die Double-Spend verhindert. retry_after ist kohärent für die Verfügbarkeit von 1 Einheit, und Tests decken wichtige Invarianten ab. Kleinere Bedenken sind kleine Implementierungsquirks wie client_count, das die Map ohne vollständige Synchronisation liest, und einige Grenzentscheidungen bei der Window-Eviktion.

Vollstandigkeit

Gewichtung 20%
93

Deckt fast alle angeforderten Elemente ab: erforderliche API-Oberfläche, Hybrid-Algorithmus, injizierbare monotone Uhr, Unterstützung für Konfiguration pro Client, explizite Randfälle, Bereinigung veralteter Clients, fraktionale Zeitmessung, Diskussion über Nebenläufigkeit, Komplexitätsnotiz und 8 aussagekräftige Tests, einschließlich deterministischem gleichzeitigem Konflikt und Eviktion. Dies kommt der vollständigen Erfüllung der Aufforderung sehr nahe.

Codequalitat

Gewichtung 20%
84

Gut strukturiert und idiomatisch, mit Datenklassen, getrennten Helfern, expliziter Validierung und klarer Benennung. Die Sperrstrategie ist durchdacht organisiert und die Design-Notiz ist prägnant. Einige Kanten sind noch rau, wie die ungewöhnliche client_count-Implementierung und einige Low-Level-Sperrverwaltung, die vereinfacht werden könnte.

Praktischer Nutzen

Gewichtung 15%
87

Praktisch für den Backend-Einsatz: deterministische Uhreninjektion, begrenzte Bereinigung von veralteten Zuständen, Lock-Striping plus Sperren pro Client für Konflikte, exakte Fensterabrechnung und solide Tests machen es einsetzbar oder anpassbar. Die Implementierung ist realistisch und behandelt gängige Produktions-Randfälle wie große Pausen und VM-Suspension.

Befolgung der Anweisungen

Gewichtung 10%
95

Folgt der Aufforderung genau: Die Sprachwahl ist gültig, die API-Namen stimmen überein, die Design-Notiz liegt im Rahmen, die Tests verwenden die injizierbare Uhr und die angeforderten Randfälle werden explizit behandelt. Die Antwort stimmt gut mit den technischen und dokumentarischen Anforderungen überein.

Bewertungsmodelle Anthropic Claude Opus 4.7

Gesamtpunktzahl

85

Gesamtkommentar

Antwort A ist ein ausgereifter, produktionsreifer Hybrid-Ratenbegrenzer. Er verwendet Datenklassen, Stripe-Sperren plus pro-Client-Sperren, integriert die Bereinigung veralteter Einträge in Allow-Aufrufe (vermeidet Probleme mit Hintergrund-Threads), validiert die Konfiguration, klemmt die verstrichene Zeit für Rückwärtsuhren und große Pausen ordnungsgemäß ab und lehnt Kosten>Burst ab, ohne Client-Status zu erstellen. Die 8 Tests verwenden die injizierbare ManualClock deterministisch, einschließlich eines gleichzeitigen Konflikttests, der die Invariante Rate*T+Burst tatsächlich überprüft. Die Design-Notiz ist prägnant und behandelt alle erforderlichen Punkte.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
85

Implementiert korrekt den Hybrid-Token-Bucket + exaktes Sliding-Window-Log. Behandelt Rückwärtsuhren durch Begrenzung mittels max(raw_now, last_seen), lehnt Kosten>Burst ab, ohne Zustand zu erstellen, begrenzt die verstrichene Zeit für Tokens, um Überläufe zu vermeiden, verwendet pro-Client-Sperren mit Stripe-Sperren für die Map. retry_after berechnet korrekt die Wartezeit basierend auf dem Ablauf des Window-Logs. Die EPS-Behandlung und die Ereignisbuchhaltung scheinen solide zu sein.

Vollstandigkeit

Gewichtung 20%
90

Behandelt alle erforderlichen Randfälle: Kosten>Burst (ohne Zustandsbildung), Rückwärtsuhr (Test enthalten), große Pausen (Test enthalten), Lazy-Initialisierung, veraltete Bereinigung (deterministisch über _maybe_cleanup, das an die injizierte Uhr gebunden ist), Bruchteile von Tokens. 8 Tests, einschließlich eines gleichzeitigen Grenzwerttests, der die Invariante Rate*T+Burst deterministisch mit ManualClock überprüft.

Codequalitat

Gewichtung 20%
80

Gut strukturiert mit Datenklassen, Trennung von MonotonicClock/ManualClock, Validierung von ClientConfig, klar erklärten pro-Client- und Stripe-Sperren. Die Bereinigung ist in den Allow-Pfad integriert (keine Komplikationen mit Hintergrund-Threads). Prägnante Design-Notiz oben.

Praktischer Nutzen

Gewichtung 15%
80

Produktionsqualität: Stripe-Sperren skalieren besser, integrierte Lazy-Bereinigung vermeidet Lebenszyklusprobleme mit Hintergrund-Threads, deterministische Testunterstützung, Konfigurationsvalidierung. Geeignet für die Einbettung in einen echten Dienst.

Befolgung der Anweisungen

Gewichtung 10%
90

Folgt allen Anweisungen: API-Oberfläche entspricht Signaturen, Hybrid-Algorithmus, pro-Client-Sperren mit Stripe-Sperren, injizierbare Monoton-Uhr, alle Randfälle, 8 Tests einschließlich deterministischem gleichzeitigen Invariantentest, Design-Notiz mit Semantik und angegebener Komplexität.

import threading
import time
import collections

Injectable clock for deterministic testing

class MonotonicClock:
def time(self):
return time.monotonic()

class TestClock:
def init(self, initial_time=0.0):
self._time = initial_time
self._lock = threading.Lock()

def time(self):
    with self._lock:
        return self._time

def advance(self, seconds):
    with self._lock:
        self._time += seconds

def set_time(self, new_time):
    with self._lock:
        self._time = new_time

Client state for the rate limiter

class ClientState:...

Vollstandige Antwort anzeigen

import threading
import time
import collections

Injectable clock for deterministic testing

class MonotonicClock:
def time(self):
return time.monotonic()

class TestClock:
def init(self, initial_time=0.0):
self._time = initial_time
self._lock = threading.Lock()

def time(self):
    with self._lock:
        return self._time

def advance(self, seconds):
    with self._lock:
        self._time += seconds

def set_time(self, new_time):
    with self._lock:
        self._time = new_time

Client state for the rate limiter

class ClientState:
def init(self, rate: float, burst: int, window_seconds: int, current_time: float):
self.lock = threading.Lock()
self.tokens = float(burst) # Current tokens in the bucket
self.last_refill_time = current_time # Last time tokens were refilled
self.burst = burst # Max tokens
self.rate = rate # Tokens per second

    # Sliding window log: stores (timestamp, cost) tuples of allowed requests
    # We use a deque for efficient O(1) append and popleft
    self.request_timestamps = collections.deque()
    self.current_window_cost = 0 # Sum of costs in the current window
    self.window_seconds = window_seconds

    self.last_access_time = current_time # For stale client eviction

def _refill_tokens(self, current_time: float):
    # Clamp last_refill_time to current_time to prevent negative time delta
    # and thus unbounded token grants if clock goes backwards (though monotonic clock prevents this)
    # or if there's a huge pause (e.g., suspended VM).
    if current_time < self.last_refill_time:
        self.last_refill_time = current_time

    time_delta = current_time - self.last_refill_time
    if time_delta > 0:
        refill_amount = time_delta * self.rate
        self.tokens = min(self.burst, self.tokens + refill_amount)
        self.last_refill_time = current_time

def _clean_window(self, current_time: float):
    # Remove (timestamp, cost) tuples older than window_seconds
    while self.request_timestamps and self.request_timestamps[0][0] <= current_time - self.window_seconds:
        _, removed_cost = self.request_timestamps.popleft()
        self.current_window_cost -= removed_cost

class RateLimiter:
def init(self, rate: float, burst: int, window_seconds: int = 60, clock=None, cleanup_interval: float = 300.0, client_ttl_multiplier: float = 2.0):
if rate <= 0 or burst < 0 or window_seconds <= 0:
raise ValueError("Rate must be positive, burst non-negative, window_seconds positive.")

    self.rate = rate
    self.burst = burst
    self.window_seconds = window_seconds
    self.clock = clock if clock is not None else MonotonicClock()

    self.clients = {} # client_id -> ClientState
    self._clients_lock = threading.Lock() # Protects access to the clients dictionary

    # Stale client cleanup
    self.cleanup_interval = cleanup_interval
    self.client_ttl_multiplier = client_ttl_multiplier # e.g., 2.0 * window_seconds
    self._cleanup_thread = None
    self._stop_cleanup_event = threading.Event()
    self._start_cleanup_thread()

def _get_or_create_client_state(self, client_id: str) -> ClientState:
    with self._clients_lock:
        if client_id not in self.clients:
            self.clients[client_id] = ClientState(self.rate, self.burst, self.window_seconds, self.clock.time())
        return self.clients[client_id]

def allow(self, client_id: str, cost: int = 1) -> bool:
    if cost <= 0:
        return True # Zero or negative cost requests are always allowed (or could raise error)

    client_state = self._get_or_create_client_state(client_id)
    current_time = self.clock.time()

    with client_state.lock:
        # Edge case: cost > burst. A single request cannot exceed the max capacity of the bucket.
        if cost > client_state.burst:
            return False

        # Update last access time for cleanup
        client_state.last_access_time = current_time

        # Token Bucket check
        client_state._refill_tokens(current_time)
        if client_state.tokens < cost:
            return False # Not enough tokens

        # Sliding Window check
        client_state._clean_window(current_time)
        # Window capacity is `rate * window_seconds` units.
        if client_state.current_window_cost + cost > self.rate * self.window_seconds:
            return False # Window capacity exceeded

        # Both checks passed. Consume tokens and log request.
        client_state.tokens -= cost
        client_state.request_timestamps.append((current_time, cost))
        client_state.current_window_cost += cost
        return True

def retry_after(self, client_id: str) -> float:
    client_state = self._get_or_create_client_state(client_id)
    current_time = self.clock.time()

    with client_state.lock:
        # Update last access time for cleanup
        client_state.last_access_time = current_time

        client_state._refill_tokens(current_time)
        client_state._clean_window(current_time)

        # Check token bucket first for 1 unit of cost
        if client_state.tokens >= 1:
            # Token bucket allows, now check sliding window for 1 unit
            if client_state.current_window_cost + 1 <= self.rate * self.window_seconds:
                return 0.0 # Currently allowed for 1 unit
            else:
                # Sliding window denies for 1 unit. Find when oldest request in window expires.
                needed_to_remove = (client_state.current_window_cost + 1) - (self.rate * self.window_seconds)
                if needed_to_remove <= 0: # Should not happen if we are in this else branch
                    return 0.0

                removed_sum = 0
                for ts, c in client_state.request_timestamps:
                    removed_sum += c
                    if removed_sum >= needed_to_remove:
                        return max(0.0, (ts + self.window_seconds) - current_time)
                return 0.0 # Should not be reached if window denies
        else:
            # Token bucket denies. Calculate time until enough tokens are available for 1 unit.
            tokens_needed = 1 - client_state.tokens
            time_to_refill = tokens_needed / self.rate
            return max(0.0, time_to_refill)

def _cleanup_clients(self):
    while not self._stop_cleanup_event.wait(self.cleanup_interval):
        current_time = self.clock.time()
        stale_threshold = current_time - (self.window_seconds * self.client_ttl_multiplier)
        
        clients_to_remove = []
        with self._clients_lock:
            for client_id, state in self.clients.items():
                # Check last_access_time without acquiring client's lock to avoid potential deadlocks
                # with concurrent `allow` calls. A slight inaccuracy in cleanup timing is acceptable.
                if state.last_access_time < stale_threshold:
                    clients_to_remove.append(client_id)
            
            for client_id in clients_to_remove:
                # Remove from the dictionary. Subsequent `allow` calls will re-create if needed.
                if client_id in self.clients: # Check again in case another thread already removed it
                    del self.clients[client_id]

def _start_cleanup_thread(self):
    if self._cleanup_thread is None or not self._cleanup_thread.is_alive():
        self._cleanup_thread = threading.Thread(target=self._cleanup_clients, daemon=True)
        self._cleanup_thread.start()

def stop_cleanup_thread(self):
    self._stop_cleanup_event.set()
    if self._cleanup_thread:
        self._cleanup_thread.join()
        self._cleanup_thread = None

Design Note:

This rate limiter combines a token bucket with a sliding-window log to provide robust rate limiting.

Algorithm Justification:

The token bucket handles burst tolerance, allowing requests to exceed the average rate temporarily

up to the burst capacity. Tokens refill at rate per second. This prevents short-term spikes.

The sliding-window log (implemented with a collections.deque storing (timestamp, cost) tuples)

prevents sustained abuse that a pure token bucket might allow after refills. It ensures that the

total cost of requests within any window_seconds period does not exceed rate * window_seconds.

This hybrid approach ensures both short-term burstiness and long-term rate adherence.

Data Structure Choice for Sliding Window:

An exact sliding-window log is used. While it consumes more memory than a weighted two-bucket

approximation (like Cloudflare's), it offers perfect accuracy within the defined window. For a

per-client limiter, the memory footprint O(rate * window_seconds) per active client (storing

(timestamp, cost) tuples) is generally acceptable, especially with the stale client eviction

mechanism. The deque provides O(1) amortized time for adding and removing elements from its ends,

making window maintenance efficient.

Concurrency:

Thread safety is achieved using per-client locks. Each ClientState object has its own threading.Lock.

This prevents a single global lock from becoming a bottleneck, allowing concurrent allow calls for

different client_ids to proceed in parallel. For a specific client_id, its lock ensures that

operations on its tokens, last_refill_time, request_timestamps, and current_window_cost are atomic.

This guarantees no double-spend of tokens or lost updates for that client. The self._clients_lock

protects the self.clients dictionary itself, ensuring safe addition/removal of client states.

Semantics when algorithms disagree:

A request is permitted only if both the token bucket check and the sliding-window log check pass.

If the token bucket has enough capacity but the sliding window indicates too many units have been

consumed recently, the request is denied. Conversely, if the sliding window has space but the

token bucket is empty, the request is also denied. This ensures the stricter of the two limits

is always enforced.

Time Complexity:

- allow: Amortized O(1). Token bucket operations are O(1). Sliding window deque operations

(append, popleft) are O(1) amortized. In the worst case, _clean_window might iterate

through many elements if the clock jumps significantly, but this is amortized over time.

- retry_after: Amortized O(N) in the worst case for the sliding window part, where N is the number

of requests in the window, as it might iterate through the deque to find the expiration point.

However, N is bounded by rate * window_seconds.

Memory Complexity:

- Per client: O(rate * window_seconds) for storing (timestamp, cost) tuples in the sliding window log.

Plus constant memory for token bucket state and lock.

- Total: O(Number of active clients * rate * window_seconds). Stale client cleanup prevents unbounded growth.

Unit Tests

import unittest
import time
import threading
import collections

Re-define TestClock here for self-contained test file

class TestClock:
def init(self, initial_time=0.0):
self._time = initial_time
self._lock = threading.Lock()

def time(self):
    with self._lock:
        return self._time

def advance(self, seconds):
    with self._lock:
        self._time += seconds

def set_time(self, new_time):
    with self._lock:
        self._time = new_time

class RateLimiterTest(unittest.TestCase):
def setUp(self):
self.clock = TestClock()
# Use a short cleanup interval for tests to make eviction testable
self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)

def tearDown(self):
    self.limiter.stop_cleanup_thread()

def test_basic_allow_deny(self):
    client_id = "client1"
    # Initial burst of 5 + 10 tokens/sec * 0 sec = 5 tokens
    # Allow 5 requests
    for _ in range(5):
        self.assertTrue(self.limiter.allow(client_id))
    
    # Deny 6th request (burst exhausted)
    self.assertFalse(self.limiter.allow(client_id))
    self.assertGreater(self.limiter.retry_after(client_id), 0)

    # Advance time to allow refill
    self.clock.advance(0.1) # 1 token refilled (10 tokens/sec * 0.1 sec)
    self.assertTrue(self.limiter.allow(client_id)) # Should allow 1
    self.assertFalse(self.limiter.allow(client_id)) # Deny 2nd

    self.clock.advance(0.9) # 9 more tokens refilled, total 10 tokens refilled since initial burst exhausted
    for _ in range(9):
        self.assertTrue(self.limiter.allow(client_id))
    self.assertFalse(self.limiter.allow(client_id)) # Deny 10th

def test_burst_draining_and_refill(self):
    client_id = "client2"
    # Burst: 5, Rate: 10/s
    
    # Drain burst
    for _ in range(5):
        self.assertTrue(self.limiter.allow(client_id))
    self.assertFalse(self.limiter.allow(client_id)) # Burst exhausted

    # Advance time for partial refill
    self.clock.advance(0.2) # 2 tokens refilled (10 * 0.2)
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertFalse(self.limiter.allow(client_id)) # Refill exhausted

    # Advance time for full refill (up to burst capacity)
    self.clock.advance(1.0) # 10 tokens refilled. Bucket should be capped at burst (5).
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertFalse(self.limiter.allow(client_id)) # Burst exhausted again

def test_sliding_window_cap_independent_of_bucket_refill(self):
    # Reset limiter and client for this specific test scenario
    self.limiter.stop_cleanup_thread() 
    self.limiter = RateLimiter(rate=10, burst=10, window_seconds=1, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)
    self.clock.set_time(0.0)
    client_id = "client_window_test"
    
    # Allow 10 requests within a very short time (burst allows this)
    for _ in range(10):
        self.assertTrue(self.limiter.allow(client_id))
    
    # Window capacity is 10 units (10 units/s * 1s).
    # We have 10 requests in the window, total cost 10.
    # Next request should be denied by window, even if bucket has tokens.
    self.assertFalse(self.limiter.allow(client_id))
    self.assertGreater(self.limiter.retry_after(client_id), 0)
    
    # Advance time by 0.5s. Still denied by window.
    self.clock.advance(0.5)
    self.assertFalse(self.limiter.allow(client_id))
    
    # Advance time by 0.5s more. Total 1.0s since the first request (at time 0.0).
    # The first request (at time 0.0) should now be out of the window.
    self.clock.advance(0.5)
    self.assertTrue(self.limiter.allow(client_id)) # Should allow, as one request expired from window.
    self.assertEqual(self.limiter.clients[client_id].current_window_cost, 10) # 10 requests in window, 10 units.

def test_cost_greater_than_burst(self):
    client_id = "client4"
    # Burst is 5. Cost 6 should be rejected.
    self.assertFalse(self.limiter.allow(client_id, cost=6))
    self.assertEqual(self.limiter.clients[client_id].tokens, 5) # Tokens should not be consumed
    self.assertEqual(self.limiter.clients[client_id].current_window_cost, 0) # Window should be empty

    # Cost 5 should be allowed
    self.assertTrue(self.limiter.allow(client_id, cost=5))
    self.assertEqual(self.limiter.clients[client_id].tokens, 0)
    self.assertEqual(self.limiter.clients[client_id].current_window_cost, 5)

def test_concurrent_contention_on_one_client(self):
    self.limiter.stop_cleanup_thread()
    self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)
    self.clock.set_time(0.0)
    
    client_id = "client_fixed_duration"
    test_duration = 2.0 # seconds
    num_threads = 10
    
    allowed_count = [0] # Use a list to make it mutable in nested scope
    
    def fixed_duration_worker():
        start_time = self.clock.time()
        while self.clock.time() < start_time + test_duration:
            if self.limiter.allow(client_id):
                with threading.Lock(): # Protect shared counter
                    allowed_count[0] += 1
            # Simulate some work and advance clock slightly
            self.clock.advance(0.001) # Each attempt takes 1ms

    threads = []
    for _ in range(num_threads):
        t = threading.Thread(target=fixed_duration_worker)
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    
    # Max allowed by token bucket: burst + rate * test_duration = 5 + 10 * 2 = 25.
    # Max allowed by window: rate * window_seconds = 10 * 10 = 100.
    # So, the token bucket is the stricter bottleneck here. Expected max allowed is 25.
    
    self.assertLessEqual(allowed_count[0], 25)
    # Due to non-deterministic scheduling and clock advancement by multiple threads,
    # asserting a precise lower bound is difficult. We primarily verify the upper bound.
    self.assertGreater(allowed_count[0], 15) # Should allow a significant number of requests

def test_stale_client_eviction(self):
    client_id_active = "active_client"
    client_id_stale = "stale_client"
    
    self.limiter.stop_cleanup_thread()
    self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)
    self.clock.set_time(0.0) # Reset clock
    
    # Access active client at time 0.0
    self.assertTrue(self.limiter.allow(client_id_active))
    
    # Access stale client at time 0.0
    self.assertTrue(self.limiter.allow(client_id_stale))
    
    # Advance time to just before stale_threshold for stale_client
    # Stale threshold is 1.5 * 10 = 15 seconds after last access.
    # So, stale_client will be stale at time 15.0.
    self.clock.advance(14.0) # Time is now 14.0
    
    # Access active client again to update its last_access_time
    self.assertTrue(self.limiter.allow(client_id_active)) # last_access_time for active_client is now 14.0
    
    # Advance time past stale_threshold for stale_client
    self.clock.advance(1.5) # Time is now 15.5. stale_client's last_access_time (0.0) is < 15.5 - 15 = 0.5.
    
    time.sleep(self.limiter.cleanup_interval * 2) # Give cleanup thread time to run
    
    self.assertNotIn(client_id_stale, self.limiter.clients)
    self.assertIn(client_id_active, self.limiter.clients) # Active client should still be there

def test_retry_after(self):
    client_id = "client_retry"
    
    # Initial state: 5 burst tokens, 0 window cost.
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0)
    
    # Consume burst
    for _ in range(5):
        self.limiter.allow(client_id)
    
    # No tokens left. Need 1 token. Rate is 10/s. So, 0.1s needed.
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.1)
    
    self.clock.advance(0.05) # Advance half the time needed
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.05)
    
    self.clock.advance(0.05) # Advance remaining time. Now 0.1s passed. 1 token refilled.
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0)
    self.assertTrue(self.limiter.allow(client_id)) # Consume the token
    
    # Now test retry_after for window limit
    self.limiter.stop_cleanup_thread()
    self.limiter = RateLimiter(rate=10, burst=10, window_seconds=1, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)
    self.clock.set_time(0.0)
    
    # Fill up the window (10 units in 1 second window)
    for i in range(10):
        self.clock.advance(0.01) # Small advance to make timestamps distinct
        self.limiter.allow(client_id)
    
    # Current time is approx 0.1s. All 10 requests are in the window.
    # Window capacity is 10 units.
    # Next request (cost 1) should be denied by window.
    self.assertGreater(self.limiter.retry_after(client_id), 0.0)
    
    # The first request was at time ~0.01. It will expire at ~0.01 + 1 = ~1.01.
    # Current time is ~0.1. So, retry_after should be ~1.01 - ~0.1 = ~0.91.
    first_req_ts = self.limiter.clients[client_id].request_timestamps[0][0]
    expected_retry = (first_req_ts + self.limiter.window_seconds) - self.clock.time()
    self.assertAlmostEqual(self.limiter.retry_after(client_id), expected_retry, places=2)
    
    # Advance time until the first request expires
    self.clock.advance(expected_retry + 0.001) # Advance slightly past expiration
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0, places=2)
    self.assertTrue(self.limiter.allow(client_id)) # Should now be allowed

def test_clock_going_backwards_or_large_pauses(self):
    # Monotonic clock prevents going backwards, but large pauses are handled by clamping.
    # TestClock can simulate backwards movement.
    self.limiter.stop_cleanup_thread()
    self.clock = TestClock(initial_time=100.0)
    self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock)
    client_id = "client_time_warp"

    self.assertTrue(self.limiter.allow(client_id)) # Time 100.0, tokens 4, window cost 1
    self.clock.set_time(50.0) # Clock goes backwards
    
    # Refill should clamp last_refill_time to current_time (50.0), no extra tokens granted.
    # Tokens should still be 4 (from previous allow). The refill logic will see time_delta=0.
    self.assertTrue(self.limiter.allow(client_id)) # Should still allow, tokens 3.
    self.assertEqual(self.limiter.clients[client_id].tokens, 3)
    
    # Now advance time normally, tokens should refill from 50.0
    self.clock.set_time(51.0) # 1 second passed from clamped time
    self.assertTrue(self.limiter.allow(client_id)) # Tokens 3 + 10 (refill) - 1 (cost) = 12. Capped at burst 5. So 5 - 1 = 4.
    self.assertEqual(self.limiter.clients[client_id].tokens, 4)

def test_first_ever_request_lazy_initialization(self):
    client_id = "new_client"
    self.assertNotIn(client_id, self.limiter.clients)
    self.assertTrue(self.limiter.allow(client_id))
    self.assertIn(client_id, self.limiter.clients)
    self.assertEqual(self.limiter.clients[client_id].tokens, 4) # 5 (burst) - 1 (cost)
    self.assertEqual(self.limiter.clients[client_id].current_window_cost, 1)

def test_fractional_tokens_sub_millisecond_timing(self):
    self.limiter.stop_cleanup_thread()
    self.limiter = RateLimiter(rate=1, burst=1, window_seconds=10, clock=self.clock)
    client_id = "fractional_client"
    
    self.assertTrue(self.limiter.allow(client_id)) # Tokens 0
    self.assertFalse(self.limiter.allow(client_id)) # No tokens
    
    self.clock.advance(0.0001) # 0.0001 tokens refilled
    self.assertFalse(self.limiter.allow(client_id)) # Not enough for cost 1
    
    self.clock.advance(0.9999) # Total 1.0s passed. 1 token refilled.
    self.assertTrue(self.limiter.allow(client_id)) # Allowed. Tokens 0.

Ergebnis

#2

Siegstimmen

0 / 3

Durchschnittsscore

69
Bewertungsmodelle Google Gemini 2.5 Pro

Gesamtpunktzahl

76

Gesamtkommentar

Antwort B liefert eine funktionale und größtenteils korrekte Implementierung, begleitet von einer sehr detaillierten Designnotiz. Sie implementiert korrekt den Hybridalgorithmus und verwendet eine Standard-Client-Sperrstrategie für Threadsicherheit. Sie weist jedoch mehrere Schwächen im Vergleich zu Antwort A auf: Ihr Concurrency-Modell ist aufgrund einer globalen Sperre für das Hauptclient-Dictionary weniger skalierbar, sie stützt sich auf einen Hintergrund-Thread zur Bereinigung, was die Komplexität erhöht, und einige ihrer Unit-Tests sind fehlerhaft (der Concurrency-Test) oder potenziell unzuverlässig (der Eviction-Test). Sie lässt auch die vorgeschlagene Funktion zur clientbezogenen Konfiguration aus.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
75

Die Kernlogik der Ratenbegrenzung ist korrekt. Das Concurrency-Modell ist jedoch, obwohl funktional, weniger robust als das von A; die globale Sperre für das Client-Dictionary kann unter hoher Client-Fluktuation zu einem Engpass werden. Der Concurrency-Test hat ein fehlerhaftes Design, bei dem mehrere Threads eine gemeinsame Uhr vorantreiben, was seine Ergebnisse unzuverlässig macht. Die `retry_after`-Logik ist im schlimmsten Fall O(N), was ein geringfügiges Korrektheits-/Performance-Problem darstellt.

Vollstandigkeit

Gewichtung 20%
80

Die Antwort ist weitgehend vollständig und bietet die Kern-API, Tests und eine Designnotiz. Sie implementiert jedoch nicht die im Konstruktor geforderte, clientbezogene Konfigurationsfunktion, sondern bietet nur eine einzige globale Konfiguration. Dies ist eine bemerkenswerte Auslassung.

Codequalitat

Gewichtung 20%
70

Die Codequalität ist gut, aber nicht außergewöhnlich. Die Verwendung eines Hintergrund-Threads zur Bereinigung erhöht die Komplexität und den Verwaltungsaufwand im Vergleich zu A's Ansatz. Die Kombination aus einer globalen Sperre für die Client-Map und clientbezogenen Sperren ist weniger elegant als A's reine Lock-Striping-Methode. Der Testcode enthält auch eine doppelte Klassendefinition.

Praktischer Nutzen

Gewichtung 15%
75

Die Implementierung ist für viele Anwendungsfälle praktisch. Die globale Sperre für das Client-Dictionary schränkt jedoch ihre Skalierbarkeit in Szenarien mit einer hohen Rate neuer Clients ein. Die Abhängigkeit von einem Hintergrund-Thread macht sie auch etwas komplexer in der Integration und Verwaltung in einigen Anwendungsumgebungen.

Befolgung der Anweisungen

Gewichtung 10%
85

Die Antwort folgt den meisten Anweisungen gut. Die Hauptabweichung besteht darin, dass kein Konstruktor implementiert wird, der eine clientbezogene Konfiguration akzeptiert, was explizit als Anforderung für die API-Oberfläche erwähnt wurde. Die Designnotiz ist auch deutlich länger als die vorgeschlagene Wortanzahl, obwohl sie gründlich ist.

Bewertungsmodelle OpenAI GPT-5.4

Gesamtpunktzahl

64

Gesamtkommentar

Antwort B zeigt gute Absichten und beinhaltet einen Hybrid-Limiter, eine Design-Diskussion und viele Tests, aber sie erfüllt mehrere Schlüsselanforderungen nicht. Ihre API unterstützt keine pro-Client-Konfiguration im Konstruktor, sie verwendet eine globale Clientsperre für alle Lookups/Erstellungen, die Bereinigung veralteter Einträge hängt von einem echten Hintergrundthread ab anstatt von einer deterministischen, uhrgesteuerten Bereinigung, und mehrere Verhaltensweisen sind falsch oder unzureichend spezifiziert, wie z. B. die Zulassung von Kosten, die nicht positiv sind, anstatt sie abzulehnen und die Rückwärtszeit durch Zurücksetzen von last_refill_time zu korrigieren. Einige Tests sind nicht deterministisch oder verlassen sich auf echtes Schlafen, was die Zuverlässigkeit des Benchmarks verringert.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
60

Das Kernverhalten des Hybrids ist vorhanden, aber es gibt Korrektheitsprobleme: Kosten, die nicht positiv sind, werden zugelassen, anstatt explizit abgelehnt zu werden, die Behandlung von Rückwärtsuhren setzt last_refill_time zurück, anstatt die Semantik des vorherigen Zustands zu klemmen, die Bereinigung veralteter Einträge kann Clients aufgrund von ungesperrten Lesevorgängen entfernen, und das Bereinigungsverhalten hängt von einem Hintergrundthread mit Echtzeit ab. Die Nebenläufigkeit auf der Client-Map wird immer noch durch eine globale Sperre für Lookup/Erstellung geleitet. Einige Tests gehen von Verhaltensweisen aus, die nicht vollständig deterministisch sind.

Vollstandigkeit

Gewichtung 20%
70

Enthält ausführbaren Code, Hybridlogik, Designhinweise und mehr als 6 Tests, aber die Anforderung des Konstruktors für pro-Client-Konfiguration fehlt und es wird keine deterministische Bereinigung/Testung für die veraltete Eviktion bereitgestellt. Es werden viele Randfälle behandelt, aber einige werden auf eine Weise behandelt, die mit der Aufforderung kollidiert, insbesondere die Kostenvalidierung und das Zeitverhalten. Die Vollständigkeit ist anständig, aber im Vergleich zur Spezifikation merklich unvollständig.

Codequalitat

Gewichtung 20%
68

Lesbar und organisiert, mit klaren Klassen und Kommentaren, aber die Qualität wird durch duplizierte TestClock-Definitionen, die Vermischung von Produktionscode und Tests auf eine etwas repetitive Weise sowie die Abhängigkeit von einem Hintergrund-Bereinigungsthread, der die Argumentation und das Testen erschwert, beeinträchtigt. Mehrere API- und Validierungsentscheidungen sind auch inkonsistent mit dem angeforderten Vertrag.

Praktischer Nutzen

Gewichtung 15%
62

Nützlich als Lehr- oder Prototypimplementierung, aber in der Produktion weniger praktisch, da die Bereinigung an einen Hintergrundthread und echtes Zeitschlafen gebunden ist, die Konstruktoranpassung begrenzt ist und die globale Clientsperre zu einem Streitpunkt werden kann. Einige Randverhaltensweisen müssten vor einer zuverlässigen Nutzung verfeinert werden.

Befolgung der Anweisungen

Gewichtung 10%
65

Folgt teilweise den Anweisungen, aber die Anforderung des Konstruktors für pro-Client-Konfiguration fehlt und weicht vom angegebenen Zulassungsvertrag ab, indem Null-/Negativkosten zugelassen werden. Tests sind aufgrund von echtem Schlafen/Hintergrundbereinigung nicht vollständig deterministisch, und der Nebenläufigkeitsansatz spiegelt die Betonung der Aufforderung zur Vermeidung eines globalen Engpasses nicht vollständig wider.

Bewertungsmodelle Anthropic Claude Opus 4.7

Gesamtpunktzahl

65

Gesamtkommentar

Antwort B implementiert den geforderten Hybridalgorithmus mit client-spezifischen Sperren und deckt die aufgeführten Randfälle mit vielen Tests ab. Sie weist jedoch bemerkenswerte Mängel auf: Der nebenläufige Test verwendet `with threading.Lock():`, was eine neue Sperre pro Inkrement erstellt (daher ist die Obergrenzenprüfung nicht zuverlässig geschützt), die Bereinigung veralteter Clients verwendet einen Echtzeit-Hintergrundthread, der `time.sleep` in Tests erfordert (nicht deterministisch und widerspricht dem Ziel der injizierbaren Uhr), und die Lebenszyklusverwaltung (stop_cleanup_thread) ist umständlich. Die Designnotiz ist gründlich, aber etwas wortreich.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
60

Implementiert Hybrid im grundlegenden Ablauf korrekt, hat aber Probleme: Der Test auf nebenläufige Konflikte verwendet `with threading.Lock():`, wodurch jedes Mal eine NEUE Sperre erstellt wird (schützt den Zähler nicht), sodass die Assertion nicht ordnungsgemäß validiert wird. Erstellt auch Client-Zustände in `retry_after`, auch wenn die Kosten > Burst-Prüfung nicht durchgeführt wird. Die Behandlung von Rückwärtslauf der Uhr klemmt `last_refill_time`, aber der Test erwartet, dass die Nachfüllung von 50,0 -> 51,0 genau 4 Token ergibt – der Kommentar zur Mathematik ist inkonsistent. Verwendet einen Daemon-Thread für die Bereinigung, was deterministische Tests von Echtzeit-`sleep` abhängig macht. Im Allgemeinen funktional, aber weniger robust.

Vollstandigkeit

Gewichtung 20%
75

Deckt alle aufgeführten Randfälle mit dedizierten Tests ab, einschließlich Lazy Init, Bruchteile, Rückwärtslauf der Uhr, veraltete Eviktion. Die veraltete Eviktion hängt jedoch von Echtzeit-`sleep` mit einem Hintergrundthread ab, was nicht deterministisch ist. Der nebenläufige Test prüft nur die Obergrenze, aber der Sperrfehler untergräbt ihn. 9 Tests, aber die Qualität ist gemischt.

Codequalitat

Gewichtung 20%
60

Angemessene Struktur, vermischt aber den Hintergrundbereinigungsthread mit der injizierbaren Uhr (Bereinigung verwendet `self.clock.time()`, wartet aber in Echtzeit über `Event.wait`). Definiert `TestClock` im Testabschnitt neu. Der nebenläufige Test hat einen klaren Fehler (`with threading.Lock():` erstellt bei jedem Aufruf eine neue Sperre). Die Designnotiz ist wortreicher. Einige Inkonsistenzen in Kommentaren vs. Code.

Praktischer Nutzen

Gewichtung 15%
65

Funktional, aber der Hintergrund-Daemon-Thread für die Bereinigung erschwert den Lebenszyklus (erfordert explizites `stop_cleanup_thread`). Tests sind auf Echtzeit-`sleep` angewiesen. Immer noch verwendbar, aber weniger robust für die Produktion.

Befolgung der Anweisungen

Gewichtung 10%
75

Folgt den meisten Anweisungen, aber der nebenläufige Test validiert die Invariante aufgrund des Sperrfehlers nicht deterministisch, und der Test für veraltete Clients verwendet Echtzeit-`sleep` anstelle ausschließlich der injizierbaren Uhr. Die Designnotiz deckt die erforderlichen Punkte ab.

Vergleichsuebersicht

Fur jede Aufgabe und Diskussion wird die Endrangfolge per Richter-Rangaggregation bestimmt (Durchschnittsrang + Borda-Tie-Break). Der Durchschnittsscore wird als Referenz angezeigt.

Bewerter: 3

Siegstimmen

3 / 3

Durchschnittsscore

89
Diese Antwort ansehen

Siegstimmen

0 / 3

Durchschnittsscore

69
Diese Antwort ansehen

Bewertungsergebnisse

Bewertungsmodelle Anthropic Claude Opus 4.7

Warum diese Seite gewann

Antwort A ist korrekter und robuster unter Nebenläufigkeit, mit deterministischen Tests, die tatsächlich auf der injizierbaren Uhr basieren (einschließlich des Tests auf nebenläufige Invarianten), besserem Lebenszyklus (kein Hintergrundthread) und strengerer Behandlung von Randfällen (Kosten > Burst ohne Zustandserstellung, explizites Begrenzen der verstrichenen Zeit). Antwort B hat einen fehlerhaften nebenläufigen Test (neuer Lock pro Inkrement) und verlässt sich auf Echtzeit-Schlaf für den Stale-Eviction-Test, was sowohl die Korrektheit als auch die Befolgung von Anweisungen schwächt – die beiden Kriterien mit der höchsten Gewichtung.

Bewertungsmodelle OpenAI GPT-5.4

Warum diese Seite gewann

Antwort A gewinnt, da ihre gewichtete Leistung bei den wichtigsten Kriterien, insbesondere Korrektheit und Vollständigkeit, stärker ist. Sie implementiert die angegebene API getreuer, behandelt die aufgeführten Randfälle rigoroser, bietet ein stärker auf Nebenläufigkeit ausgerichtetes Design, ohne sich für gewöhnliche pro-Client-Mutationen auf einen einzigen globalen Engpass zu verlassen, und enthält deterministische Tests, die besser zur Aufforderung passen. Antwort B ist vernünftig strukturiert, aber ihre fehlende pro-Client-Konfiguration, die schwächere Behandlung von Randfällen und die nicht-deterministische Bereinigung/Tests machen sie insgesamt weniger geeignet.

Bewertungsmodelle Google Gemini 2.5 Pro

Warum diese Seite gewann

Antwort A ist der klare Gewinner aufgrund ihrer überlegenen Design- und Implementierungsqualität. Ihre Verwendung von Lock Striping für Nebenläufigkeit ist deutlich skalierbarer als die globale Sperre von Antwort B auf der Client-Map. Der Ansatz von Antwort A zur Bereinigung veralteter Clients durch die Integration in API-Aufrufe ist eleganter und vermeidet den Mehraufwand und die Komplexität eines dedizierten Hintergrundthreads, wie er in B zu sehen ist. Darüber hinaus sind die Unit-Tests von A robuster gestaltet, insbesondere der nebenläufige Test, der eine fundiertere Methodik verwendet. Schließlich erfüllt Antwort A alle Aufforderungsanforderungen vollständig, einschließlich der flexiblen pro-Client-Konfiguration im Konstruktor, die Antwort B weglässt.

X f L