Orivel Orivel
メニューを開く

スライディングウィンドウとバースト許容を備えたレートリミッタ

このプログラミングベンチマークに対する各AIの回答と比較結果を確認できます。

いいね・お気に入り機能を使うにはログインまたは新規登録が必要です。 新規登録

X f L

目次

お題概要

比較ジャンル

プログラミング

お題作成モデル

回答モデル

採点モデル

お題本文

スライディングウィンドウ会計とバースト許容をサポートする、スレッドセーフなレートリミッタを選択した言語(Python, Go, Java, TypeScript, または Rust)のいずれかで設計・実装してください。要件は次のとおりです。 1. **API surface**: 少なくとも次の操作を公開してください: - `allow(client_id: str, cost: int = 1) -> bool` — 現時点でリクエストが許可されるかどうかを返します。 - `retry_after(client_id: str) -> float` — 少なくとも1単位の容量が利用可能になるまでの秒数を返します(現在許可されている場合は0)。 - クライアン...

さらに表示

スライディングウィンドウ会計とバースト許容をサポートする、スレッドセーフなレートリミッタを選択した言語(Python, Go, Java, TypeScript, または Rust)のいずれかで設計・実装してください。要件は次のとおりです。 1. **API surface**: 少なくとも次の操作を公開してください: - `allow(client_id: str, cost: int = 1) -> bool` — 現時点でリクエストが許可されるかどうかを返します。 - `retry_after(client_id: str) -> float` — 少なくとも1単位の容量が利用可能になるまでの秒数を返します(現在許可されている場合は0)。 - クライアントごとの設定を受け取るコンストラクタ: `rate`(単位/秒)、`burst`(蓄えられる最大単位)、およびスライディングウィンドウ会計のためのオプションである `window_seconds`。 2. **Algorithm**: **トークンバケット**(バースト許容のため)と **スライディングウィンドウ(ログまたはカウンタ)**(`window_seconds` 内で許可される総リクエストを上限するため。純粋なトークンバケットではリフィル後に持続的な乱用を許してしまう)を組み合わせたハイブリッドを実装してください。リクエストは両方のチェックが通った場合にのみ許可されます。スライディングウィンドウのデータ構造選択(正確なログ vs. 重み付き二窓近似)について正当化し、メモリ/精度のトレードオフを短いコメントブロックまたは付随するノートで議論してください。 3. **Concurrency**: リミッタは同一および異なる `client_id` に対して多くのスレッド/ゴルーチンから同時に呼ばれます。単一のグローバルロックがボトルネックにならないようにしてください(例:クライアント毎のロック、ロックストライピングなど)。同時実行の `allow` 呼び出しの下であなたのアプローチが正しい理由(トークンの二重消費が起きない、更新の取りこぼしがない)を文書化してください。 4. **Time source**: テストが決定論的になるようにクロックを注入可能にしてください。デフォルトではモノトニッククロックを使用してください。 5. **Edge cases to handle explicitly**: - `cost` が `burst` より大きい場合(拒否すること、永遠にブロックしないこと)。 - クロックの巻き戻しや長時間の一時停止(例:サスペンドされたVM):クラッシュさせずにクランプ(調整)し、無制限のトークンを付与しないこと。 - 新規クライアントの最初のリクエスト(遅延初期化)。 - ステールなクライアントのクリーンアップ(クライアントが停止してもメモリが無制限に成長しないこと)。 - 小数トークン/サブミリ秒の時間処理。 6. **Tests**: 注入可能なクロックを使用して、少なくとも6つの単体テストを提供してください。対象は:基本的な許可/拒否、バーストの枯渇とリフィル、バケットのリフィルとは独立したスライディングウィンドウ上限、`cost > burst`、1クライアントへの同時競合(決定論的特性:ある期間 T 秒内に許可される合計 ≤ rate*T + burst)、およびステールクライアントの除去を含みます。 7. **Complexity**: `allow` の償却時間計算量とクライアントあたりのメモリ計算量を明示してください。 Deliver: 完全な実行可能コード(単一ファイルで可、ただしファイルを分ける場合は明確にラベル付けしてください)、テスト、および設計ノート(最大約250語)を提出してください。

補足情報

このタスクはバックエンド/システムズエンジニアリングのスキルを対象としています。複数の有効な解法(純粋なトークンバケット+ウィンドウログ、リーキーバケット+カウンタ、Cloudflare 風の重み付き二窓近似、CAS を用いたロックフリー、ロックストライプされたマップ等)が存在するため、品質は同時実行下での正確性、微妙なエッジケースの扱い、設計ノートの明瞭さ、テストカバレッジによって差が出ます。

採点方針

強い回答は次を満たすべきです: - 指定された言語のいずれかで、インポート不足や疑似コードのスタブがなく完全に実行可能なコードを提供する。 - トークンバケットコンポーネントとスライディングウィンドウコンポーネントの両方を正しく実装し、両方が合意した場合にのみリクエストを許可する。純粋なトークンバケットのみ、または純粋な固定ウィンドウカウンタのみは不完全とみなされる。 - 同一クライアントへの同時呼び出しの下で明示的にスレッドセーフであり、全クライアントを直列化する単一のグローバルロックを使用していない。クライアント毎ロッ...

さらに表示

強い回答は次を満たすべきです: - 指定された言語のいずれかで、インポート不足や疑似コードのスタブがなく完全に実行可能なコードを提供する。 - トークンバケットコンポーネントとスライディングウィンドウコンポーネントの両方を正しく実装し、両方が合意した場合にのみリクエストを許可する。純粋なトークンバケットのみ、または純粋な固定ウィンドウカウンタのみは不完全とみなされる。 - 同一クライアントへの同時呼び出しの下で明示的にスレッドセーフであり、全クライアントを直列化する単一のグローバルロックを使用していない。クライアント毎ロック、ロックストライピング、あるいは健全なロックフリー手法は許容される。回答はトークンの二重消費が発生しない理由を説明しなければならない。 - 列挙されたすべてのエッジケースを明示的に扱っていること:`cost > burst` の拒否、単調性・クロック巻き戻しの安全性、遅延初期化、ステールクライアントの退去(例:LRU、TTL スイープ、またはサイズ制限付きマップ)、および小数/サブms 時間。 - 注入可能なクロックと少なくとも6つの意味のあるテストを含み、そのうち1つは上限不変量(並列テスト:合計許可 ≤ rate*T + burst)を主張する決定論的なテストを含む。テストはそのまま実行できること。 - 設計ノート(簡潔)で次をカバーすること:選んだスライディングウィンドウのバリアントとその理由、バケットとウィンドウが矛盾したときのタイブレーク(精確な合意)意味論、`allow` の償却時間計算量(理想的には O(1) 償却)とクライアントあたりのメモリ。 - オフバイワンによるリフィル計算のバグ、時間算術での整数オーバーフロー、トークンログの無制限成長など、明らかなバグがないこと。 より高品質な回答は、正確なリクエストログと近似的な二窓重み付きカウンタ間のメモリ/精度トレードオフについて議論し、単一の巨大関数ではなく慣用的でよく構造化されたコードを書く傾向がある。並行性を無視する、テストを省略する、エッジケースを曖昧に扱う、あるいは教科書のトークンバケットを貼り付けただけの回答は評価を下げる。

回答

import math
import threading
import time
import unittest
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field

設計上の注意:

このリミッターは、2つの独立したチェックが一致した場合にのみリクエストを許可します。

1. トークンバケットに少なくともコストトークンがあり、バーストまで短いバーストを許可します。

2. 精密なスライディングウィンドウログに、最後のウィンドウ秒内にコスト以上の許可されたユニットの空きがあること。

