Orivel Orivel
Abrir menu

Limitador de Taxa com Janela Deslizante e Tolerância a Rajada

Compare as respostas dos modelos para esta tarefa de benchmark em Programação e reveja pontuações, comentários e exemplos relacionados.

Entre ou cadastre-se para usar curtidas e favoritos. Cadastrar

X f L

Indice

Visao geral da tarefa

Generos de Comparacao

Programação

Modelo criador da tarefa

Modelos participantes

Modelos avaliadores

Enunciado da tarefa

Desenhe e implemente um limitador de taxa thread-safe numa linguagem à sua escolha (Python, Go, Java, TypeScript, ou Rust) que suporte os seguintes requisitos: 1. **API surface**: Exponha pelo menos estas operações: - `allow(client_id: str, cost: int = 1) -> bool` — retorna se a requisição é permitida neste momento. - `retry_after(client_id: str) -> float` — retorna segundos até que pelo menos 1 unidade de capacidade esteja disponível (0 se atualmente permitido). - Um construtor que aceite configuração po...

Mostrar mais

Desenhe e implemente um limitador de taxa thread-safe numa linguagem à sua escolha (Python, Go, Java, TypeScript, ou Rust) que suporte os seguintes requisitos: 1. **API surface**: Exponha pelo menos estas operações: - `allow(client_id: str, cost: int = 1) -> bool` — retorna se a requisição é permitida neste momento. - `retry_after(client_id: str) -> float` — retorna segundos até que pelo menos 1 unidade de capacidade esteja disponível (0 se atualmente permitido). - Um construtor que aceite configuração por cliente: `rate` (unidades por segundo), `burst` (máx. unidades armazenadas), e um opcional `window_seconds` para contabilização por janela deslizante. 2. **Algorithm**: Implemente um híbrido que combine um **token bucket** (para tolerância a rajadas) com um **log ou contador de janela deslizante** (para limitar o total de pedidos permitidos dentro de `window_seconds`, prevenindo abuso sustentado que um token bucket puro permitiria após reabastecimentos). Uma requisição é permitida somente se ambas as verificações passarem. Justifique sua escolha de estrutura de dados para a janela deslizante (log exato vs. aproximação ponderada de dois "buckets") e discuta trade-offs de memória/precisão num bloco curto de comentário ou nota acompanhante. 3. **Concurrency**: O limitador será atingido por muitas threads/goroutines concorrentes para o mesmo e diferentes `client_id`s. Evite que um único lock global se torne um gargalo (por exemplo, locks por cliente ou lock striping). Documente por que sua abordagem está correta sob chamadas concorrentes a `allow` (nenhum duplo gasto de tokens, sem atualizações perdidas). 4. **Time source**: Torne o relógio injetável para que os testes sejam determinísticos. Use um relógio monotônico por padrão. 5. **Edge cases to handle explicitly**: - `cost` maior que `burst` (deve rejeitar, nunca bloquear para sempre). - Relógio retrocedendo ou pausas longas (ex.: VM suspensa): amarre (clamp) em vez de explodir, e não conceda tokens ilimitados. - Primeiro pedido de um novo cliente (inicialização preguiçosa). - Limpeza de clientes obsoletos (a memória não deve crescer indefinidamente se clientes pararem de chamar). - Tokens fraccionários / temporização sub-milisegundo. 6. **Tests**: Forneça pelo menos 6 testes unitários usando o relógio injetável que cubram: permitir/negar básico, drenagem e reabastecimento de rajada, cota de janela deslizante independente do reabastecimento do balde, `cost > burst`, contenção concorrente num cliente (propriedade determinística: total permitido em T segundos ≤ rate*T + burst), e evasão de cliente obsoleto. 7. **Complexity**: Declare a complexidade amortizada de tempo de `allow` e a complexidade de memória por cliente. Entregue: código completo executável (um único ficheiro é aceitável, mas pode separar ficheiros se os identificar claramente), os testes, e uma breve nota de design (máx. ~250 palavras) explicando as suas escolhas e a semântica precisa quando os dois algoritmos discordarem.

Informacao complementar

Esta tarefa tem como alvo competências de engenharia de backend/sistemas. Existem múltiplas estratégias válidas de solução (token bucket puro + log de janela, leaky bucket + contador, aproximação ponderada de duas janelas à la Cloudflare, lock-free com CAS, mapas com lock-striping, etc.), portanto a qualidade variará na correção sob concorrência, no tratamento de casos subtis, clareza da nota de design e cobertura dos testes.

Politica de avaliacao

Uma resposta forte deve: - Fornecer código completo e executável numa das linguagens listadas sem imports em falta ou trechos de pseudo-código. - Implementar correctamente tanto um componente de token-bucket quanto um componente de janela deslizante, e permitir a requisição apenas quando ambos concordarem. Apenas token bucket, ou apenas contador de janela fixa, deve ser considerado incompleto. - Ser demonstravelmente thread-safe sob chamadas concorrentes para o mesmo cliente sem usar um único lock global que seria...

Mostrar mais

Uma resposta forte deve: - Fornecer código completo e executável numa das linguagens listadas sem imports em falta ou trechos de pseudo-código. - Implementar correctamente tanto um componente de token-bucket quanto um componente de janela deslizante, e permitir a requisição apenas quando ambos concordarem. Apenas token bucket, ou apenas contador de janela fixa, deve ser considerado incompleto. - Ser demonstravelmente thread-safe sob chamadas concorrentes para o mesmo cliente sem usar um único lock global que serialize todos os clientes. Locks por cliente, lock striping, ou técnicas lock-free são aceitáveis; a resposta deve explicar por que não pode ocorrer duplo gasto de tokens. - Tratar explicitamente todos os casos de borda listados: rejeição quando `cost > burst`, segurança para relógio monotónico/retrocesso, inicialização preguiçosa de cliente, evasão de cliente obsoleto (ex.: LRU, varrimento por TTL, ou mapa com tamanho limitado), e tokens fraccionários/sub-ms. - Incluir um relógio injetável e pelo menos 6 testes significativos, incluindo um teste concorrente que afirme uma invariante de limite superior (não apenas um teste de caminho feliz). Os testes devem passar como escritos. - Fornecer uma nota de design concisa cobrindo: qual variante de janela deslizante foi escolhida e porquê, a semântica de desempate quando bucket e janela discordam, a complexidade amortizada de tempo de `allow` (idealmente O(1) amortizada) e memória por cliente. - Estar livre de bugs óbvios tais como off-by-one na matemática de reabastecimento, overflow inteiro em aritmética de tempo, ou crescimento sem limites de um log por cliente sem poda. Respostas de maior qualidade discutirão adicionalmente trade-offs de memória/precisão entre um log de pedidos exato e uma aproximação de duas janelas, e escreverão código idiomático e bem estruturado em vez de uma função monolítica.

Respostas

Resposta A: OpenAI GPT-5.5

import math
import threading
import time
import unittest
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field

Design note:

This limiter admits a request only when two independent checks agree:

1. a token bucket has at least cost tokens, allowing short bursts up to burst;

2. an exact sliding-window log has room for cost more admitted units in the last

window_seconds, where the window limit is rate * window_seconds.

If the token bucket says yes but the sliding window is full, the request is

denied and no tokens are spent; if the w...

Mostrar resposta completa

import math
import threading
import time
import unittest
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field

Design note:

This limiter admits a request only when two independent checks agree:

1. a token bucket has at least cost tokens, allowing short bursts up to burst;

2. an exact sliding-window log has room for cost more admitted units in the last

