Antwort A: OpenAI GPT-5.5
import math
import threading
import time
import unittest
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
Design note:
This limiter admits a request only when two independent checks agree:
1. a token bucket has at least cost tokens, allowing short bursts up to burst;
2. an exact sliding-window log has room for cost more admitted units in the last
window_seconds, where the window limit is rate * window_seconds.
If the token bucket says yes but the sliding window is full, the request is
denied and no tokens are spent; if the w...
Vollstandige Antwort anzeigen ▼
import math
import threading
import time
import unittest
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
Design note:
This limiter admits a request only when two independent checks agree:
1. a token bucket has at least cost tokens, allowing short bursts up to burst;
2. an exact sliding-window log has room for cost more admitted units in the last
window_seconds, where the window limit is rate * window_seconds.
If the token bucket says yes but the sliding window is full, the request is
denied and no tokens are spent; if the window has room but tokens are absent,
it is also denied. The exact log stores one entry per admitted request as
(timestamp, cost), not one entry per unit, so allow is amortized O(1): each log
entry is appended once and removed once. Memory is O(number of admitted requests
in the active window) per client. A weighted two-bucket counter would use O(1)
memory but can over- or under-admit near bucket boundaries; this implementation
chooses exactness because correctness is usually more important for abuse
prevention. The client map is lock-striped; each client state has its own lock.
A caller obtains the per-client lock before mutating tokens/window state, so
concurrent allow calls cannot double-spend tokens or lose window updates.
class MonotonicClock:
def now(self) -> float:
return time.monotonic()
class ManualClock:
def init(self, start: float = 0.0):
self._now = float(start)
self._lock = threading.Lock()
def now(self) -> float:
with self._lock:
return self._now
def set(self, value: float) -> None:
with self._lock:
self._now = float(value)
def advance(self, seconds: float) -> None:
with self._lock:
self._now += float(seconds)
@dataclass(frozen=True)
class ClientConfig:
rate: float
burst: float
window_seconds: float = 1.0
def validated(self) -> 'ClientConfig':
rate = float(self.rate)
burst = float(self.burst)
window = float(self.window_seconds)
if not math.isfinite(rate) or rate <= 0:
raise ValueError('rate must be a positive finite number')
if not math.isfinite(burst) or burst < 1:
raise ValueError('burst must be a finite number >= 1')
if not math.isfinite(window) or window <= 0:
raise ValueError('window_seconds must be a positive finite number')
return ClientConfig(rate, burst, window)
@property
def window_limit(self) -> float:
return self.rate * self.window_seconds
@dataclass
class _ClientState:
config: ClientConfig
tokens: float
last_refill: float
last_seen: float
events: deque = field(default_factory=deque)
window_sum: float = 0.0
lock: threading.Lock = field(default_factory=threading.Lock)
class HybridRateLimiter:
def init(
self,
rate: float,
burst: float,
window_seconds: float = 1.0,
per_client_configs=None,
clock=None,
stripes: int = 64,
idle_ttl_seconds: float | None = None,
cleanup_interval_seconds: float | None = None,
):
if stripes <= 0:
raise ValueError('stripes must be positive')
self._default_config = ClientConfig(rate, burst, window_seconds).validated()
self._configs = {}
if per_client_configs:
for client_id, cfg in per_client_configs.items():
self._configs[client_id] = self._coerce_config(cfg)
max_window = self._default_config.window_seconds
for cfg in self._configs.values():
max_window = max(max_window, cfg.window_seconds)
if idle_ttl_seconds is None:
idle_ttl_seconds = max(60.0, 2.0 * max_window)
idle_ttl_seconds = float(idle_ttl_seconds)
if not math.isfinite(idle_ttl_seconds) or idle_ttl_seconds <= 0:
raise ValueError('idle_ttl_seconds must be positive and finite')
if idle_ttl_seconds < max_window:
raise ValueError('idle_ttl_seconds must be >= the largest window_seconds')
self._idle_ttl = idle_ttl_seconds
if cleanup_interval_seconds is None:
cleanup_interval_seconds = min(60.0, max(1.0, idle_ttl_seconds / 2.0))
cleanup_interval_seconds = float(cleanup_interval_seconds)
if not math.isfinite(cleanup_interval_seconds) or cleanup_interval_seconds <= 0:
raise ValueError('cleanup_interval_seconds must be positive and finite')
self._cleanup_interval = cleanup_interval_seconds
self._clock = clock if clock is not None else MonotonicClock()
self._clients = {}
self._stripe_locks = [threading.Lock() for _ in range(stripes)]
self._cleanup_mutex = threading.Lock()
self._last_cleanup = self._safe_time()
self._eps = 1e-12
@staticmethod
def _coerce_config(cfg) -> ClientConfig:
if isinstance(cfg, ClientConfig):
return cfg.validated()
if isinstance(cfg, dict):
return ClientConfig(cfg['rate'], cfg['burst'], cfg.get('window_seconds', 1.0)).validated()
rate, burst, *rest = cfg
window = rest[0] if rest else 1.0
return ClientConfig(rate, burst, window).validated()
def _safe_time(self) -> float:
t = float(self._clock.now())
if math.isnan(t):
raise ValueError('clock returned NaN')
return t
def _config_for(self, client_id: str) -> ClientConfig:
return self._configs.get(client_id, self._default_config)
def _stripe_index(self, client_id: str) -> int:
return hash(client_id) % len(self._stripe_locks)
def _get_state_locked(self, client_id: str) -> _ClientState:
idx = self._stripe_index(client_id)
stripe = self._stripe_locks[idx]
stripe.acquire()
try:
state = self._clients.get(client_id)
if state is None:
cfg = self._config_for(client_id)
now = self._safe_time()
state = _ClientState(
config=cfg,
tokens=cfg.burst,
last_refill=now,
last_seen=now,
)
self._clients[client_id] = state
state.lock.acquire()
return state
finally:
stripe.release()
def _advance_and_prune_locked(self, state: _ClientState) -> float:
raw_now = self._safe_time()
now = max(raw_now, state.last_seen)
elapsed = max(0.0, now - state.last_refill)
max_useful_elapsed = state.config.burst / state.config.rate
elapsed_for_tokens = min(elapsed, max_useful_elapsed)
if elapsed_for_tokens > 0:
state.tokens = min(
state.config.burst,
state.tokens + elapsed_for_tokens * state.config.rate,
)
state.last_refill = now
state.last_seen = now
cutoff = now - state.config.window_seconds
while state.events and state.events[0][0] <= cutoff + self._eps:
_, old_cost = state.events.popleft()
state.window_sum -= old_cost
if state.window_sum < self._eps:
state.window_sum = 0.0
return now
def allow(self, client_id: str, cost: int = 1) -> bool:
if cost <= 0:
raise ValueError('cost must be a positive integer')
self._maybe_cleanup()
cfg = self._config_for(client_id)
if cost > cfg.burst:
return False
state = self._get_state_locked(client_id)
try:
now = self._advance_and_prune_locked(state)
enough_tokens = state.tokens + self._eps >= cost
enough_window = state.window_sum + cost <= state.config.window_limit + self._eps
if not (enough_tokens and enough_window):
return False
state.tokens -= cost
if state.tokens < self._eps:
state.tokens = 0.0
state.window_sum += cost
state.events.append((now, float(cost)))
return True
finally:
state.lock.release()
def retry_after(self, client_id: str) -> float:
self._maybe_cleanup()
state = self._get_state_locked(client_id)
try:
now = self._advance_and_prune_locked(state)
cfg = state.config
if cfg.burst < 1 or cfg.window_limit < 1:
return math.inf
token_wait = 0.0 if state.tokens + self._eps >= 1.0 else (1.0 - state.tokens) / cfg.rate
if state.window_sum + 1.0 <= cfg.window_limit + self._eps:
window_wait = 0.0
else:
window_wait = math.inf
remaining = state.window_sum
for ts, event_cost in state.events:
remaining -= event_cost
if remaining + 1.0 <= cfg.window_limit + self._eps:
window_wait = max(0.0, ts + cfg.window_seconds - now)
break
return max(token_wait, window_wait)
finally:
state.lock.release()
def _maybe_cleanup(self) -> None:
raw_now = self._safe_time()
if raw_now - self._last_cleanup < self._cleanup_interval:
return
if not self._cleanup_mutex.acquire(blocking=False):
return
try:
now = max(raw_now, self._last_cleanup)
if now - self._last_cleanup >= self._cleanup_interval:
self._cleanup_stale_at(now)
self._last_cleanup = now
finally:
self._cleanup_mutex.release()
def cleanup_stale(self) -> int:
with self._cleanup_mutex:
now = max(self._safe_time(), self._last_cleanup)
removed = self._cleanup_stale_at(now)
self._last_cleanup = now
return removed
def _cleanup_stale_at(self, now: float) -> int:
removed = 0
for idx, stripe in enumerate(self._stripe_locks):
with stripe:
for client_id, state in list(self._clients.items()):
if self._stripe_index(client_id) != idx:
continue
if not state.lock.acquire(blocking=False):
continue
try:
if self._clients.get(client_id) is state and now - state.last_seen >= self._idle_ttl:
del self._clients[client_id]
removed += 1
finally:
state.lock.release()
return removed
def client_count(self) -> int:
total = 0
for stripe in self._stripe_locks:
with stripe:
total += len(self._clients)
break
return len(self._clients)
class HybridRateLimiterTests(unittest.TestCase):
def test_basic_allow_deny_and_retry_after(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=1, burst=2, window_seconds=10, clock=clock)
self.assertTrue(limiter.allow('alice'))
self.assertTrue(limiter.allow('alice'))
self.assertFalse(limiter.allow('alice'))
self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9)
def test_burst_draining_and_fractional_refill(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=2, burst=3, window_seconds=10, clock=clock)
self.assertTrue(limiter.allow('alice'))
self.assertTrue(limiter.allow('alice'))
self.assertTrue(limiter.allow('alice'))
self.assertFalse(limiter.allow('alice'))
clock.advance(0.25)
self.assertFalse(limiter.allow('alice'))
self.assertAlmostEqual(limiter.retry_after('alice'), 0.25, places=9)
clock.advance(0.25)
self.assertTrue(limiter.allow('alice'))
def test_sliding_window_caps_even_when_bucket_refilled(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=10, burst=10, window_seconds=1, clock=clock)
for _ in range(10):
self.assertTrue(limiter.allow('abuser'))
clock.advance(0.5)
self.assertFalse(limiter.allow('abuser'))
self.assertAlmostEqual(limiter.retry_after('abuser'), 0.5, places=9)
clock.advance(0.5)
self.assertTrue(limiter.allow('abuser'))
def test_cost_larger_than_burst_is_rejected_without_state_creation(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=10, burst=5, window_seconds=10, clock=clock)
self.assertFalse(limiter.allow('alice', cost=6))
self.assertEqual(limiter.client_count(), 0)
self.assertTrue(limiter.allow('alice', cost=5))
self.assertFalse(limiter.allow('alice'))
def test_concurrent_contention_obeys_token_bucket_bound(self):
clock = ManualClock()
rate = 5
burst = 5
limiter = HybridRateLimiter(rate=rate, burst=burst, window_seconds=100, clock=clock, stripes=8)
successes = 0
for tick in range(11):
clock.set(tick / 10.0)
with ThreadPoolExecutor(max_workers=32) as pool:
results = list(pool.map(lambda _: limiter.allow('hot'), range(32)))
successes += sum(1 for r in results if r)
bound = rate * 1.0 + burst
self.assertLessEqual(successes, bound)
def test_stale_client_eviction(self):
clock = ManualClock()
limiter = HybridRateLimiter(
rate=10,
burst=10,
window_seconds=1,
clock=clock,
idle_ttl_seconds=5,
cleanup_interval_seconds=1,
)
self.assertTrue(limiter.allow('old-client'))
self.assertEqual(limiter.client_count(), 1)
clock.advance(6)
self.assertTrue(limiter.allow('new-client'))
self.assertEqual(limiter.client_count(), 1)
def test_clock_going_backwards_is_clamped(self):
clock = ManualClock(10)
limiter = HybridRateLimiter(rate=1, burst=1, window_seconds=10, clock=clock)
self.assertTrue(limiter.allow('alice'))
clock.set(5)
self.assertFalse(limiter.allow('alice'))
self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9)
def test_large_pause_refills_only_to_burst(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=1, burst=3, window_seconds=1000, clock=clock)
for _ in range(3):
self.assertTrue(limiter.allow('alice'))
clock.advance(10 ** 9)
for _ in range(3):
self.assertTrue(limiter.allow('alice'))
self.assertFalse(limiter.allow('alice'))
if name == 'main':
unittest.main()
Ergebnis
Siegstimmen
3 / 3
Durchschnittsscore
Gesamtpunktzahl
Gesamtkommentar
Antwort A bietet eine produktionsreife, hochentwickelte und robuste Implementierung des hybriden Ratenbegrenzers. Ihre wichtigsten Stärken sind das hochgradig skalierbare Concurrency-Modell mit Lock-Striping in Kombination mit Per-Client-Locks, ein eleganter Bereinigungsmechanismus, der einen Hintergrund-Thread vermeidet, und eine umfassende, gut durchdachte Testsuite, die komplexe Verhaltensweisen wie gleichzeitige Konflikte korrekt validiert. Der Code ist sauber, gut strukturiert und entspricht vollständig allen Anforderungen der Aufgabenstellung, einschließlich erweiterter Funktionen wie Per-Client-Konfigurationen.
Bewertungsdetails anzeigen ▼
Korrektheit
Gewichtung 35%Die Implementierung ist hochgradig korrekt und robust. Das Concurrency-Modell mit Lock-Striping und Per-Client-Locks ist eine ausgeklügelte und korrekte Methode zur Vermeidung von Engpässen. Alle Edge Cases, einschließlich Clock-Sicherheit und fraktionale Refills, werden korrekt behandelt. Die Logik sowohl für den Token-Bucket als auch für das Sliding Window ist solide.
Vollstandigkeit
Gewichtung 20%Diese Antwort ist außergewöhnlich vollständig. Sie implementiert alle erforderlichen API-Methoden, behandelt jeden spezifizierten Edge Case, bietet eine umfassende Testsuite mit mehr als der geforderten Anzahl von Tests und enthält eine klare Designnotiz. Sie implementiert auch die vorgeschlagene Per-Client-Konfiguration, was ein vollständiges Verständnis der Aufgabenstellung zeigt.
Codequalitat
Gewichtung 20%Die Codequalität ist ausgezeichnet. Sie verwendet moderne Python-Funktionen wie Dataclasses effektiv, ist gut in logische Methoden gegliedert und sehr gut lesbar. Die Designentscheidungen, wie der integrierte Bereinigungsmechanismus und das Lock-Striping, sind elegant und zeugen von einem hohen Ingenieurskönnen.
Praktischer Nutzen
Gewichtung 15%Die Implementierung ist von produktionsreifer Qualität und sehr praktisch. Das skalierbare Concurrency-Modell macht sie für Systeme mit hohem Durchsatz geeignet. Die flexible Konfiguration und die robuste Behandlung von Edge Cases bedeuten, dass sie mit Zuversicht in einer realen Anwendung eingesetzt werden könnte.
Befolgung der Anweisungen
Gewichtung 10%Die Antwort folgt allen Anweisungen perfekt. Sie liefert Code in der angeforderten Sprache, implementiert die spezifizierte API, den Algorithmus und die Concurrency-Strategie, behandelt alle Edge Cases, enthält die erforderlichen Tests und Komplexitätsanalysen und liefert eine Designnotiz innerhalb der vorgegebenen Einschränkungen.
Gesamtpunktzahl
Gesamtkommentar
Antwort A ist eine starke, weitgehend vollständige Implementierung, die direkt der angeforderten API und Semantik entspricht. Sie kombiniert einen Token-Eimer mit einem exakten Sliding-Window-Log, verwendet eine injizierbare monotone Uhr, vermeidet einen einzigen globalen Engpass durch gestreifte Map-Sperren plus Sperren pro Client und enthält solide deterministische Tests, die die erforderlichen Randfälle abdecken. Die Design-Notiz ist prägnant und behandelt Kompromisse, Korrektheit bei Nebenläufigkeit und Komplexität. Kleinere Schwächen sind eine etwas umständliche Implementierung von client_count und retry_after, die nur die Verfügbarkeit von 1 Einheit und nicht die willkürliche Kosten berücksichtigt, obwohl dies der angeforderten API entspricht.
Bewertungsdetails anzeigen ▼
Korrektheit
Gewichtung 35%Implementiert den erforderlichen Hybrid korrekt: Die Anfrage wird nur genehmigt, wenn sowohl der Token-Eimer als auch das exakte Sliding Window dies zulassen, mit ordnungsgemäßer Begrenzung der Token-Nachfüllung, Lazy Init, Bereinigung veralteter Einträge und serialisierter Mutation pro Client, die Double-Spend verhindert. retry_after ist kohärent für die Verfügbarkeit von 1 Einheit, und Tests decken wichtige Invarianten ab. Kleinere Bedenken sind kleine Implementierungsquirks wie client_count, das die Map ohne vollständige Synchronisation liest, und einige Grenzentscheidungen bei der Window-Eviktion.
Vollstandigkeit
Gewichtung 20%Deckt fast alle angeforderten Elemente ab: erforderliche API-Oberfläche, Hybrid-Algorithmus, injizierbare monotone Uhr, Unterstützung für Konfiguration pro Client, explizite Randfälle, Bereinigung veralteter Clients, fraktionale Zeitmessung, Diskussion über Nebenläufigkeit, Komplexitätsnotiz und 8 aussagekräftige Tests, einschließlich deterministischem gleichzeitigem Konflikt und Eviktion. Dies kommt der vollständigen Erfüllung der Aufforderung sehr nahe.
Codequalitat
Gewichtung 20%Gut strukturiert und idiomatisch, mit Datenklassen, getrennten Helfern, expliziter Validierung und klarer Benennung. Die Sperrstrategie ist durchdacht organisiert und die Design-Notiz ist prägnant. Einige Kanten sind noch rau, wie die ungewöhnliche client_count-Implementierung und einige Low-Level-Sperrverwaltung, die vereinfacht werden könnte.
Praktischer Nutzen
Gewichtung 15%Praktisch für den Backend-Einsatz: deterministische Uhreninjektion, begrenzte Bereinigung von veralteten Zuständen, Lock-Striping plus Sperren pro Client für Konflikte, exakte Fensterabrechnung und solide Tests machen es einsetzbar oder anpassbar. Die Implementierung ist realistisch und behandelt gängige Produktions-Randfälle wie große Pausen und VM-Suspension.
Befolgung der Anweisungen
Gewichtung 10%Folgt der Aufforderung genau: Die Sprachwahl ist gültig, die API-Namen stimmen überein, die Design-Notiz liegt im Rahmen, die Tests verwenden die injizierbare Uhr und die angeforderten Randfälle werden explizit behandelt. Die Antwort stimmt gut mit den technischen und dokumentarischen Anforderungen überein.
Gesamtpunktzahl
Gesamtkommentar
Antwort A ist ein ausgereifter, produktionsreifer Hybrid-Ratenbegrenzer. Er verwendet Datenklassen, Stripe-Sperren plus pro-Client-Sperren, integriert die Bereinigung veralteter Einträge in Allow-Aufrufe (vermeidet Probleme mit Hintergrund-Threads), validiert die Konfiguration, klemmt die verstrichene Zeit für Rückwärtsuhren und große Pausen ordnungsgemäß ab und lehnt Kosten>Burst ab, ohne Client-Status zu erstellen. Die 8 Tests verwenden die injizierbare ManualClock deterministisch, einschließlich eines gleichzeitigen Konflikttests, der die Invariante Rate*T+Burst tatsächlich überprüft. Die Design-Notiz ist prägnant und behandelt alle erforderlichen Punkte.
Bewertungsdetails anzeigen ▼
Korrektheit
Gewichtung 35%Implementiert korrekt den Hybrid-Token-Bucket + exaktes Sliding-Window-Log. Behandelt Rückwärtsuhren durch Begrenzung mittels max(raw_now, last_seen), lehnt Kosten>Burst ab, ohne Zustand zu erstellen, begrenzt die verstrichene Zeit für Tokens, um Überläufe zu vermeiden, verwendet pro-Client-Sperren mit Stripe-Sperren für die Map. retry_after berechnet korrekt die Wartezeit basierend auf dem Ablauf des Window-Logs. Die EPS-Behandlung und die Ereignisbuchhaltung scheinen solide zu sein.
Vollstandigkeit
Gewichtung 20%Behandelt alle erforderlichen Randfälle: Kosten>Burst (ohne Zustandsbildung), Rückwärtsuhr (Test enthalten), große Pausen (Test enthalten), Lazy-Initialisierung, veraltete Bereinigung (deterministisch über _maybe_cleanup, das an die injizierte Uhr gebunden ist), Bruchteile von Tokens. 8 Tests, einschließlich eines gleichzeitigen Grenzwerttests, der die Invariante Rate*T+Burst deterministisch mit ManualClock überprüft.
Codequalitat
Gewichtung 20%Gut strukturiert mit Datenklassen, Trennung von MonotonicClock/ManualClock, Validierung von ClientConfig, klar erklärten pro-Client- und Stripe-Sperren. Die Bereinigung ist in den Allow-Pfad integriert (keine Komplikationen mit Hintergrund-Threads). Prägnante Design-Notiz oben.
Praktischer Nutzen
Gewichtung 15%Produktionsqualität: Stripe-Sperren skalieren besser, integrierte Lazy-Bereinigung vermeidet Lebenszyklusprobleme mit Hintergrund-Threads, deterministische Testunterstützung, Konfigurationsvalidierung. Geeignet für die Einbettung in einen echten Dienst.
Befolgung der Anweisungen
Gewichtung 10%Folgt allen Anweisungen: API-Oberfläche entspricht Signaturen, Hybrid-Algorithmus, pro-Client-Sperren mit Stripe-Sperren, injizierbare Monoton-Uhr, alle Randfälle, 8 Tests einschließlich deterministischem gleichzeitigen Invariantentest, Design-Notiz mit Semantik und angegebener Komplexität.