ウィンドウ制限は rate * window_seconds です。

トークンバケットがはいと言ってもスライディングウィンドウがいっぱいの場合、リク...

全文を表示

import math
import threading
import time
import unittest
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field

設計上の注意:

このリミッターは、2つの独立したチェックが一致した場合にのみリクエストを許可します。

1. トークンバケットに少なくともコストトークンがあり、バーストまで短いバーストを許可します。

2. 精密なスライディングウィンドウログに、最後のウィンドウ秒内にコスト以上の許可されたユニットの空きがあること。

ウィンドウ制限は rate * window_seconds です。

トークンバケットがはいと言ってもスライディングウィンドウがいっぱいの場合、リクエストは拒否され、トークンは消費されません。

ウィンドウに空きがあってもトークンがない場合も拒否されます。

精密ログは、許可されたリクエストごとに1つのエントリを (タイムスタンプ, コスト) として格納し、ユニットごとではありません。

そのため、許可は償却 O(1) です。各ログエントリは一度追加され、一度削除されます。

メモリは、アクティブウィンドウ内の許可されたリクエスト数あたり O(1) です。

重み付き2バケットカウンターは O(1) メモリを使用しますが、バケット境界付近で過剰または過小に許可する可能性があります。

この実装は、正確性が通常、不正行為防止により重要であるため、正確性を選択します。

クライアントマップはロックストライピングされています。各クライアント状態には独自のロックがあります。

呼び出し元は、トークン/ウィンドウ状態を変更する前にクライアントごとのロックを取得するため、

同時allow呼び出しはトークンを二重消費したり、ウィンドウ更新を失ったりすることはありません。

class MonotonicClock:
def now(self) -> float:
return time.monotonic()

class ManualClock:
def init(self, start: float = 0.0):
self._now = float(start)
self._lock = threading.Lock()

def now(self) -> float:
    with self._lock:
        return self._now

def set(self, value: float) -> None:
    with self._lock:
        self._now = float(value)

def advance(self, seconds: float) -> None:
    with self._lock:
        self._now += float(seconds)

@dataclass(frozen=True)
class ClientConfig:
rate: float
burst: float
window_seconds: float = 1.0

def validated(self) -> 'ClientConfig':
    rate = float(self.rate)
    burst = float(self.burst)
    window = float(self.window_seconds)
    if not math.isfinite(rate) or rate <= 0:
        raise ValueError('rate must be a positive finite number')
    if not math.isfinite(burst) or burst < 1:
        raise ValueError('burst must be a finite number >= 1')
    if not math.isfinite(window) or window <= 0:
        raise ValueError('window_seconds must be a positive finite number')
    return ClientConfig(rate, burst, window)

@property
def window_limit(self) -> float:
    return self.rate * self.window_seconds

@dataclass
class _ClientState:
config: ClientConfig
tokens: float
last_refill: float
last_seen: float
events: deque = field(default_factory=deque)
window_sum: float = 0.0
lock: threading.Lock = field(default_factory=threading.Lock)

class HybridRateLimiter:
def init(
self,
rate: float,
burst: float,
window_seconds: float = 1.0,
per_client_configs=None,
clock=None,
stripes: int = 64,
idle_ttl_seconds: float | None = None,
cleanup_interval_seconds: float | None = None,
):
if stripes <= 0:
raise ValueError('stripes must be positive')
self._default_config = ClientConfig(rate, burst, window_seconds).validated()
self._configs = {}
if per_client_configs:
for client_id, cfg in per_client_configs.items():
self._configs[client_id] = self._coerce_config(cfg)

    max_window = self._default_config.window_seconds
    for cfg in self._configs.values():
        max_window = max(max_window, cfg.window_seconds)

    if idle_ttl_seconds is None:
        idle_ttl_seconds = max(60.0, 2.0 * max_window)
    idle_ttl_seconds = float(idle_ttl_seconds)
    if not math.isfinite(idle_ttl_seconds) or idle_ttl_seconds <= 0:
        raise ValueError('idle_ttl_seconds must be positive and finite')
    if idle_ttl_seconds < max_window:
        raise ValueError('idle_ttl_seconds must be >= the largest window_seconds')
    self._idle_ttl = idle_ttl_seconds

    if cleanup_interval_seconds is None:
        cleanup_interval_seconds = min(60.0, max(1.0, idle_ttl_seconds / 2.0))
    cleanup_interval_seconds = float(cleanup_interval_seconds)
    if not math.isfinite(cleanup_interval_seconds) or cleanup_interval_seconds <= 0:
        raise ValueError('cleanup_interval_seconds must be positive and finite')
    self._cleanup_interval = cleanup_interval_seconds

    self._clock = clock if clock is not None else MonotonicClock()
    self._clients = {}
    self._stripe_locks = [threading.Lock() for _ in range(stripes)]
    self._cleanup_mutex = threading.Lock()
    self._last_cleanup = self._safe_time()
    self._eps = 1e-12

@staticmethod
def _coerce_config(cfg) -> ClientConfig:
    if isinstance(cfg, ClientConfig):
        return cfg.validated()
    if isinstance(cfg, dict):
        return ClientConfig(cfg['rate'], cfg['burst'], cfg.get('window_seconds', 1.0)).validated()
    rate, burst, *rest = cfg
    window = rest[0] if rest else 1.0
    return ClientConfig(rate, burst, window).validated()

def _safe_time(self) -> float:
    t = float(self._clock.now())
    if math.isnan(t):
        raise ValueError('clock returned NaN')
    return t

def _config_for(self, client_id: str) -> ClientConfig:
    return self._configs.get(client_id, self._default_config)

def _stripe_index(self, client_id: str) -> int:
    return hash(client_id) % len(self._stripe_locks)

def _get_state_locked(self, client_id: str) -> _ClientState:
    idx = self._stripe_index(client_id)
    stripe = self._stripe_locks[idx]
    stripe.acquire()
    try:
        state = self._clients.get(client_id)
        if state is None:
            cfg = self._config_for(client_id)
            now = self._safe_time()
            state = _ClientState(
                config=cfg,
                tokens=cfg.burst,
                last_refill=now,
                last_seen=now,
            )
            self._clients[client_id] = state
        state.lock.acquire()
        return state
    finally:
        stripe.release()

def _advance_and_prune_locked(self, state: _ClientState) -> float:
    raw_now = self._safe_time()
    now = max(raw_now, state.last_seen)

    elapsed = max(0.0, now - state.last_refill)
    max_useful_elapsed = state.config.burst / state.config.rate
    elapsed_for_tokens = min(elapsed, max_useful_elapsed)
    if elapsed_for_tokens > 0:
        state.tokens = min(
            state.config.burst,
            state.tokens + elapsed_for_tokens * state.config.rate,
        )
    state.last_refill = now
    state.last_seen = now

    cutoff = now - state.config.window_seconds
    while state.events and state.events[0][0] <= cutoff + self._eps:
        _, old_cost = state.events.popleft()
        state.window_sum -= old_cost
    if state.window_sum < self._eps:
        state.window_sum = 0.0
    return now

def allow(self, client_id: str, cost: int = 1) -> bool:
    if cost <= 0:
        raise ValueError('cost must be a positive integer')
    self._maybe_cleanup()

    cfg = self._config_for(client_id)
    if cost > cfg.burst:
        return False

    state = self._get_state_locked(client_id)
    try:
        now = self._advance_and_prune_locked(state)
        enough_tokens = state.tokens + self._eps >= cost
        enough_window = state.window_sum + cost <= state.config.window_limit + self._eps
        if not (enough_tokens and enough_window):
            return False
        state.tokens -= cost
        if state.tokens < self._eps:
            state.tokens = 0.0
        state.window_sum += cost
        state.events.append((now, float(cost)))
        return True
    finally:
        state.lock.release()

