Respuesta A: OpenAI GPT-5.5
import math
import threading
import time
import unittest
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
Design note:
This limiter admits a request only when two independent checks agree:
1. a token bucket has at least cost tokens, allowing short bursts up to burst;
2. an exact sliding-window log has room for cost more admitted units in the last
window_seconds, where the window limit is rate * window_seconds.
If the token bucket says yes but the sliding window is full, the request is
denied and no tokens are spent; if the w...
Mostrar respuesta completa ▼
import math
import threading
import time
import unittest
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
Design note:
This limiter admits a request only when two independent checks agree:
1. a token bucket has at least cost tokens, allowing short bursts up to burst;
2. an exact sliding-window log has room for cost more admitted units in the last
window_seconds, where the window limit is rate * window_seconds.
If the token bucket says yes but the sliding window is full, the request is
denied and no tokens are spent; if the window has room but tokens are absent,
it is also denied. The exact log stores one entry per admitted request as
(timestamp, cost), not one entry per unit, so allow is amortized O(1): each log
entry is appended once and removed once. Memory is O(number of admitted requests
in the active window) per client. A weighted two-bucket counter would use O(1)
memory but can over- or under-admit near bucket boundaries; this implementation
chooses exactness because correctness is usually more important for abuse
prevention. The client map is lock-striped; each client state has its own lock.
A caller obtains the per-client lock before mutating tokens/window state, so
concurrent allow calls cannot double-spend tokens or lose window updates.
class MonotonicClock:
def now(self) -> float:
return time.monotonic()
class ManualClock:
def init(self, start: float = 0.0):
self._now = float(start)
self._lock = threading.Lock()
def now(self) -> float:
with self._lock:
return self._now
def set(self, value: float) -> None:
with self._lock:
self._now = float(value)
def advance(self, seconds: float) -> None:
with self._lock:
self._now += float(seconds)
@dataclass(frozen=True)
class ClientConfig:
rate: float
burst: float
window_seconds: float = 1.0
def validated(self) -> 'ClientConfig':
rate = float(self.rate)
burst = float(self.burst)
window = float(self.window_seconds)
if not math.isfinite(rate) or rate <= 0:
raise ValueError('rate must be a positive finite number')
if not math.isfinite(burst) or burst < 1:
raise ValueError('burst must be a finite number >= 1')
if not math.isfinite(window) or window <= 0:
raise ValueError('window_seconds must be a positive finite number')
return ClientConfig(rate, burst, window)
@property
def window_limit(self) -> float:
return self.rate * self.window_seconds
@dataclass
class _ClientState:
config: ClientConfig
tokens: float
last_refill: float
last_seen: float
events: deque = field(default_factory=deque)
window_sum: float = 0.0
lock: threading.Lock = field(default_factory=threading.Lock)
class HybridRateLimiter:
def init(
self,
rate: float,
burst: float,
window_seconds: float = 1.0,
per_client_configs=None,
clock=None,
stripes: int = 64,
idle_ttl_seconds: float | None = None,
cleanup_interval_seconds: float | None = None,
):
if stripes <= 0:
raise ValueError('stripes must be positive')
self._default_config = ClientConfig(rate, burst, window_seconds).validated()
self._configs = {}
if per_client_configs:
for client_id, cfg in per_client_configs.items():
self._configs[client_id] = self._coerce_config(cfg)
max_window = self._default_config.window_seconds
for cfg in self._configs.values():
max_window = max(max_window, cfg.window_seconds)
if idle_ttl_seconds is None:
idle_ttl_seconds = max(60.0, 2.0 * max_window)
idle_ttl_seconds = float(idle_ttl_seconds)
if not math.isfinite(idle_ttl_seconds) or idle_ttl_seconds <= 0:
raise ValueError('idle_ttl_seconds must be positive and finite')
if idle_ttl_seconds < max_window:
raise ValueError('idle_ttl_seconds must be >= the largest window_seconds')
self._idle_ttl = idle_ttl_seconds
if cleanup_interval_seconds is None:
cleanup_interval_seconds = min(60.0, max(1.0, idle_ttl_seconds / 2.0))
cleanup_interval_seconds = float(cleanup_interval_seconds)
if not math.isfinite(cleanup_interval_seconds) or cleanup_interval_seconds <= 0:
raise ValueError('cleanup_interval_seconds must be positive and finite')
self._cleanup_interval = cleanup_interval_seconds
self._clock = clock if clock is not None else MonotonicClock()
self._clients = {}
self._stripe_locks = [threading.Lock() for _ in range(stripes)]
self._cleanup_mutex = threading.Lock()
self._last_cleanup = self._safe_time()
self._eps = 1e-12
@staticmethod
def _coerce_config(cfg) -> ClientConfig:
if isinstance(cfg, ClientConfig):
return cfg.validated()
if isinstance(cfg, dict):
return ClientConfig(cfg['rate'], cfg['burst'], cfg.get('window_seconds', 1.0)).validated()
rate, burst, *rest = cfg
window = rest[0] if rest else 1.0
return ClientConfig(rate, burst, window).validated()
def _safe_time(self) -> float:
t = float(self._clock.now())
if math.isnan(t):
raise ValueError('clock returned NaN')
return t
def _config_for(self, client_id: str) -> ClientConfig:
return self._configs.get(client_id, self._default_config)
def _stripe_index(self, client_id: str) -> int:
return hash(client_id) % len(self._stripe_locks)
def _get_state_locked(self, client_id: str) -> _ClientState:
idx = self._stripe_index(client_id)
stripe = self._stripe_locks[idx]
stripe.acquire()
try:
state = self._clients.get(client_id)
if state is None:
cfg = self._config_for(client_id)
now = self._safe_time()
state = _ClientState(
config=cfg,
tokens=cfg.burst,
last_refill=now,
last_seen=now,
)
self._clients[client_id] = state
state.lock.acquire()
return state
finally:
stripe.release()
def _advance_and_prune_locked(self, state: _ClientState) -> float:
raw_now = self._safe_time()
now = max(raw_now, state.last_seen)
elapsed = max(0.0, now - state.last_refill)
max_useful_elapsed = state.config.burst / state.config.rate
elapsed_for_tokens = min(elapsed, max_useful_elapsed)
if elapsed_for_tokens > 0:
state.tokens = min(
state.config.burst,
state.tokens + elapsed_for_tokens * state.config.rate,
)
state.last_refill = now
state.last_seen = now
cutoff = now - state.config.window_seconds
while state.events and state.events[0][0] <= cutoff + self._eps:
_, old_cost = state.events.popleft()
state.window_sum -= old_cost
if state.window_sum < self._eps:
state.window_sum = 0.0
return now
def allow(self, client_id: str, cost: int = 1) -> bool:
if cost <= 0:
raise ValueError('cost must be a positive integer')
self._maybe_cleanup()
cfg = self._config_for(client_id)
if cost > cfg.burst:
return False
state = self._get_state_locked(client_id)
try:
now = self._advance_and_prune_locked(state)
enough_tokens = state.tokens + self._eps >= cost
enough_window = state.window_sum + cost <= state.config.window_limit + self._eps
if not (enough_tokens and enough_window):
return False
state.tokens -= cost
if state.tokens < self._eps:
state.tokens = 0.0
state.window_sum += cost
state.events.append((now, float(cost)))
return True
finally:
state.lock.release()
def retry_after(self, client_id: str) -> float:
self._maybe_cleanup()
state = self._get_state_locked(client_id)
try:
now = self._advance_and_prune_locked(state)
cfg = state.config
if cfg.burst < 1 or cfg.window_limit < 1:
return math.inf
token_wait = 0.0 if state.tokens + self._eps >= 1.0 else (1.0 - state.tokens) / cfg.rate
if state.window_sum + 1.0 <= cfg.window_limit + self._eps:
window_wait = 0.0
else:
window_wait = math.inf
remaining = state.window_sum
for ts, event_cost in state.events:
remaining -= event_cost
if remaining + 1.0 <= cfg.window_limit + self._eps:
window_wait = max(0.0, ts + cfg.window_seconds - now)
break
return max(token_wait, window_wait)
finally:
state.lock.release()
def _maybe_cleanup(self) -> None:
raw_now = self._safe_time()
if raw_now - self._last_cleanup < self._cleanup_interval:
return
if not self._cleanup_mutex.acquire(blocking=False):
return
try:
now = max(raw_now, self._last_cleanup)
if now - self._last_cleanup >= self._cleanup_interval:
self._cleanup_stale_at(now)
self._last_cleanup = now
finally:
self._cleanup_mutex.release()
def cleanup_stale(self) -> int:
with self._cleanup_mutex:
now = max(self._safe_time(), self._last_cleanup)
removed = self._cleanup_stale_at(now)
self._last_cleanup = now
return removed
def _cleanup_stale_at(self, now: float) -> int:
removed = 0
for idx, stripe in enumerate(self._stripe_locks):
with stripe:
for client_id, state in list(self._clients.items()):
if self._stripe_index(client_id) != idx:
continue
if not state.lock.acquire(blocking=False):
continue
try:
if self._clients.get(client_id) is state and now - state.last_seen >= self._idle_ttl:
del self._clients[client_id]
removed += 1
finally:
state.lock.release()
return removed
def client_count(self) -> int:
total = 0
for stripe in self._stripe_locks:
with stripe:
total += len(self._clients)
break
return len(self._clients)
class HybridRateLimiterTests(unittest.TestCase):
def test_basic_allow_deny_and_retry_after(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=1, burst=2, window_seconds=10, clock=clock)
self.assertTrue(limiter.allow('alice'))
self.assertTrue(limiter.allow('alice'))
self.assertFalse(limiter.allow('alice'))
self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9)
def test_burst_draining_and_fractional_refill(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=2, burst=3, window_seconds=10, clock=clock)
self.assertTrue(limiter.allow('alice'))
self.assertTrue(limiter.allow('alice'))
self.assertTrue(limiter.allow('alice'))
self.assertFalse(limiter.allow('alice'))
clock.advance(0.25)
self.assertFalse(limiter.allow('alice'))
self.assertAlmostEqual(limiter.retry_after('alice'), 0.25, places=9)
clock.advance(0.25)
self.assertTrue(limiter.allow('alice'))
def test_sliding_window_caps_even_when_bucket_refilled(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=10, burst=10, window_seconds=1, clock=clock)
for _ in range(10):
self.assertTrue(limiter.allow('abuser'))
clock.advance(0.5)
self.assertFalse(limiter.allow('abuser'))
self.assertAlmostEqual(limiter.retry_after('abuser'), 0.5, places=9)
clock.advance(0.5)
self.assertTrue(limiter.allow('abuser'))
def test_cost_larger_than_burst_is_rejected_without_state_creation(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=10, burst=5, window_seconds=10, clock=clock)
self.assertFalse(limiter.allow('alice', cost=6))
self.assertEqual(limiter.client_count(), 0)
self.assertTrue(limiter.allow('alice', cost=5))
self.assertFalse(limiter.allow('alice'))
def test_concurrent_contention_obeys_token_bucket_bound(self):
clock = ManualClock()
rate = 5
burst = 5
limiter = HybridRateLimiter(rate=rate, burst=burst, window_seconds=100, clock=clock, stripes=8)
successes = 0
for tick in range(11):
clock.set(tick / 10.0)
with ThreadPoolExecutor(max_workers=32) as pool:
results = list(pool.map(lambda _: limiter.allow('hot'), range(32)))
successes += sum(1 for r in results if r)
bound = rate * 1.0 + burst
self.assertLessEqual(successes, bound)
def test_stale_client_eviction(self):
clock = ManualClock()
limiter = HybridRateLimiter(
rate=10,
burst=10,
window_seconds=1,
clock=clock,
idle_ttl_seconds=5,
cleanup_interval_seconds=1,
)
self.assertTrue(limiter.allow('old-client'))
self.assertEqual(limiter.client_count(), 1)
clock.advance(6)
self.assertTrue(limiter.allow('new-client'))
self.assertEqual(limiter.client_count(), 1)
def test_clock_going_backwards_is_clamped(self):
clock = ManualClock(10)
limiter = HybridRateLimiter(rate=1, burst=1, window_seconds=10, clock=clock)
self.assertTrue(limiter.allow('alice'))
clock.set(5)
self.assertFalse(limiter.allow('alice'))
self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9)
def test_large_pause_refills_only_to_burst(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=1, burst=3, window_seconds=1000, clock=clock)
for _ in range(3):
self.assertTrue(limiter.allow('alice'))
clock.advance(10 ** 9)
for _ in range(3):
self.assertTrue(limiter.allow('alice'))
self.assertFalse(limiter.allow('alice'))
if name == 'main':
unittest.main()
Resultado
Votos ganadores
3 / 3
Puntuacion media
Puntuacion total
Comentario general
La Respuesta A proporciona una implementación sofisticada, robusta y de calidad de producción del limitador de tasa híbrido. Sus puntos fuertes clave son el modelo de concurrencia altamente escalable que utiliza el bloqueo por franjas (lock striping) combinado con bloqueos por cliente, un elegante mecanismo de limpieza que evita un hilo en segundo plano y un conjunto de pruebas completo y bien diseñado que valida correctamente comportamientos complejos como la contención concurrente. El código está limpio, bien estructurado y cumple plenamente con todos los requisitos del prompt, incluidas funciones avanzadas como la configuración por cliente.
Ver detalle de evaluacion ▼
Correccion
Peso 35%La implementación es altamente correcta y robusta. El modelo de concurrencia que utiliza el bloqueo por franjas y los bloqueos por cliente es una forma sofisticada y correcta de evitar cuellos de botella. Todos los casos extremos, incluida la seguridad del reloj y los reabastecimientos fraccionarios, se manejan correctamente. La lógica tanto para el cubo de tokens como para la ventana deslizante es sólida.
Integridad
Peso 20%Esta respuesta es excepcionalmente completa. Implementa todos los métodos de API requeridos, maneja todos los casos extremos especificados, proporciona un conjunto de pruebas completo con más del número requerido de pruebas e incluye una nota de diseño clara. También implementa la configuración sugerida por cliente, lo que demuestra una comprensión completa del prompt.
Calidad del codigo
Peso 20%La calidad del código es excelente. Utiliza características modernas de Python como las clases de datos (dataclasses) de manera efectiva, está bien estructurado en métodos lógicos y es muy legible. Las decisiones de diseño, como el mecanismo de limpieza integrado y el bloqueo por franjas, son elegantes y demuestran un alto nivel de habilidad de ingeniería.
Valor practico
Peso 15%La implementación es de calidad de grado de producción y muy práctica. El modelo de concurrencia escalable la hace adecuada para sistemas de alto rendimiento. La configuración flexible y el manejo robusto de los casos extremos significan que podría implementarse con confianza en una aplicación del mundo real.
Seguimiento de instrucciones
Peso 10%La respuesta sigue todas las instrucciones a la perfección. Proporciona código en un idioma solicitado, implementa la API especificada, el algoritmo y la estrategia de concurrencia, maneja todos los casos extremos, incluye las pruebas requeridas y el análisis de complejidad, y proporciona una nota de diseño dentro de las restricciones especificadas.
Puntuacion total
Comentario general
La respuesta A es una implementación sólida y mayormente completa que coincide directamente con la API y la semántica solicitadas. Combina un cubo de tokens con un registro de ventana deslizante exacto, utiliza un reloj monótono inyectable, evita un cuello de botella global único a través de bloqueos de mapas rayados más bloqueos por cliente, e incluye pruebas deterministas sólidas que cubren los casos de borde requeridos. La nota de diseño es concisa y aborda las compensaciones, la corrección de la concurrencia y la complejidad. Las debilidades menores incluyen una implementación de client_count algo torpe y retry_after que solo considera la disponibilidad de 1 unidad en lugar de un costo arbitrario, aunque esto coincide con la API solicitada.
Ver detalle de evaluacion ▼
Correccion
Peso 35%Implementa correctamente el híbrido requerido: la solicitud se aprueba solo si tanto el cubo de tokens como la ventana deslizante exacta lo permiten, con recarga de tokens adecuada, inicialización perezosa, limpieza de obsoletos y mutación serializada por cliente que evita el doble gasto. retry_after es coherente para la disponibilidad de 1 unidad, y las pruebas cubren invariantes importantes. Las preocupaciones menores son pequeñas peculiaridades de implementación como la lectura de client_count del mapa sin sincronización completa y algunas elecciones de límites en el desalojo de la ventana.
Integridad
Peso 20%Cubre casi todos los elementos solicitados: superficie de API requerida, algoritmo híbrido, reloj monótono inyectable, soporte de configuración por cliente, casos de borde explícitos, limpieza de clientes obsoletos, temporización fraccionaria, discusión de concurrencia, nota de complejidad y 8 pruebas significativas que incluyen contención concurrente determinista y desalojo. Esto está muy cerca de satisfacer completamente la solicitud.
Calidad del codigo
Peso 20%Bien estructurado e idiomático, con dataclasses, ayudantes separados, validación explícita y nombres claros. La estrategia de bloqueo está organizada de manera reflexiva y la nota de diseño es concisa. Quedan algunos bordes ásperos, como la inusual implementación de client_count y alguna gestión de bloqueo de bajo nivel que podría simplificarse.
Valor practico
Peso 15%Práctico para uso en backend: inyección de reloj determinista, limpieza de estado obsoleto acotada, rayado de bloqueos más bloqueos por cliente para contención, contabilidad de ventana exacta y pruebas sólidas lo hacen desplegable o adaptable. La implementación es realista y aborda casos de borde comunes de producción como pausas largas y suspensión de VM.
Seguimiento de instrucciones
Peso 10%Sigue la solicitud de cerca: la elección del idioma es válida, los nombres de la API coinciden, la nota de diseño está dentro del alcance, las pruebas utilizan el reloj inyectable y se abordan explícitamente los casos de borde solicitados. La respuesta se alinea bien tanto con los requisitos técnicos como de documentación.
Puntuacion total
Comentario general
La respuesta A es un limitador de tasa híbrido pulido y de calidad de producción. Utiliza dataclasses, bloqueos de stripe y bloqueos por cliente, integra la limpieza de elementos obsoletos en las llamadas allow (evitando problemas de hilos en segundo plano), valida la configuración, ajusta adecuadamente el tiempo transcurrido tanto para relojes inversos como para pausas largas, y rechaza cost>burst sin crear estado de cliente. Las 8 pruebas utilizan el ManualClock inyectable de forma determinista, incluida una prueba de contención concurrente que afirma genuinamente el invariante rate*T+burst. La nota de diseño es concisa y aborda todos los puntos requeridos.
Ver detalle de evaluacion ▼
Correccion
Peso 35%Implementa correctamente el cubo de tokens híbrido + registro de ventana deslizante exacto. Maneja el reloj inverso ajustando mediante max(raw_now, last_seen), rechaza cost>burst sin crear estado, ajusta el tiempo transcurrido para los tokens para evitar desbordamientos, utiliza bloqueos por cliente con bloqueos de stripe para el mapa. retry_after calcula correctamente el tiempo de espera basándose en la expiración del registro de la ventana. El manejo de eps y la contabilidad de eventos parecen sólidos.
Integridad
Peso 20%Cubre todos los casos extremos requeridos: cost>burst (sin creación de estado), reloj inverso (prueba incluida), pausas largas (prueba incluida), inicialización diferida, eliminación de elementos obsoletos (determinista a través de _maybe_cleanup vinculado al reloj inyectado), tokens fraccionarios. 8 pruebas, incluida la prueba de límite concurrente que afirma el invariante rate*T+burst de forma determinista utilizando ManualClock.
Calidad del codigo
Peso 20%Bien estructurado con dataclasses, separación de MonotonicClock/ManualClock, validación de ClientConfig, bloqueos por cliente y de stripe claramente explicados. La limpieza se integra en la ruta de allow (sin complicaciones de hilos en segundo plano). Nota de diseño concisa en la parte superior.
Valor practico
Peso 15%Calidad de producción: los bloqueos de stripe escalan mejor, la limpieza diferida integrada evita el ciclo de vida del hilo en segundo plano, soporte de pruebas deterministas, validación de configuración. Adecuado para incrustar en un servicio real.
Seguimiento de instrucciones
Peso 10%Sigue todas las instrucciones: la superficie de la API coincide con las firmas, algoritmo híbrido, bloqueos por cliente con bloqueos de stripe, reloj monotónico inyectable, todos los casos extremos, 8 pruebas, incluida la prueba de invariante concurrente determinista, nota de diseño con semántica y complejidad declaradas.