Resposta A: OpenAI GPT-5.5
import math
import threading
import time
import unittest
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
Design note:
This limiter admits a request only when two independent checks agree:
1. a token bucket has at least cost tokens, allowing short bursts up to burst;
2. an exact sliding-window log has room for cost more admitted units in the last
window_seconds, where the window limit is rate * window_seconds.
If the token bucket says yes but the sliding window is full, the request is
denied and no tokens are spent; if the w...
Mostrar resposta completa ▼
import math
import threading
import time
import unittest
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
Design note:
This limiter admits a request only when two independent checks agree:
1. a token bucket has at least cost tokens, allowing short bursts up to burst;
2. an exact sliding-window log has room for cost more admitted units in the last
window_seconds, where the window limit is rate * window_seconds.
If the token bucket says yes but the sliding window is full, the request is
denied and no tokens are spent; if the window has room but tokens are absent,
it is also denied. The exact log stores one entry per admitted request as
(timestamp, cost), not one entry per unit, so allow is amortized O(1): each log
entry is appended once and removed once. Memory is O(number of admitted requests
in the active window) per client. A weighted two-bucket counter would use O(1)
memory but can over- or under-admit near bucket boundaries; this implementation
chooses exactness because correctness is usually more important for abuse
prevention. The client map is lock-striped; each client state has its own lock.
A caller obtains the per-client lock before mutating tokens/window state, so
concurrent allow calls cannot double-spend tokens or lose window updates.
class MonotonicClock:
def now(self) -> float:
return time.monotonic()
class ManualClock:
def init(self, start: float = 0.0):
self._now = float(start)
self._lock = threading.Lock()
def now(self) -> float:
with self._lock:
return self._now
def set(self, value: float) -> None:
with self._lock:
self._now = float(value)
def advance(self, seconds: float) -> None:
with self._lock:
self._now += float(seconds)
@dataclass(frozen=True)
class ClientConfig:
rate: float
burst: float
window_seconds: float = 1.0
def validated(self) -> 'ClientConfig':
rate = float(self.rate)
burst = float(self.burst)
window = float(self.window_seconds)
if not math.isfinite(rate) or rate <= 0:
raise ValueError('rate must be a positive finite number')
if not math.isfinite(burst) or burst < 1:
raise ValueError('burst must be a finite number >= 1')
if not math.isfinite(window) or window <= 0:
raise ValueError('window_seconds must be a positive finite number')
return ClientConfig(rate, burst, window)
@property
def window_limit(self) -> float:
return self.rate * self.window_seconds
@dataclass
class _ClientState:
config: ClientConfig
tokens: float
last_refill: float
last_seen: float
events: deque = field(default_factory=deque)
window_sum: float = 0.0
lock: threading.Lock = field(default_factory=threading.Lock)
class HybridRateLimiter:
def init(
self,
rate: float,
burst: float,
window_seconds: float = 1.0,
per_client_configs=None,
clock=None,
stripes: int = 64,
idle_ttl_seconds: float | None = None,
cleanup_interval_seconds: float | None = None,
):
if stripes <= 0:
raise ValueError('stripes must be positive')
self._default_config = ClientConfig(rate, burst, window_seconds).validated()
self._configs = {}
if per_client_configs:
for client_id, cfg in per_client_configs.items():
self._configs[client_id] = self._coerce_config(cfg)
max_window = self._default_config.window_seconds
for cfg in self._configs.values():
max_window = max(max_window, cfg.window_seconds)
if idle_ttl_seconds is None:
idle_ttl_seconds = max(60.0, 2.0 * max_window)
idle_ttl_seconds = float(idle_ttl_seconds)
if not math.isfinite(idle_ttl_seconds) or idle_ttl_seconds <= 0:
raise ValueError('idle_ttl_seconds must be positive and finite')
if idle_ttl_seconds < max_window:
raise ValueError('idle_ttl_seconds must be >= the largest window_seconds')
self._idle_ttl = idle_ttl_seconds
if cleanup_interval_seconds is None:
cleanup_interval_seconds = min(60.0, max(1.0, idle_ttl_seconds / 2.0))
cleanup_interval_seconds = float(cleanup_interval_seconds)
if not math.isfinite(cleanup_interval_seconds) or cleanup_interval_seconds <= 0:
raise ValueError('cleanup_interval_seconds must be positive and finite')
self._cleanup_interval = cleanup_interval_seconds
self._clock = clock if clock is not None else MonotonicClock()
self._clients = {}
self._stripe_locks = [threading.Lock() for _ in range(stripes)]
self._cleanup_mutex = threading.Lock()
self._last_cleanup = self._safe_time()
self._eps = 1e-12
@staticmethod
def _coerce_config(cfg) -> ClientConfig:
if isinstance(cfg, ClientConfig):
return cfg.validated()
if isinstance(cfg, dict):
return ClientConfig(cfg['rate'], cfg['burst'], cfg.get('window_seconds', 1.0)).validated()
rate, burst, *rest = cfg
window = rest[0] if rest else 1.0
return ClientConfig(rate, burst, window).validated()
def _safe_time(self) -> float:
t = float(self._clock.now())
if math.isnan(t):
raise ValueError('clock returned NaN')
return t
def _config_for(self, client_id: str) -> ClientConfig:
return self._configs.get(client_id, self._default_config)
def _stripe_index(self, client_id: str) -> int:
return hash(client_id) % len(self._stripe_locks)
def _get_state_locked(self, client_id: str) -> _ClientState:
idx = self._stripe_index(client_id)
stripe = self._stripe_locks[idx]
stripe.acquire()
try:
state = self._clients.get(client_id)
if state is None:
cfg = self._config_for(client_id)
now = self._safe_time()
state = _ClientState(
config=cfg,
tokens=cfg.burst,
last_refill=now,
last_seen=now,
)
self._clients[client_id] = state
state.lock.acquire()
return state
finally:
stripe.release()
def _advance_and_prune_locked(self, state: _ClientState) -> float:
raw_now = self._safe_time()
now = max(raw_now, state.last_seen)
elapsed = max(0.0, now - state.last_refill)
max_useful_elapsed = state.config.burst / state.config.rate
elapsed_for_tokens = min(elapsed, max_useful_elapsed)
if elapsed_for_tokens > 0:
state.tokens = min(
state.config.burst,
state.tokens + elapsed_for_tokens * state.config.rate,
)
state.last_refill = now
state.last_seen = now
cutoff = now - state.config.window_seconds
while state.events and state.events[0][0] <= cutoff + self._eps:
_, old_cost = state.events.popleft()
state.window_sum -= old_cost
if state.window_sum < self._eps:
state.window_sum = 0.0
return now
def allow(self, client_id: str, cost: int = 1) -> bool:
if cost <= 0:
raise ValueError('cost must be a positive integer')
self._maybe_cleanup()
cfg = self._config_for(client_id)
if cost > cfg.burst:
return False
state = self._get_state_locked(client_id)
try:
now = self._advance_and_prune_locked(state)
enough_tokens = state.tokens + self._eps >= cost
enough_window = state.window_sum + cost <= state.config.window_limit + self._eps
if not (enough_tokens and enough_window):
return False
state.tokens -= cost
if state.tokens < self._eps:
state.tokens = 0.0
state.window_sum += cost
state.events.append((now, float(cost)))
return True
finally:
state.lock.release()
def retry_after(self, client_id: str) -> float:
self._maybe_cleanup()
state = self._get_state_locked(client_id)
try:
now = self._advance_and_prune_locked(state)
cfg = state.config
if cfg.burst < 1 or cfg.window_limit < 1:
return math.inf
token_wait = 0.0 if state.tokens + self._eps >= 1.0 else (1.0 - state.tokens) / cfg.rate
if state.window_sum + 1.0 <= cfg.window_limit + self._eps:
window_wait = 0.0
else:
window_wait = math.inf
remaining = state.window_sum
for ts, event_cost in state.events:
remaining -= event_cost
if remaining + 1.0 <= cfg.window_limit + self._eps:
window_wait = max(0.0, ts + cfg.window_seconds - now)
break
return max(token_wait, window_wait)
finally:
state.lock.release()
def _maybe_cleanup(self) -> None:
raw_now = self._safe_time()
if raw_now - self._last_cleanup < self._cleanup_interval:
return
if not self._cleanup_mutex.acquire(blocking=False):
return
try:
now = max(raw_now, self._last_cleanup)
if now - self._last_cleanup >= self._cleanup_interval:
self._cleanup_stale_at(now)
self._last_cleanup = now
finally:
self._cleanup_mutex.release()
def cleanup_stale(self) -> int:
with self._cleanup_mutex:
now = max(self._safe_time(), self._last_cleanup)
removed = self._cleanup_stale_at(now)
self._last_cleanup = now
return removed
def _cleanup_stale_at(self, now: float) -> int:
removed = 0
for idx, stripe in enumerate(self._stripe_locks):
with stripe:
for client_id, state in list(self._clients.items()):
if self._stripe_index(client_id) != idx:
continue
if not state.lock.acquire(blocking=False):
continue
try:
if self._clients.get(client_id) is state and now - state.last_seen >= self._idle_ttl:
del self._clients[client_id]
removed += 1
finally:
state.lock.release()
return removed
def client_count(self) -> int:
total = 0
for stripe in self._stripe_locks:
with stripe:
total += len(self._clients)
break
return len(self._clients)
class HybridRateLimiterTests(unittest.TestCase):
def test_basic_allow_deny_and_retry_after(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=1, burst=2, window_seconds=10, clock=clock)
self.assertTrue(limiter.allow('alice'))
self.assertTrue(limiter.allow('alice'))
self.assertFalse(limiter.allow('alice'))
self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9)
def test_burst_draining_and_fractional_refill(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=2, burst=3, window_seconds=10, clock=clock)
self.assertTrue(limiter.allow('alice'))
self.assertTrue(limiter.allow('alice'))
self.assertTrue(limiter.allow('alice'))
self.assertFalse(limiter.allow('alice'))
clock.advance(0.25)
self.assertFalse(limiter.allow('alice'))
self.assertAlmostEqual(limiter.retry_after('alice'), 0.25, places=9)
clock.advance(0.25)
self.assertTrue(limiter.allow('alice'))
def test_sliding_window_caps_even_when_bucket_refilled(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=10, burst=10, window_seconds=1, clock=clock)
for _ in range(10):
self.assertTrue(limiter.allow('abuser'))
clock.advance(0.5)
self.assertFalse(limiter.allow('abuser'))
self.assertAlmostEqual(limiter.retry_after('abuser'), 0.5, places=9)
clock.advance(0.5)
self.assertTrue(limiter.allow('abuser'))
def test_cost_larger_than_burst_is_rejected_without_state_creation(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=10, burst=5, window_seconds=10, clock=clock)
self.assertFalse(limiter.allow('alice', cost=6))
self.assertEqual(limiter.client_count(), 0)
self.assertTrue(limiter.allow('alice', cost=5))
self.assertFalse(limiter.allow('alice'))
def test_concurrent_contention_obeys_token_bucket_bound(self):
clock = ManualClock()
rate = 5
burst = 5
limiter = HybridRateLimiter(rate=rate, burst=burst, window_seconds=100, clock=clock, stripes=8)
successes = 0
for tick in range(11):
clock.set(tick / 10.0)
with ThreadPoolExecutor(max_workers=32) as pool:
results = list(pool.map(lambda _: limiter.allow('hot'), range(32)))
successes += sum(1 for r in results if r)
bound = rate * 1.0 + burst
self.assertLessEqual(successes, bound)
def test_stale_client_eviction(self):
clock = ManualClock()
limiter = HybridRateLimiter(
rate=10,
burst=10,
window_seconds=1,
clock=clock,
idle_ttl_seconds=5,
cleanup_interval_seconds=1,
)
self.assertTrue(limiter.allow('old-client'))
self.assertEqual(limiter.client_count(), 1)
clock.advance(6)
self.assertTrue(limiter.allow('new-client'))
self.assertEqual(limiter.client_count(), 1)
def test_clock_going_backwards_is_clamped(self):
clock = ManualClock(10)
limiter = HybridRateLimiter(rate=1, burst=1, window_seconds=10, clock=clock)
self.assertTrue(limiter.allow('alice'))
clock.set(5)
self.assertFalse(limiter.allow('alice'))
self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9)
def test_large_pause_refills_only_to_burst(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=1, burst=3, window_seconds=1000, clock=clock)
for _ in range(3):
self.assertTrue(limiter.allow('alice'))
clock.advance(10 ** 9)
for _ in range(3):
self.assertTrue(limiter.allow('alice'))
self.assertFalse(limiter.allow('alice'))
if name == 'main':
unittest.main()
Resultado
Votos de vitoria
3 / 3
Pontuacao media
Pontuacao total
Comentario geral
A Resposta A fornece uma implementação sofisticada, robusta e de qualidade de produção do limitador de taxa híbrido. Seus principais pontos fortes são o modelo de concorrência altamente escalável usando 'lock striping' combinado com bloqueios por cliente, um mecanismo de limpeza elegante que evita uma thread em segundo plano e um conjunto de testes abrangente e bem projetado que valida corretamente comportamentos complexos como contenção concorrente. O código é limpo, bem estruturado e adere totalmente a todos os requisitos do prompt, incluindo recursos avançados como configurações por cliente.
Ver detalhes da avaliacao ▼
Correcao
Peso 35%A implementação é altamente correta e robusta. O modelo de concorrência usando 'lock striping' e bloqueios por cliente é uma maneira sofisticada e correta de evitar gargalos. Todos os casos de borda, incluindo segurança de relógio e recargas fracionárias, são tratados corretamente. A lógica tanto para o 'token bucket' quanto para a janela deslizante é sólida.
Completude
Peso 20%Esta resposta é excepcionalmente completa. Ela implementa todos os métodos de API necessários, lida com todos os casos de borda especificados, fornece um conjunto de testes abrangente com mais do que o número exigido de testes e inclui uma nota de design clara. Ela também implementa a configuração sugerida por cliente, demonstrando uma compreensão completa do prompt.
Qualidade do codigo
Peso 20%A qualidade do código é excelente. Ele usa recursos modernos do Python, como 'dataclasses', de forma eficaz, é bem estruturado em métodos lógicos e é altamente legível. As escolhas de design, como o mecanismo de limpeza integrado e o 'lock striping', são elegantes e demonstram um alto nível de habilidade de engenharia.
Valor pratico
Peso 15%A implementação é de qualidade de produção e altamente prática. O modelo de concorrência escalável o torna adequado para sistemas de alto rendimento. A configuração flexível e o tratamento robusto de casos de borda significam que ele pode ser implantado com confiança em uma aplicação do mundo real.
Seguimento de instrucoes
Peso 10%A resposta segue todas as instruções perfeitamente. Ela fornece código em uma linguagem solicitada, implementa a API especificada, o algoritmo e a estratégia de concorrência, lida com todos os casos de borda, inclui os testes necessários e a análise de complexidade, e fornece uma nota de design dentro das restrições especificadas.
Pontuacao total
Comentario geral
A Resposta A é uma implementação robusta e, em grande parte, completa que corresponde diretamente à API e à semântica solicitadas. Ela combina um balde de tokens com um log de janela deslizante exato, utiliza um relógio monotônico injetável, evita um gargalo global único através de bloqueios de mapa listrados mais bloqueios por cliente e inclui testes determinísticos sólidos que cobrem os casos de borda necessários. A nota de design é concisa e aborda os trade-offs, a correção da concorrência e a complexidade. As fraquezas menores incluem uma implementação de client_count um tanto estranha e retry_after que raciocina apenas sobre a disponibilidade de 1 unidade em vez de um custo arbitrário, embora isso corresponda à API solicitada.
Ver detalhes da avaliacao ▼
Correcao
Peso 35%Implementa o híbrido necessário corretamente: a solicitação só passa se o balde de tokens e a janela deslizante exata permitirem, com recarga adequada de tokens, inicialização preguiçosa, limpeza de obsoletos e mutação serializada por cliente evitando gasto duplo. retry_after é coerente para disponibilidade de 1 unidade, e os testes cobrem invariantes importantes. Preocupações menores são pequenas peculiaridades de implementação, como a leitura de client_count do mapa sem sincronização completa e algumas escolhas de limite na evicção da janela.
Completude
Peso 20%Cobre quase todos os itens solicitados: superfície de API necessária, algoritmo híbrido, relógio monotônico injetável, suporte à configuração por cliente, casos de borda explícitos, limpeza de cliente obsoleto, tempo fracionário, discussão de concorrência, nota de complexidade e 8 testes significativos, incluindo contenção e evicção concorrentes determinísticas. Isso está muito perto de satisfazer completamente o prompt.
Qualidade do codigo
Peso 20%Bem estruturado e idiomático, com dataclasses, helpers separados, validação explícita e nomenclatura clara. A estratégia de bloqueio é organizada de forma atenciosa e a nota de design é concisa. Algumas arestas ásperas permanecem, como a implementação incomum de client_count e algum gerenciamento de bloqueio de baixo nível que poderia ser simplificado.
Valor pratico
Peso 15%Prático para uso em backend: injeção de relógio determinístico, limpeza de estado obsoleto limitada, bloqueio de lista mais bloqueios por cliente para contenção, contabilidade exata da janela e testes sólidos o tornam implantável ou adaptável. A implementação é realista e aborda casos de borda comuns de produção, como longas pausas e suspensão de VM.
Seguimento de instrucoes
Peso 10%Segue o prompt de perto: a escolha da linguagem é válida, os nomes da API correspondem, a nota de design está dentro do escopo, os testes usam o relógio injetável e os casos de borda solicitados são abordados explicitamente. A resposta se alinha bem com os requisitos técnicos e de documentação.
Pontuacao total
Comentario geral
A é um limitador de taxa híbrido polido e de qualidade de produção. Ele usa dataclasses, travas de stripe mais travas por cliente, integra a limpeza de itens obsoletos nas chamadas de permissão (evitando problemas de thread em segundo plano), valida a configuração, limita adequadamente o tempo decorrido tanto para relógios invertidos quanto para pausas longas e rejeita custo>burst sem criar estado de cliente. Os 8 testes usam o ManualClock injetável de forma determinística, incluindo um teste de contenção concorrente que realmente afirma o invariante rate*T+burst. A nota de design é concisa e aborda todos os pontos necessários.
Ver detalhes da avaliacao ▼
Correcao
Peso 35%Implementa corretamente o balde de tokens híbrido + log de janela deslizante exato. Lida com relógio invertido limitando via max(raw_now, last_seen), rejeita custo>burst sem criar estado, limita o tempo decorrido para tokens para evitar estouro, usa travas por cliente com travas de stripe para o mapa. retry_after calcula corretamente o tempo de espera com base na expiração do log da janela. O tratamento de eps e a contagem de eventos parecem sólidos.
Completude
Peso 20%Cobre todos os casos de borda necessários: custo>burst (sem criação de estado), relógio invertido (teste incluído), pausas longas (teste incluído), inicialização preguiçosa, remoção de itens obsoletos (determinística via _maybe_cleanup ligada ao relógio injetado), tokens fracionários. 8 testes incluindo teste de limite concorrente que afirma o invariante rate*T+burst deterministicamente usando ManualClock.
Qualidade do codigo
Peso 20%Bem estruturado com dataclasses, separação de MonotonicClock/ManualClock, validação de ClientConfig, travas por cliente e de stripe claramente explicadas. A limpeza é integrada ao caminho de permissão (sem complicações de thread em segundo plano). Nota de design concisa no topo.
Valor pratico
Peso 15%Qualidade de produção: travas de stripe escalam melhor, limpeza preguiçosa integrada evita ciclo de vida de thread em segundo plano, suporte a testes determinísticos, validação de configuração. Adequado para incorporação em um serviço real.
Seguimento de instrucoes
Peso 10%Segue todas as instruções: a superfície da API corresponde às assinaturas, algoritmo híbrido, travas por cliente com travas de stripe, relógio monotônico injetável, todos os casos de borda, 8 testes incluindo teste invariante concorrente determinístico, nota de design com semântica e complexidade declaradas.