def retry_after(self, client_id: str) -> float:
    self._maybe_cleanup()
    state = self._get_state_locked(client_id)
    try:
        now = self._advance_and_prune_locked(state)
        cfg = state.config
        if cfg.burst < 1 or cfg.window_limit < 1:
            return math.inf
        token_wait = 0.0 if state.tokens + self._eps >= 1.0 else (1.0 - state.tokens) / cfg.rate

        if state.window_sum + 1.0 <= cfg.window_limit + self._eps:
            window_wait = 0.0
        else:
            window_wait = math.inf
            remaining = state.window_sum
            for ts, event_cost in state.events:
                remaining -= event_cost
                if remaining + 1.0 <= cfg.window_limit + self._eps:
                    window_wait = max(0.0, ts + cfg.window_seconds - now)
                    break
        return max(token_wait, window_wait)
    finally:
        state.lock.release()

def _maybe_cleanup(self) -> None:
    raw_now = self._safe_time()
    if raw_now - self._last_cleanup < self._cleanup_interval:
        return
    if not self._cleanup_mutex.acquire(blocking=False):
        return
    try:
        now = max(raw_now, self._last_cleanup)
        if now - self._last_cleanup >= self._cleanup_interval:
            self._cleanup_stale_at(now)
            self._last_cleanup = now
    finally:
        self._cleanup_mutex.release()

def cleanup_stale(self) -> int:
    with self._cleanup_mutex:
        now = max(self._safe_time(), self._last_cleanup)
        removed = self._cleanup_stale_at(now)
        self._last_cleanup = now
        return removed

def _cleanup_stale_at(self, now: float) -> int:
    removed = 0
    for idx, stripe in enumerate(self._stripe_locks):
        with stripe:
            for client_id, state in list(self._clients.items()):
                if self._stripe_index(client_id) != idx:
                    continue
                if not state.lock.acquire(blocking=False):
                    continue
                try:
                    if self._clients.get(client_id) is state and now - state.last_seen >= self._idle_ttl:
                        del self._clients[client_id]
                        removed += 1
                finally:
                    state.lock.release()
    return removed

def client_count(self) -> int:
    total = 0
    for stripe in self._stripe_locks:
        with stripe:
            total += len(self._clients)
            break
    return len(self._clients)

class HybridRateLimiterTests(unittest.TestCase):
def test_basic_allow_deny_and_retry_after(self):
clock = ManualClock()
limiter = HybridRateLimiter(rate=1, burst=2, window_seconds=10, clock=clock)
self.assertTrue(limiter.allow('alice'))
self.assertTrue(limiter.allow('alice'))
self.assertFalse(limiter.allow('alice'))
self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9)

