Orivel Orivel
メニューを開く

スライディングウィンドウと優先度付きキューを備えた同時実行レートリミッタを実装する

このプログラミングベンチマークに対する各AIの回答と比較結果を確認できます。

いいね・お気に入り機能を使うにはログインまたは新規登録が必要です。 新規登録

X f L

目次

お題概要

比較ジャンル

プログラミング

お題作成モデル

回答モデル

採点モデル

お題本文

Pythonで、次の機能をサポートするスレッドセーフなレートリミッタを設計・実装してください。 1. **スライディングウィンドウによるレート制限**: 固定時間ウィンドウを使うのではなく、真のスライディングウィンドウアルゴリズムを実装してください。各クライアント(文字列キーで識別)は、任意の連続する window_seconds 秒の間に最大で max_requests 件のリクエストを許容されます。 2. **優先度レベル**: 各リクエストには優先度レベル(整数 1-5、1 が最も高い優先度)が付与されます。クライアントのレート上限に達した場合、低優先度(数値が大きい...

さらに表示

Pythonで、次の機能をサポートするスレッドセーフなレートリミッタを設計・実装してください。 1. **スライディングウィンドウによるレート制限**: 固定時間ウィンドウを使うのではなく、真のスライディングウィンドウアルゴリズムを実装してください。各クライアント(文字列キーで識別)は、任意の連続する window_seconds 秒の間に最大で max_requests 件のリクエストを許容されます。 2. **優先度レベル**: 各リクエストには優先度レベル(整数 1-5、1 が最も高い優先度)が付与されます。クライアントのレート上限に達した場合、低優先度(数値が大きい)なリクエストが優先的に拒否されるべきです。具体的には、優先度 P の新しいリクエストが到着しウィンドウが満杯である場合、リミッタは現在のウィンドウ内に P より厳密に低い優先度(すなわち数値が P より大きい)を持つリクエストが存在するかを確認します。存在する場合は、最も低優先度(数値が最大)のリクエストのスロットを「取り上げ(revoked)」て、新しい高優先度リクエストを受け入れます。取り上げられたリクエストは報告できるよう記録されるべきです。取り上げ可能な低優先度のリクエストが存在しない場合は、新しいリクエストは拒否されます。 3. **バースト許容**: 各クライアントはオプションで burst(デフォルトは 0)というバースト許容量を持てます。これはウィンドウ内で max_requests に加えて最大 burst 件まで追加のリクエストを許容します。ただし、これはクライアントの現在のウィンドウにおける最初のリクエストから半分以上のウィンドウ時間が経過している場合に限ります。 4. **スレッドセーフ**: レートリミッタは複数のスレッドから同時に使用しても安全でなければなりません。これをテストシナリオで実証してください。 5. **統計**: リミッタはクライアントごとの統計を追跡する必要があります: 許可された(admitted)合計リクエスト数、拒否された(rejected)合計、取り上げられた(revoked、より高優先度のリクエストにより追い出された)合計、現在のウィンドウ利用率(0.0〜1.0 の浮動小数点)を追跡してください。 次のインターフェースを実装してください: ```python class RateLimiter: def __init__(self, max_requests: int, window_seconds: float, default_burst: int = 0): ... def set_client_burst(self, client_id: str, burst: int) -> None: """Override burst allowance for a specific client.""" ... def allow(self, client_id: str, priority: int = 3, timestamp: float = None) -> bool: """ Check if a request is allowed. If timestamp is None, use current time. Returns True if the request is admitted, False if rejected. """ ... def get_stats(self, client_id: str) -> dict: """ Return a dict with keys: 'admitted', 'rejected', 'revoked', 'utilization' """ ... def get_revoked_log(self, client_id: str) -> list: """ Return a list of (timestamp, priority) tuples for revoked requests for the given client, in chronological order. """ ... ``` 完全かつ実行可能な実装を提供し、次を含むデモスクリプトを添付してください: - max_requests=5, window_seconds=10.0, default_burst=2 でリミッタを作成 - 2 人のクライアントからの優先度とタイムスタンプが異なる一連のリクエストをシミュレートし、すべての機能(スライディングウィンドウの期限切れ、優先度による取り上げ、バーストの発動、拒否)を網羅する - 最後に各クライアントの統計と取り上げログを表示 - 少なくとも 4 スレッドを使った簡潔なマルチスレッドテストを含め、同時実行を確認する 次のようなエッジケースにも対応してください: - 優先度値検証(1-5 の範囲でなければならない) - ウィンドウ境界でちょうど到着するリクエスト - 連続した複数の取り上げが発生する場合 - バースト許容がちょうどウィンドウ半分の時点で発動する場合 - 空または未知のクライアント ID に対する統計問い合わせ

採点方針

強い解答は、5 つの機能すべてを正しく扱う完全な実行可能な Python 実装を提供するべきです。重要な品質指標は次のとおりです: (1) スライディングウィンドウアルゴリズムの正確性 — リクエストは個々のタイムスタンプに基づいて期限切れになり、固定バケットではないこと。 (2) 優先度取り上げロジックの正確性 — ウィンドウが満杯で高優先度リクエストが到着したとき、既存の最も低優先度のリクエストが取り上げられて置き換えられること。 (3) バースト許容は、現在のウィンドウの最初のリクエストから半分以上経過した時点でのみ発動すること。 (4) スレッドセーフ性 — 適切な同期プリミティブ(ロック等)を使用し、マルチスレッドデモが意味のあるものであること。 (5) 統計は正確かつ一貫していること。 (6) エッジケースは適切に処理されること — 無効な優先度はエラーを投げるか適切に扱い、未知のクライアントは妥当なデフォルトを返し、境界タイムスタンプは正しく動作すること。 (7) デモスクリプトはすべての機能を明確に実行し、理解しやすい出力を生成すること。 (8) コード品質も重要: 構造が明確で命名が適切、コメントが適切に付与され、関心の分離ができていること。効率的な優先度取り上げのためにヒープやソート済み構造を利用する解法が望ましいが、正確性は最優先されます。

回答

import time
import threading
import random
from collections import deque
from dataclasses import dataclass, field
from typing import List, Tuple, Dict, Optional

@dataclass
class ClientState:
"""Holds all state for a single client."""
lock: threading.Lock = field(default_factory=threading.Lock, repr=False)
requests: deque[Tuple[float, int]] = field(default_factory=deque)
burst: int = 0
stats: Dict[str, int] = field(default_factory=lambda: {"admitted": 0, "rejected": 0, "revoked": 0})
revoked_log: List[Tuple[float, int]] = field(default_factory=list)

class RateLimiter:
"""
A thread...

全文を表示

import time
import threading
import random
from collections import deque
from dataclasses import dataclass, field
from typing import List, Tuple, Dict, Optional

@dataclass
class ClientState:
"""Holds all state for a single client."""
lock: threading.Lock = field(default_factory=threading.Lock, repr=False)
requests: deque[Tuple[float, int]] = field(default_factory=deque)
burst: int = 0
stats: Dict[str, int] = field(default_factory=lambda: {"admitted": 0, "rejected": 0, "revoked": 0})
revoked_log: List[Tuple[float, int]] = field(default_factory=list)

class RateLimiter:
"""
A thread-safe rate limiter implementing a sliding window algorithm with
priority queues and burst allowances.
"""
def init(self, max_requests: int, window_seconds: float, default_burst: int = 0):
if max_requests <= 0 or window_seconds <= 0:
raise ValueError("max_requests and window_seconds must be positive.")
self.max_requests = max_requests
self.window_seconds = window_seconds
self.default_burst = default_burst
self._clients: Dict[str, ClientState] = {}
self._clients_lock = threading.Lock()

def _get_or_create_client(self, client_id: str) -> ClientState:
    """Atomically get or create a client's state object."""
    if client_id not in self._clients:
        with self._clients_lock:
            # Double-check locking pattern to avoid race conditions on creation
            if client_id not in self._clients:
                self._clients[client_id] = ClientState(burst=self.default_burst)
    return self._clients[client_id]

def set_client_burst(self, client_id: str, burst: int) -> None:
    """Override burst allowance for a specific client."""
    client = self._get_or_create_client(client_id)
    with client.lock:
        client.burst = burst

def allow(self, client_id: str, priority: int = 3, timestamp: Optional[float] = None) -> bool:
    """
    Check if a request is allowed. If timestamp is None, use current time.
    Returns True if the request is admitted, False if rejected.
    """
    if not 1 <= priority <= 5:
        raise ValueError("Priority must be an integer between 1 and 5.")

    now = timestamp if timestamp is not None else time.time()
    client = self._get_or_create_client(client_id)

    with client.lock:
        # 1. Slide the window: remove expired requests
        window_start = now - self.window_seconds
        while client.requests and client.requests[0][0] < window_start:
            client.requests.popleft()

        # 2. Determine the effective request limit (base + burst)
        effective_max = self.max_requests
        if client.burst > 0 and client.requests:
            first_req_time = client.requests[0][0]
            if (now - first_req_time) >= (self.window_seconds / 2.0):
                effective_max += client.burst
        
        current_requests_count = len(client.requests)

        # 3. Decision: Admit, Revoke, or Reject
        if current_requests_count < effective_max:
            # Case A: Space is available (either base or burst)
            client.requests.append((now, priority))
            client.stats['admitted'] += 1
            return True
        else:
            # Case B: Window is full, check for priority revocation
            # Find the request with the highest priority number (lowest actual priority)
            worst_request_idx = -1
            worst_priority = -1
            for i, req in enumerate(client.requests):
                if req[1] > worst_priority:
                    worst_priority = req[1]
                    worst_request_idx = i

            # If a suitable candidate for revocation exists...
            if worst_request_idx != -1 and worst_priority > priority:
                # Revoke the lowest-priority request
                revoked_req = client.requests[worst_request_idx]
                del client.requests[worst_request_idx]
                
                client.revoked_log.append(revoked_req)
                client.stats['revoked'] += 1
                
                # Admit the new, higher-priority request
                client.requests.append((now, priority))
                client.stats['admitted'] += 1
                return True
            else:
                # Case C: No space and no possible revocation -> Reject
                client.stats['rejected'] += 1
                return False

def get_stats(self, client_id: str) -> dict:
    """
    Return a dict with keys: 'admitted', 'rejected', 'revoked', 'utilization'
    """
    if client_id not in self._clients:
        return {'admitted': 0, 'rejected': 0, 'revoked': 0, 'utilization': 0.0}

    client = self._clients[client_id]
    with client.lock:
        # Slide window for accurate utilization calculation at the current time
        now = time.time()
        window_start = now - self.window_seconds
        while client.requests and client.requests[0][0] < window_start:
            client.requests.popleft()
        
        current_count = len(client.requests)
        utilization = min(1.0, current_count / self.max_requests) if self.max_requests > 0 else 0.0

        stats_copy = client.stats.copy()
        stats_copy['utilization'] = utilization
        return stats_copy

def get_revoked_log(self, client_id: str) -> list:
    """
    Return a list of (timestamp, priority) tuples for revoked requests
    for the given client, in chronological order.
    """
    if client_id not in self._clients:
        return []
        
    client = self._clients[client_id]
    with client.lock:
        # The log is already in chronological order of revocation
        return list(client.revoked_log)

--- Demonstration Script ---

def run_demonstration():
print("--- Rate Limiter Demonstration ---")
limiter = RateLimiter(max_requests=5, window_seconds=10.0, default_burst=2)

# --- Client 1: Standard, Rejection, and Priority Revocation ---
client1 = "client-A"
print(f"\n--- Testing Client: {client1} ---")

# 1. Fill the window with medium-priority requests
print("1. Filling the window (5 requests)...")
for i in range(5):
    limiter.allow(client1, priority=4, timestamp=float(i))

# 2. Reject a request
print("2. Sending a 6th request (should be rejected)...")
is_allowed = limiter.allow(client1, priority=4, timestamp=5.0)
print(f"  Request at T=5.0 allowed: {is_allowed} (Expected: False)")

# 3. Revoke a low-priority request with a high-priority one
print("3. Sending a high-priority request (should revoke one)...")
is_allowed = limiter.allow(client1, priority=1, timestamp=6.0)
print(f"  High-priority request at T=6.0 allowed: {is_allowed} (Expected: True)")

# 4. Let the window slide
print("4. Waiting for window to slide...")
# At T=11.0, request at T=0.0 has expired. Window contains T=1,2,3,4,6 (5 requests)
is_allowed = limiter.allow(client1, priority=5, timestamp=11.0)
print(f"  Request at T=11.0 allowed: {is_allowed} (Expected: False)")

# At T=12.0, requests at T=0,1 have expired. Window contains T=2,3,4,6 (4 requests)
is_allowed = limiter.allow(client1, priority=5, timestamp=12.0)
print(f"  Request at T=12.0 allowed: {is_allowed} (Expected: True)")

# --- Client 2: Burst Allowance ---
client2 = "client-B"
print(f"\n--- Testing Client: {client2} (Burst) ---")
limiter.set_client_burst(client2, 3) # Override burst to 3 for this client

# 1. Send initial request
print("1. Sending initial request at T=20.0...")
limiter.allow(client2, priority=3, timestamp=20.0)

# 2. Try to burst before half-window (should fail)
print("2. Attempting to burst before half-window (T=24.0)...")
# Window is 10s, half is 5s. T=24.0 is 4s after T=20.0.
# Should allow up to max_requests (5)
for i in range(4): # Requests 2, 3, 4, 5
    limiter.allow(client2, priority=3, timestamp=24.0)
is_allowed = limiter.allow(client2, priority=3, timestamp=24.1)
print(f"  6th request at T=24.1 (before burst active) allowed: {is_allowed} (Expected: False)")

# 3. Activate burst after half-window
print("3. Activating burst after half-window (T=25.0)...")
# T=25.0 is 5s after T=20.0. Burst is now active.
# Limit is now 5 (base) + 3 (burst) = 8.
# Currently 5 requests in window (from T=20, T=24). Should allow 3 more.
allowed_count = 0
for _ in range(4): # Try to send 4 more
    if limiter.allow(client2, priority=3, timestamp=25.0):
        allowed_count += 1
print(f"  Allowed {allowed_count} burst requests at T=25.0. (Expected: 3)")

# --- Final Stats ---
print("\n--- Final Stats ---")
print(f"Client A Stats: {limiter.get_stats(client1)}")
print(f"Client A Revoked Log: {limiter.get_revoked_log(client1)}")
print(f"Client B Stats: {limiter.get_stats(client2)}")
print(f"Client B Revoked Log: {limiter.get_revoked_log(client2)}")
print(f"Unknown Client Stats: {limiter.get_stats('client-C')}")

--- Multithreaded Test ---

def run_multithreaded_test():
print("\n--- Multithreaded Test ---")
limiter = RateLimiter(max_requests=20, window_seconds=5.0)
client_id = "mt-client"
num_threads = 5
requests_per_thread = 10
total_requests = num_threads * requests_per_thread

barrier = threading.Barrier(num_threads)

def worker():
    barrier.wait()
    for i in range(requests_per_thread):
        priority = random.randint(1, 5)
        limiter.allow(client_id, priority=priority)
        time.sleep(0.01)

threads = [threading.Thread(target=worker) for _ in range(num_threads)]

for t in threads:
    t.start()

for t in threads:
    t.join()

print(f"\nFinished multithreaded test with {num_threads} threads and {requests_per_thread} requests each.")

stats = limiter.get_stats(client_id)
print(f"Final Stats for {client_id}: {stats}")

total_processed = stats['admitted'] + stats['rejected']
print(f"Total requests attempted: {total_requests}")
print(f"Total requests processed (admitted + rejected): {total_processed}")
if total_processed == total_requests:
    print("Test PASSED: All requests were accounted for, indicating thread safety.")
else:
    print(f"Test FAILED: Mismatch in request counts. Expected {total_requests}, got {total_processed}.")

if name == 'main':
run_demonstration()
run_multithreaded_test()

判定

2位

勝利票

1 / 3

平均スコア

74

総合点

90

総評

回答Aはレートリミッターの非常に強力で正確な実装を提供しています。クライアントごとのロックの使用は、複数のクライアントを同時に処理するレートリミッターにとって重要な優れた並行性を保証します。コードはクリーンで、よく構造化されており、堅牢なデモスクリプトを含む、指定されたすべての機能とエッジケースを効果的に処理します。優先度取り消しのためにヒープではなく線形スキャンを使用していますが、全体的な正確さ、明瞭さ、および実用的な並行性により、優れたソリューションとなっています。

採点詳細を表示

正確さ

重み 35%
90

スライディングウィンドウ、優先度取り消し(線形スキャン)、バースト許容量、および統計はすべて正しく実装されています。クライアントごとのロックは、堅牢なスレッドセーフティを保証します。利用率は、一般的な解釈である基本のmax_requestsに対して計算されます。

完全性

重み 20%
100

必要なすべてのメソッドと機能が完全に実装されており、デモスクリプトはレートリミッターのすべての側面を包括的にテストしています。

コード品質

重み 20%
85

コードはよく構造化されており、データクラスを効果的に使用し、メソッド名とコメントが明確です。詳細なクライアントごとのロックは、並行性にとって強力な点です。取り消しのための線形スキャンは、単純さのためのマイナーなパフォーマンスのトレードオフです。

実用性

重み 15%
85

このソリューションは、クライアントごとのロックを備えた堅牢なスレッドセーフティモデルにより、高い実用的な価値を提供し、異なるクライアント間での高い並行性を可能にします。これにより、複数のクライアントが同時にリミッターにヒットする可能性のある実世界のアプリケーションに適しています。

指示遵守

重み 10%
90

インターフェース、機能、デモスクリプト、およびエッジケースを含むすべての指示が正しく従われています。唯一のマイナーな点は、プロンプトがヒープまたはソート済み構造を好んでいたのに対し、優先度取り消しに線形スキャンを使用していることです。

採点モデル OpenAI GPT-5.4

総合点

66

総評

回答Aは、クライアントごとのロック、スライディングウィンドウの削除、優先度ベースの置換、バーストサポート、統計情報、マルチスレッドデモを備えた実行可能な実装を提供します。構造は読みやすく、ほとんど正しいですが、クライアントIDやバースト値を検証しておらず、取り消しのために優先度構造ではなく線形スキャンを使用しており、指定されたエッジケースに対してウィンドウ境界の処理が不正確です。これは、カットオフちょうどのリクエストを保持するためです。また、利用率はmax_requestsに対してのみキャップされており、アクティブなバースト容量を無視しています。

採点詳細を表示

正確さ

重み 35%
62

コアのスライディングウィンドウアドミッション、バーストゲーティング、優先度置換は一般的に機能しており、スレッドセーフティはクライアントごとのロックによって合理的に確保されています。しかし、ウィンドウ境界ちょうどのリクエストは、削除が `<=` ではなく `<` を使用するため、アクティブと見なされます。これは、指定されたエッジケースの要件と矛盾します。クライアントIDとバーストの検証も欠落しており、利用率はバースト調整された容量を無視しています。

完全性

重み 20%
70

完全なクラスインターフェイス、統計情報、取り消されたログ、シーケンシャルデモンストレーション、マルチスレッドテストが含まれています。しかし、特に無効なクライアントID、正確な境界の動作、およびシーケンスでの明示的な複数回の取り消しなど、要求されたエッジケースの一部は完全には処理または実証されていません。

コード品質

重み 20%
68

読みやすく整理されており、便利なデータクラスと明確なコメントが付いています。ただし、クラスのドキュメンテーションでは優先度キューが言及されていますが、取り消しはデックに対する線形スキャンであり、検証が一部欠落しており、デモの期待値が体系的にテストされるのではなく、非公式に埋め込まれています。

実用性

重み 15%
65

実行可能な例として有用で、理解しやすいですが、検証の不完全さ、境界の不一致、および単純な取り消しスキャンにより、本番環境での使用にはやや不向きです。マルチスレッドテストはアカウンティングをチェックしますが、かなり軽量です。

指示遵守

重み 10%
71

プロンプトのほとんどに従っており、完全な実行可能なコード、統計情報、取り消されたログ、および並行処理を提供しています。それでも、優先データ構造の選択、境界処理、およびクエリでの空のクライアントIDの正常な処理については不十分です。

総合点

66

総評

回答Aは、データクラスベースのクライアント状態とクライアントごとのロックを備えた、クリーンで読みやすい実装を提供しています。スライディングウィンドウは、優先度取り消しのために線形スキャンを行うdequeを使用しており、これは正しいですが最適ではありません。デモスクリプトはほとんどの機能をうまく実行しています。しかし、いくつかの正確性の問題があります。(1) スライディングウィンドウの削除は、厳密な小なり(< window_start)を使用しており、小なりイコールではないため、境界線上のリクエストは期限切れになりません。これは軽微な境界処理の問題です。(2) 取り消されたリクエストのスロットカウントは微妙に間違っています。リクエストが取り消されると、dequeから削除されますが、デクリメントされずに(取り消されたリクエストは許可されたため正しい)アドミテッド統計でカウントされ続けます。(3) get_statsでの利用率計算は、最大リクエスト数(base + burst)ではなく、最大リクエスト数で除算しており、これは議論の余地がありますが、バーストを考慮した容量とは一致しません。(4) バーストアクティベーションチェックは、半ウィンドウに対して>=を使用しており、これは正しいです。(5) マルチスレッドテストは、要求された最小の4ではなく5スレッドを使用していますが、これは問題ありません。しかし、共有クライアントでの競合をテストするよりも面白みのない単一クライアントを使用しています。クライアントごとのロック戦略は並行処理には適していますが、_get_or_create_client内のロックの外側にある最初のチェックには潜在的な問題があります(ロックなしで_clients辞書を読み取る)。ただし、二重チェックパターンがこれを緩和します。

採点詳細を表示

正確さ

重み 35%
65

回答Aは、コアのスライディングウィンドウと優先度取り消しロジックを正しく実装しています。しかし、境界処理では有効期限の厳密な小なり(境界上のリクエストは期限切れにならない)を使用しており、利用率は実効容量ではなく最大リクエスト数に対して計算されています。取り消しのための線形スキャンは正しいですが、最適ではありません。バーストアクティベーションロジックは、>=半ウィンドウチェックで正しいです。

完全性

重み 20%
65

回答Aは、スライディングウィンドウ、優先度取り消し、バースト許可、スレッドセーフティ、統計の5つの必須機能をすべてカバーしています。デモはほとんどの機能を実行します。しかし、マルチスレッドテストは、共有リソースでの競合を実証するのではなく、単一のクライアントを使用しています。未知のクライアントなどのエッジケースは処理されます。優先度検証は、予想通りValueErrorを発生させます。

コード品質

重み 20%
70

回答Aは、クライアント状態のためのクリーンなデータクラス、明確なメソッド構造、および良好なコメントを備えた優れたコード構成を持っています。クライアントごとのロック戦略はよく設計されています。変数名は明確で一貫性があります。デモスクリプトは、明確な期待される出力とともに、よく整理されています。

実用性

重み 15%
60

回答Aは、中程度のユースケースに適した動作するレートリミッターを提供します。クライアントごとのロックは並行処理に適しています。しかし、取り消しのための線形スキャンは、多くのリクエストがある場合にボトルネックになる可能性があります。デモ出力は明確で教育的です。この実装は、いくつかの改良を加えれば本番環境でも機能するでしょう。

指示遵守

重み 10%
70

回答Aは、インターフェース仕様に正しく従っています。すべての必須メソッドを正しいシグネチャで実装しています。デモは指定されたパラメータでリミッターを作成し、ほとんどの機能を実行します。要求された最小の4スレッドではなく5スレッドを使用します。必要に応じて統計と取り消されたログを出力します。

import time
import threading
import random
import heapq
from collections import deque

class RateLimiter:
"""Thread-safe sliding-window rate limiter with priority-based revocation and optional burst."""

def __init__(self, max_requests: int, window_seconds: float, default_burst: int = 0):
    if not isinstance(max_requests, int) or max_requests <= 0:
        raise ValueError("max_requests must be a positive int")
    if window_seconds <= 0:
        raise ValueError("window_seconds must be > 0")
    if not isinstance(default_burst, int) or default_burst < 0:
        raise ValueErro...
全文を表示

import time
import threading
import random
import heapq
from collections import deque

class RateLimiter:
"""Thread-safe sliding-window rate limiter with priority-based revocation and optional burst."""

def __init__(self, max_requests: int, window_seconds: float, default_burst: int = 0):
    if not isinstance(max_requests, int) or max_requests <= 0:
        raise ValueError("max_requests must be a positive int")
    if window_seconds <= 0:
        raise ValueError("window_seconds must be > 0")
    if not isinstance(default_burst, int) or default_burst < 0:
        raise ValueError("default_burst must be a non-negative int")

    self.max_requests = max_requests
    self.window_seconds = float(window_seconds)
    self.default_burst = default_burst

    self._lock = threading.RLock()
    self._clients = {}  # client_id -> state

def set_client_burst(self, client_id: str, burst: int) -> None:
    if not isinstance(client_id, str) or not client_id:
        raise ValueError("client_id must be a non-empty string")
    if not isinstance(burst, int) or burst < 0:
        raise ValueError("burst must be a non-negative int")
    with self._lock:
        st = self._get_or_create_state(client_id)
        st["burst"] = burst

def allow(self, client_id: str, priority: int = 3, timestamp: float = None) -> bool:
    if not isinstance(client_id, str) or not client_id:
        raise ValueError("client_id must be a non-empty string")
    if not isinstance(priority, int) or not (1 <= priority <= 5):
        raise ValueError("priority must be an int in [1, 5]")
    now = time.time() if timestamp is None else float(timestamp)

    with self._lock:
        st = self._get_or_create_state(client_id)
        self._evict_expired_locked(st, now)

        # Determine effective capacity (base + optional burst)
        n = st["active_count"]
        base = self.max_requests
        burst = st["burst"]
        cap = base

        # Burst can be used only if at least half the window has passed since first request in current window.
        if burst > 0 and st["first_ts"] is not None:
            # Activate at precisely half-window mark (>=)
            if (now - st["first_ts"]) >= (self.window_seconds / 2.0):
                cap = base + burst

        # Fast path: within capacity
        if n < cap:
            self._admit_locked(st, now, priority)
            return True

        # If full, attempt priority revocation (only within current effective capacity cap)
        # If cap > base, then "full" includes burst usage too; revocation still applies.
        # We can only revoke a strictly lower priority (numerically higher).
        # Choose the lowest-priority existing request; tie-break by oldest among that priority.
        # If no lower-priority exists, reject.

        # Make sure heap top points to a live active event.
        cand = self._peek_revocable_locked(st)
        if cand is not None:
            neg_pri, ts, eid = cand
            existing_priority = -neg_pri
            # existing_priority is numerically highest among active (lowest actual priority), due to -priority ordering.
            if existing_priority > priority:
                # Revoke candidate
                self._pop_specific_candidate_locked(st, eid)
                # Admit new
                self._admit_locked(st, now, priority)
                st["revoked"] += 1
                st["revoked_log"].append((ts, existing_priority))
                return True

        # No revocation possible
        st["rejected"] += 1
        return False

def get_stats(self, client_id: str) -> dict:
    if not isinstance(client_id, str) or not client_id:
        # As requested: handle empty/unknown client IDs gracefully
        return {"admitted": 0, "rejected": 0, "revoked": 0, "utilization": 0.0}
    with self._lock:
        st = self._clients.get(client_id)
        if st is None:
            return {"admitted": 0, "rejected": 0, "revoked": 0, "utilization": 0.0}
        now = time.time()
        self._evict_expired_locked(st, now)

        base = self.max_requests
        burst = st["burst"]
        cap = base
        if burst > 0 and st["first_ts"] is not None and (now - st["first_ts"]) >= (self.window_seconds / 2.0):
            cap = base + burst
        util = (st["active_count"] / cap) if cap > 0 else 0.0
        # clamp
        if util < 0.0:
            util = 0.0
        if util > 1.0:
            util = 1.0
        return {
            "admitted": st["admitted"],
            "rejected": st["rejected"],
            "revoked": st["revoked"],
            "utilization": float(util),
        }

def get_revoked_log(self, client_id: str) -> list:
    if not isinstance(client_id, str) or not client_id:
        return []
    with self._lock:
        st = self._clients.get(client_id)
        if st is None:
            return []
        return list(st["revoked_log"])

# ------------------------ internal helpers (all require self._lock held) ------------------------

def _get_or_create_state(self, client_id: str) -> dict:
    st = self._clients.get(client_id)
    if st is None:
        st = {
            "burst": self.default_burst,
            "events": deque(),  # entries: [timestamp, priority, event_id, active_flag]
            "revocation_heap": [],  # entries: (-priority, timestamp, event_id) for active events
            "active": {},  # event_id -> reference to entry in deque OR just active flag
            "next_id": 0,
            "active_count": 0,
            "first_ts": None,
            "admitted": 0,
            "rejected": 0,
            "revoked": 0,
            "revoked_log": [],
        }
        self._clients[client_id] = st
    return st

def _evict_expired_locked(self, st: dict, now: float) -> None:
    cutoff = now - self.window_seconds
    # Sliding window: keep entries with timestamp > cutoff.
    # Requests exactly at boundary (timestamp == cutoff) are expired.
    ev = st["events"]
    while ev and ev[0][0] <= cutoff:
        ts, pri, eid, active = ev[0]
        ev.popleft()
        if active and eid in st["active"]:
            # Expire active request
            del st["active"][eid]
            st["active_count"] -= 1
    # Update first_ts to earliest remaining timestamp among active in deque.
    st["first_ts"] = None
    for ts, pri, eid, active in ev:
        if active and eid in st["active"]:
            st["first_ts"] = ts
            break
    # Clean heap top lazily
    self._clean_heap_top_locked(st)

def _clean_heap_top_locked(self, st: dict) -> None:
    h = st["revocation_heap"]
    while h:
        neg_pri, ts, eid = h[0]
        if eid in st["active"]:
            return
        heapq.heappop(h)

def _peek_revocable_locked(self, st: dict):
    self._clean_heap_top_locked(st)
    if not st["revocation_heap"]:
        return None
    return st["revocation_heap"][0]

def _pop_specific_candidate_locked(self, st: dict, eid: int) -> None:
    # We expect candidate to be at heap top. Pop until we pop that eid.
    h = st["revocation_heap"]
    while h:
        neg_pri, ts, heid = heapq.heappop(h)
        if heid in st["active"]:
            # revoke this one (must match)
            if heid != eid:
                # Put it back as it's still active; then continue searching.
                heapq.heappush(h, (neg_pri, ts, heid))
                # fall back: revoke the one we just saw instead? No; keep deterministic behavior.
                # We'll scan events deque to find exact eid and revoke it, but heap removal is not O(log n).
                # Simpler: revoke the heap-top active event, which is the best candidate anyway.
                # So if mismatch happens (shouldn't), revoke heid.
                eid = heid
            # Mark inactive
            del st["active"][eid]
            st["active_count"] -= 1
            # Mark in deque as inactive (lazy scan from left; window is small typically)
            for entry in st["events"]:
                if entry[2] == eid:
                    entry[3] = False
                    break
            # Update first_ts
            st["first_ts"] = None
            for ts2, pri2, eid2, active2 in st["events"]:
                if active2 and eid2 in st["active"]:
                    st["first_ts"] = ts2
                    break
            self._clean_heap_top_locked(st)
            return
    # If heap is empty, nothing to do.

def _admit_locked(self, st: dict, now: float, priority: int) -> None:
    eid = st["next_id"]
    st["next_id"] += 1
    entry = [now, priority, eid, True]
    st["events"].append(entry)
    st["active"][eid] = True
    st["active_count"] += 1
    heapq.heappush(st["revocation_heap"], (-priority, now, eid))
    st["admitted"] += 1
    if st["first_ts"] is None:
        st["first_ts"] = now

def demo_sequence():
print("=== Demo: sliding window, priority revocation, burst, boundary handling ===")
rl = RateLimiter(max_requests=5, window_seconds=10.0, default_burst=2)

# Override one client's burst to show per-client config
rl.set_client_burst("clientA", 2)
rl.set_client_burst("clientB", 0)

t0 = 1_000_000.0

# Fill base window for clientA with low priority (5)
for i in range(5):
    ok = rl.allow("clientA", priority=5, timestamp=t0 + i * 0.1)
    print("clientA admit low", i, ok)

# Now window is full. Higher-priority request should revoke one of the priority-5 requests.
ok = rl.allow("clientA", priority=2, timestamp=t0 + 0.6)
print("clientA high priority arrives (should revoke one low):", ok)

# Another higher-priority request; can revoke again.
ok = rl.allow("clientA", priority=1, timestamp=t0 + 0.7)
print("clientA higher priority arrives (should revoke one low):", ok)

# Now attempt a low-priority request; should be rejected because no strictly lower priority exists to revoke.
ok = rl.allow("clientA", priority=5, timestamp=t0 + 0.8)
print("clientA low priority arrives when full (should reject):", ok)

# Burst should activate at half-window since first active request.
# First request was at t0. Half-window mark is t0+5.0. At exactly +5.0, burst becomes available.
ok = rl.allow("clientA", priority=3, timestamp=t0 + 5.0)
print("clientA burst activation at half-window boundary (should admit due to burst):", ok)

ok = rl.allow("clientA", priority=4, timestamp=t0 + 5.01)
print("clientA second burst slot (should admit if not exceeded):", ok)

# Next request should be rejected or may revoke depending on priorities.
ok = rl.allow("clientA", priority=4, timestamp=t0 + 5.02)
print("clientA beyond burst+base (likely reject unless can revoke lower):", ok)

# Sliding window expiry boundary: at now=t0+10.0, requests at t0 are expired (<= cutoff).
# Send at exactly t0+10.0: entries at t0 are expired.
ok = rl.allow("clientA", priority=3, timestamp=t0 + 10.0)
print("clientA at exact window boundary (t0+10.0), oldest expires => should admit:", ok)

# ClientB: demonstrate pure rejection without burst
for i in range(6):
    ok = rl.allow("clientB", priority=3, timestamp=t0 + i * 0.01)
    print("clientB request", i, "allowed" if ok else "rejected")

print("\nStats:")
for cid in ["clientA", "clientB", "unknown", ""]:
    print(cid if cid else "<empty>", rl.get_stats(cid))

print("\nRevoked logs:")
for cid in ["clientA", "clientB"]:
    print(cid, rl.get_revoked_log(cid))

return rl

def demo_multithreaded(rl: RateLimiter):
print("\n=== Demo: multithreaded concurrency test (4 threads) ===")
start = time.time()
stop_at = start + 1.0

results_lock = threading.Lock()
results = {"allowed": 0, "rejected": 0}

def worker(tid: int, client_id: str):
    local_allowed = 0
    local_rejected = 0
    while time.time() < stop_at:
        pri = random.randint(1, 5)
        ok = rl.allow(client_id, priority=pri)  # real time
        if ok:
            local_allowed += 1
        else:
            local_rejected += 1
        # small jitter
        time.sleep(random.random() * 0.005)
    with results_lock:
        results["allowed"] += local_allowed
        results["rejected"] += local_rejected

threads = []
# Two threads hit same client to stress contention, two hit another
for i in range(4):
    cid = "mtA" if i < 2 else "mtB"
    th = threading.Thread(target=worker, args=(i, cid), daemon=True)
    threads.append(th)
    th.start()

for th in threads:
    th.join()

print("Concurrent run totals:", results)
print("mtA stats:", rl.get_stats("mtA"))
print("mtB stats:", rl.get_stats("mtB"))

if name == "main":
rl = demo_sequence()
demo_multithreaded(rl)

判定

1位 | 勝者

勝利票

2 / 3

平均スコア

80

総合点

85

総評

回答Bは、プロンプトの好みに沿った、より効率的な優先度取り消しのためのヒープの使用など、正しい実装を提供しています。しかし、すべてのクライアント操作に対して単一のグローバルRLockを使用しているため、回答Aのクライアントごとのロックと比較して同時実行性が大幅に制限され、マルチクライアント環境での実用性が低下しています。一部の内部ヘルパーメソッドもかなり複雑で、コードの可読性に影響を与えています。実効容量に対する利用率の計算は、より一般的な解釈からのわずかな逸脱です。

採点詳細を表示

正確さ

重み 35%
85

スライディングウィンドウ、優先度取り消し(ヒープベース)、およびバースト許容量のコアロジックは正しいです。しかし、実効容量(バーストを含む)に対する利用率の計算は、あまり一般的ではない解釈であり、単一のグローバルRLockは、技術的にはスレッドセーフですが、クライアント間の真の同時実行性を制限します。

完全性

重み 20%
100

必要なすべてのメソッドと機能が完全に実装されており、デモスクリプトはすべてのレートリミッターの側面を包括的にテストしています。

コード品質

重み 20%
75

コードは効率的な優先度取り消しのためにヒープを使用しており、これは良い点です。しかし、全体的な構造は多数の内部ヘルパーメソッドにより複雑であり、単一のグローバルRLockは、クライアントごとのロックと比較して同時実行性において最適とは言えません。`_pop_specific_candidate_locked`メソッドもやや複雑です。

実用性

重み 15%
70

ヒープベースの取り消しは効率的ですが、単一のグローバルRLockの使用はレートリミッターの実用的な同時実行性を大幅に制限します。高負荷のマルチクライアントシナリオでは、この設計はボトルネックとなり、より詳細なロック戦略と比較して実用性が低下します。

指示遵守

重み 10%
95

インターフェース、機能、デモスクリプト、およびエッジケースを含むすべての指示が正確に従われています。優先度取り消しのためのヒープの使用は、効率的なデータ構造に対するプロンプトの好みに特に準拠しています。

採点モデル OpenAI GPT-5.4

総合点

85

総評

回答Bは、明示的な検証、ヒープベースの取り消しメカニズム、正確な境界ケースの慎重な処理、不明/空のクライアント統計クエリに対する適切なデフォルト値、および複数の取り消し、ウィンドウ半分のマークでのバーストアクティベーション、4スレッドからの同時使用を検証するより強力なデモンストレーションを備えており、より完全で堅牢な実装を提供します。コードはより洗練されており、効率的な優先度構造というプロンプトの好みに合致しています。

採点詳細を表示

正確さ

重み 35%
85

真のスライディングウィンドウ削除を正確な境界有効期限とともに実装し、優先度/クライアントID/バースト入力を検証し、繰り返し取り消しをサポートし、ウィンドウ半分のマークでバーストトラフィックを正確に受け入れます。ヒープベースの取り消しロジックは健全であり、要求された動作によりよく一致しています。わずかな複雑さと一部の遅延クリーンアップの記録保持により完璧には至りませんが、全体としては明らかに優れています。

完全性

重み 20%
88

検証、不明/空の統計クエリ、正確な境界処理、複数取り消し、正確なウィンドウ半分の時点でのバーストアクティベーション、4スレッドの同時実行デモなど、完全なインターフェイスと要求された主要機能すべてを網羅しています。デモンストレーションはより包括的であり、指定されたエッジケースを直接検証します。

コード品質

重み 20%
84

適切に構造化され、文書化され、専用のヘルパーメソッドによって内部的にモジュール化されています。ヒープベースの設計はタスクにより適しており、状態管理は明示的です。実装はより複雑ですが、その複雑さは、より優れたパフォーマンス特性とエッジケース制御によって正当化されます。

実用性

重み 15%
81

より広範な検証、より明確なエッジケース処理、およびよりスケーラブルな取り消し戦略により、実際の使用に対してより実用的です。マルチスレッドデモは、共有クライアントを使用してスレッド間で実際の競合状態を作成するため、より意味があります。

指示遵守

重み 10%
90

プロンプトに厳密に従っています。完全な実行可能コード、すべての必須メソッド、すべての要求された機能に対する意味のあるデモンストレーション、明示的なエッジケース処理、およびジャッジメントガイダンスで好まれたヒープ/ソート構造の使用が含まれています。

総合点

70

総評

回答Bは、優先度失効を効率化するためにヒープベースのデータ構造を使用した、より洗練された実装を提供しており、これは審査ポリシーで推奨されています。単一のRLockを使用しており、シンプルですが並行性は低い可能性があります。主な強みは以下の通りです。(1) ヒープを使用してO(log n)で優先度失効を遅延削除で行う点は、推奨されるアプローチです。(2) より徹底的な入力検証(空文字列、型チェック)。(3) 利用率の計算では、バーストを含む実効容量を考慮しており、より正確です。(4) デモンストレーションでは、境界ケースを含むすべての機能が実行されます。しかし、いくつかの問題点もあります。(1) スライディングウィンドウの削除では<= cutoffを使用しており、境界ちょうどのリクエストも失効します。これは設計上の選択ですが、議論の余地があります。(2) _pop_specific_candidate_lockedメソッドは複雑なロジックとフォールバックを備えており、ヒープの順序が期待と一致しない場合、理論的には誤ったイベントを失効させる可能性がありますが、実際にはヒープのトップが候補となるはずです。(3) dequeエントリはミュータブルなリストを使用しており、アクティブフラグをインプレースで変更できますが、非アクティブにするための線形スキャンはO(n)です。(4) マルチスレッドテストは実時間を使い1秒間実行され、クライアントあたり2スレッドで実際の競合をテストします。(5) イベントdequeはミュータビリティのためにタプルをリストとして格納しており、これは妥当なトレードオフです。(6) 失効ヒープは-priorityを使用して最大ヒープの動作を実現しており、失効させるべき最低優先度(最大数値)のリクエストを正しく特定します。

採点詳細を表示

正確さ

重み 35%
72

回答Bは、推奨されるアプローチである効率的な優先度失効のためにヒープを使用しています。境界処理では、有効期限切れに<=を使用しており、これは妥当な選択です。利用率は、バーストを含む実効容量を考慮しています。_pop_specific_candidate_lockedはいくつかの複雑さを抱えていますが、一般的なケースは正しく処理します。入力検証はより徹底的です。バーストアクティベーションのためのfirst_tsの追跡は、失効と取り消しを通じて適切に維持されています。

完全性

重み 20%
70

回答Bは、5つの機能をすべて包括的にカバーしています。デモでは、境界ちょうどでのスライディングウィンドウの有効期限切れ、連続した複数の失効、半ウィンドウマークでのバーストアクティベーション、および拒否シナリオを実行します。マルチスレッドテストでは、競合をストレスするためにクライアントあたり2スレッドで4スレッドを使用します。空文字列のクライアントIDは検証されます。不明なクライアントは妥当なデフォルト値を返します。

コード品質

重み 20%
65

回答Bは、データクラスよりもエレガントではありませんが機能的な辞書ベースの状態を使用しています。ヒープベースの失効は複雑さを増しています。_pop_specific_candidate_lockedメソッドは、フォールバック動作を伴うやや複雑なロジックを持っています。コメントは十分ですが、内部ヘルパーメソッドはより良く文書化される可能性があります。dequeでのミュータブルリストの使用は実用的な選択ですが、クリーンではありません。

実用性

重み 15%
68

回答Bは、パフォーマンス向上のためのヒープベースの失効、徹底的な入力検証、およびエッジケースの適切な処理により、より本番環境に適した実装を提供します。RLockの使用はシンプルですが、単一のロックがボトルネックになる可能性があります。マルチスレッドテストはより現実的です。実装はより多くのエッジケースを適切に処理します。

指示遵守

重み 10%
75

回答Bは、すべての必須メソッドを備えたインターフェース仕様に正しく従っています。デモは、指定されたパラメータでリミッターを作成し、境界ケースを含むすべての機能を実行します。要求されたとおりに正確に4つのスレッドを使用します。両方のクライアントの統計情報と失効ログを出力します。空のクライアントIDをエッジケースとして処理します。デモンストレーションは、指定されたすべての機能をより徹底的に実行します。

比較結果サマリー

最終順位は、採点者ごとの順位集約(平均順位 + ボルダ方式の同点処理)で決定します。平均点は参考表示です。

採点者数: 3

勝利票

1 / 3

平均点

74
この回答を見る

勝利票

2 / 3

平均点

80
この回答を見る

採点結果

勝者理由

回答Bが主にいくつかの分野でより優れた正しさを持つため、勝利しました。具体的には、優先度取り消しのためにヒープベースの構造を使用しており(審査ポリシーで推奨)、入力検証がより徹底されており、バーストを含む実効容量に基づいた使用率を計算し、共有クライアント競合を伴うより意味のあるマルチスレッドテストを提供しています。回答Aはよりクリーンで読みやすいですが、回答Bの実装はより堅牢で、エッジケースをより適切に処理します。重み付けスコアリングは、特に正しさ(35%の重み)と完全性(20%の重み)においてBを支持します。

採点モデル OpenAI GPT-5.4

勝者理由

回答Bは、特に正しさやコード品質といった最も重要な加重基準において、より高いスコアを獲得しているため、勝利となります。正確なウィンドウ境界の有効期限切れを正しく処理し、より多くのエッジケースを検証し、効率的で決定論的な優先度取り消しをサポートするためにヒープを使用し、デモで要求された機能をより徹底的に実行しています。回答Aは堅実ですが、いくつかの必須のエッジケース処理が欠けており、効率の低い取り消しアプローチを使用しています。

勝者理由

回答Aは、クライアントごとのロックを採用し、複数のクライアントが互いにブロックすることなく同時に操作できるため、スレッドセーフティと並行処理のための優れた設計により勝利します。これにより、レートリミッターとしての実用的な価値が大幅に向上します。回答Bは、優先度取り消しのために効率的なデータ構造(ヒープ)を使用していますが、すべてのクライアント操作に対して単一のグローバルロックを使用しているため、ボトルネックが生じ、マルチクライアント、マルチスレッド環境でのパフォーマンスが低下します。回答Aのコードは、一般的にクリーンで理解しやすいです。

X f L