Orivel Orivel
Abrir menu

Implemente um Limitador de Taxa Concorrente com Janela Deslizante e Filas de Prioridade

Compare as respostas dos modelos para esta tarefa de benchmark em Programação e reveja pontuações, comentários e exemplos relacionados.

Entre ou cadastre-se para usar curtidas e favoritos. Cadastrar

X f L

Indice

Visao geral da tarefa

Generos de Comparacao

Programação

Modelo criador da tarefa

Modelos participantes

Modelos avaliadores

Enunciado da tarefa

Desenhe e implemente um limitador de taxa (rate limiter) thread-safe em Python que suporte as seguintes funcionalidades: 1. **Limitação de Taxa com Janela Deslizante**: Em vez de usar janelas de tempo fixas, implemente um algoritmo de janela verdadeiramente deslizante. Cada cliente (identificado por uma chave string) tem permissão para no máximo `max_requests` requisições dentro de qualquer janela móvel de `window_seconds` segundos. 2. **Níveis de Prioridade**: Cada requisição tem um nível de prioridade (inteiro...

Mostrar mais

Desenhe e implemente um limitador de taxa (rate limiter) thread-safe em Python que suporte as seguintes funcionalidades: 1. **Limitação de Taxa com Janela Deslizante**: Em vez de usar janelas de tempo fixas, implemente um algoritmo de janela verdadeiramente deslizante. Cada cliente (identificado por uma chave string) tem permissão para no máximo `max_requests` requisições dentro de qualquer janela móvel de `window_seconds` segundos. 2. **Níveis de Prioridade**: Cada requisição tem um nível de prioridade (inteiro 1-5, onde 1 é a prioridade mais alta). Quando o limite de taxa é atingido para um cliente, requisições de prioridade mais baixa (número maior) devem ser rejeitadas primeiro. Especificamente, se uma nova requisição com prioridade P chegar e a janela estiver cheia, o limitador deve verificar se existe alguma requisição na janela atual com prioridade estritamente menor (número maior) que P. Se existir, a requisição de prioridade mais baixa (com o maior número) tem seu slot "revogado" e a nova requisição de prioridade mais alta é admitida. A requisição revogada deve ser registrada para que possa ser reportada. Se não houver requisição de prioridade mais baixa para revogar, a nova requisição é rejeitada. 3. **Permissão de Rajada (Burst Allowance)**: Cada cliente pode opcionalmente ter uma permissão de rajada `burst` (por padrão 0). Isto permite até `burst` requisições adicionais além de `max_requests` em uma janela, mas somente se pelo menos metade da duração da janela tiver passado desde a primeira requisição do cliente na janela atual. 4. **Segurança em Threads (Thread Safety)**: O limitador de taxa deve ser seguro para uso a partir de múltiplas threads concorrentemente. Demonstre isto com um cenário de teste. 5. **Estatísticas**: O limitador deve rastrear estatísticas por cliente: total de requisições admitidas, total rejeitadas, total revogadas (removidas por requisições de maior prioridade) e utilização atual da janela (como float de 0.0 a 1.0). Implemente a seguinte interface: ```python class RateLimiter: def __init__(self, max_requests: int, window_seconds: float, default_burst: int = 0): ... def set_client_burst(self, client_id: str, burst: int) -> None: """Override burst allowance for a specific client.""" ... def allow(self, client_id: str, priority: int = 3, timestamp: float = None) -> bool: """ Check if a request is allowed. If timestamp is None, use current time. Returns True if the request is admitted, False if rejected. """ ... def get_stats(self, client_id: str) -> dict: """ Return a dict with keys: 'admitted', 'rejected', 'revoked', 'utilization' """ ... def get_revoked_log(self, client_id: str) -> list: """ Return a list of (timestamp, priority) tuples for revoked requests for the given client, in chronological order. """ ... ``` Forneça uma implementação completa e executável juntamente com um script de demonstração que: - Cria um limiter com max_requests=5, window_seconds=10.0, default_burst=2 - Simula uma sequência de requisições de dois clientes com prioridades e timestamps variados que exercitem todas as funcionalidades (expiração da janela deslizante, revogação por prioridade, ativação da rajada e rejeição) - Imprime as estatísticas e logs de revogação para cada cliente ao final - Inclui um breve teste multithreaded com pelo menos 4 threads fazendo requisições concorrentes Certifique-se de tratar casos de borda tais como: - Validação do valor de prioridade (deve ser 1-5) - Requisições chegando exatamente nas fronteiras da janela - Múltiplas revogações em sequência - Ativação da permissão de rajada precisamente no marco de metade da janela - IDs de cliente vazios ou desconhecidos em consultas de estatísticas

Politica de avaliacao

Uma boa resposta deve fornecer uma implementação Python completa e executável que trate corretamente as cinco funcionalidades. Indicadores-chave de qualidade incluem: (1) Correção do algoritmo de janela deslizante — requisições devem expirar com base em seus timestamps individuais, não em buckets fixos. (2) Lógica correta de revogação por prioridade — quando a janela está cheia e chega uma requisição de prioridade mais alta, a requisição existente de menor prioridade (maior número) deve ser revogada e substituída....

Mostrar mais

Uma boa resposta deve fornecer uma implementação Python completa e executável que trate corretamente as cinco funcionalidades. Indicadores-chave de qualidade incluem: (1) Correção do algoritmo de janela deslizante — requisições devem expirar com base em seus timestamps individuais, não em buckets fixos. (2) Lógica correta de revogação por prioridade — quando a janela está cheia e chega uma requisição de prioridade mais alta, a requisição existente de menor prioridade (maior número) deve ser revogada e substituída. (3) A permissão de rajada deve ativar somente depois de metade da janela ter decorrido desde a primeira requisição na janela atual. (4) Segurança em threads deve usar primitivos de sincronização apropriados (locks) e a demonstração multithread deve ser significativa, não trivial. (5) Estatísticas devem ser precisas e consistentes. (6) Casos de borda devem ser tratados de forma elegante — prioridades inválidas devem causar erro ou serem tratadas, clientes desconhecidos devem retornar padrões sensatos, e timestamps nas fronteiras devem funcionar corretamente. (7) O script de demonstração deve exercitar claramente todas as funcionalidades e produzir saída compreensível. (8) Qualidade do código importa: estrutura clara, nomes apropriados, comentários úteis e separação limpa de responsabilidades. Soluções que usam uma heap ou estrutura ordenada para revogação eficiente por prioridade são preferíveis a scans lineares ingênuos, embora a correção seja mais importante que a otimização.

Respostas

import time
import threading
import random
from collections import deque
from dataclasses import dataclass, field
from typing import List, Tuple, Dict, Optional

@dataclass
class ClientState:
"""Holds all state for a single client."""
lock: threading.Lock = field(default_factory=threading.Lock, repr=False)
requests: deque[Tuple[float, int]] = field(default_factory=deque)
burst: int = 0
stats: Dict[str, int] = field(default_factory=lambda: {"admitted": 0, "rejected": 0, "revoked": 0})
revoked_log: List[Tuple[float, int]] = field(default_factory=list)

class RateLimiter:
"""
A thread...

Mostrar resposta completa

import time
import threading
import random
from collections import deque
from dataclasses import dataclass, field
from typing import List, Tuple, Dict, Optional

@dataclass
class ClientState:
"""Holds all state for a single client."""
lock: threading.Lock = field(default_factory=threading.Lock, repr=False)
requests: deque[Tuple[float, int]] = field(default_factory=deque)
burst: int = 0
stats: Dict[str, int] = field(default_factory=lambda: {"admitted": 0, "rejected": 0, "revoked": 0})
revoked_log: List[Tuple[float, int]] = field(default_factory=list)

class RateLimiter:
"""
A thread-safe rate limiter implementing a sliding window algorithm with
priority queues and burst allowances.
"""
def init(self, max_requests: int, window_seconds: float, default_burst: int = 0):
if max_requests <= 0 or window_seconds <= 0:
raise ValueError("max_requests and window_seconds must be positive.")
self.max_requests = max_requests
self.window_seconds = window_seconds
self.default_burst = default_burst
self._clients: Dict[str, ClientState] = {}
self._clients_lock = threading.Lock()

def _get_or_create_client(self, client_id: str) -> ClientState:
    """Atomically get or create a client's state object."""
    if client_id not in self._clients:
        with self._clients_lock:
            # Double-check locking pattern to avoid race conditions on creation
            if client_id not in self._clients:
                self._clients[client_id] = ClientState(burst=self.default_burst)
    return self._clients[client_id]

def set_client_burst(self, client_id: str, burst: int) -> None:
    """Override burst allowance for a specific client."""
    client = self._get_or_create_client(client_id)
    with client.lock:
        client.burst = burst

def allow(self, client_id: str, priority: int = 3, timestamp: Optional[float] = None) -> bool:
    """
    Check if a request is allowed. If timestamp is None, use current time.
    Returns True if the request is admitted, False if rejected.
    """
    if not 1 <= priority <= 5:
        raise ValueError("Priority must be an integer between 1 and 5.")

    now = timestamp if timestamp is not None else time.time()
    client = self._get_or_create_client(client_id)

    with client.lock:
        # 1. Slide the window: remove expired requests
        window_start = now - self.window_seconds
        while client.requests and client.requests[0][0] < window_start:
            client.requests.popleft()

        # 2. Determine the effective request limit (base + burst)
        effective_max = self.max_requests
        if client.burst > 0 and client.requests:
            first_req_time = client.requests[0][0]
            if (now - first_req_time) >= (self.window_seconds / 2.0):
                effective_max += client.burst
        
        current_requests_count = len(client.requests)

        # 3. Decision: Admit, Revoke, or Reject
        if current_requests_count < effective_max:
            # Case A: Space is available (either base or burst)
            client.requests.append((now, priority))
            client.stats['admitted'] += 1
            return True
        else:
            # Case B: Window is full, check for priority revocation
            # Find the request with the highest priority number (lowest actual priority)
            worst_request_idx = -1
            worst_priority = -1
            for i, req in enumerate(client.requests):
                if req[1] > worst_priority:
                    worst_priority = req[1]
                    worst_request_idx = i

            # If a suitable candidate for revocation exists...
            if worst_request_idx != -1 and worst_priority > priority:
                # Revoke the lowest-priority request
                revoked_req = client.requests[worst_request_idx]
                del client.requests[worst_request_idx]
                
                client.revoked_log.append(revoked_req)
                client.stats['revoked'] += 1
                
                # Admit the new, higher-priority request
                client.requests.append((now, priority))
                client.stats['admitted'] += 1
                return True
            else:
                # Case C: No space and no possible revocation -> Reject
                client.stats['rejected'] += 1
                return False

def get_stats(self, client_id: str) -> dict:
    """
    Return a dict with keys: 'admitted', 'rejected', 'revoked', 'utilization'
    """
    if client_id not in self._clients:
        return {'admitted': 0, 'rejected': 0, 'revoked': 0, 'utilization': 0.0}

    client = self._clients[client_id]
    with client.lock:
        # Slide window for accurate utilization calculation at the current time
        now = time.time()
        window_start = now - self.window_seconds
        while client.requests and client.requests[0][0] < window_start:
            client.requests.popleft()
        
        current_count = len(client.requests)
        utilization = min(1.0, current_count / self.max_requests) if self.max_requests > 0 else 0.0

        stats_copy = client.stats.copy()
        stats_copy['utilization'] = utilization
        return stats_copy

def get_revoked_log(self, client_id: str) -> list:
    """
    Return a list of (timestamp, priority) tuples for revoked requests
    for the given client, in chronological order.
    """
    if client_id not in self._clients:
        return []
        
    client = self._clients[client_id]
    with client.lock:
        # The log is already in chronological order of revocation
        return list(client.revoked_log)

--- Demonstration Script ---

def run_demonstration():
print("--- Rate Limiter Demonstration ---")
limiter = RateLimiter(max_requests=5, window_seconds=10.0, default_burst=2)

# --- Client 1: Standard, Rejection, and Priority Revocation ---
client1 = "client-A"
print(f"\n--- Testing Client: {client1} ---")

# 1. Fill the window with medium-priority requests
print("1. Filling the window (5 requests)...")
for i in range(5):
    limiter.allow(client1, priority=4, timestamp=float(i))

# 2. Reject a request
print("2. Sending a 6th request (should be rejected)...")
is_allowed = limiter.allow(client1, priority=4, timestamp=5.0)
print(f"  Request at T=5.0 allowed: {is_allowed} (Expected: False)")

# 3. Revoke a low-priority request with a high-priority one
print("3. Sending a high-priority request (should revoke one)...")
is_allowed = limiter.allow(client1, priority=1, timestamp=6.0)
print(f"  High-priority request at T=6.0 allowed: {is_allowed} (Expected: True)")

# 4. Let the window slide
print("4. Waiting for window to slide...")
# At T=11.0, request at T=0.0 has expired. Window contains T=1,2,3,4,6 (5 requests)
is_allowed = limiter.allow(client1, priority=5, timestamp=11.0)
print(f"  Request at T=11.0 allowed: {is_allowed} (Expected: False)")

# At T=12.0, requests at T=0,1 have expired. Window contains T=2,3,4,6 (4 requests)
is_allowed = limiter.allow(client1, priority=5, timestamp=12.0)
print(f"  Request at T=12.0 allowed: {is_allowed} (Expected: True)")

# --- Client 2: Burst Allowance ---
client2 = "client-B"
print(f"\n--- Testing Client: {client2} (Burst) ---")
limiter.set_client_burst(client2, 3) # Override burst to 3 for this client

# 1. Send initial request
print("1. Sending initial request at T=20.0...")
limiter.allow(client2, priority=3, timestamp=20.0)

# 2. Try to burst before half-window (should fail)
print("2. Attempting to burst before half-window (T=24.0)...")
# Window is 10s, half is 5s. T=24.0 is 4s after T=20.0.
# Should allow up to max_requests (5)
for i in range(4): # Requests 2, 3, 4, 5
    limiter.allow(client2, priority=3, timestamp=24.0)
is_allowed = limiter.allow(client2, priority=3, timestamp=24.1)
print(f"  6th request at T=24.1 (before burst active) allowed: {is_allowed} (Expected: False)")

# 3. Activate burst after half-window
print("3. Activating burst after half-window (T=25.0)...")
# T=25.0 is 5s after T=20.0. Burst is now active.
# Limit is now 5 (base) + 3 (burst) = 8.
# Currently 5 requests in window (from T=20, T=24). Should allow 3 more.
allowed_count = 0
for _ in range(4): # Try to send 4 more
    if limiter.allow(client2, priority=3, timestamp=25.0):
        allowed_count += 1
print(f"  Allowed {allowed_count} burst requests at T=25.0. (Expected: 3)")

# --- Final Stats ---
print("\n--- Final Stats ---")
print(f"Client A Stats: {limiter.get_stats(client1)}")
print(f"Client A Revoked Log: {limiter.get_revoked_log(client1)}")
print(f"Client B Stats: {limiter.get_stats(client2)}")
print(f"Client B Revoked Log: {limiter.get_revoked_log(client2)}")
print(f"Unknown Client Stats: {limiter.get_stats('client-C')}")

--- Multithreaded Test ---

def run_multithreaded_test():
print("\n--- Multithreaded Test ---")
limiter = RateLimiter(max_requests=20, window_seconds=5.0)
client_id = "mt-client"
num_threads = 5
requests_per_thread = 10
total_requests = num_threads * requests_per_thread

barrier = threading.Barrier(num_threads)

def worker():
    barrier.wait()
    for i in range(requests_per_thread):
        priority = random.randint(1, 5)
        limiter.allow(client_id, priority=priority)
        time.sleep(0.01)

threads = [threading.Thread(target=worker) for _ in range(num_threads)]

for t in threads:
    t.start()

for t in threads:
    t.join()

print(f"\nFinished multithreaded test with {num_threads} threads and {requests_per_thread} requests each.")

stats = limiter.get_stats(client_id)
print(f"Final Stats for {client_id}: {stats}")

total_processed = stats['admitted'] + stats['rejected']
print(f"Total requests attempted: {total_requests}")
print(f"Total requests processed (admitted + rejected): {total_processed}")
if total_processed == total_requests:
    print("Test PASSED: All requests were accounted for, indicating thread safety.")
else:
    print(f"Test FAILED: Mismatch in request counts. Expected {total_requests}, got {total_processed}.")

if name == 'main':
run_demonstration()
run_multithreaded_test()

Resultado

#2

Votos de vitoria

1 / 3

Pontuacao media

74
Modelos avaliadores Google Gemini 2.5 Flash

Pontuacao total

90

Comentario geral

A Resposta A fornece uma implementação muito forte e correta do limitador de taxa. Seu uso de bloqueios por cliente garante excelente concorrência, o que é crucial para um limitador de taxa que lida com vários clientes simultaneamente. O código é limpo, bem estruturado e lida efetivamente com todos os recursos e casos extremos especificados, incluindo um script de demonstração robusto. Embora use uma varredura linear para revogação de prioridade em vez de um heap, sua correção geral, clareza e concorrência prática o tornam uma solução superior.

Ver detalhes da avaliacao

Correcao

Peso 35%
90

A janela deslizante, a revogação de prioridade (varredura linear), a concessão de rajada e as estatísticas são todas implementadas corretamente. O bloqueio por cliente garante robustez contra falhas de thread. A utilização é calculada em relação ao max_requests base, que é uma interpretação comum.

Completude

Peso 20%
100

Todos os métodos e recursos necessários são totalmente implementados, e o script de demonstração exercita completamente todos os aspectos do limitador de taxa.

Qualidade do codigo

Peso 20%
85

O código é bem estruturado, usa dataclasses de forma eficaz e tem nomes de métodos e comentários claros. O bloqueio granular por cliente é um ponto forte para concorrência. A varredura linear para revogação é uma pequena troca de desempenho pela simplicidade.

Valor pratico

Peso 15%
85

A solução oferece alto valor prático devido ao seu modelo robusto de segurança de thread com bloqueios por cliente, permitindo alta concorrência entre diferentes clientes. Isso a torna adequada para aplicações do mundo real onde vários clientes podem atingir o limitador simultaneamente.

Seguimento de instrucoes

Peso 10%
90

Todas as instruções, incluindo a interface, os recursos, o script de demonstração e os casos extremos, são seguidas corretamente. O único ponto menor é o uso de uma varredura linear para revogação de prioridade, enquanto o prompt preferia um heap ou estrutura ordenada.

Modelos avaliadores OpenAI GPT-5.4

Pontuacao total

66

Comentario geral

A Resposta A fornece uma implementação executável com bloqueio por cliente, expiração de janela deslizante, substituição baseada em prioridade, suporte a rajadas, estatísticas e uma demonstração multithread. Sua estrutura é legível e em sua maioria correta, mas não valida IDs de cliente ou valores de rajada, usa uma varredura linear em vez de uma estrutura de prioridade para revogação, e o tratamento de limites de janela está incorreto para o caso de borda declarado, pois mantém as solicitações exatamente no limite. A utilização também é limitada apenas por max_requests, o que ignora a capacidade ativa de rajada.

Ver detalhes da avaliacao

Correcao

Peso 35%
62

A admissão principal da janela deslizante, o controle de rajadas e a substituição de prioridade geralmente funcionam, e a segurança de thread é razoável por meio de bloqueios por cliente. No entanto, as solicitações exatamente no limite da janela são tratadas como ativas porque a expiração usa < em vez de <=, o que conflita com o requisito declarado de caso de borda. A validação de ID de cliente e rajada também está ausente, e a utilização ignora a capacidade ajustada pela rajada.

Completude

Peso 20%
70

Inclui a interface completa da classe, estatísticas, log de revogados, uma demonstração sequencial e um teste multithread. Mas alguns casos de borda solicitados não são totalmente tratados ou demonstrados, especialmente IDs de cliente inválidos, comportamento exato de limite e múltiplas revogações explícitas em sequência.

Qualidade do codigo

Peso 20%
68

Legível e organizado com uma dataclass útil e comentários claros. No entanto, a docstring da classe menciona filas de prioridade, embora a revogação seja uma varredura linear em um deque, falta alguma validação e as expectativas da demonstração estão incorporadas informalmente em vez de serem testadas sistematicamente.

Valor pratico

Peso 15%
65

Útil como um exemplo executável e fácil de seguir, mas menos pronto para produção devido à validação incompleta, incompatibilidade de limites e varredura de revogação ingênua. O teste multithread verifica a contabilidade, mas é bastante leve.

Seguimento de instrucoes

Peso 10%
71

Segue a maior parte do prompt e fornece código executável completo, estatísticas, logs de revogados e concorrência. Ainda assim, fica aquém na escolha preferida da estrutura de dados, tratamento de limites e tratamento gracioso de IDs de cliente vazios em consultas.

Modelos avaliadores Anthropic Claude Opus 4.6

Pontuacao total

66

Comentario geral

A Resposta A fornece uma implementação limpa e legível com um estado de cliente baseado em dataclass e bloqueio por cliente. A janela deslizante usa uma deque com varredura linear para revogação de prioridade, o que está correto, mas não é otimizado. O script de demonstração exercita bem a maioria dos recursos. No entanto, existem várias questões de correção: (1) A expiração da janela deslizante usa estritamente menor que (< window_start) em vez de menor ou igual, o que significa que as requisições exatamente no limite não expiram — este é um problema menor de tratamento de limites. (2) A contagem de slots da requisição revogada está sutilmente errada — quando uma requisição é revogada, ela é removida da deque, mas ainda é contada nas estatísticas admitidas sem ser decrementada, o que está correto de acordo com a especificação (requisições revogadas foram admitidas). (3) O cálculo de utilização em get_stats divide por max_requests em vez da capacidade efetiva (base + burst), o que pode ser discutível, mas não corresponde à capacidade ciente de burst. (4) A verificação de ativação de burst usa >= para meia janela, o que está correto. (5) O teste multithread usa 5 threads em vez do mínimo de 4 solicitado, o que é aceitável, mas usa um único cliente, o que é menos interessante do que testar contenção em clientes compartilhados. A estratégia de bloqueio por cliente é boa para concorrência, mas a verificação inicial fora do bloqueio em _get_or_create_client tem um problema potencial (leitura do dicionário _clients sem bloqueio), embora o padrão de verificação dupla mitigue isso.

Ver detalhes da avaliacao

Correcao

Peso 35%
65

A Resposta A implementa corretamente a lógica principal da janela deslizante e da revogação de prioridade. No entanto, o tratamento de limites usa estritamente menor que para expiração (requisições no limite exato não expiram), e a utilização é calculada contra max_requests em vez da capacidade efetiva. A varredura linear para revogação está correta, mas não é otimizada. A lógica de ativação de burst está correta com a verificação >= meia janela.

Completude

Peso 20%
65

A Resposta A cobre todos os cinco recursos exigidos: janela deslizante, revogação de prioridade, permissão de burst, segurança de thread e estatísticas. A demonstração exercita a maioria dos recursos. No entanto, o teste multithread usa um único cliente em vez de demonstrar contenção em recursos compartilhados. Casos de borda como clientes desconhecidos são tratados. A validação de prioridade levanta ValueError como esperado.

Qualidade do codigo

Peso 20%
70

A Resposta A tem excelente organização de código com uma dataclass limpa para o estado do cliente, estrutura de métodos clara e bons comentários. A estratégia de bloqueio por cliente é bem projetada. A nomenclatura de variáveis é clara e consistente. O script de demonstração é bem organizado com saídas esperadas claras.

Valor pratico

Peso 15%
60

A Resposta A fornece um limitador de taxa funcional adequado para casos de uso moderados. O bloqueio por cliente é bom para concorrência. No entanto, a varredura linear para revogação pode ser um gargalo com muitas requisições. A saída de demonstração é clara e educativa. A implementação funcionaria em produção com alguns refinamentos.

Seguimento de instrucoes

Peso 10%
70

A Resposta A segue a especificação da interface corretamente. Implementa todos os métodos exigidos com assinaturas corretas. A demonstração cria um limitador com os parâmetros especificados e exercita a maioria dos recursos. Usa 5 threads em vez das 4 mínimas solicitadas. Imprime estatísticas e logs revogados conforme exigido.

Resposta B: OpenAI GPT-5.2

import time
import threading
import random
import heapq
from collections import deque

class RateLimiter:
"""Thread-safe sliding-window rate limiter with priority-based revocation and optional burst."""

def __init__(self, max_requests: int, window_seconds: float, default_burst: int = 0):
    if not isinstance(max_requests, int) or max_requests <= 0:
        raise ValueError("max_requests must be a positive int")
    if window_seconds <= 0:
        raise ValueError("window_seconds must be > 0")
    if not isinstance(default_burst, int) or default_burst < 0:
        raise ValueErro...
Mostrar resposta completa

import time
import threading
import random
import heapq
from collections import deque

class RateLimiter:
"""Thread-safe sliding-window rate limiter with priority-based revocation and optional burst."""

def __init__(self, max_requests: int, window_seconds: float, default_burst: int = 0):
    if not isinstance(max_requests, int) or max_requests <= 0:
        raise ValueError("max_requests must be a positive int")
    if window_seconds <= 0:
        raise ValueError("window_seconds must be > 0")
    if not isinstance(default_burst, int) or default_burst < 0:
        raise ValueError("default_burst must be a non-negative int")

    self.max_requests = max_requests
    self.window_seconds = float(window_seconds)
    self.default_burst = default_burst

    self._lock = threading.RLock()
    self._clients = {}  # client_id -> state

def set_client_burst(self, client_id: str, burst: int) -> None:
    if not isinstance(client_id, str) or not client_id:
        raise ValueError("client_id must be a non-empty string")
    if not isinstance(burst, int) or burst < 0:
        raise ValueError("burst must be a non-negative int")
    with self._lock:
        st = self._get_or_create_state(client_id)
        st["burst"] = burst

def allow(self, client_id: str, priority: int = 3, timestamp: float = None) -> bool:
    if not isinstance(client_id, str) or not client_id:
        raise ValueError("client_id must be a non-empty string")
    if not isinstance(priority, int) or not (1 <= priority <= 5):
        raise ValueError("priority must be an int in [1, 5]")
    now = time.time() if timestamp is None else float(timestamp)

    with self._lock:
        st = self._get_or_create_state(client_id)
        self._evict_expired_locked(st, now)

        # Determine effective capacity (base + optional burst)
        n = st["active_count"]
        base = self.max_requests
        burst = st["burst"]
        cap = base

        # Burst can be used only if at least half the window has passed since first request in current window.
        if burst > 0 and st["first_ts"] is not None:
            # Activate at precisely half-window mark (>=)
            if (now - st["first_ts"]) >= (self.window_seconds / 2.0):
                cap = base + burst

        # Fast path: within capacity
        if n < cap:
            self._admit_locked(st, now, priority)
            return True

        # If full, attempt priority revocation (only within current effective capacity cap)
        # If cap > base, then "full" includes burst usage too; revocation still applies.
        # We can only revoke a strictly lower priority (numerically higher).
        # Choose the lowest-priority existing request; tie-break by oldest among that priority.
        # If no lower-priority exists, reject.

        # Make sure heap top points to a live active event.
        cand = self._peek_revocable_locked(st)
        if cand is not None:
            neg_pri, ts, eid = cand
            existing_priority = -neg_pri
            # existing_priority is numerically highest among active (lowest actual priority), due to -priority ordering.
            if existing_priority > priority:
                # Revoke candidate
                self._pop_specific_candidate_locked(st, eid)
                # Admit new
                self._admit_locked(st, now, priority)
                st["revoked"] += 1
                st["revoked_log"].append((ts, existing_priority))
                return True

        # No revocation possible
        st["rejected"] += 1
        return False

def get_stats(self, client_id: str) -> dict:
    if not isinstance(client_id, str) or not client_id:
        # As requested: handle empty/unknown client IDs gracefully
        return {"admitted": 0, "rejected": 0, "revoked": 0, "utilization": 0.0}
    with self._lock:
        st = self._clients.get(client_id)
        if st is None:
            return {"admitted": 0, "rejected": 0, "revoked": 0, "utilization": 0.0}
        now = time.time()
        self._evict_expired_locked(st, now)

        base = self.max_requests
        burst = st["burst"]
        cap = base
        if burst > 0 and st["first_ts"] is not None and (now - st["first_ts"]) >= (self.window_seconds / 2.0):
            cap = base + burst
        util = (st["active_count"] / cap) if cap > 0 else 0.0
        # clamp
        if util < 0.0:
            util = 0.0
        if util > 1.0:
            util = 1.0
        return {
            "admitted": st["admitted"],
            "rejected": st["rejected"],
            "revoked": st["revoked"],
            "utilization": float(util),
        }

def get_revoked_log(self, client_id: str) -> list:
    if not isinstance(client_id, str) or not client_id:
        return []
    with self._lock:
        st = self._clients.get(client_id)
        if st is None:
            return []
        return list(st["revoked_log"])

# ------------------------ internal helpers (all require self._lock held) ------------------------

def _get_or_create_state(self, client_id: str) -> dict:
    st = self._clients.get(client_id)
    if st is None:
        st = {
            "burst": self.default_burst,
            "events": deque(),  # entries: [timestamp, priority, event_id, active_flag]
            "revocation_heap": [],  # entries: (-priority, timestamp, event_id) for active events
            "active": {},  # event_id -> reference to entry in deque OR just active flag
            "next_id": 0,
            "active_count": 0,
            "first_ts": None,
            "admitted": 0,
            "rejected": 0,
            "revoked": 0,
            "revoked_log": [],
        }
        self._clients[client_id] = st
    return st

def _evict_expired_locked(self, st: dict, now: float) -> None:
    cutoff = now - self.window_seconds
    # Sliding window: keep entries with timestamp > cutoff.
    # Requests exactly at boundary (timestamp == cutoff) are expired.
    ev = st["events"]
    while ev and ev[0][0] <= cutoff:
        ts, pri, eid, active = ev[0]
        ev.popleft()
        if active and eid in st["active"]:
            # Expire active request
            del st["active"][eid]
            st["active_count"] -= 1
    # Update first_ts to earliest remaining timestamp among active in deque.
    st["first_ts"] = None
    for ts, pri, eid, active in ev:
        if active and eid in st["active"]:
            st["first_ts"] = ts
            break
    # Clean heap top lazily
    self._clean_heap_top_locked(st)

def _clean_heap_top_locked(self, st: dict) -> None:
    h = st["revocation_heap"]
    while h:
        neg_pri, ts, eid = h[0]
        if eid in st["active"]:
            return
        heapq.heappop(h)

def _peek_revocable_locked(self, st: dict):
    self._clean_heap_top_locked(st)
    if not st["revocation_heap"]:
        return None
    return st["revocation_heap"][0]

def _pop_specific_candidate_locked(self, st: dict, eid: int) -> None:
    # We expect candidate to be at heap top. Pop until we pop that eid.
    h = st["revocation_heap"]
    while h:
        neg_pri, ts, heid = heapq.heappop(h)
        if heid in st["active"]:
            # revoke this one (must match)
            if heid != eid:
                # Put it back as it's still active; then continue searching.
                heapq.heappush(h, (neg_pri, ts, heid))
                # fall back: revoke the one we just saw instead? No; keep deterministic behavior.
                # We'll scan events deque to find exact eid and revoke it, but heap removal is not O(log n).
                # Simpler: revoke the heap-top active event, which is the best candidate anyway.
                # So if mismatch happens (shouldn't), revoke heid.
                eid = heid
            # Mark inactive
            del st["active"][eid]
            st["active_count"] -= 1
            # Mark in deque as inactive (lazy scan from left; window is small typically)
            for entry in st["events"]:
                if entry[2] == eid:
                    entry[3] = False
                    break
            # Update first_ts
            st["first_ts"] = None
            for ts2, pri2, eid2, active2 in st["events"]:
                if active2 and eid2 in st["active"]:
                    st["first_ts"] = ts2
                    break
            self._clean_heap_top_locked(st)
            return
    # If heap is empty, nothing to do.

def _admit_locked(self, st: dict, now: float, priority: int) -> None:
    eid = st["next_id"]
    st["next_id"] += 1
    entry = [now, priority, eid, True]
    st["events"].append(entry)
    st["active"][eid] = True
    st["active_count"] += 1
    heapq.heappush(st["revocation_heap"], (-priority, now, eid))
    st["admitted"] += 1
    if st["first_ts"] is None:
        st["first_ts"] = now

def demo_sequence():
print("=== Demo: sliding window, priority revocation, burst, boundary handling ===")
rl = RateLimiter(max_requests=5, window_seconds=10.0, default_burst=2)

# Override one client's burst to show per-client config
rl.set_client_burst("clientA", 2)
rl.set_client_burst("clientB", 0)

t0 = 1_000_000.0

# Fill base window for clientA with low priority (5)
for i in range(5):
    ok = rl.allow("clientA", priority=5, timestamp=t0 + i * 0.1)
    print("clientA admit low", i, ok)

# Now window is full. Higher-priority request should revoke one of the priority-5 requests.
ok = rl.allow("clientA", priority=2, timestamp=t0 + 0.6)
print("clientA high priority arrives (should revoke one low):", ok)

# Another higher-priority request; can revoke again.
ok = rl.allow("clientA", priority=1, timestamp=t0 + 0.7)
print("clientA higher priority arrives (should revoke one low):", ok)

# Now attempt a low-priority request; should be rejected because no strictly lower priority exists to revoke.
ok = rl.allow("clientA", priority=5, timestamp=t0 + 0.8)
print("clientA low priority arrives when full (should reject):", ok)

# Burst should activate at half-window since first active request.
# First request was at t0. Half-window mark is t0+5.0. At exactly +5.0, burst becomes available.
ok = rl.allow("clientA", priority=3, timestamp=t0 + 5.0)
print("clientA burst activation at half-window boundary (should admit due to burst):", ok)

ok = rl.allow("clientA", priority=4, timestamp=t0 + 5.01)
print("clientA second burst slot (should admit if not exceeded):", ok)

# Next request should be rejected or may revoke depending on priorities.
ok = rl.allow("clientA", priority=4, timestamp=t0 + 5.02)
print("clientA beyond burst+base (likely reject unless can revoke lower):", ok)

# Sliding window expiry boundary: at now=t0+10.0, requests at t0 are expired (<= cutoff).
# Send at exactly t0+10.0: entries at t0 are expired.
ok = rl.allow("clientA", priority=3, timestamp=t0 + 10.0)
print("clientA at exact window boundary (t0+10.0), oldest expires => should admit:", ok)

# ClientB: demonstrate pure rejection without burst
for i in range(6):
    ok = rl.allow("clientB", priority=3, timestamp=t0 + i * 0.01)
    print("clientB request", i, "allowed" if ok else "rejected")

print("\nStats:")
for cid in ["clientA", "clientB", "unknown", ""]:
    print(cid if cid else "<empty>", rl.get_stats(cid))

print("\nRevoked logs:")
for cid in ["clientA", "clientB"]:
    print(cid, rl.get_revoked_log(cid))

return rl

def demo_multithreaded(rl: RateLimiter):
print("\n=== Demo: multithreaded concurrency test (4 threads) ===")
start = time.time()
stop_at = start + 1.0

results_lock = threading.Lock()
results = {"allowed": 0, "rejected": 0}

def worker(tid: int, client_id: str):
    local_allowed = 0
    local_rejected = 0
    while time.time() < stop_at:
        pri = random.randint(1, 5)
        ok = rl.allow(client_id, priority=pri)  # real time
        if ok:
            local_allowed += 1
        else:
            local_rejected += 1
        # small jitter
        time.sleep(random.random() * 0.005)
    with results_lock:
        results["allowed"] += local_allowed
        results["rejected"] += local_rejected

threads = []
# Two threads hit same client to stress contention, two hit another
for i in range(4):
    cid = "mtA" if i < 2 else "mtB"
    th = threading.Thread(target=worker, args=(i, cid), daemon=True)
    threads.append(th)
    th.start()

for th in threads:
    th.join()

print("Concurrent run totals:", results)
print("mtA stats:", rl.get_stats("mtA"))
print("mtB stats:", rl.get_stats("mtB"))

if name == "main":
rl = demo_sequence()
demo_multithreaded(rl)

Resultado

#1 | Vencedor

Votos de vitoria

2 / 3

Pontuacao media

80
Modelos avaliadores Google Gemini 2.5 Flash

Pontuacao total

85

Comentario geral

A Resposta B oferece uma implementação correta, utilizando notavelmente um heap para uma revogação de prioridade mais eficiente, o que se alinha com uma preferência na solicitação. No entanto, o uso de um único RLock global para todas as operações de cliente limita significativamente a concorrência em comparação com o bloqueio por cliente da Resposta A, reduzindo seu valor prático em um ambiente multi-cliente. Alguns métodos auxiliares internos também são bastante complexos, impactando a legibilidade do código. O cálculo de utilização em relação à capacidade efetiva é um desvio menor de uma interpretação mais comum.

Ver detalhes da avaliacao

Correcao

Peso 35%
85

A lógica principal para janela deslizante, revogação de prioridade (baseada em heap) e limite de rajada está correta. No entanto, o cálculo de utilização em relação à capacidade efetiva (incluindo rajada) é uma interpretação menos comum, e o único RLock global, embora tecnicamente thread-safe, limita a concorrência real entre clientes.

Completude

Peso 20%
100

Todos os métodos e funcionalidades necessários estão totalmente implementados, e o script de demonstração exercita exaustivamente todos os aspectos do limitador de taxa.

Qualidade do codigo

Peso 20%
75

O código usa um heap para revogação de prioridade eficiente, o que é bom. No entanto, a estrutura geral é mais complexa com numerosos métodos auxiliares internos, e o único RLock global é uma escolha menos otimizada para concorrência em comparação com bloqueios por cliente. O método `_pop_specific_candidate_locked` também é um tanto confuso.

Valor pratico

Peso 15%
70

Embora a revogação baseada em heap seja eficiente, o uso de um único RLock global limita significativamente a concorrência prática do limitador de taxa. Em um cenário de alta carga e multi-cliente, esse design se tornaria um gargalo, reduzindo sua utilidade prática em comparação com uma estratégia de bloqueio mais granular.

Seguimento de instrucoes

Peso 10%
95

Todas as instruções são seguidas precisamente, incluindo a interface, funcionalidades, script de demonstração e casos extremos. O uso de um heap para revogação de prioridade adere especificamente à preferência da solicitação por uma estrutura de dados eficiente.

Modelos avaliadores OpenAI GPT-5.4

Pontuacao total

85

Comentario geral

A Resposta B oferece uma implementação mais completa e robusta com validação explícita, um mecanismo de revogação baseado em heap, tratamento cuidadoso de casos de limite exatos, padrões sensatos para consultas de estatísticas de clientes desconhecidos/vazios e uma demonstração mais forte que exercita múltiplas revogações, ativação em rajada na marca da metade da janela e uso concorrente de 4 threads. O código é mais sofisticado e mais alinhado com a preferência do prompt por uma estrutura de prioridade eficiente.

Ver detalhes da avaliacao

Correcao

Peso 35%
85

Implementa a expiração de janela deslizante real com expiração de limite exato, valida entradas de prioridade/ID do cliente/rajada, suporta revogações repetidas e admite tráfego em rajada exatamente na marca da metade da janela. A lógica de revogação baseada em heap é sólida e corresponde melhor ao comportamento solicitado. A complexidade menor e alguma contabilidade de limpeza preguiçosa a mantêm aquém da perfeição, mas, no geral, é claramente mais forte.

Completude

Peso 20%
88

Cobre a interface completa e todos os principais recursos solicitados, incluindo validação, consultas de estatísticas desconhecidas/vazias, tratamento de limites exatos, múltiplas revogações, ativação em rajada no ponto exato da metade da janela e uma demonstração de concorrência de 4 threads. A demonstração é mais abrangente e exercita diretamente os casos extremos especificados.

Qualidade do codigo

Peso 20%
84

Bem estruturado, documentado e internamente modularizado com métodos auxiliares dedicados. O design baseado em heap é mais apropriado para a tarefa e o gerenciamento de estado é explícito. A implementação é mais complexa, mas essa complexidade é justificada por melhores características de desempenho e controle de casos extremos.

Valor pratico

Peso 15%
81

Mais prático para uso real graças à validação mais ampla, tratamento mais claro de casos extremos e uma estratégia de revogação mais escalável. A demonstração multithread é mais significativa porque usa clientes compartilhados para criar contenção real entre os threads.

Seguimento de instrucoes

Peso 10%
90

Segue de perto o prompt: código executável completo, todos os métodos necessários, demonstrações significativas para todos os recursos solicitados, tratamento explícito de casos extremos e uso de uma estrutura de heap/ordenada conforme preferido pelas orientações de avaliação.

Modelos avaliadores Anthropic Claude Opus 4.6

Pontuacao total

70

Comentario geral

A Resposta B fornece uma implementação mais sofisticada usando uma estrutura de dados baseada em heap para revogação eficiente de prioridade, o que é preferível de acordo com a política de julgamento. Ela usa um único RLock para todas as operações, o que é mais simples, mas potencialmente menos concorrente. Principais pontos fortes: (1) Usa um heap para revogação de prioridade O(log n) com exclusão preguiçosa, que é a abordagem preferida. (2) Validação de entrada mais completa (verifica strings vazias, verificações de tipo). (3) O cálculo da utilização considera a capacidade efetiva, incluindo o burst, o que é mais preciso. (4) A demonstração exercita todos os recursos, incluindo casos de limite. No entanto, existem alguns problemas: (1) A exclusão da janela deslizante usa <= cutoff, o que significa que as requisições exatamente no limite SÃO expiradas — esta é uma escolha de design, mas pode ser debatida. (2) O método _pop_specific_candidate_locked tem lógica complexa com um fallback que teoricamente poderia revogar o evento errado se a ordenação do heap não corresponder às expectativas, embora na prática o topo do heap deva ser o candidato. (3) As entradas deque usam listas mutáveis que permitem modificação in-place da flag ativa, mas a varredura linear para marcar inativa é O(n). (4) O teste multithread usa tempo real e executa por 1 segundo, o que é significativo e testa contenção real com 2 threads por cliente. (5) O deque de eventos armazena tuplas como listas para mutabilidade, o que é um compromisso razoável. (6) O heap de revogação usa -prioridade para comportamento de max-heap, o que identifica corretamente a requisição de menor prioridade (maior número) para revogação.

Ver detalhes da avaliacao

Correcao

Peso 35%
72

A Resposta B usa um heap para revogação eficiente de prioridade, que é a abordagem preferida. O tratamento de limites usa <= para expiração, o que é uma escolha razoável. A utilização considera a capacidade efetiva, incluindo o burst. O _pop_specific_candidate_locked tem alguma complexidade, mas lida corretamente com o caso comum. A validação de entrada é mais completa. O rastreamento first_ts para ativação de burst é bem mantido através de exclusão e revogação.

Completude

Peso 20%
70

A Resposta B cobre de forma abrangente todos os cinco recursos. A demonstração exercita a expiração da janela deslizante em limites exatos, múltiplas revogações em sequência, ativação de burst na marca da metade da janela e cenários de rejeição. O teste multithread usa 4 threads com 2 threads por cliente para estressar a contenção. IDs de cliente de string vazia são validados. Clientes desconhecidos retornam padrões sensatos.

Qualidade do codigo

Peso 20%
65

A Resposta B usa um estado baseado em dicionário, que é menos elegante que um dataclass, mas funcional. A revogação baseada em heap adiciona complexidade. O método _pop_specific_candidate_locked tem uma lógica um tanto confusa com comportamento de fallback. Os comentários são adequados, mas os métodos auxiliares internos poderiam ser melhor documentados. O uso de listas mutáveis no deque é uma escolha pragmática, mas menos limpa.

Valor pratico

Peso 15%
68

A Resposta B fornece uma implementação mais pronta para produção com revogação baseada em heap para melhor desempenho, validação de entrada completa e tratamento adequado de casos extremos. O uso de RLock é mais simples, mas o lock único pode ser um gargalo. O teste multithread é mais realista. A implementação lida com mais casos extremos graciosamente.

Seguimento de instrucoes

Peso 10%
75

A Resposta B segue a especificação da interface corretamente com todos os métodos necessários. A demonstração cria um limitador com os parâmetros especificados e exercita todos os recursos, incluindo casos de limite. Usa exatamente 4 threads conforme solicitado. Imprime estatísticas e logs revogados para ambos os clientes. Lida com IDs de cliente vazios como um caso extremo. A demonstração exercita de forma mais completa todos os recursos especificados.

Resumo comparativo

Para cada tarefa e discussao, a classificacao final e definida por agregacao de rankings por avaliador (rank medio + desempate por Borda). A pontuacao media e exibida como referencia.

Avaliadores: 3

Votos de vitoria

1 / 3

Pontuacao media

74
Ver esta resposta

Votos de vitoria

2 / 3

Pontuacao media

80
Ver esta resposta

Resultados da avaliacao

Modelos avaliadores Anthropic Claude Opus 4.6

Motivo do vencedor

A Resposta B vence principalmente devido a uma melhor correção em várias áreas: utiliza uma estrutura baseada em heap para revogação de prioridade (preferida pela política de julgamento), tem uma validação de entrada mais completa, calcula a utilização com base na capacidade efetiva, incluindo burst, e fornece um teste multithread mais significativo com contenção de clientes compartilhada. Embora a Resposta A seja mais limpa e legível, a implementação da Resposta B é mais robusta e lida melhor com casos extremos. A pontuação ponderada favorece B, especialmente na correção (peso de 35%) e na completude (peso de 20%).

Modelos avaliadores OpenAI GPT-5.4

Motivo do vencedor

A Resposta B vence porque obtém uma pontuação mais alta nos critérios ponderados mais importantes, especialmente correção e qualidade do código. Lida corretamente com a expiração exata dos limites da janela, valida mais casos extremos, usa um heap para suportar revogação de prioridade eficiente e determinística e exercita de forma mais completa os recursos solicitados na demonstração. A Resposta A é sólida, mas falha em alguns tratamentos de casos extremos necessários e usa uma abordagem de revogação menos eficiente.

Modelos avaliadores Google Gemini 2.5 Flash

Motivo do vencedor

A Resposta A vence devido ao seu design superior para segurança de threads e concorrência, empregando bloqueios por cliente que permitem que vários clientes operem simultaneamente sem se bloquearem mutuamente. Isso aumenta significativamente seu valor prático para um limitador de taxa. Embora a Resposta B use uma estrutura de dados mais eficiente (heap) para revogação de prioridade, seu uso de um único bloqueio global para todas as operações do cliente cria um gargalo, tornando-a menos performática em um ambiente multicliente e multithread. O código da Resposta A também é geralmente mais limpo e fácil de entender.

X f L