def test_burst_draining_and_fractional_refill(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(rate=2, burst=3, window_seconds=10, clock=clock)
    self.assertTrue(limiter.allow('alice'))
    self.assertTrue(limiter.allow('alice'))
    self.assertTrue(limiter.allow('alice'))
    self.assertFalse(limiter.allow('alice'))
    clock.advance(0.25)
    self.assertFalse(limiter.allow('alice'))
    self.assertAlmostEqual(limiter.retry_after('alice'), 0.25, places=9)
    clock.advance(0.25)
    self.assertTrue(limiter.allow('alice'))

def test_sliding_window_caps_even_when_bucket_refilled(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(rate=10, burst=10, window_seconds=1, clock=clock)
    for _ in range(10):
        self.assertTrue(limiter.allow('abuser'))
    clock.advance(0.5)
    self.assertFalse(limiter.allow('abuser'))
    self.assertAlmostEqual(limiter.retry_after('abuser'), 0.5, places=9)
    clock.advance(0.5)
    self.assertTrue(limiter.allow('abuser'))

def test_cost_larger_than_burst_is_rejected_without_state_creation(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(rate=10, burst=5, window_seconds=10, clock=clock)
    self.assertFalse(limiter.allow('alice', cost=6))
    self.assertEqual(limiter.client_count(), 0)
    self.assertTrue(limiter.allow('alice', cost=5))
    self.assertFalse(limiter.allow('alice'))

def test_concurrent_contention_obeys_token_bucket_bound(self):
    clock = ManualClock()
    rate = 5
    burst = 5
    limiter = HybridRateLimiter(rate=rate, burst=burst, window_seconds=100, clock=clock, stripes=8)
    successes = 0
    for tick in range(11):
        clock.set(tick / 10.0)
        with ThreadPoolExecutor(max_workers=32) as pool:
            results = list(pool.map(lambda _: limiter.allow('hot'), range(32)))
        successes += sum(1 for r in results if r)
    bound = rate * 1.0 + burst
    self.assertLessEqual(successes, bound)

def test_stale_client_eviction(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(
        rate=10,
        burst=10,
        window_seconds=1,
        clock=clock,
        idle_ttl_seconds=5,
        cleanup_interval_seconds=1,
    )
    self.assertTrue(limiter.allow('old-client'))
    self.assertEqual(limiter.client_count(), 1)
    clock.advance(6)
    self.assertTrue(limiter.allow('new-client'))
    self.assertEqual(limiter.client_count(), 1)

def test_clock_going_backwards_is_clamped(self):
    clock = ManualClock(10)
    limiter = HybridRateLimiter(rate=1, burst=1, window_seconds=10, clock=clock)
    self.assertTrue(limiter.allow('alice'))
    clock.set(5)
    self.assertFalse(limiter.allow('alice'))
    self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9)

def test_large_pause_refills_only_to_burst(self):
    clock = ManualClock()
    limiter = HybridRateLimiter(rate=1, burst=3, window_seconds=1000, clock=clock)
    for _ in range(3):
        self.assertTrue(limiter.allow('alice'))
    clock.advance(10 ** 9)
    for _ in range(3):
        self.assertTrue(limiter.allow('alice'))
    self.assertFalse(limiter.allow('alice'))

if name == 'main':
unittest.main()

判定

1位 | 勝者

勝利票

3 / 3

平均スコア

89
採点モデル Google Gemini 2.5 Pro

総合点

92

総評

回答Aは、ハイブリッドレートリミッターの本番品質で洗練された堅牢な実装を提供します。その主な強みは、ロックストライピングとクライアントごとのロックを組み合わせた高度にスケーラブルな並行処理モデル、バックグラウンドスレッドを回避するエレガントなクリーンアップメカニズム、および競合状態のような複雑な動作を正確に検証する包括的でよく設計されたテストスイートです。コードはクリーンで、構造化されており、クライアントごとの設定のような高度な機能を含む、すべてのプロンプト要件に完全に準拠しています。

採点詳細を表示

正確さ

重み 35%
90

実装は非常に正確で堅牢です。ロックストライピングとクライアントごとのロックを使用した並行処理モデルは、ボトルネックを回避するための洗練された正確な方法です。クロックの安全性や端数リフィルを含むすべてのエッジケースが正しく処理されています。トークンバケットとスライディングウィンドウの両方のロジックは健全です。

完全性

重み 20%
95

この回答は非常に包括的です。要求されたすべてのAPIメソッドを実装し、指定されたすべてのエッジケースを処理し、要求数を超える包括的なテストスイートを提供し、明確な設計ノートを含んでいます。また、提案されたクライアントごとの設定も実装しており、プロンプトの完全な理解を示しています。

コード品質

重み 20%
90

コード品質は優れています。データクラスのような最新のPython機能を効果的に使用し、論理的なメソッドに構造化されており、非常に読みやすいです。統合されたクリーンアップメカニズムやロックストライピングなどの設計上の選択はエレガントで、高いレベルのエンジニアリングスキルを示しています。

実用性

重み 15%
90

実装は本番グレードの品質であり、非常に実用的です。スケーラブルな並行処理モデルにより、高スループットシステムに適しています。柔軟な設定とエッジケースの堅牢な処理により、実際のアプリケーションで自信を持ってデプロイできます。

指示遵守

重み 10%
100

回答はすべての指示に完全に準拠しています。要求された言語でコードを提供し、指定されたAPI、アルゴリズム、および並行処理戦略を実装し、すべてのエッジケースを処理し、要求されたテストと複雑さの分析を含み、指定された制約内で設計ノートを提供しています。

採点モデル OpenAI GPT-5.4

総合点

89

総評

回答Aは、要求されたAPIとセマンティクスに直接一致する、強力でほぼ完全な実装です。トークンバケットと正確なスライディングウィンドウログを組み合わせ、注入可能な単調クロックを使用し、ストライプ化されたマップロックとクライアントごとのロックにより単一のグローバルボトルネックを回避し、要求されたエッジケースを網羅する堅牢な決定論的テストが含まれています。設計ノートは簡潔で、トレードオフ、並行処理の正しさ、複雑性に対処しています。マイナーな弱点としては、やや扱いにくいclient_countの実装や、任意のコストではなく1単位の可用性のみを考慮するretry_afterがありますが、これは要求されたAPIに一致しています。

採点詳細を表示

正確さ

重み 35%
88

要求されたハイブリッドを正しく実装しています:リクエストは、トークンバケットと正確なスライディングウィンドウの両方が許可した場合にのみ通過し、適切なトークン補充クランプ、遅延初期化、古い状態のクリーンアップ、およびクライアントごとのシリアル化されたミューテーションにより二重消費を防ぎます。retry_afterは1単位の可用性に対して一貫性があり、テストは重要な不変条件を網羅しています。マイナーな懸念事項は、client_countが完全な同期なしにマップを読み取ることや、ウィンドウエビクションにおける一部の境界選択などの小さな実装上の癖です。

完全性

重み 20%
93

要求されたほぼすべての項目を網羅しています:必須のAPIサーフェス、ハイブリッドアルゴリズム、注入可能な単調クロック、クライアントごとの設定サポート、明示的なエッジケース、古いクライアントのクリーンアップ、分数タイミング、並行処理に関する議論、複雑性に関する注記、および決定論的な同時実行競合とエビクションを含む8つの意味のあるテスト。これはプロンプトをほぼ完全に満たしています。

コード品質

重み 20%
84

データクラス、分離されたヘルパー、明示的な検証、明確な命名により、構造化され、イディオムに沿ったコードです。ロッキング戦略は慎重に構成されており、設計ノートは簡潔です。クライアントカウントの実装が独特であることや、簡略化できる可能性のある低レベルのロック管理など、いくつかの粗いエッジが残っています。

実用性

重み 15%
87

バックエンドでの使用に実用的です:決定論的なクロック注入、境界のある古い状態のクリーンアップ、競合に対するロックストライピングとクライアントごとのロック、正確なウィンドウアカウンティング、および堅牢なテストにより、デプロイまたは適応可能です。実装は現実的であり、大きな一時停止やVMサスペンションなどの一般的な本番環境のエッジケースに対処しています。

指示遵守

重み 10%
95

プロンプトに密接に従っています:言語選択は有効で、API名は一致し、設計ノートは範囲内であり、テストは注入可能なクロックを使用し、要求されたエッジケースは明示的に対処されています。回答は、技術的要件とドキュメント要件の両方にうまく適合しています。

総合点

85

総評

回答Aは、洗練された本番品質のハイブリッドレートリミッターです。データクラス、ストライプロック、およびクライアントごとのロックを使用し、stale cleanupをallow呼び出しに統合(バックグラウンドスレッドの問題を回避)、設定を検証し、バッククロックと大きな一時停止の両方に対して経過時間を適切にクランプし、クライアント状態を作成せずにcost>burstを拒否します。8つのテストは、決定論的にManualClockを注入し、rate*T+burst不変量を実際にアサートする同時競合テストを含みます。設計ノートは簡潔で、必要なすべてのポイントに対処しています。

採点詳細を表示

正確さ

重み 35%
85

ハイブリッドトークンバケット+正確なスライディングウィンドウログを正しく実装しています。max(raw_now, last_seen)によるクランプでclock-backwardsを処理し、状態を作成せずにcost>burstを拒否し、オーバーフローを回避するためにトークンの経過時間をクランプし、マップに対してストライプロックを備えたクライアントごとのロックを使用します。retry_afterは、ウィンドウログの有効期限に基づいて待機時間を正しく計算します。eps処理とイベントアカウンティングは健全に見えます。

完全性

重み 20%
90

すべての必須エッジケースをカバーしています:cost>burst(状態作成なし)、clock-backwards(テストを含む)、大きな一時停止(テストを含む)、遅延初期化、stale eviction(注入されたクロックに結び付けられた_maybe_cleanupによる決定論的)、小数トークン。8つのテストには、ManualClockを使用した決定論的な同時バウンドテストが含まれ、rate*T+burst不変量をアサートします。

コード品質

重み 20%
80

データクラス、MonotonicClock/ManualClockの分離、ClientConfigの検証、クライアントごとおよびストライプロックが明確に説明されており、適切に構造化されています。Cleanupはallowパスに統合されており(バックグラウンドスレッドの複雑さはありません)。簡潔な設計ノートが冒頭にあります。

実用性

重み 15%
80

本番品質:ストライプロックはよりスケーラブルであり、統合された遅延クリーンアップはバックグラウンドスレッドのライフサイクルを回避し、決定論的なテストサポート、設定検証。実際のサービスに組み込むのに適しています。

指示遵守

重み 10%
90

すべての指示に従っています:APIサーフェスはシグネチャと一致し、ハイブリッドアルゴリズム、ストライプロックを備えたクライアントごとのロック、注入可能なモノトニッククロック、すべてエッジケース、決定論的な同時不変量テストを含む8つのテスト、セマンティクスと複雑さが述べられた設計ノート。

import threading
import time
import collections

Injectable clock for deterministic testing

class MonotonicClock:
def time(self):
return time.monotonic()

class TestClock:
def init(self, initial_time=0.0):
self._time = initial_time
self._lock = threading.Lock()

def time(self):
    with self._lock:
        return self._time

def advance(self, seconds):
    with self._lock:
        self._time += seconds

def set_time(self, new_time):
    with self._lock:
        self._time = new_time

Client state for the rate limiter

class ClientState:...

全文を表示

import threading
import time
import collections

Injectable clock for deterministic testing

class MonotonicClock:
def time(self):
return time.monotonic()

class TestClock:
def init(self, initial_time=0.0):
self._time = initial_time
self._lock = threading.Lock()

def time(self):
    with self._lock:
        return self._time

def advance(self, seconds):
    with self._lock:
        self._time += seconds

def set_time(self, new_time):
    with self._lock:
        self._time = new_time

Client state for the rate limiter

class ClientState:
def init(self, rate: float, burst: int, window_seconds: int, current_time: float):
self.lock = threading.Lock()
self.tokens = float(burst) # Current tokens in the bucket
self.last_refill_time = current_time # Last time tokens were refilled
self.burst = burst # Max tokens
self.rate = rate # Tokens per second

    # Sliding window log: stores (timestamp, cost) tuples of allowed requests
    # We use a deque for efficient O(1) append and popleft
    self.request_timestamps = collections.deque()
    self.current_window_cost = 0 # Sum of costs in the current window
    self.window_seconds = window_seconds

    self.last_access_time = current_time # For stale client eviction

def _refill_tokens(self, current_time: float):
    # Clamp last_refill_time to current_time to prevent negative time delta
    # and thus unbounded token grants if clock goes backwards (though monotonic clock prevents this)
    # or if there's a huge pause (e.g., suspended VM).
    if current_time < self.last_refill_time:
        self.last_refill_time = current_time

    time_delta = current_time - self.last_refill_time
    if time_delta > 0:
        refill_amount = time_delta * self.rate
        self.tokens = min(self.burst, self.tokens + refill_amount)
        self.last_refill_time = current_time

def _clean_window(self, current_time: float):
    # Remove (timestamp, cost) tuples older than window_seconds
    while self.request_timestamps and self.request_timestamps[0][0] <= current_time - self.window_seconds:
        _, removed_cost = self.request_timestamps.popleft()
        self.current_window_cost -= removed_cost

class RateLimiter:
def init(self, rate: float, burst: int, window_seconds: int = 60, clock=None, cleanup_interval: float = 300.0, client_ttl_multiplier: float = 2.0):
if rate <= 0 or burst < 0 or window_seconds <= 0:
raise ValueError("Rate must be positive, burst non-negative, window_seconds positive.")

    self.rate = rate
    self.burst = burst
    self.window_seconds = window_seconds
    self.clock = clock if clock is not None else MonotonicClock()

    self.clients = {} # client_id -> ClientState
    self._clients_lock = threading.Lock() # Protects access to the clients dictionary

    # Stale client cleanup
    self.cleanup_interval = cleanup_interval
    self.client_ttl_multiplier = client_ttl_multiplier # e.g., 2.0 * window_seconds
    self._cleanup_thread = None
    self._stop_cleanup_event = threading.Event()
    self._start_cleanup_thread()

def _get_or_create_client_state(self, client_id: str) -> ClientState:
    with self._clients_lock:
        if client_id not in self.clients:
            self.clients[client_id] = ClientState(self.rate, self.burst, self.window_seconds, self.clock.time())
        return self.clients[client_id]

def allow(self, client_id: str, cost: int = 1) -> bool:
    if cost <= 0:
        return True # Zero or negative cost requests are always allowed (or could raise error)

    client_state = self._get_or_create_client_state(client_id)
    current_time = self.clock.time()

    with client_state.lock:
        # Edge case: cost > burst. A single request cannot exceed the max capacity of the bucket.
        if cost > client_state.burst:
            return False

        # Update last access time for cleanup
        client_state.last_access_time = current_time

        # Token Bucket check
        client_state._refill_tokens(current_time)
        if client_state.tokens < cost:
            return False # Not enough tokens

        # Sliding Window check
        client_state._clean_window(current_time)
        # Window capacity is `rate * window_seconds` units.
        if client_state.current_window_cost + cost > self.rate * self.window_seconds:
            return False # Window capacity exceeded

        # Both checks passed. Consume tokens and log request.
        client_state.tokens -= cost
        client_state.request_timestamps.append((current_time, cost))
        client_state.current_window_cost += cost
        return True

def retry_after(self, client_id: str) -> float:
    client_state = self._get_or_create_client_state(client_id)
    current_time = self.clock.time()

    with client_state.lock:
        # Update last access time for cleanup
        client_state.last_access_time = current_time

        client_state._refill_tokens(current_time)
        client_state._clean_window(current_time)

        # Check token bucket first for 1 unit of cost
        if client_state.tokens >= 1: 
            # Token bucket allows, now check sliding window for 1 unit
            if client_state.current_window_cost + 1 <= self.rate * self.window_seconds:
                return 0.0 # Currently allowed for 1 unit
            else:
                # Sliding window denies for 1 unit. Find when oldest request in window expires.
                needed_to_remove = (client_state.current_window_cost + 1) - (self.rate * self.window_seconds)
                if needed_to_remove <= 0: # Should not happen if we are in this else branch
                    return 0.0

                removed_sum = 0
                for ts, c in client_state.request_timestamps:
                    removed_sum += c
                    if removed_sum >= needed_to_remove:
                        return max(0.0, (ts + self.window_seconds) - current_time)
                return 0.0 # Should not be reached if window denies
        else:
            # Token bucket denies. Calculate time until enough tokens are available for 1 unit.
            tokens_needed = 1 - client_state.tokens
            time_to_refill = tokens_needed / self.rate
            return max(0.0, time_to_refill)

def _cleanup_clients(self):
    while not self._stop_cleanup_event.wait(self.cleanup_interval):
        current_time = self.clock.time()
        stale_threshold = current_time - (self.window_seconds * self.client_ttl_multiplier) # 15.0 for default values
        
        clients_to_remove = []
        with self._clients_lock:
            for client_id, state in self.clients.items():
                # Check last_access_time without acquiring client's lock to avoid potential deadlocks
                # with concurrent `allow` calls. A slight inaccuracy in cleanup timing is acceptable.
                if state.last_access_time < stale_threshold:
                    clients_to_remove.append(client_id)
            
            for client_id in clients_to_remove:
                # Remove from the dictionary. Subsequent `allow` calls will re-create if needed.
                if client_id in self.clients: # Check again in case another thread already removed it
                    del self.clients[client_id]

def _start_cleanup_thread(self):
    if self._cleanup_thread is None or not self._cleanup_thread.is_alive():
        self._cleanup_thread = threading.Thread(target=self._cleanup_clients, daemon=True)
        self._cleanup_thread.start()

def stop_cleanup_thread(self):
    self._stop_cleanup_event.set()
    if self._cleanup_thread:
        self._cleanup_thread.join()
        self._cleanup_thread = None

Design Note:

This rate limiter combines a token bucket with a sliding-window log to provide robust rate limiting.

Algorithm Justification:

The token bucket handles burst tolerance, allowing requests to exceed the average rate temporarily

up to the burst capacity. Tokens refill at rate per second. This prevents short-term spikes.

The sliding-window log (implemented with a collections.deque storing (timestamp, cost) tuples)

prevents sustained abuse that a pure token bucket might allow after refills. It ensures that the

total cost of requests within any window_seconds period does not exceed rate * window_seconds.

This hybrid approach ensures both short-term burstiness and long-term rate adherence.

Data Structure Choice for Sliding Window:

An exact sliding-window log is used. While it consumes more memory than a weighted two-bucket

approximation (like Cloudflare's), it offers perfect accuracy within the defined window. For a

per-client limiter, the memory footprint O(rate * window_seconds) per active client (storing

(timestamp, cost) tuples) is generally acceptable, especially with the stale client eviction

mechanism. The deque provides O(1) amortized time for adding and removing elements from its ends,

making window maintenance efficient.

Concurrency:

Thread safety is achieved using per-client locks. Each ClientState object has its own threading.Lock.

This prevents a single global lock from becoming a bottleneck, allowing concurrent allow calls for

different client_ids to proceed in parallel. For a specific client_id, its lock ensures that

operations on its tokens, last_refill_time, request_timestamps, and current_window_cost are atomic.

This guarantees no double-spend of tokens or lost updates for that client. The self._clients_lock

protects the self.clients dictionary itself, ensuring safe addition/removal of client states.

Semantics when algorithms disagree:

A request is permitted only if both the token bucket check and the sliding-window log check pass.

If the token bucket has enough capacity but the sliding window indicates too many units have been

consumed recently, the request is denied. Conversely, if the sliding window has space but the

token bucket is empty, the request is also denied. This ensures the stricter of the two limits

is always enforced.

Time Complexity:

- allow: Amortized O(1). Token bucket operations are O(1). Sliding window deque operations

(append, popleft) are O(1) amortized. In the worst case, _clean_window might iterate

through many elements if the clock jumps significantly, but this is amortized over time.

- retry_after: Amortized O(N) in the worst case for the sliding window part, where N is the number

of requests in the window, as it might iterate through the deque to find the expiration point.

However, N is bounded by rate * window_seconds.

Memory Complexity:

- Per client: O(rate * window_seconds) for storing (timestamp, cost) tuples in the sliding window log.

Plus constant memory for token bucket state and lock.

- Total: O(Number of active clients * rate * window_seconds). Stale client cleanup prevents unbounded growth.

Unit Tests

import unittest
import time
import threading
import collections

Re-define TestClock here for self-contained test file

class TestClock:
def init(self, initial_time=0.0):
self._time = initial_time
self._lock = threading.Lock()

def time(self):
    with self._lock:
        return self._time

def advance(self, seconds):
    with self._lock:
        self._time += seconds

def set_time(self, new_time):
    with self._lock:
        self._time = new_time

class RateLimiterTest(unittest.TestCase):
def setUp(self):
self.clock = TestClock()
# Use a short cleanup interval for tests to make eviction testable
self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)

def tearDown(self):
    self.limiter.stop_cleanup_thread()

def test_basic_allow_deny(self):
    client_id = "client1"
    # Initial burst of 5 + 10 tokens/sec * 0 sec = 5 tokens
    # Allow 5 requests
    for _ in range(5):
        self.assertTrue(self.limiter.allow(client_id))
    
    # Deny 6th request (burst exhausted)
    self.assertFalse(self.limiter.allow(client_id))
    self.assertGreater(self.limiter.retry_after(client_id), 0)

    # Advance time to allow refill
    self.clock.advance(0.1) # 1 token refilled (10 tokens/sec * 0.1 sec)
    self.assertTrue(self.limiter.allow(client_id)) # Should allow 1
    self.assertFalse(self.limiter.allow(client_id)) # Deny 2nd

    self.clock.advance(0.9) # 9 more tokens refilled, total 10 tokens refilled since initial burst exhausted
    for _ in range(9):
        self.assertTrue(self.limiter.allow(client_id))
    self.assertFalse(self.limiter.allow(client_id)) # Deny 10th

def test_burst_draining_and_refill(self):
    client_id = "client2"
    # Burst: 5, Rate: 10/s
    
    # Drain burst
    for _ in range(5):
        self.assertTrue(self.limiter.allow(client_id))
    self.assertFalse(self.limiter.allow(client_id)) # Burst exhausted

    # Advance time for partial refill
    self.clock.advance(0.2) # 2 tokens refilled (10 * 0.2)
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertFalse(self.limiter.allow(client_id)) # Refill exhausted

    # Advance time for full refill (up to burst capacity)
    self.clock.advance(1.0) # 10 tokens refilled. Bucket should be capped at burst (5).
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertTrue(self.limiter.allow(client_id))
    self.assertFalse(self.limiter.allow(client_id)) # Burst exhausted again

def test_sliding_window_cap_independent_of_bucket_refill(self):
    # Reset limiter and client for this specific test scenario
    self.limiter.stop_cleanup_thread() 
    self.limiter = RateLimiter(rate=10, burst=10, window_seconds=1, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)
    self.clock.set_time(0.0)
    client_id = "client_window_test"
    
    # Allow 10 requests within a very short time (burst allows this)
    for _ in range(10):
        self.assertTrue(self.limiter.allow(client_id))
    
    # Window capacity is 10 units (10 units/s * 1s).
    # We have 10 requests in the window, total cost 10.
    # Next request should be denied by window, even if bucket has tokens.
    self.assertFalse(self.limiter.allow(client_id))
    self.assertGreater(self.limiter.retry_after(client_id), 0)
    
    # Advance time by 0.5s. Still denied by window.
    self.clock.advance(0.5)
    self.assertFalse(self.limiter.allow(client_id))
    
    # Advance time by 0.5s more. Total 1.0s since the first request (at time 0.0).
    # The first request (at time 0.0) should now be out of the window.
    self.clock.advance(0.5)
    self.assertTrue(self.limiter.allow(client_id)) # Should allow, as one request expired from window.
    self.assertEqual(self.limiter.clients[client_id].current_window_cost, 10) # 10 requests in window, 10 units.

def test_cost_greater_than_burst(self):
    client_id = "client4"
    # Burst is 5. Cost 6 should be rejected.
    self.assertFalse(self.limiter.allow(client_id, cost=6))
    self.assertEqual(self.limiter.clients[client_id].tokens, 5) # Tokens should not be consumed
    self.assertEqual(self.limiter.clients[client_id].current_window_cost, 0) # Window should be empty

    # Cost 5 should be allowed
    self.assertTrue(self.limiter.allow(client_id, cost=5))
    self.assertEqual(self.limiter.clients[client_id].tokens, 0)
    self.assertEqual(self.limiter.clients[client_id].current_window_cost, 5)

def test_concurrent_contention_on_one_client(self):
    self.limiter.stop_cleanup_thread()
    self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)
    self.clock.set_time(0.0)
    
    client_id = "client_fixed_duration"
    test_duration = 2.0 # seconds
    num_threads = 10
    
    allowed_count = [0] # Use a list to make it mutable in nested scope
    
    def fixed_duration_worker():
        start_time = self.clock.time()
        while self.clock.time() < start_time + test_duration:
            if self.limiter.allow(client_id):
                with threading.Lock(): # Protect shared counter
                    allowed_count[0] += 1
            # Simulate some work and advance clock slightly
            self.clock.advance(0.001) # Each attempt takes 1ms

    threads = []
    for _ in range(num_threads):
        t = threading.Thread(target=fixed_duration_worker)
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    
    # Max allowed by token bucket: burst + rate * test_duration = 5 + 10 * 2 = 25.
    # Max allowed by window: rate * window_seconds = 10 * 10 = 100.
    # So, the token bucket is the stricter bottleneck here. Expected max allowed is 25.
    
    self.assertLessEqual(allowed_count[0], 25)
    # Due to non-deterministic scheduling and clock advancement by multiple threads,
    # asserting a precise lower bound is difficult. We primarily verify the upper bound.
    self.assertGreater(allowed_count[0], 15) # Should allow a significant number of requests

def test_stale_client_eviction(self):
    client_id_active = "active_client"
    client_id_stale = "stale_client"
    
    self.limiter.stop_cleanup_thread()
    self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)
    self.clock.set_time(0.0) # Reset clock
    
    # Access active client at time 0.0
    self.assertTrue(self.limiter.allow(client_id_active))
    
    # Access stale client at time 0.0
    self.assertTrue(self.limiter.allow(client_id_stale))
    
    # Advance time to just before stale_threshold for stale_client
    # Stale threshold is 1.5 * 10 = 15 seconds after last access.
    # So, stale_client will be stale at time 15.0.
    self.clock.advance(14.0) # Time is now 14.0
    
    # Access active client again to update its last_access_time
    self.assertTrue(self.limiter.allow(client_id_active)) # last_access_time for active_client is now 14.0
    
    # Advance time past stale_threshold for stale_client
    self.clock.advance(1.5) # Time is now 15.5. stale_client's last_access_time (0.0) is < 15.5 - 15 = 0.5.
    
    time.sleep(self.limiter.cleanup_interval * 2) # Give cleanup thread time to run
    
    self.assertNotIn(client_id_stale, self.limiter.clients)
    self.assertIn(client_id_active, self.limiter.clients) # Active client should still be there

def test_retry_after(self):
    client_id = "client_retry"
    
    # Initial state: 5 burst tokens, 0 window cost.
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0)
    
    # Consume burst
    for _ in range(5):
        self.limiter.allow(client_id)
    
    # No tokens left. Need 1 token. Rate is 10/s. So, 0.1s needed.
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.1)
    
    self.clock.advance(0.05) # Advance half the time needed
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.05)
    
    self.clock.advance(0.05) # Advance remaining time. Now 0.1s passed. 1 token refilled.
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0)
    self.assertTrue(self.limiter.allow(client_id)) # Consume the token
    
    # Now test retry_after for window limit
    self.limiter.stop_cleanup_thread()
    self.limiter = RateLimiter(rate=10, burst=10, window_seconds=1, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5)
    self.clock.set_time(0.0)
    
    # Fill up the window (10 units in 1 second window)
    for i in range(10):
        self.clock.advance(0.01) # Small advance to make timestamps distinct
        self.limiter.allow(client_id)
    
    # Current time is approx 0.1s. All 10 requests are in the window.
    # Window capacity is 10 units.
    # Next request (cost 1) should be denied by window.
    self.assertGreater(self.limiter.retry_after(client_id), 0.0)
    
    # The first request was at time ~0.01. It will expire at ~0.01 + 1 = ~1.01.
    # Current time is ~0.1. So, retry_after should be ~1.01 - ~0.1 = ~0.91.
    first_req_ts = self.limiter.clients[client_id].request_timestamps[0][0]
    expected_retry = (first_req_ts + self.limiter.window_seconds) - self.clock.time()
    self.assertAlmostEqual(self.limiter.retry_after(client_id), expected_retry, places=2)
    
    # Advance time until the first request expires
    self.clock.advance(expected_retry + 0.001) # Advance slightly past expiration
    self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0, places=2)
    self.assertTrue(self.limiter.allow(client_id)) # Should now be allowed