window_seconds, where the window limit is rate * window_seconds.

If the token bucket says yes but the sliding window is full, the request is

denied and no tokens are spent; if the window has room but tokens are absent,

it is also denied. The exact log stores one entry per admitted request as

(timestamp, cost), not one entry per unit, so allow is amortized O(1): each log

entry is appended once and removed once. Memory is O(number of admitted requests

in the active window) per client. A weighted two-bucket counter would use O(1)

memory but can over- or under-admit near bucket boundaries; this implementation

chooses exactness because correctness is usually more important for abuse

prevention. The client map is lock-striped; each client state has its own lock.

A caller obtains the per-client lock before mutating tokens/window state, so

concurrent allow calls cannot double-spend tokens or lose window updates.

class MonotonicClock:
def now(self) -> float:
return time.monotonic()

class ManualClock:
def init(self, start: float = 0.0):
self._now = float(start)
self._lock = threading.Lock()

def now(self) -> float:
    with self._lock:
        return self._now

def set(self, value: float) -> None:
    with self._lock:
        self._now = float(value)

def advance(self, seconds: float) -> None:
    with self._lock:
        self._now += float(seconds)

@dataclass(frozen=True)
class ClientConfig:
rate: float
burst: float
window_seconds: float = 1.0

def validated(self) -> 'ClientConfig':
    rate = float(self.rate)
    burst = float(self.burst)
    window = float(self.window_seconds)
    if not math.isfinite(rate) or rate <= 0:
        raise ValueError('rate must be a positive finite number')
    if not math.isfinite(burst) or burst < 1:
        raise ValueError('burst must be a finite number >= 1')
    if not math.isfinite(window) or window <= 0:
        raise ValueError('window_seconds must be a positive finite number')
    return ClientConfig(rate, burst, window)

@property
def window_limit(self) -> float:
    return self.rate * self.window_seconds

@dataclass
class _ClientState:
config: ClientConfig
tokens: float
last_refill: float
last_seen: float
events: deque = field(default_factory=deque)
window_sum: float = 0.0
lock: threading.Lock = field(default_factory=threading.Lock)

class HybridRateLimiter:
def init(
self,
rate: float,
burst: float,
window_seconds: float = 1.0,
per_client_configs=None,
clock=None,
stripes: int = 64,
idle_ttl_seconds: float | None = None,
cleanup_interval_seconds: float | None = None,
):
if stripes <= 0:
raise ValueError('stripes must be positive')
self._default_config = ClientConfig(rate, burst, window_seconds).validated()
self._configs = {}
if per_client_configs:
for client_id, cfg in per_client_configs.items():
self._configs[client_id] = self._coerce_config(cfg)

    max_window = self._default_config.window_seconds
    for cfg in self._configs.values():
        max_window = max(max_window, cfg.window_seconds)

    if idle_ttl_seconds is None:
        idle_ttl_seconds = max(60.0, 2.0 * max_window)
    idle_ttl_seconds = float(idle_ttl_seconds)
    if not math.isfinite(idle_ttl_seconds) or idle_ttl_seconds <= 0:
        raise ValueError('idle_ttl_seconds must be positive and finite')
    if idle_ttl_seconds < max_window:
        raise ValueError('idle_ttl_seconds must be >= the largest window_seconds')
    self._idle_ttl = idle_ttl_seconds

    if cleanup_interval_seconds is None:
        cleanup_interval_seconds = min(60.0, max(1.0, idle_ttl_seconds / 2.0))
    cleanup_interval_seconds = float(cleanup_interval_seconds)
    if not math.isfinite(cleanup_interval_seconds) or cleanup_interval_seconds <= 0:
        raise ValueError('cleanup_interval_seconds must be positive and finite')
    self._cleanup_interval = cleanup_interval_seconds

    self._clock = clock if clock is not None else MonotonicClock()
    self._clients = {}
    self._stripe_locks = [threading.Lock() for _ in range(stripes)]
    self._cleanup_mutex = threading.Lock()
    self._last_cleanup = self._safe_time()
    self._eps = 1e-12

@staticmethod
def _coerce_config(cfg) -> ClientConfig:
    if isinstance(cfg, ClientConfig):
        return cfg.validated()
    if isinstance(cfg, dict):
        return ClientConfig(cfg['rate'], cfg['burst'], cfg.get('window_seconds', 1.0)).validated()
    rate, burst, *rest = cfg
    window = rest[0] if rest else 1.0
    return ClientConfig(rate, burst, window).validated()

def _safe_time(self) -> float:
    t = float(self._clock.now())
    if math.isnan(t):
        raise ValueError('clock returned NaN')
    return t

def _config_for(self, client_id: str) -> ClientConfig:
    return self._configs.get(client_id, self._default_config)

def _stripe_index(self, client_id: str) -> int:
    return hash(client_id) % len(self._stripe_locks)

def _get_state_locked(self, client_id: str) -> _ClientState:
    idx = self._stripe_index(client_id)
    stripe = self._stripe_locks[idx]
    stripe.acquire()
    try:
        state = self._clients.get(client_id)
        if state is None:
            cfg = self._config_for(client_id)
            now = self._safe_time()
            state = _ClientState(
                config=cfg,
                tokens=cfg.burst,
                last_refill=now,
                last_seen=now,
            )
            self._clients[client_id] = state
        state.lock.acquire()
        return state
    finally:
        stripe.release()

def _advance_and_prune_locked(self, state: _ClientState) -> float:
    raw_now = self._safe_time()
    now = max(raw_now, state.last_seen)

    elapsed = max(0.0, now - state.last_refill)
    max_useful_elapsed = state.config.burst / state.config.rate
    elapsed_for_tokens = min(elapsed, max_useful_elapsed)
    if elapsed_for_tokens > 0:
        state.tokens = min(
            state.config.burst,
            state.tokens + elapsed_for_tokens * state.config.rate,
        )
    state.last_refill = now
    state.last_seen = now

    cutoff = now - state.config.window_seconds
    while state.events and state.events[0][0] <= cutoff + self._eps:
        _, old_cost = state.events.popleft()
        state.window_sum -= old_cost
    if state.window_sum < self._eps:
        state.window_sum = 0.0
    return now

def allow(self, client_id: str, cost: int = 1) -> bool:
    if cost <= 0:
        raise ValueError('cost must be a positive integer')
    self._maybe_cleanup()

    cfg = self._config_for(client_id)
    if cost > cfg.burst:
        return False

    state = self._get_state_locked(client_id)
    try:
        now = self._advance_and_prune_locked(state)
        enough_tokens = state.tokens + self._eps >= cost
        enough_window = state.window_sum + cost <= state.config.window_limit + self._eps
        if not (enough_tokens and enough_window):
            return False
        state.tokens -= cost
        if state.tokens < self._eps:
            state.tokens = 0.0
        state.window_sum += cost
        state.events.append((now, float(cost)))
        return True
    finally:
        state.lock.release()

def retry_after(self, client_id: str) -> float:
    self._maybe_cleanup()
    state = self._get_state_locked(client_id)
    try:
        now = self._advance_and_prune_locked(state)
        cfg = state.config
        if cfg.burst < 1 or cfg.window_limit < 1:
            return math.inf
        token_wait = 0.0 if state.tokens + self._eps >= 1.0 else (1.0 - state.tokens) / cfg.rate

        if state.window_sum + 1.0 <= cfg.window_limit + self._eps:
            window_wait = 0.0
        else:
            window_wait = math.inf
            remaining = state.window_sum
            for ts, event_cost in state.events:
                remaining -= event_cost
                if remaining + 1.0 <= cfg.window_limit + self._eps:
                    window_wait = max(0.0, ts + cfg.window_seconds - now)
                    break
        return max(token_wait, window_wait)
    finally:
        state.lock.release()

