import time
import threading
import collections
import heapq
import uuid
from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Any, Optional
# --- 設定 ---
# 各ティアのレート制限とウィンドウサイズを定義します。
# window_seconds が小さいほど、レート制限は厳しくなります。
TIER_CONFIG = {
"free": {"max_requests": 5, "window_seconds": 60},
"standard": {"max_requests": 50, "window_seconds": 60},
"premium": {"max_requests": 200, "window_seconds": 60},
}
# アクティビティがない場合にクライアントデータを削除するまでの保持期間。
CLEANUP_THRESHOLD_SECONDS = 300
# --- データ...
全文を表示 ▼
import time
import threading
import collections
import heapq
import uuid
from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Any, Optional
# --- 設定 ---
# 各ティアのレート制限とウィンドウサイズを定義します。
# window_seconds が小さいほど、レート制限は厳しくなります。
TIER_CONFIG = {
"free": {"max_requests": 5, "window_seconds": 60},
"standard": {"max_requests": 50, "window_seconds": 60},
"premium": {"max_requests": 200, "window_seconds": 60},
}
# アクティビティがない場合にクライアントデータを削除するまでの保持期間。
CLEANUP_THRESHOLD_SECONDS = 300
# --- データ構造 ---
@dataclass(order=True)
class Request:
"""優先度付きリクエストを表します。優先度キュー用です。"""
# heapq が優先度でソートできるように、優先度を最初の要素として使用します。
# 数値が小さいほど優先度が高くなります。
priority: int
# 優先度のタイブレークと安定ソートを保証するために、一意のIDを使用します。
request_id: str = field(default_factory=lambda: str(uuid.uuid4()))
client_id: str
timestamp: float
payload: Any
def __lt__(self, other):
"""heapq のカスタム比較: 優先度でソートし、次に request_id で安定ソートします。"""
if self.priority != other.priority:
return self.priority < other.priority
return self.request_id < other.request_id
class SlidingWindowCounter:
"""スライディングウィンドウカウンターをレート制限のために実装します。"""
def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window_seconds = window_seconds
# ウィンドウ内のリクエストのタイムスタンプを格納するために deque を使用します。
# deque は両端からの追加と削除が効率的です。
self.request_timestamps: collections.deque[float] = collections.deque()
# request_timestamps へのスレッドセーフなアクセス用のロック。
self.lock = threading.Lock()
def record_request(self) -> bool:
"""リクエストを記録し、制限内であれば True を、そうでなければ False を返します。"""
with self.lock:
current_time = time.time()
# ウィンドウより古いタイムスタンプを削除します。
while self.request_timestamps and self.request_timestamps[0] <= current_time - self.window_seconds:
self.request_timestamps.popleft()
# 新しいリクエストを追加すると制限を超えるかどうかを確認します。
if len(self.request_timestamps) < self.max_requests:
self.request_timestamps.append(current_time)
return True
else:
return False
def get_current_count(self) -> int:
"""ウィンドウ内の現在リクエスト数を返します。"""
with self.lock:
current_time = time.time()
# カウントを返す前に古いタイムスタンプをクリーンアップします。
while self.request_timestamps and self.request_timestamps[0] <= current_time - self.window_seconds:
self.request_timestamps.popleft()
return len(self.request_timestamps)
class RateLimiter:
"""複数のティア、スライディングウィンドウ、優先度付きキューを備えたスレッドセーフなレートリミッターです。"""
def __init__(self, tier_config: Dict[str, Dict[str, Any]], cleanup_threshold_seconds: float):
self.tier_config = tier_config
self.cleanup_threshold_seconds = cleanup_threshold_seconds
# 各ティアのリクエストキュー(遅延リクエスト用)を格納します。
self.tier_queues: Dict[str, List[Request]] = collections.defaultdict(list)
# クリーンアップ用の各クライアントの最終アクティビティタイムスタンプを格納します。
self.client_last_activity: Dict[str, float] = {}
# 各クライアントに割り当てられたティアを格納します。
self.client_tiers: Dict[str, str] = {}
# スライディングウィンドウカウンターを格納します。
self.tier_counters: Dict[str, SlidingWindowCounter] = {}
# クライアントティア、クライアント最終アクティビティ、ティアキューなどの共有データ構造を保護するためのメインロック。
# SlidingWindowCounter は独自の内部ロックを持っています。
self.global_lock = threading.Lock()
# ティアカウンターを初期化します。
for tier_name, config in tier_config.items():
self.tier_counters[tier_name] = SlidingWindowCounter(
max_requests=config["max_requests"],
window_seconds=config["window_seconds"]
)
# クリーンアップ用のバックグラウンドスレッドを開始します。
self.cleanup_thread = threading.Thread(target=self._cleanup_task, daemon=True)
self.cleanup_thread.start()
def register_client(self, client_id: str, tier: str) -> bool:
"""クライアントを指定されたティアで登録します。成功した場合は True を、クライアントが既に存在するかティアが無効な場合は False を返します。"""
if tier not in self.tier_config:
print(f"[!] エラー: ティア '{tier}' は存在しません。")
return False
with self.global_lock:
if client_id in self.client_tiers:
print(f"[!] 警告: クライアント '{client_id}' は既に登録されています。")
return False
self.client_tiers[client_id] = tier
self.client_last_activity[client_id] = time.time()
print(f"[*] クライアント '{client_id}' がティア '{tier}' で登録されました。")
return True
def _get_client_tier(self, client_id: str) -> Optional[str]:
"""指定された client_id のティアを安全に取得します。"""
with self.global_lock:
return self.client_tiers.get(client_id)
def _update_client_activity(self, client_id: str):
"""クライアントの最終アクティビティタイムスタンプを更新します。"""
with self.global_lock:
self.client_last_activity[client_id] = time.time()
def allow_request(self, client_id: str) -> bool:
"""クライアントのティアのレート制限に基づいて、client_id からのリクエストが許可されるかどうかを確認します。"""
tier = self._get_client_tier(client_id)
if not tier:
print(f"[!] エラー: クライアント '{client_id}' は登録されていません。")
return False
counter = self.tier_counters.get(tier)
if not counter:
# tier_config が有効で初期化が正しければ、これは発生しないはずです。
print(f"[!] 内部エラー: ティア '{tier}' のカウンターが見つかりません。")
return False
if counter.record_request():
self._update_client_activity(client_id)
# print(f"[*] リクエスト許可: クライアント '{client_id}' (ティア: {tier})。現在のカウント: {counter.get_current_count()}")
return True
else:
# print(f"[*] リクエスト拒否: クライアント '{client_id}' (ティア: {tier})。現在のカウント: {counter.get_current_count()}")
return False
def enqueue_request(self, client_id: str, priority: int, payload: Any):
"""レート制限されたリクエストを適切な優先度付きキューにエンキューします。"""
tier = self._get_client_tier(client_id)
if not tier:
print(f"[!] エラー: クライアント '{client_id}' は登録されていません。エンキューできません。")
return
request = Request(priority=priority, client_id=client_id, timestamp=time.time(), payload=payload)
with self.global_lock:
heapq.heappush(self.tier_queues[tier], request)
print(f"[*] リクエストエンキュー: クライアント '{client_id}' (ティア: {tier}, 優先度: {priority})。キューサイズ: {len(self.tier_queues[tier])}")
self._update_client_activity(client_id)
def dequeue_and_process(self, client_id: str) -> Optional[Tuple[Request, bool]]:
"""クライアントのティアで利用可能な容量がある場合、最優先度リクエストをデキューして処理しようとします。"""
tier = self._get_client_tier(client_id)
if not tier:
print(f"[!] エラー: クライアント '{client_id}' は登録されていません。デキューできません。")
return None
counter = self.tier_counters.get(tier)
if not counter:
print(f"[!] 内部エラー: ティア '{tier}' のカウンターが見つかりません。")
return None
# デキューを試みる前に、容量が利用可能かどうかを確認します。
# 容量がない場合に不要なロックとヒープ操作を防ぎます。
if counter.get_current_count() < counter.max_requests:
with self.global_lock:
queue = self.tier_queues[tier]
if queue:
# ロック内で容量を再確認します。競合状態を処理するためです。
# 他のスレッドがちょうど容量を使い切った可能性があります。
if counter.get_current_count() < counter.max_requests:
highest_priority_request = heapq.heappop(queue)
# リクエストを再度記録しようとします。容量があれば成功するはずです。
if counter.record_request():
self._update_client_activity(highest_priority_request.client_id)
print(f"[*] リクエストデキューと処理完了: クライアント '{highest_priority_request.client_id}' (ティア: {tier}, 優先度: {highest_priority_request.priority})。キューサイズ: {len(queue)}")
return highest_priority_request, True
else:
# これはまれな競合状態です: 容量は利用可能でしたが、record_request が失敗しました。
# リクエストを戻し、エラーを記録します。
heapq.heappush(queue, highest_priority_request)
print(f"[!] 競合状態: {highest_priority_request.client_id} のデキューされたリクエストの記録に失敗しました。戻しました。")
return highest_priority_request, False
else:
# 外側のチェックと内側のチェックの間に、別のスレッドが容量を占有しました。
# ポップされた場合、リクエストをキューに戻します。
heapq.heappush(queue, highest_priority_request)
# print(f"[*] {client_id} のデキュー試行失敗: 容量が他のスレッドによって満たされました。")
return highest_priority_request, False
else:
# キューは空です。
# print(f"[*] {client_id} のデキュー試行: キューは空です。")
return None, False
else:
# 容量が利用できません。
# print(f"[*] {client_id} のデキュー試行: 容量が利用できません。")
return None, False
def _cleanup_task(self):
"""非アクティブなクライアントデータを削除するバックグラウンドタスクです。"""
while True:
time.sleep(self.cleanup_threshold_seconds / 2) # 定期的にチェック
current_time = time.time()
clients_to_remove = []
with self.global_lock:
for client_id, last_activity in self.client_last_activity.items():
if current_time - last_activity > self.cleanup_threshold_seconds:
clients_to_remove.append(client_id)
for client_id in clients_to_remove:
tier = self.client_tiers.pop(client_id, None)
if tier:
self.client_last_activity.pop(client_id, None)
# 必要に応じて、このクライアントのキューをクリアすることもできますが、通常は行われません。
# 簡単のため、エンキューされたリクエストはそのままにしておきます。
print(f"[*] 非アクティブなクライアント '{client_id}' をクリーンアップしました。")
# --- デモンストレーションスクリプト ---
def simulate_requests(rate_limiter: RateLimiter, client_id: str, num_requests: int, delay: float, priority_offset: int = 0):
"""単一クライアントからのリクエストのバーストをシミュレートします。"""
print(f"--- クライアント '{client_id}' から {num_requests} 件のリクエストをシミュレート中 ---")
for i in range(num_requests):
# エンキューされたリクエストの優先度を変化させます
priority = i % 5 + priority_offset # 数値が小さいほど優先度が高い
payload = f"data_{client_id}_{i}"
if rate_limiter.allow_request(client_id):
print(f"[スレッド {threading.current_thread().name}] リクエスト {i+1}/{num_requests} 許可: '{client_id}'。ペイロード: {payload}")
else:
print(f"[スレッド {threading.current_thread().name}] リクエスト {i+1}/{num_requests} 拒否: '{client_id}'。優先度 {priority} でエンキューします。")
rate_limiter.enqueue_request(client_id, priority, payload)
time.sleep(delay)
def process_deferred_requests(rate_limiter: RateLimiter, client_id: str, num_attempts: int):
"""クライアントの遅延リクエストを処理しようと定期的に試みます。"""
print(f"--- クライアント '{client_id}' の遅延リクエストの処理を試行中 ---")
for i in range(num_attempts):
time.sleep(1.5) # 容量が解放されるのを少し待ちます
print(f"[プロセッサースレッド] '{client_id}' のデキュー試行 {i+1}/{num_attempts}...")
result = rate_limiter.dequeue_and_process(client_id)
if result:
request, success = result
if success:
print(f"[プロセッサースレッド] 正常に処理されました: {request.payload} (優先度: {request.priority})")
else:
print(f"[プロセッサースレッド] 処理に失敗しました (容量の問題?): {request.payload} (優先度: {request.priority})。再エンキューしました。")
else:
print(f"[プロセッサースレッド] '{client_id}' の遅延リクエストはこの試行では処理されませんでした (キュー空または容量なし)。")
if __name__ == "__main__":
print("--- レートリミッターデモンストレーション ---")
# レートリミッターを初期化します
limiter = RateLimiter(tier_config=TIER_CONFIG, cleanup_threshold_seconds=CLEANUP_THRESHOLD_SECONDS)
# クライアントを登録します
clients = {
"client_A": "free",
"client_B": "standard",
"client_C": "premium",
"client_D": "free",
"client_E": "standard",
}
for cid, tier in clients.items():
limiter.register_client(cid, tier)
print("\n--- 同時リクエストのシミュレーション ---")
# 複数のスレッドからリクエストのバーストをシミュレートします
threads = []
# クライアント A (無料ティア) - レート制限に達する可能性が高い
t1 = threading.Thread(target=simulate_requests, args=(limiter, "client_A", 15, 0.1, 0), name="Thread-A")
threads.append(t1)
# クライアント B (標準ティア) - より多くの容量
t2 = threading.Thread(target=simulate_requests, args=(limiter, "client_B", 60, 0.05, 1), name="Thread-B")
threads.append(t2)
# クライアント C (プレミアムティア) - 最も高い容量
t3 = threading.Thread(target=simulate_requests, args=(limiter, "client_C", 250, 0.01, 2), name="Thread-C")
threads.append(t3)
# クライアント D (無料ティア) - 同じ制限に達する別のクライアント
t4 = threading.Thread(target=simulate_requests, args=(limiter, "client_D", 10, 0.15, 3), name="Thread-D")
threads.append(t4)
# シミュレーションスレッドを開始します
for t in threads:
t.start()
# 初期リクエストとエンキューに時間を置きます
time.sleep(2)
print("\n--- 遅延リクエストの処理を試行中 ---")
# レート制限された可能性のあるクライアントの遅延リクエストを処理するスレッドを開始します
# クライアント A と D (無料ティア) について複数回試行します。
processor_threads = []
tp1 = threading.Thread(target=process_deferred_requests, args=(limiter, "client_A", 5), name="Processor-A")
processor_threads.append(tp1)
tp2 = threading.Thread(target=process_deferred_requests, args=(limiter, "client_D", 5), name="Processor-D")
processor_threads.append(tp2)
# クライアント B についても試行します。
tp3 = threading.Thread(target=process_deferred_requests, args=(limiter, "client_B", 3), name="Processor-B")
processor_threads.append(tp3)
for tp in processor_threads:
tp.start()
# すべてのシミュレーションスレッドが完了するのを待ちます
for t in threads:
t.join()
# プロセッサースレッドが試行を完了するのを待ちます
for tp in processor_threads:
tp.join()
print("\n--- デモンストレーション完了 ---")
print("最終状態チェック (カウントはタイミングにより概算です):")
for tier_name, counter in limiter.tier_counters.items():
print(f"ティア '{tier_name}': 現在のウィンドウ内のリクエスト数: {counter.get_current_count()} (最大: {counter.max_requests})")
for tier_name, queue in limiter.tier_queues.items():
print(f"ティア '{tier_name}': キュー内の遅延リクエスト数: {len(queue)}")
# --- 設計上の選択に関する説明 ---
# 1. スライディングウィンドウの実装:
# - ウィンドウ内のリクエストのタイムスタンプを格納するために `collections.deque` を使用します。
# - `deque` は、新しいタイムスタンプの追加と古いタイムスタンプの左からの削除に O(1) の計算量を提供します。
# - `record_request` が呼び出されると、まず `current_time - window_seconds` より古いすべてのタイムスタンプを削除します。
# - これにより、`len(self.request_timestamps)` が現在のスライディングウィンドウ内のリクエスト数を正確に反映することが保証されます。
# - トレードオフ: このアプローチは正確ですが、多数のリクエストが短時間で到着し、その後停止した場合、`record_request` で複数のポップが発生する可能性があります。ただし、時間の経過とともに deque のサイズは管理されます。
# - スレッドセーフティ: 各 `SlidingWindowCounter` インスタンスは、`request_timestamps` deque を保護するための独自の `threading.Lock` を持ち、同じティアに対する `record_request` または `get_current_count` の同時呼び出しがシリアル化されることを保証します。
# 2. 複数のティア:
# - `RateLimiter` クラスは、ティア名をレート制限パラメータ (`max_requests`, `window_seconds`) にマッピングする辞書 `tier_config` を保持します。
# - また、各ティアに対して個別の `SlidingWindowCounter` インスタンスと優先度付きキュー (`tier_queues`) を保持します。
# - クライアントティアの割り当ては `client_tiers` に格納されます。
# - これにより、異なるクライアントタイプ間で柔軟な設定とレート制限の分離が可能になります。
# 3. 遅延リクエスト用の優先度付きキュー:
# - 各ティアについて、標準の Python リストが `heapq` モジュールによって管理される最小ヒープとして使用されます。
# - `Request` dataclass は、`priority` をプライマリソートキーとして設計されています (値が小さいほど優先度が高い)。
# - 一意の `request_id` が含まれており、安定ソート (同じ優先度の場合は FIFO) を保証し、タイムスタンプが同一の場合の比較エラーを防ぎます。
# - `enqueue_request` は `heapq.heappush` を使用してリクエストを追加し、ヒーププロパティを維持します。
# - `dequeue_and_process` は `heapq.heappop` を使用して最優先度リクエストを取得します。
# - 重要なのは、`dequeue_and_process` はヒープからポップしようとする前に、まず利用可能な容量を確認することです。容量が利用可能な場合、グローバルロックを取得し、容量を再確認し (競合状態を処理するため)、リクエストをポップし、`record_request` を再度呼び出します。`record_request` が成功すると、リクエストが処理されます。失敗した場合 (ちょうど容量が埋められた競合状態のため)、リクエストはヒープに戻されます。
# - トレードオフ: Python のリスト + heapq を使用することは、適度な数の遅延リクエストに対して効率的です。非常に高いボリュームの場合、より専門的な並列優先度キュー実装が検討されるかもしれませんが、それは複雑さを増します。
# 4. スレッドセーフティ:
# - `SlidingWindowCounter`: 内部の `threading.Lock` を使用して `request_timestamps` deque を保護します。
# - `RateLimiter`:
# - `global_lock` (`threading.Lock`): 共有クライアント状態 (`client_tiers`, `client_last_activity`) とティア優先度キュー (`tier_queues`) を保護します。このロックは、`register_client`、`enqueue_request`、および `dequeue_and_process` 内でキューにアクセス/変更する操作のために取得されます。
# - `allow_request`: スレッドセーフな `counter.record_request()` を呼び出します。また、グローバルロックの下でクライアントアクティビティを更新します。
# - `dequeue_and_process`: ロックを慎重に管理します。まずグローバルロックなしで容量を確認します。容量が利用可能である可能性がある場合、グローバルロックを取得し、容量を再確認し、ヒープ操作を実行し、次にスレッドセーフな `counter.record_request()` を再度呼び出します。
# - トレードオフ: 複数のロック (カウンターごとに 1 つ + グローバルロック 1 つ) の使用は、すべての操作に対して単一のグローバルロックを使用する場合と比較して、競合を減らすことを目的としています。ただし、クライアント状態とティア状態にまたがる操作 (デキューなど) では、慎重なロック順序付けまたはグローバルロックの取得が必要です。
# 5. クリーンアップ:
# - バックグラウンドスレッド (`_cleanup_task`) が `client_last_activity` を定期的にスキャンします。
# - 最終アクティビティタイムスタンプが `cleanup_threshold_seconds` より古いクライアントを特定します。
# - これらのクライアントは、`global_lock` の下で `client_tiers` と `client_last_activity` から削除されます。
# - これにより、`RateLimiter` が非アクティブなクライアントの状態を無限に蓄積するのを防ぎます。
# - トレードオフ: クリーンアップは即時ではありません。定期的に実行されます。`cleanup_threshold_seconds` は、メモリ使用量とクリーンアップの応答性の間のトレードオフを決定します。
# 6. エッジケースの処理:
# - クライアントの重複登録: `register_client` は False を返し、警告をログに記録します。
# - 未登録クライアント: `allow_request` および `enqueue_request` は `client_tiers` をチェックし、エラーを返します/ログに記録します。
# - 空の優先度付きキュー: `dequeue_and_process` は、`self.tier_queues[tier]` が空の場合のケースを処理します。
# - 同時変更: 適切な `threading.Lock` の使用によって対処されます。
# - 時計の精度: `time.time()` が使用されますが、これは一般的に十分です。非常に高い精度が要求される場合は、プラットフォーム固有の単調クロックが検討されるかもしれませんが、`time.time()` はこの種のアプリケーションでは標準です。
# - デキュー時の競合状態: 容量の再確認と、`dequeue_and_process` のクリティカルセクション内でのリクエストの再記録によって処理されます。
判定
勝利票
1 / 3
平均スコア
総合点
総評
回答Aは、5つの機能すべて、優れたドキュメント、デモ用スクリプトを備えた完全な実装を提供しています。しかし、重大な設計上の欠陥があります。スライディングウィンドウカウンターがクライアントごとではなくティアごとに共有されているため、同じティアのすべてのクライアントが単一のリクエストカウントを共有します。これにより、クライアントA(無料ティア)が5つのリクエストを使用した場合、クライアントD(同じく無料ティア)は独自のクォータを持つはずなのにブロックされてしまいます。これは根本的な正しさの問題です。優先度付きキューもクライアントごとではなくティアごとに存在するため、特定のクライアントのdequeue_and_processが別のクライアントのリクエストを処理する可能性があります。dequeue_and_processメソッドには、else句の`if queue:`ブロックの外で`highest_priority_request`を参照するバグがあり、UnboundLocalErrorが発生します。また、`time.monotonic()`の代わりに`time.time()`を使用しているため、システムクロックの変更に対して脆弱です。デモ用スクリプトは、合理的なデモ時間内にスライディングウィンドウが実際に機能していることを確認するのが難しい、大きなティア設定(60秒ウィンドウ)を使用しています。コメントは豊富ですが、やや冗長です。
採点詳細を表示 ▼
正確さ
重み 35%回答Aには重大な正しさの欠陥があります。スライディングウィンドウカウンターがクライアントごとではなくティアごとであることです。ティアを共有するすべてのクライアントが1つのカウンターを共有するため、クライアントAのリクエストがクライアントDのクォータを消費します。dequeue_and_processメソッドには、潜在的なUnboundLocalErrorがあります(else節で定義されていない可能性のあるhighest_priority_requestを参照しています)。time.monotonic()の代わりにtime.time()を使用しており、クロックジャンプに対して脆弱です。スライディングウィンドウ自体(タイムスタンプのdeque)は単独では正しく実装されていますが、誤った粒度で適用されています。
完全性
重み 20%回答Aは、スライディングウィンドウ、複数のティア、優先度付きキュー、スレッドセーフ、クリーンアップ(バックグラウンドスレッド経由)の5つの機能をすべて実装しています。しかし、クライアントごとではなくティアごとのレート制限により、スライディングウィンドウ機能が意図したとおりに機能しません。デモ用スクリプトが存在し、複数のスレッドを使用しています。クリーンアップはバックグラウンドデーモンスレッドとして実行されます。必要なすべてのコンポーネント(Requestデータクラス、RateLimiterクラス、デモ)が存在します。
コード品質
重み 20%回答Aは、個別のSlidingWindowCounterクラスを備えた合理的な構造を持っていますが、カウンターをティアごとに共有するというアーキテクチャ上の決定は設計上の欠陥です。Requestデータクラスは、order=Trueと__lt__の両方を定義しており、冗長です。global_lockは広範囲に使用されており、並行性の利点を低下させています。コメントは豊富ですが冗長です。コードには、コメントアウトされたprint文がいくつか残っています。dequeue_and_processメソッドは、ネストされた条件と潜在的なバグを備えた複雑なものです。
実用性
重み 15%回答Aのティアごとのレート制限は、各クライアントが独立したレート制限を持つべき実際のユースケースには実用的ではありません。デモでは60秒のウィンドウを使用しているため、合理的な時間でスライディングウィンドウの動作を観察するのが困難です。バックグラウンドクリーンアップスレッドは良い点ですが、コア機能が不正確であるため、実用的な価値が大幅に制限されます。
指示遵守
重み 10%回答Aはほとんどの指示に従っています。RateLimiterクラス、Requestデータクラスを実装し、エッジケース(重複登録、未登録クライアント)を処理し、複数のスレッドを使用したデモを提供します。しかし、クライアントごとではなくティアごとのレート制限は、「クライアントは登録時にティアが割り当てられる」という要件を、解釈を誤っている可能性があります。ティアは制限を定義しますが、各クライアントは個別に追跡されるべきです。デモでは、リクエストが許可され、エンキューされる様子が示されています。設計上の選択はコメントで説明されています。
総合点
総評
dequeを使用したワーキングスライディングウィンドウリミッターを実装し、heapqベースのティアごとの遅延キュー、マルチスレッドデモ、および詳細な設計コメントを備えています。しかし、クライアントごとのレート制限のコア設計は誤っています。スライディングウィンドウカウンターはティアごと(同じティアのすべてのクライアントで共有)であるため、あるクライアントのトラフィックが他のクライアントに影響を与え、これは典型的な「クライアントはティアに割り当てられる」というセマンティクスや、プロンプトのクライアントごとの処理の期待に違反します。また、競合状態/ロジックの問題もあります。enqueue_requestはglobal_lockを保持したまま_update_client_activityを呼び出しますが、_update_client_activityはglobal_lockを再取得します(デッドロック)。dequeue_and_processには、内部チェック後に容量が利用できなくなった場合のhighest_priority_requestを参照するバグパスがあります(変数が未定義の可能性があります)。クリーンアップはクライアント登録を削除しますが、キューに入れられたリクエストは(クライアントティアのルックアップが失敗するため)デキューできなくなり、残ります。
採点詳細を表示 ▼
正確さ
重み 35%スライディングウィンドウロジック自体は問題ありませんが、クライアントごとではなくティアごとに実装されているため、同じティアのクライアントが互いにスロットリングします。enqueue_requestはデッドロックします(global_lockが保持されたまま_update_client_activityがglobal_lockを再取得します)。dequeue_and_processには、あるブランチに未定義変数バグがあり、クリーンアップはクライアント登録を削除することでキューに入れられたリクエストを孤立させる可能性があります。
完全性
重み 20%RateLimiter、Request dataclass、マルチティア設定、ティアごとの優先度付きキュー、バックグラウンドクリーンアップスレッド、マルチスレッドデモが含まれています。しかし、主要な操作は(デッドロックにより)不完全であり、クリーンアップは期限切れの追跡だけでなくクライアントを削除するため、遅延アイテムに一貫性がなくなります。
コード品質
重み 20%可読性が高く、コメントも豊富ですが、深刻な構造上の問題があります。ネストされたロッキングによるデッドロック、一貫性のないデキュー戻り値(None vs (None, False))、未初期化変数に参照するバグのあるブランチがあります。また、dataclass(order=True)とカスタム__lt__が不必要に混在しています。
実用性
重み 15%実際にはデッドロックによりハングする可能性があり、ティア全体の制限はクライアントベースのレート制限には不向きです。クリーンアップは、削除されたクライアントに対して遅延キューを使用不能にする可能性があります。デモは豊富ですが、確実に完了しない可能性があります。
指示遵守
重み 10%ほとんどの指示(スライディングウィンドウ、ティア、優先度付きキュー、スレッドセーフ性の主張、デモ、コメント)に従っていますが、クライアントベースの制限に関する主要な意図に違反しており、エッジケースの処理が壊れる可能性があります(デッドロック、孤立したキューに入れられたリクエスト)。
総合点
総評
回答Aは、同時実行レートリミッターの非常に強力で、正確かつ実用的な実装を提供しています。特に`SlidingWindowCounter`を独自のクラスに分離した設計上の選択や、共有ティアキューからの遅延リクエストを処理するための効率的なロジックは堅実です。スレッドセーフの実装は堅牢で、dequeueメソッドにおける競合状態の処理も慎重に行われています。自動化されたバックグラウンドクリーンアップスレッドと包括的なデモスクリプトの組み込みは、その品質をさらに向上させています。クライアントデータとキューに単一のグローバルロックを使用している点はわずかなトレードオフですが、全体的なソリューションはよく設計されており、本番環境での使用に適しています。
採点詳細を表示 ▼
正確さ
重み 35%すべての機能にわたって実装は正確です。スライディングウィンドウロジックは標準的で効果的です。優先度付きキューとdequeueロジックは特にうまく処理されており、競合状態に対する堅牢なチェックも含まれています。全体的なシステムは、同時実行下で期待どおりに動作します。
完全性
重み 20%要求された5つの機能はすべて完全に実装されています。このソリューションは、クリーンアップメカニズムを自律的なバックグラウンドスレッドとして実装することで、さらに一歩進んでおり、単純な手動メソッドよりも完全で堅牢なソリューションとなっています。
コード品質
重み 20%関心の分離(例:`SlidingWindowCounter`クラス)が優れており、コード品質は高いです。APIはクリーンで直感的です。キューに単一のグローバルロックを使用している点はわずかな簡略化ですが、正しく実装されています。コードはよくコメントされており、読みやすいです。
実用性
重み 15%このソリューションは非常に実用的です。設計はスケーラブルで効率的であり、最小限の変更で本番環境で使用できる可能性があります。自動化されたクリーンアップと堅牢なdequeueロジックにより、信頼性の高いコンポーネントとなっています。
指示遵守
重み 10%回答は、プロンプトのすべての指示に完全に準拠しています。要求されたすべての機能を実装し、`Request`データクラスを提供し、包括的なデモスクリプトを含み、設計上の選択に関する詳細な説明を提供しています。