def test_clock_going_backwards_or_large_pauses(self):
    # Monotonic clock prevents going backwards, but large pauses are handled by clamping.
    # TestClock can simulate backwards movement.
    self.limiter.stop_cleanup_thread()
    self.clock = TestClock(initial_time=100.0)
    self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock)
    client_id = "client_time_warp"

    self.assertTrue(self.limiter.allow(client_id)) # Time 100.0, tokens 4, window cost 1
    self.clock.set_time(50.0) # Clock goes backwards
    
    # Refill should clamp last_refill_time to current_time (50.0), no extra tokens granted.
    # Tokens should still be 4 (from previous allow). The refill logic will see time_delta=0.
    self.assertTrue(self.limiter.allow(client_id)) # Should still allow, tokens 3.
    self.assertEqual(self.limiter.clients[client_id].tokens, 3)
    
    # Now advance time normally, tokens should refill from 50.0
    self.clock.set_time(51.0) # 1 second passed from clamped time
    self.assertTrue(self.limiter.allow(client_id)) # Tokens 3 + 10 (refill) - 1 (cost) = 12. Capped at burst 5. So 5 - 1 = 4.
    self.assertEqual(self.limiter.clients[client_id].tokens, 4)

def test_first_ever_request_lazy_initialization(self):
    client_id = "new_client"
    self.assertNotIn(client_id, self.limiter.clients)
    self.assertTrue(self.limiter.allow(client_id))
    self.assertIn(client_id, self.limiter.clients)
    self.assertEqual(self.limiter.clients[client_id].tokens, 4) # 5 (burst) - 1 (cost)
    self.assertEqual(self.limiter.clients[client_id].current_window_cost, 1)