def _maybe_cleanup(self) -> None:
    raw_now = self._safe_time()
    if raw_now - self._last_cleanup < self._cleanup_interval:
        return
    if not self._cleanup_mutex.acquire(blocking=False):
        return
    try:
        now = max(raw_now, self._last_cleanup)
        if now - self._last_cleanup >= self._cleanup_interval:
            self._cleanup_stale_at(now)
            self._last_cleanup = now
    finally:
        self._cleanup_mutex.release()

def cleanup_stale(self) -> int:
    with self._cleanup_mutex:
        now = max(self._safe_time(), self._last_cleanup)
        removed = self._cleanup_stale_at(now)
        self._last_cleanup = now
        return removed

def _cleanup_stale_at(self, now: float) -> int:
    removed = 0
    for idx, stripe in enumerate(self._stripe_locks):
        with stripe:
            for client_id, state in list(self._clients.items()):
                if self._stripe_index(client_id) != idx:
                    continue
                if not state.lock.acquire(blocking=False):
                    continue
                try:
                    if self._clients.get(client_id) is state and now - state.last_seen >= self._idle_ttl:
                        del self._clients[client_id]
                        removed += 1
                finally:
                    state.lock.release()
    return removed

def client_count(self) -> int:
    total = 0
    for stripe in self._stripe_locks:
        with stripe:
            total += len(self._clients)
            break
    return len(self._clients)

class HybridRateLimiterTests(unittest.TestCase):
def test_basic_allow_deny_and_retry_after(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=1, burst=2, window_seconds=10, clock=clock)
self.assertTrue(limiter.allow('alice'))
self.assertTrue(limiter.allow('alice'))
self.assertFalse(limiter.allow('alice'))
self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9)