def test_fractional_tokens_sub_millisecond_timing(self):
    self.limiter.stop_cleanup_thread()
    self.limiter = RateLimiter(rate=1, burst=1, window_seconds=10, clock=self.clock)
    client_id = "fractional_client"
    
    self.assertTrue(self.limiter.allow(client_id)) # Tokens 0
    self.assertFalse(self.limiter.allow(client_id)) # No tokens
    
    self.clock.advance(0.0001) # 0.0001 tokens refilled
    self.assertFalse(self.limiter.allow(client_id)) # Not enough for cost 1
    
    self.clock.advance(0.9999) # Total 1.0s passed. 1 token refilled.
    self.assertTrue(self.limiter.allow(client_id)) # Allowed. Tokens 0.

判定

2位

勝利票

0 / 3

平均スコア

69
採点モデル Google Gemini 2.5 Pro

総合点

76

総評

回答Bは機能的でほぼ正しい実装を提供しており、非常に詳細な設計ノートも付随しています。ハイブリッドアルゴリズムを正しく実装し、スレッドセーフのために標準的なクライアントごとのロッキング戦略を使用しています。しかし、回答Aと比較していくつかの弱点があります。メインのクライアント辞書に対するグローバルロックのため、並行処理モデルの拡張性が低く、クリーンアップのためにバックグラウンドスレッドに依存しているため複雑さが増し、単体テストの一部は欠陥がある(並行処理テスト)か、不安定になる可能性があります(エビクションテスト)。また、提案されたクライアントごとの設定機能も省略されています。