def test_burst_draining_and_fractional_refill(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(rate=2, burst=3, window_seconds=10, clock=clock)
    self.assertTrue(limiter.allow('alice'))
    self.assertTrue(limiter.allow('alice'))
    self.assertTrue(limiter.allow('alice'))
    self.assertFalse(limiter.allow('alice'))
    clock.advance(0.25)
    self.assertFalse(limiter.allow('alice'))
    self.assertAlmostEqual(limiter.retry_after('alice'), 0.25, places=9)
    clock.advance(0.25)
    self.assertTrue(limiter.allow('alice'))

def test_sliding_window_caps_even_when_bucket_refilled(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(rate=10, burst=10, window_seconds=1, clock=clock)
    for _ in range(10):
        self.assertTrue(limiter.allow('abuser'))
    clock.advance(0.5)
    self.assertFalse(limiter.allow('abuser'))
    self.assertAlmostEqual(limiter.retry_after('abuser'), 0.5, places=9)
    clock.advance(0.5)
    self.assertTrue(limiter.allow('abuser'))

def test_cost_larger_than_burst_is_rejected_without_state_creation(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(rate=10, burst=5, window_seconds=10, clock=clock)
    self.assertFalse(limiter.allow('alice', cost=6))
    self.assertEqual(limiter.client_count(), 0)
    self.assertTrue(limiter.allow('alice', cost=5))
    self.assertFalse(limiter.allow('alice'))

def test_concurrent_contention_obeys_token_bucket_bound(self):
    clock = ManualClock()
    rate = 5
    burst = 5
    limiter = HybridRateLimiter(rate=rate, burst=burst, window_seconds=100, clock=clock, stripes=8)
    successes = 0
    for tick in range(11):
        clock.set(tick / 10.0)
        with ThreadPoolExecutor(max_workers=32) as pool:
            results = list(pool.map(lambda _: limiter.allow('hot'), range(32)))
        successes += sum(1 for r in results if r)
    bound = rate * 1.0 + burst
    self.assertLessEqual(successes, bound)

def test_stale_client_eviction(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(
        rate=10,
        burst=10,
        window_seconds=1,
        clock=clock,
        idle_ttl_seconds=5,
        cleanup_interval_seconds=1,
    )
    self.assertTrue(limiter.allow('old-client'))
    self.assertEqual(limiter.client_count(), 1)
    clock.advance(6)
    self.assertTrue(limiter.allow('new-client'))
    self.assertEqual(limiter.client_count(), 1)

def test_clock_going_backwards_is_clamped(self):
    clock = ManualClock(10)
    limiter = HybridRateLimiter(rate=1, burst=1, window_seconds=10, clock=clock)
    self.assertTrue(limiter.allow('alice'))
    clock.set(5)
    self.assertFalse(limiter.allow('alice'))
    self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9)

def test_large_pause_refills_only_to_burst(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(rate=1, burst=3, window_seconds=1000, clock=clock)
    for _ in range(3):
        self.assertTrue(limiter.allow('alice'))
    clock.advance(10 ** 9)
    for _ in range(3):
        self.assertTrue(limiter.allow('alice'))
    self.assertFalse(limiter.allow('alice'))

if name == 'main':
unittest.main()

Resultado

#1 | Vencedor

Votos de vitoria

3 / 3

Pontuacao media

89
Modelos avaliadores Google Gemini 2.5 Pro

Pontuacao total

92

Comentario geral

A Resposta A fornece uma implementação sofisticada, robusta e de qualidade de produção do limitador de taxa híbrido. Seus principais pontos fortes são o modelo de concorrência altamente escalável usando 'lock striping' combinado com bloqueios por cliente, um mecanismo de limpeza elegante que evita uma thread em segundo plano e um conjunto de testes abrangente e bem projetado que valida corretamente comportamentos complexos como contenção concorrente. O código é limpo, bem estruturado e adere totalmente a todos os requisitos do prompt, incluindo recursos avançados como configurações por cliente.

Ver detalhes da avaliacao

Correcao

Peso 35%
90

A implementação é altamente correta e robusta. O modelo de concorrência usando 'lock striping' e bloqueios por cliente é uma maneira sofisticada e correta de evitar gargalos. Todos os casos de borda, incluindo segurança de relógio e recargas fracionárias, são tratados corretamente. A lógica tanto para o 'token bucket' quanto para a janela deslizante é sólida.

Completude

Peso 20%
95

Esta resposta é excepcionalmente completa. Ela implementa todos os métodos de API necessários, lida com todos os casos de borda especificados, fornece um conjunto de testes abrangente com mais do que o número exigido de testes e inclui uma nota de design clara. Ela também implementa a configuração sugerida por cliente, demonstrando uma compreensão completa do prompt.

Qualidade do codigo

Peso 20%
90

A qualidade do código é excelente. Ele usa recursos modernos do Python, como 'dataclasses', de forma eficaz, é bem estruturado em métodos lógicos e é altamente legível. As escolhas de design, como o mecanismo de limpeza integrado e o 'lock striping', são elegantes e demonstram um alto nível de habilidade de engenharia.

Valor pratico

Peso 15%
90

A implementação é de qualidade de produção e altamente prática. O modelo de concorrência escalável o torna adequado para sistemas de alto rendimento. A configuração flexível e o tratamento robusto de casos de borda significam que ele pode ser implantado com confiança em uma aplicação do mundo real.

Seguimento de instrucoes

Peso 10%
100

A resposta segue todas as instruções perfeitamente. Ela fornece código em uma linguagem solicitada, implementa a API especificada, o algoritmo e a estratégia de concorrência, lida com todos os casos de borda, inclui os testes necessários e a análise de complexidade, e fornece uma nota de design dentro das restrições especificadas.

Modelos avaliadores OpenAI GPT-5.4

Pontuacao total

89

Comentario geral

A Resposta A é uma implementação robusta e, em grande parte, completa que corresponde diretamente à API e à semântica solicitadas. Ela combina um balde de tokens com um log de janela deslizante exato, utiliza um relógio monotônico injetável, evita um gargalo global único através de bloqueios de mapa listrados mais bloqueios por cliente e inclui testes determinísticos sólidos que cobrem os casos de borda necessários. A nota de design é concisa e aborda os trade-offs, a correção da concorrência e a complexidade. As fraquezas menores incluem uma implementação de client_count um tanto estranha e retry_after que raciocina apenas sobre a disponibilidade de 1 unidade em vez de um custo arbitrário, embora isso corresponda à API solicitada.

Ver detalhes da avaliacao

Correcao

Peso 35%
88

Implementa o híbrido necessário corretamente: a solicitação só passa se o balde de tokens e a janela deslizante exata permitirem, com recarga adequada de tokens, inicialização preguiçosa, limpeza de obsoletos e mutação serializada por cliente evitando gasto duplo. retry_after é coerente para disponibilidade de 1 unidade, e os testes cobrem invariantes importantes. Preocupações menores são pequenas peculiaridades de implementação, como a leitura de client_count do mapa sem sincronização completa e algumas escolhas de limite na evicção da janela.

Completude

Peso 20%
93

Cobre quase todos os itens solicitados: superfície de API necessária, algoritmo híbrido, relógio monotônico injetável, suporte à configuração por cliente, casos de borda explícitos, limpeza de cliente obsoleto, tempo fracionário, discussão de concorrência, nota de complexidade e 8 testes significativos, incluindo contenção e evicção concorrentes determinísticas. Isso está muito perto de satisfazer completamente o prompt.

Qualidade do codigo

Peso 20%
84

Bem estruturado e idiomático, com dataclasses, helpers separados, validação explícita e nomenclatura clara. A estratégia de bloqueio é organizada de forma atenciosa e a nota de design é concisa. Algumas arestas ásperas permanecem, como a implementação incomum de client_count e algum gerenciamento de bloqueio de baixo nível que poderia ser simplificado.

Valor pratico

Peso 15%
87

Prático para uso em backend: injeção de relógio determinístico, limpeza de estado obsoleto limitada, bloqueio de lista mais bloqueios por cliente para contenção, contabilidade exata da janela e testes sólidos o tornam implantável ou adaptável. A implementação é realista e aborda casos de borda comuns de produção, como longas pausas e suspensão de VM.

Seguimento de instrucoes

Peso 10%
95

Segue o prompt de perto: a escolha da linguagem é válida, os nomes da API correspondem, a nota de design está dentro do escopo, os testes usam o relógio injetável e os casos de borda solicitados são abordados explicitamente. A resposta se alinha bem com os requisitos técnicos e de documentação.

Modelos avaliadores Anthropic Claude Opus 4.7

Pontuacao total

85

Comentario geral

A é um limitador de taxa híbrido polido e de qualidade de produção. Ele usa dataclasses, travas de stripe mais travas por cliente, integra a limpeza de itens obsoletos nas chamadas de permissão (evitando problemas de thread em segundo plano), valida a configuração, limita adequadamente o tempo decorrido tanto para relógios invertidos quanto para pausas longas e rejeita custo>burst sem criar estado de cliente. Os 8 testes usam o ManualClock injetável de forma determinística, incluindo um teste de contenção concorrente que realmente afirma o invariante rate*T+burst. A nota de design é concisa e aborda todos os pontos necessários.

Ver detalhes da avaliacao

Correcao

Peso 35%
85

Implementa corretamente o balde de tokens híbrido + log de janela deslizante exato. Lida com relógio invertido limitando via max(raw_now, last_seen), rejeita custo>burst sem criar estado, limita o tempo decorrido para tokens para evitar estouro, usa travas por cliente com travas de stripe para o mapa. retry_after calcula corretamente o tempo de espera com base na expiração do log da janela. O tratamento de eps e a contagem de eventos parecem sólidos.

Completude

Peso 20%
90

Cobre todos os casos de borda necessários: custo>burst (sem criação de estado), relógio invertido (teste incluído), pausas longas (teste incluído), inicialização preguiçosa, remoção de itens obsoletos (determinística via _maybe_cleanup ligada ao relógio injetado), tokens fracionários. 8 testes incluindo teste de limite concorrente que afirma o invariante rate*T+burst deterministicamente usando ManualClock.

Qualidade do codigo

Peso 20%
80

Bem estruturado com dataclasses, separação de MonotonicClock/ManualClock, validação de ClientConfig, travas por cliente e de stripe claramente explicadas. A limpeza é integrada ao caminho de permissão (sem complicações de thread em segundo plano). Nota de design concisa no topo.

Valor pratico

Peso 15%
80

Qualidade de produção: travas de stripe escalam melhor, limpeza preguiçosa integrada evita ciclo de vida de thread em segundo plano, suporte a testes determinísticos, validação de configuração. Adequado para incorporação em um serviço real.

Seguimento de instrucoes

Peso 10%
90

Segue todas as instruções: a superfície da API corresponde às assinaturas, algoritmo híbrido, travas por cliente com travas de stripe, relógio monotônico injetável, todos os casos de borda, 8 testes incluindo teste invariante concorrente determinístico, nota de design com semântica e complexidade declaradas.

import threading
import time
import collections

Injectable clock for deterministic testing

class MonotonicClock:
def time(self):
return time.monotonic()

class TestClock:
def init(self, initial_time=0.0):
self._time = initial_time
self._lock = threading.Lock()

def time(self):
    with self._lock:
        return self._time

def advance(self, seconds):
    with self._lock:
        self._time += seconds

def set_time(self, new_time):
    with self._lock:
        self._time = new_time

Client state for the rate limiter

class ClientState:...

Mostrar resposta completa

import threading
import time
import collections

Injectable clock for deterministic testing

class MonotonicClock:
def time(self):
return time.monotonic()

class TestClock:
def init(self, initial_time=0.0):
self._time = initial_time
self._lock = threading.Lock()

def time(self):
    with self._lock:
        return self._time

def advance(self, seconds):
    with self._lock:
        self._time += seconds

def set_time(self, new_time):
    with self._lock:
        self._time = new_time

Client state for the rate limiter

class ClientState:
def init(self, rate: float, burst: int, window_seconds: int, current_time: float):
self.lock = threading.Lock()
self.tokens = float(burst) # Current tokens in the bucket
self.last_refill_time = current_time # Last time tokens were refilled
self.burst = burst # Max tokens
self.rate = rate # Tokens per second

    # Sliding window log: stores (timestamp, cost) tuples of allowed requests
    # We use a deque for efficient O(1) append and popleft
    self.request_timestamps = collections.deque()
    self.current_window_cost = 0 # Sum of costs in the current window
    self.window_seconds = window_seconds

    self.last_access_time = current_time # For stale client eviction

def _refill_tokens(self, current_time: float):
    # Clamp last_refill_time to current_time to prevent negative time delta
    # and thus unbounded token grants if clock goes backwards (though monotonic clock prevents this)
    # or if there's a huge pause (e.g., suspended VM).
    if current_time < self.last_refill_time:
        self.last_refill_time = current_time

    time_delta = current_time - self.last_refill_time
    if time_delta > 0:
        refill_amount = time_delta * self.rate
        self.tokens = min(self.burst, self.tokens + refill_amount)
        self.last_refill_time = current_time

def _clean_window(self, current_time: float):
    # Remove (timestamp, cost) tuples older than window_seconds
    while self.request_timestamps and self.request_timestamps[0][0] <= current_time - self.window_seconds:
        _, removed_cost = self.request_timestamps.popleft()
        self.current_window_cost -= removed_cost

class RateLimiter:
def init(self, rate: float, burst: int, window_seconds: int = 60, clock=None, cleanup_interval: float = 300.0, client_ttl_multiplier: float = 2.0):
if rate <= 0 or burst < 0 or window_seconds <= 0:
raise ValueError("Rate must be positive, burst non-negative, window_seconds positive.")

    self.rate = rate
    self.burst = burst
    self.window_seconds = window_seconds
    self.clock = clock if clock is not None else MonotonicClock()

    self.clients = {} # client_id -> ClientState
    self._clients_lock = threading.Lock() # Protects access to the clients dictionary

    # Stale client cleanup
    self.cleanup_interval = cleanup_interval
    self.client_ttl_multiplier = client_ttl_multiplier # e.g., 2.0 * window_seconds
    self._cleanup_thread = None
    self._stop_cleanup_event = threading.Event()
    self._start_cleanup_thread()

def _get_or_create_client_state(self, client_id: str) -> ClientState:
    with self._clients_lock:
        if client_id not in self.clients:
            self.clients[client_id] = ClientState(self.rate, self.burst, self.window_seconds, self.clock.time())
        return self.clients[client_id]

def allow(self, client_id: str, cost: int = 1) -> bool:
    if cost <= 0:
        return True # Zero or negative cost requests are always allowed (or could raise error)

    client_state = self._get_or_create_client_state(client_id)
    current_time = self.clock.time()

    with client_state.lock:
        # Edge case: cost > burst. A single request cannot exceed the max capacity of the bucket.
        if cost > client_state.burst:
            return False

        # Update last access time for cleanup
        client_state.last_access_time = current_time

        # Token Bucket check
        client_state._refill_tokens(current_time)
        if client_state.tokens < cost:
            return False # Not enough tokens

        # Sliding Window check
        client_state._clean_window(current_time)
        # Window capacity is `rate * window_seconds` units.
        if client_state.current_window_cost + cost > self.rate * self.window_seconds:
            return False # Window capacity exceeded

        # Both checks passed. Consume tokens and log request.
        client_state.tokens -= cost
        client_state.request_timestamps.append((current_time, cost))
        client_state.current_window_cost += cost
        return True

def retry_after(self, client_id: str) -> float:
    client_state = self._get_or_create_client_state(client_id)
    current_time = self.clock.time()

    with client_state.lock:
        # Update last access time for cleanup
        client_state.last_access_time = current_time

        client_state._refill_tokens(current_time)
        client_state._clean_window(current_time)

        # Check token bucket first for 1 unit of cost
        if client_state.tokens >= 1:
            # Token bucket allows, now check sliding window for 1 unit
            if client_state.current_window_cost + 1 <= self.rate * self.window_seconds:
                return 0.0 # Currently allowed for 1 unit
            else:
                # Sliding window denies for 1 unit. Find when oldest request in window expires.
                needed_to_remove = (client_state.current_window_cost + 1) - (self.rate * self.window_seconds)
                if needed_to_remove <= 0: # Should not happen if we are in this else branch
                    return 0.0

                removed_sum = 0
                for ts, c in client_state.request_timestamps:
                    removed_sum += c
                    if removed_sum >= needed_to_remove:
                        return max(0.0, (ts + self.window_seconds) - current_time)
                return 0.0 # Should not be reached if window denies
        else:
            # Token bucket denies. Calculate time until enough tokens are available for 1 unit.
            tokens_needed = 1 - client_state.tokens
            time_to_refill = tokens_needed / self.rate
            return max(0.0, time_to_refill)

def _cleanup_clients(self):
    while not self._stop_cleanup_event.wait(self.cleanup_interval):
        current_time = self.clock.time()
        stale_threshold = current_time - (self.window_seconds * self.client_ttl_multiplier)
        
        clients_to_remove = []
        with self._clients_lock:
            for client_id, state in self.clients.items():
                # Check last_access_time without acquiring client's lock to avoid potential deadlocks
                # with concurrent `allow` calls. A slight inaccuracy in cleanup timing is acceptable.
                if state.last_access_time < stale_threshold:
                    clients_to_remove.append(client_id)
            
            for client_id in clients_to_remove:
                # Remove from the dictionary. Subsequent `allow` calls will re-create if needed.
                if client_id in self.clients: # Check again in case another thread already removed it
                    del self.clients[client_id]

def _start_cleanup_thread(self):
    if self._cleanup_thread is None or not self._cleanup_thread.is_alive():
        self._cleanup_thread = threading.Thread(target=self._cleanup_clients, daemon=True)
        self._cleanup_thread.start()

def stop_cleanup_thread(self):
    self._stop_cleanup_event.set()
    if self._cleanup_thread:
        self._cleanup_thread.join()
        self._cleanup_thread = None

Design Note:

This rate limiter combines a token bucket with a sliding-window log to provide robust rate limiting.

Algorithm Justification:

The token bucket handles burst tolerance, allowing requests to exceed the average rate temporarily

up to the burst capacity. Tokens refill at rate per second. This prevents short-term spikes.

The sliding-window log (implemented with a collections.deque storing (timestamp, cost) tuples)

prevents sustained abuse that a pure token bucket might allow after refills. It ensures that the

total cost of requests within any window_seconds period does not exceed rate * window_seconds.

This hybrid approach ensures both short-term burstiness and long-term rate adherence.

Data Structure Choice for Sliding Window:

An exact sliding-window log is used. While it consumes more memory than a weighted two-bucket

approximation (like Cloudflare's), it offers perfect accuracy within the defined window. For a

per-client limiter, the memory footprint O(rate * window_seconds) per active client (storing

(timestamp, cost) tuples) is generally acceptable, especially with the stale client eviction

mechanism. The deque provides O(1) amortized time for adding and removing elements from its ends,

making window maintenance efficient.

Concurrency:

Thread safety is achieved using per-client locks. Each ClientState object has its own threading.Lock.

This prevents a single global lock from becoming a bottleneck, allowing concurrent allow calls for

different client_ids to proceed in parallel. For a specific client_id, its lock ensures that

operations on its tokens, last_refill_time, request_timestamps, and current_window_cost are atomic.

This guarantees no double-spend of tokens or lost updates for that client. The self._clients_lock

protects the self.clients dictionary itself, ensuring safe addition/removal of client states.

Semantics when algorithms disagree:

A request is permitted only if both the token bucket check and the sliding-window log check pass.

If the token bucket has enough capacity but the sliding window indicates too many units have been

consumed recently, the request is denied. Conversely, if the sliding window has space but the

token bucket is empty, the request is also denied. This ensures the stricter of the two limits

is always enforced.

Time Complexity:

- allow: Amortized O(1). Token bucket operations are O(1). Sliding window deque operations

(append, popleft) are O(1) amortized. In the worst case, _clean_window might iterate

through many elements if the clock jumps significantly, but this is amortized over time.

- retry_after: Amortized O(N) in the worst case for the sliding window part, where N is the number

of requests in the window, as it might iterate through the deque to find the expiration point.

However, N is bounded by rate * window_seconds.

Memory Complexity:

- Per client: O(rate * window_seconds) for storing (timestamp, cost) tuples in the sliding window log.

Plus constant memory for token bucket state and lock.

- Total: O(Number of active clients * rate * window_seconds). Stale client cleanup prevents unbounded growth.

Unit Tests

import unittest
import time
import threading
import collections

Re-define TestClock here for self-contained test file

class TestClock:
def init(self, initial_time=0.0):
self._time = initial_time
self._lock = threading.Lock()

def time(self):
    with self._lock:
        return self._time

def advance(self, seconds):
    with self._lock:
        self._time += seconds

def set_time(self, new_time):
    with self._lock:
        self._time = new_time

class RateLimiterTest(unittest.TestCase):
def setUp(self):
self.clock = TestClock()
# Use a short cleanup interval for tests to make eviction testable
self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)

def tearDown(self):
    self.limiter.stop_cleanup_thread()

def test_basic_allow_deny(self):
    client_id = "client1"
    # Initial burst of 5 + 10 tokens/sec * 0 sec = 5 tokens
    # Allow 5 requests
    for _ in range(5):
        self.assertTrue(self.limiter.allow(client_id))
    
    # Deny 6th request (burst exhausted)
    self.assertFalse(self.limiter.allow(client_id))
    self.assertGreater(self.limiter.retry_after(client_id), 0)

    # Advance time to allow refill
    self.clock.advance(0.1) # 1 token refilled (10 tokens/sec * 0.1 sec)
    self.assertTrue(self.limiter.allow(client_id)) # Should allow 1
    self.assertFalse(self.limiter.allow(client_id)) # Deny 2nd

    self.clock.advance(0.9) # 9 more tokens refilled, total 10 tokens refilled since initial burst exhausted
    for _ in range(9):
        self.assertTrue(self.limiter.allow(client_id))
    self.assertFalse(self.limiter.allow(client_id)) # Deny 10th

def test_burst_draining_and_refill(self):
    client_id = "client2"
    # Burst: 5, Rate: 10/s
    
    # Drain burst
    for _ in range(5):
        self.assertTrue(self.limiter.allow(client_id))
    self.assertFalse(self.limiter.allow(client_id)) # Burst exhausted

    # Advance time for partial refill
    self.clock.advance(0.2) # 2 tokens refilled (10 * 0.2)
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertFalse(self.limiter.allow(client_id)) # Refill exhausted

    # Advance time for full refill (up to burst capacity)
    self.clock.advance(1.0) # 10 tokens refilled. Bucket should be capped at burst (5).
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertFalse(self.limiter.allow(client_id)) # Burst exhausted again

def test_sliding_window_cap_independent_of_bucket_refill(self):
    # Reset limiter and client for this specific test scenario
    self.limiter.stop_cleanup_thread() 
    self.limiter = RateLimiter(rate=10, burst=10, window_seconds=1, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)
    self.clock.set_time(0.0)
    client_id = "client_window_test"
    
    # Allow 10 requests within a very short time (burst allows this)
    for _ in range(10):
        self.assertTrue(self.limiter.allow(client_id))
    
    # Window capacity is 10 units (10 units/s * 1s).
    # We have 10 requests in the window, total cost 10.
    # Next request should be denied by window, even if bucket has tokens.
    self.assertFalse(self.limiter.allow(client_id))
    self.assertGreater(self.limiter.retry_after(client_id), 0)
    
    # Advance time by 0.5s. Still denied by window.
    self.clock.advance(0.5)
    self.assertFalse(self.limiter.allow(client_id))
    
    # Advance time by 0.5s more. Total 1.0s since the first request (at time 0.0).
    # The first request (at time 0.0) should now be out of the window.
    self.clock.advance(0.5)
    self.assertTrue(self.limiter.allow(client_id)) # Should allow, as one request expired from window.
    self.assertEqual(self.limiter.clients[client_id].current_window_cost, 10) # 10 requests in window, 10 units.

def test_cost_greater_than_burst(self):
    client_id = "client4"
    # Burst is 5. Cost 6 should be rejected.
    self.assertFalse(self.limiter.allow(client_id, cost=6))
    self.assertEqual(self.limiter.clients[client_id].tokens, 5) # Tokens should not be consumed
    self.assertEqual(self.limiter.clients[client_id].current_window_cost, 0) # Window should be empty

    # Cost 5 should be allowed
    self.assertTrue(self.limiter.allow(client_id, cost=5))
    self.assertEqual(self.limiter.clients[client_id].tokens, 0)
    self.assertEqual(self.limiter.clients[client_id].current_window_cost, 5)

def test_concurrent_contention_on_one_client(self):
    self.limiter.stop_cleanup_thread()
    self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)
    self.clock.set_time(0.0)
    
    client_id = "client_fixed_duration"
    test_duration = 2.0 # seconds
    num_threads = 10
    
    allowed_count = [0] # Use a list to make it mutable in nested scope
    
    def fixed_duration_worker():
        start_time = self.clock.time()
        while self.clock.time() < start_time + test_duration:
            if self.limiter.allow(client_id):
                with threading.Lock(): # Protect shared counter
                    allowed_count[0] += 1
            # Simulate some work and advance clock slightly
            self.clock.advance(0.001) # Each attempt takes 1ms

    threads = []
    for _ in range(num_threads):
        t = threading.Thread(target=fixed_duration_worker)
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    
    # Max allowed by token bucket: burst + rate * test_duration = 5 + 10 * 2 = 25.
    # Max allowed by window: rate * window_seconds = 10 * 10 = 100.
    # So, the token bucket is the stricter bottleneck here. Expected max allowed is 25.
    
    self.assertLessEqual(allowed_count[0], 25)
    # Due to non-deterministic scheduling and clock advancement by multiple threads,
    # asserting a precise lower bound is difficult. We primarily verify the upper bound.
    self.assertGreater(allowed_count[0], 15) # Should allow a significant number of requests

def test_stale_client_eviction(self):
    client_id_active = "active_client"
    client_id_stale = "stale_client"
    
    self.limiter.stop_cleanup_thread()
    self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)
    self.clock.set_time(0.0) # Reset clock
    
    # Access active client at time 0.0
    self.assertTrue(self.limiter.allow(client_id_active))
    
    # Access stale client at time 0.0
    self.assertTrue(self.limiter.allow(client_id_stale))
    
    # Advance time to just before stale_threshold for stale_client
    # Stale threshold is 1.5 * 10 = 15 seconds after last access.
    # So, stale_client will be stale at time 15.0.
    self.clock.advance(14.0) # Time is now 14.0
    
    # Access active client again to update its last_access_time
    self.assertTrue(self.limiter.allow(client_id_active)) # last_access_time for active_client is now 14.0
    
    # Advance time past stale_threshold for stale_client
    self.clock.advance(1.5) # Time is now 15.5. stale_client's last_access_time (0.0) is < 15.5 - 15 = 0.5.
    
    time.sleep(self.limiter.cleanup_interval * 2) # Give cleanup thread time to run
    
    self.assertNotIn(client_id_stale, self.limiter.clients)
    self.assertIn(client_id_active, self.limiter.clients) # Active client should still be there

def test_retry_after(self):
    client_id = "client_retry"
    
    # Initial state: 5 burst tokens, 0 window cost.
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0)
    
    # Consume burst
    for _ in range(5):
        self.limiter.allow(client_id)
    
    # No tokens left. Need 1 token. Rate is 10/s. So, 0.1s needed.
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.1)
    
    self.clock.advance(0.05) # Advance half the time needed
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.05)
    
    self.clock.advance(0.05) # Advance remaining time. Now 0.1s passed. 1 token refilled.
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0)
    self.assertTrue(self.limiter.allow(client_id)) # Consume the token
    
    # Now test retry_after for window limit
    self.limiter.stop_cleanup_thread()
    self.limiter = RateLimiter(rate=10, burst=10, window_seconds=1, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)
    self.clock.set_time(0.0)
    
    # Fill up the window (10 units in 1 second window)
    for i in range(10):
        self.clock.advance(0.01) # Small advance to make timestamps distinct
        self.limiter.allow(client_id)
    
    # Current time is approx 0.1s. All 10 requests are in the window.
    # Window capacity is 10 units.
    # Next request (cost 1) should be denied by window.
    self.assertGreater(self.limiter.retry_after(client_id), 0.0)
    
    # The first request was at time ~0.01. It will expire at ~0.01 + 1 = ~1.01.
    # Current time is ~0.1. So, retry_after should be ~1.01 - ~0.1 = ~0.91.
    first_req_ts = self.limiter.clients[client_id].request_timestamps[0][0]
    expected_retry = (first_req_ts + self.limiter.window_seconds) - self.clock.time()
    self.assertAlmostEqual(self.limiter.retry_after(client_id), expected_retry, places=2)
    
    # Advance time until the first request expires
    self.clock.advance(expected_retry + 0.001) # Advance slightly past expiration
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0, places=2)
    self.assertTrue(self.limiter.allow(client_id)) # Should now be allowed