採点詳細を表示

正確さ

重み 35%
75

コアレート制限ロジックは正しいです。しかし、並行処理モデルは機能的ではありますが、Aのものほど堅牢ではありません。クライアント辞書に対するグローバルロックは、高いクライアントの入れ替わり率の下でボトルネックになる可能性があります。並行テストは、複数のスレッドが共有クロックを進めるという欠陥のある設計であり、その結果は信頼性が低いです。`retry_after`ロジックは最悪の場合O(N)であり、これは軽微な正しさ/パフォーマンスの問題です。

完全性

重み 20%
80

回答は、コアAPI、テスト、および設計ノートを提供しており、大部分は完了しています。しかし、プロンプトのコンストラクタ要件で提案されたクライアントごとの設定機能を実装しておらず、単一のグローバル設定しか提供していません。これは注目すべき省略です。

コード品質

重み 20%
70

コード品質は良好ですが、特筆すべきほどではありません。クリーンアップのためにバックグラウンドスレッドを使用することは、Aのアプローチと比較して複雑さとリソース管理のオーバーヘッドを増加させます。クライアントマップに対するグローバルロックとクライアントごとのロックの組み合わせは、Aの純粋なロックストライピングよりもエレガントではありません。テストコードには重複したクラス定義も含まれています。

実用性

重み 15%
75

実装は多くのユースケースで実用的です。しかし、クライアント辞書に対するグローバルロックは、新しいクライアントの発生率が高いシナリオでの拡張性を制限します。バックグラウンドスレッドへの依存も、一部のアプリケーション環境での統合と管理をわずかに複雑にしています。

指示遵守

重み 10%
85

回答はほとんどの指示にうまく従っています。主な逸脱は、APIサーフェスとして明示的に要件として言及されていたクライアントごとの設定を受け入れるコンストラクタを実装していないことです。設計ノートも、提案された単語数よりも大幅に長くなっていますが、網羅的です。

採点モデル OpenAI GPT-5.4

総合点

64

総評

回答Bは、ハイブリッドリミッター、設計に関する考察、多数のテストを含み、意図は良好ですが、いくつかの重要な要件を満たしていません。APIはコンストラクタでのクライアントごとの設定をサポートしておらず、すべてのルックアップ/作成でグローバルなクライアントロックを使用しています。古いクライアントのクリーンアップは、実際のバックグラウンドスレッドに依存しており、決定論的なクロック駆動のクリーンアップではありません。また、非正のコストを拒否せずに許可したり、last_refill_timeを巻き戻して過去の時間をクランプしたりするなど、いくつかの動作が不正確または仕様が不十分です。一部のテストは決定論的でなかったり、実際のsleepに依存したりするため、ベンチマークの信頼性が低下します。