def test_clock_going_backwards_or_large_pauses(self):
    # Monotonic clock prevents going backwards, but large pauses are handled by clamping.
    # TestClock can simulate backwards movement.
    self.limiter.stop_cleanup_thread()
    self.clock = TestClock(initial_time=100.0)
    self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock)
    client_id = "client_time_warp"

    self.assertTrue(self.limiter.allow(client_id)) # Time 100.0, tokens 4, window cost 1
    self.clock.set_time(50.0) # Clock goes backwards
    
    # Refill should clamp last_refill_time to current_time (50.0), no extra tokens granted.
    # Tokens should still be 4 (from previous allow). The refill logic will see time_delta=0.
    self.assertTrue(self.limiter.allow(client_id)) # Should still allow, tokens 3.
    self.assertEqual(self.limiter.clients[client_id].tokens, 3)
    
    # Now advance time normally, tokens should refill from 50.0
    self.clock.set_time(51.0) # 1 second passed from clamped time
    self.assertTrue(self.limiter.allow(client_id)) # Tokens 3 + 10 (refill) - 1 (cost) = 12. Capped at burst 5. So 5 - 1 = 4.
    self.assertEqual(self.limiter.clients[client_id].tokens, 4)

def test_first_ever_request_lazy_initialization(self):
    client_id = "new_client"
    self.assertNotIn(client_id, self.limiter.clients)
    self.assertTrue(self.limiter.allow(client_id))
    self.assertIn(client_id, self.limiter.clients)
    self.assertEqual(self.limiter.clients[client_id].tokens, 4) # 5 (burst) - 1 (cost)
    self.assertEqual(self.limiter.clients[client_id].current_window_cost, 1)