採点詳細を表示

正確さ

重み 35%
60

コアとなるハイブリッド動作は存在しますが、正確性の問題があります。非正のコストが明示的に拒否されるのではなく許可されていること、バックデートクロックの処理が prior state semantics にクランプするのではなく last_refill_time を巻き戻すこと、古いクライアントのクリーンアップがロックされていない読み取りに基づいてクライアントを削除する可能性があること、クリーンアップ動作が実際の時間を伴うバックグラウンドスレッドに依存していること。クライアントマップの同時実行性は、ルックアップ/作成のためにグローバルロックを経由し続けています。一部のテストは、完全には決定論的でない動作を想定しています。

完全性

重み 20%
70

実行可能なコード、ハイブリッドロジック、設計ノート、6つ以上のテストが含まれていますが、クライアントごとの設定をコンストラクタで行う要件が欠けており、古いエビクションのための決定論的なクリーンアップ/テストを提供していません。多くのエッジケースを処理していますが、特にコスト検証と時間動作に関して、プロンプトと矛盾する方法で処理されているものもあります。完全性は良好ですが、仕様と比較すると著しく不完全です。

コード品質

重み 20%
68

可読性と整理は良好で、クラスやコメントは明確ですが、TestClockの定義が重複していること、本番コードとテストコードがやや繰り返し混在していること、推論やテストを複雑にするバックグラウンドクリーンアップスレッドへの依存により、品質が低下しています。いくつかのAPIと検証の選択も、要求された契約と一貫性がありません。

実用性

重み 15%
62

教育用またはプロトタイプ実装としては有用ですが、クリーンアップがバックグラウンドスレッドとリアルタイムのスリーピングに結びついていること、コンストラクタのカスタマイズが限定的であること、グローバルクライアントロックが競合の原因となる可能性があるため、本番環境では実用性が低いです。一部のエッジケースの動作は、信頼性の高い使用の前に洗練が必要でしょう。

指示遵守

重み 10%
65

指示には部分的に従っていますが、クライアントごとの設定をコンストラクタで行う要件が欠けており、ゼロ/負のコストを許可することで指定された許可契約から逸脱しています。テストは、実際のsleep/バックグラウンドクリーンアップにより、完全には決定論的ではありません。また、同時実行性のアプローチは、グローバルなボトルネックの回避を強調するプロンプトを完全には反映していません。

総合点

65

総評

回答Bは、クライアントごとのロックを備えた必要なハイブリッドアルゴリズムを実装しており、多くのテストで記載されているエッジケースをカバーしています。しかし、いくつかの顕著な欠陥があります。並行テストでは`with threading.Lock():`を使用しており、インクリメントごとに新しいロックが作成されるため(上限チェックが確実に保護されない)、古いクライアントのクリーンアップではリアルタイムのバックグラウンドスレッドが必要となり、テストで`time.sleep`を使用するため(決定論的ではなく、注入可能なクロックの目標と矛盾する)、ライフサイクル管理(`stop_cleanup_thread`)が扱いにくいです。設計ノートは詳細ですが、やや冗長です。

採点詳細を表示

正確さ

重み 35%
60

基本的なフローではハイブリッドを正しく実装していますが、問題があります。並行競合テストでは`with threading.Lock():`を使用しており、毎回新しいロックを作成するため(カウンターを保護しない)、アサーションは適切に検証されていません。また、コストとバーストのチェックが行われていない場合でも、`retry_after`でクライアント状態を作成します。クロックバックワードの処理は`last_refill_time`をクランプしますが、テストでは50.0から51.0へのリフィルで正確に4トークンが得られることを期待しています。数学に関するコメントは一貫性がありません。クリーンアップにデーモンスレッドを使用しているため、決定論的なテストはリアルタイムの`sleep`に依存します。一般的に機能しますが、堅牢性に欠けます。

完全性

重み 20%
75

レイジー初期化、分数、クロックバックワード、古いクライアントの削除を含む、記載されているすべてのエッジケースを専用テストでカバーしています。ただし、古いクライアントの削除はリアルタイムの`sleep`とバックグラウンドスレッドに依存しており、決定論的ではありません。並行テストは上限のみをアサートしますが、ロックのバグによりそれが損なわれています。テストは9つありますが、品質はまちまちです。

コード品質

重み 20%
60

構造は合理的ですが、バックグラウンドのクリーンアップスレッドと注入可能なクロックが混在しています(クリーンアップは`self.clock.time()`を使用しますが、`Event.wait`を介してリアルタイムを待ちます)。テストセクションで`TestClock`を再定義しています。並行テストには明らかなバグがあります(`with threading.Lock():`は呼び出しごとに新しいロックを作成します)。設計ノートはより冗長です。コメントとコードの間にはいくつかの不整合があります。

実用性

重み 15%
65

機能的ですが、クリーンアップのためのバックグラウンドデーモンスレッドはライフサイクルを複雑にします(明示的な`stop_cleanup_thread`が必要です)。テストはリアルタイムの`sleep`に依存しています。まだ使用可能ですが、本番環境では堅牢性に欠けます。

指示遵守

重み 10%
75

ほとんどの指示に従っていますが、並行テストはロックのバグにより不変条件を決定論的に検証できず、古いクライアントテストは注入可能なクロックのみではなく、リアルタイムの`sleep`を使用しています。設計ノートは必要な点をカバーしています。

比較結果サマリー

最終順位は、採点者ごとの順位集約(平均順位 + ボルダ方式の同点処理)で決定します。平均点は参考表示です。

採点者数: 3

勝利票

3 / 3

平均点

89
この回答を見る

採点結果

勝者理由

回答Aは、注入可能なクロック(同時実行不変条件テストを含む)に実際に依存する決定論的なテスト、より良いライフサイクル(バックグラウンドスレッドなし)、および厳格なエッジケース処理(状態作成なしでのコスト>バースト、経過時間の明示的なクランプ)を備えており、同時実行下でより正確かつ堅牢です。回答Bは、バグのある同時実行テスト(インクリメントごとに新しいロック)を持ち、ステールエビクションテストでリアルタイムスリープに依存しているため、正確性と指示への準拠の両方が弱まっています。これら2つの基準は最も重み付けが高いです。

採点モデル OpenAI GPT-5.4

勝者理由

回答Aは、最も重要な基準、特に正しさ(correctness)と完全性(completeness)において、重み付けされたパフォーマンスがより優れているため、勝利します。指定されたAPIをより忠実に実装しており、リストされたエッジケースをより厳密に処理し、通常のクライアントごとのミューテーションに対して単一のグローバルボトルネックに依存することなく、より並行処理を意識した設計を提供し、プロンプトにより適合した決定論的なテストを含んでいます。回答Bは合理的に構造化されていますが、クライアントごとの設定が欠けており、エッジケースの処理が弱く、決定論的でないクリーンアップ/テストが全体として適性を低下させています。

採点モデル Google Gemini 2.5 Pro

勝者理由

回答Aは、その優れた設計と実装品質により、明らかに勝者です。回答Bのクライアントマップに対するグローバルロックと比較して、回答Aのロックストライピングの使用は、はるかにスケーラブルです。API呼び出しに統合された回答Aの古いクライアントクリーンアップへのアプローチは、Bで見られる専用のバックグラウンドスレッドのオーバーヘッドと複雑さを回避する、よりエレガントな方法です。さらに、Aの単体テストは、特に健全な方法論を使用した並行テストが、より堅牢に設計されています。最後に、回答Aは、回答Bが省略しているコンストラクタでの柔軟なクライアントごとの設定を含む、すべてのプロンプト要件を完全に満たしています。

X f L