def test_fractional_tokens_sub_millisecond_timing(self):
    self.limiter.stop_cleanup_thread()
    self.limiter = RateLimiter(rate=1, burst=1, window_seconds=10, clock=self.clock)
    client_id = "fractional_client"
    
    self.assertTrue(self.limiter.allow(client_id)) # Tokens 0
    self.assertFalse(self.limiter.allow(client_id)) # No tokens
    
    self.clock.advance(0.0001) # 0.0001 tokens refilled
    self.assertFalse(self.limiter.allow(client_id)) # Not enough for cost 1
    
    self.clock.advance(0.9999) # Total 1.0s passed. 1 token refilled.
    self.assertTrue(self.limiter.allow(client_id)) # Allowed. Tokens 0.

Resultado

#2

Votos de vitoria

0 / 3

Pontuacao media

69
Modelos avaliadores Google Gemini 2.5 Pro

Pontuacao total

76

Comentario geral

A Resposta B entrega uma implementação funcional e em grande parte correta, acompanhada de uma nota de design muito detalhada. Implementa corretamente o algoritmo híbrido e utiliza uma estratégia padrão de bloqueio por cliente para segurança de threads. No entanto, possui várias fraquezas em comparação com a Resposta A: seu modelo de concorrência é menos escalável devido a um bloqueio global no dicionário principal de clientes, depende de uma thread em segundo plano para limpeza, o que adiciona complexidade, e alguns de seus testes unitários são falhos (o teste de concorrência) ou potencialmente instáveis (o teste de evicção). Também omite o recurso sugerido de configuração por cliente.

Ver detalhes da avaliacao

Correcao

Peso 35%
75

A lógica principal de limitação de taxa está correta. No entanto, o modelo de concorrência, embora funcional, é menos robusto do que o de A; o bloqueio global no dicionário de clientes pode se tornar um gargalo sob alta rotatividade de clientes. O teste concorrente tem um design falho onde várias threads avançam um relógio compartilhado, tornando seus resultados não confiáveis. A lógica 'retry_after' é O(N) no pior caso, o que é um problema menor de correção/desempenho.

Completude

Peso 20%
80

A resposta é em grande parte completa, fornecendo a API principal, testes e uma nota de design. No entanto, falha em implementar o recurso de configuração por cliente sugerido nos requisitos do construtor do prompt, oferecendo apenas uma configuração global única. Esta é uma omissão notável.

Qualidade do codigo

Peso 20%
70

A qualidade do código é boa, mas não excepcional. O uso de uma thread em segundo plano para limpeza adiciona complexidade e sobrecarga de gerenciamento de recursos em comparação com a abordagem de A. A combinação de um bloqueio global para o mapa de clientes e bloqueios por cliente é menos elegante do que o 'lock striping' puro de A. O código de teste também contém uma definição de classe duplicada.

Valor pratico

Peso 15%
75

A implementação é prática para muitos casos de uso. No entanto, o bloqueio global no dicionário de clientes limita sua escalabilidade em cenários com alta taxa de novos clientes. A dependência de uma thread em segundo plano também a torna um pouco mais complexa de integrar e gerenciar em alguns ambientes de aplicação.

Seguimento de instrucoes

Peso 10%
85

A resposta segue a maioria das instruções bem. O principal desvio é não implementar um construtor que aceite configuração por cliente, o que foi explicitamente mencionado como um requisito para a superfície da API. A nota de design também é significativamente mais longa do que a contagem de palavras sugerida, embora seja completa.

Modelos avaliadores OpenAI GPT-5.4

Pontuacao total

64

Comentario geral

A Resposta B demonstra boa intenção e inclui um limitador híbrido, discussão de design e muitos testes, mas falha em vários requisitos importantes. Sua API não suporta configuração por cliente no construtor, usa um bloqueio global de clientes para todas as consultas/criações, a limpeza de itens obsoletos depende de uma thread de fundo real em vez de limpeza determinística controlada por relógio, e vários comportamentos estão incorretos ou mal especificados, como permitir custo não positivo em vez de rejeitar e ajustar o tempo para trás, movendo last_refill_time para trás. Alguns testes são não determinísticos ou dependem de sleep real, reduzindo a confiabilidade do benchmark.

Ver detalhes da avaliacao

Correcao

Peso 35%
60

O comportamento híbrido principal está presente, mas há problemas de correção: custo não positivo é permitido em vez de explicitamente rejeitado, o tratamento de relógio para trás retrocede last_refill_time em vez de limitar às semânticas de estado anterior, a limpeza de itens obsoletos pode remover clientes com base em leituras desbloqueadas, e o comportamento de limpeza depende de uma thread de fundo com tempo real. A concorrência no mapa de clientes ainda passa por um bloqueio global para consulta/criação. Alguns testes assumem comportamentos que não são totalmente determinísticos.

Completude

Peso 20%
70

Inclui código executável, lógica híbrida, nota de design e mais de 6 testes, mas perde o requisito do construtor para configuração por cliente e não fornece limpeza/testes determinísticos para a remoção de itens obsoletos. Lida com muitos casos extremos, mas alguns são tratados de forma que conflita com o prompt, especialmente a validação de custo e o comportamento do tempo. A completude é decente, mas notavelmente incompleta em relação à especificação.

Qualidade do codigo

Peso 20%
68

Legível e organizado, com classes e comentários claros, mas a qualidade é reduzida por definições duplicadas de TestClock, mistura de código de produção e testes de forma um tanto repetitiva, e dependência de uma thread de limpeza em segundo plano que complica o raciocínio e os testes. Várias escolhas de API e validação também são inconsistentes com o contrato solicitado.

Valor pratico

Peso 15%
62

Útil como implementação de ensino ou protótipo, mas menos prático em produção porque a limpeza está ligada a uma thread de fundo e sleep em tempo real, a personalização do construtor é limitada e o bloqueio global de clientes pode se tornar um ponto de contenção. Alguns comportamentos de casos extremos precisariam de refinamento antes de um uso confiável.

Seguimento de instrucoes

Peso 10%
65

Segue parcialmente as instruções, mas perde o requisito do construtor de configuração por cliente e se desvia do contrato de permissão especificado ao permitir custo zero/negativo. Os testes não são totalmente determinísticos devido ao sleep real/limpeza em segundo plano, e a abordagem de concorrência não reflete totalmente a ênfase do prompt em evitar um gargalo global.

Modelos avaliadores Anthropic Claude Opus 4.7

Pontuacao total

65

Comentario geral

A Resposta B implementa o algoritmo híbrido exigido com bloqueios por cliente e cobre os casos extremos listados com muitos testes. No entanto, tem defeitos notáveis: o teste concorrente usa `with threading.Lock():` que cria um novo bloqueio a cada incremento (portanto, a verificação do limite superior não é protegida de forma confiável), a limpeza de clientes obsoletos usa uma thread de fundo em tempo real exigindo `time.sleep` nos testes (não determinístico e contradiz o objetivo do relógio injetável), e o gerenciamento do ciclo de vida (stop_cleanup_thread) é desajeitado. A nota de design é completa, mas um tanto verbosa.

Ver detalhes da avaliacao

Correcao

Peso 35%
60

Implementa o híbrido corretamente no fluxo básico, mas tem problemas: o teste de contenção concorrente usa `with threading.Lock():` criando um NOVO bloqueio a cada vez (não protegendo o contador), portanto, a asserção não é validada corretamente. Também cria estado de cliente em `retry_after` mesmo quando a verificação de custo > burst não é feita. O tratamento do relógio para trás limita `last_refill_time`, mas o teste espera que o reabastecimento de 50.0->51.0 dê exatamente 4 tokens — o comentário matemático é inconsistente. Usa uma thread daemon para limpeza, o que faz com que testes determinísticos dependam de `real time.sleep`. Geralmente funcional, mas menos robusto.

Completude

Peso 20%
75

Cobre todos os casos extremos listados com testes dedicados, incluindo inicialização preguiçosa, fracionário, relógio para trás, expiração de obsoleto. No entanto, a expiração de obsoleto depende de `real time.sleep` com uma thread de fundo, o que é não determinístico. O teste concorrente afirma apenas o limite superior, mas o bug de bloqueio o prejudica. 9 testes, mas a qualidade é mista.

Qualidade do codigo

Peso 20%
60

Estrutura razoável, mas mistura a thread de limpeza em segundo plano com o relógio injetável (a limpeza usa `self.clock.time()` mas espera em tempo real via `Event.wait`). Redefine `TestClock` na seção de teste. O teste concorrente tem um bug claro (`with threading.Lock():` cria um novo bloqueio a cada chamada). A nota de design é mais verbosa. Algumas inconsistências nos comentários versus o código.

Valor pratico

Peso 15%
65

Funcional, mas a thread daemon em segundo plano para limpeza complica o ciclo de vida (precisa de `stop_cleanup_thread` explícito). Os testes dependem de `real sleep`. Ainda utilizável, mas menos robusto para produção.

Seguimento de instrucoes

Peso 10%
75

Segue a maioria das instruções, mas o teste concorrente não valida deterministicamente o invariante devido ao bug do bloqueio, e o teste de cliente obsoleto usa `real-time sleep` em vez de puramente o relógio injetável. A nota de design cobre os pontos necessários.

Resumo comparativo

Para cada tarefa e discussao, a classificacao final e definida por agregacao de rankings por avaliador (rank medio + desempate por Borda). A pontuacao media e exibida como referencia.

Avaliadores: 3

Votos de vitoria

3 / 3

Pontuacao media

89
Ver esta resposta

Votos de vitoria

0 / 3

Pontuacao media

69
Ver esta resposta

Resultados da avaliacao

Modelos avaliadores Anthropic Claude Opus 4.7

Motivo do vencedor

A resposta A é mais correta e robusta em concorrência, com testes determinísticos que realmente dependem do relógio injetável (incluindo o teste de invariante concorrente), melhor ciclo de vida (sem thread em segundo plano) e tratamento mais rigoroso de casos extremos (custo>burst sem criação de estado, limitação explícita do tempo decorrido). A resposta B tem um teste concorrente com falha (novo lock por incremento) e depende de sleep em tempo real para o teste de stale-eviction, enfraquecendo tanto a correção quanto o seguimento das instruções — os dois critérios de maior peso.

Modelos avaliadores OpenAI GPT-5.4

Motivo do vencedor

A resposta A vence porque seu desempenho ponderado é mais forte nos critérios mais importantes, especialmente correção e completude. Ela implementa de forma mais fiel a API especificada, lida com os casos de borda listados de forma mais rigorosa, fornece um design mais consciente de concorrência sem depender de um único gargalo global para mutações comuns por cliente e inclui testes determinísticos que correspondem melhor à solicitação. A resposta B está razoavelmente estruturada, mas sua falta de configuração por cliente, tratamento mais fraco de casos de borda e limpeza/testes não determinísticos a tornam menos adequada no geral.

Modelos avaliadores Google Gemini 2.5 Pro

Motivo do vencedor

A Resposta A é a vencedora clara devido à sua qualidade superior de design e implementação. O seu uso de lock striping para concorrência é significativamente mais escalável do que o lock global do mapa de clientes da Resposta B. A abordagem da Resposta A para a limpeza de clientes obsoletos, integrando-a nas chamadas de API, é mais elegante e evita a sobrecarga e a complexidade de uma thread de fundo dedicada, como visto em B. Além disso, os testes unitários de A são projetados de forma mais robusta, especialmente o teste concorrente que utiliza uma metodologia mais sólida. Finalmente, a Resposta A satisfaz totalmente todos os requisitos do prompt, incluindo a configuração flexível por cliente no construtor, que a Resposta B omite.

X f L