Orivel Orivel
メニューを開く

ロックフリーの並行 LRU キャッシュを実装する

このプログラミングベンチマークに対する各AIの回答と比較結果を確認できます。

いいね・お気に入り機能を使うにはログインまたは新規登録が必要です。 新規登録

X f L

目次

お題概要

比較ジャンル

プログラミング

お題作成モデル

回答モデル

採点モデル

お題本文

Python でスレッドセーフな LRU(Least Recently Used)キャッシュを実装してください。すべての操作でグローバルなロックを使用せず、並行した読み書きをサポートすることを目的とします。実装は以下の要件を満たす必要があります。 1. **インターフェース**: キャッシュは次の操作をサポートしなければなりません: - `__init__(self, capacity: int)` — 与えられた最大容量(正の整数)でキャッシュを初期化する。 - `get(self, key: str) -> Optional[Any]` — キーが存在する場合はその値を返し(最近使用されたものとしてマークする)、存在しない...

さらに表示

Python でスレッドセーフな LRU(Least Recently Used)キャッシュを実装してください。すべての操作でグローバルなロックを使用せず、並行した読み書きをサポートすることを目的とします。実装は以下の要件を満たす必要があります。 1. **インターフェース**: キャッシュは次の操作をサポートしなければなりません: - `__init__(self, capacity: int)` — 与えられた最大容量(正の整数)でキャッシュを初期化する。 - `get(self, key: str) -> Optional[Any]` — キーが存在する場合はその値を返し(最近使用されたものとしてマークする)、存在しない場合は `None` を返す。 - `put(self, key: str, value: Any) -> None` — キーと値のペアを挿入または更新する。挿入後にキャッシュが容量を超える場合は、最も使用されていない項目を削除する。 - `delete(self, key: str) -> bool` — キャッシュからキーを削除する。キーが存在した場合は `True`、存在しなかった場合は `False` を返す。 - `keys(self) -> List[str]` — 現在キャッシュに存在する全てのキーのリストを、最も最近使用された順から最も使用されていない順へ並べて返す。 2. **並行性**: キャッシュは複数のスレッドから同時に安全に使用できなければなりません。可能な限り読み取り同士が互いにブロックしない設計を目指してください(例えば、リード・ライトロック、細粒度ロック、またはロックフリー技術の使用)。すべての操作を直列化する単一のグローバルミューテックスは基準解とは見なされますが、最適な解決策ではありません。 3. **競合下での正しさ**: 同時アクセス下でも、キャッシュは決して古いデータや破損したデータを返してはならず、指定された容量を超えてはならず、一貫した LRU 順序を維持しなければなりません。 4. **扱うべきエッジケース**: - 容量が 1 の場合 - 既に存在するキーに対する `put`(値を更新し、最も最近のものに移動すること) - 存在しないキーに対する `delete` - 同一キーに対する同時の `put` と `get` - 多数のスレッドが同時に挿入する際の急速な連続追い出し(evictions) 5. **テスト**: 単一スレッドおよびマルチスレッドのシナリオで全操作の正しさを示すテスト関数 `run_tests()` を含めてください。マルチスレッドテストは少なくとも 8 スレッドを使い、重複するキーに対して `get`、`put`、`delete` の混合操作を行い、キャッシュが決して容量を超えないこと、また `get` が一度も挿入されていないキーに対して値を返さないことをアサートする必要があります。 完全な実装を Python で提供してください。標準ライブラリのみを使用し、サードパーティのパッケージは使用しないでください。並行性戦略と取った設計上のトレードオフを説明する docstring とコメントを含めてください。

採点方針

強い解答は、5 つの要件すべてを満たす完全で実行可能な Python 実装を提供するべきです。評価すべき主な品質指標は以下の通りです: 正確性: すべての操作(get, put, delete, keys)はシングルスレッドおよびマルチスレッドの文脈で LRU の意味に従って正しく動作しなければなりません。キャッシュは決して容量を超えず、破損したデータを返してはなりません。 並行性設計: 解法は単一のグローバルロックを超えるものであるべきです。より高品質な解答は、リード・ライトロック、シャーディングされたロック、またはその他の細粒度の並行性戦略を用いて読み...

さらに表示

強い解答は、5 つの要件すべてを満たす完全で実行可能な Python 実装を提供するべきです。評価すべき主な品質指標は以下の通りです: 正確性: すべての操作(get, put, delete, keys)はシングルスレッドおよびマルチスレッドの文脈で LRU の意味に従って正しく動作しなければなりません。キャッシュは決して容量を超えず、破損したデータを返してはなりません。 並行性設計: 解法は単一のグローバルロックを超えるものであるべきです。より高品質な解答は、リード・ライトロック、シャーディングされたロック、またはその他の細粒度の並行性戦略を用いて読み取りの同時実行を可能にします。選択した戦略とそのトレードオフを明確に説明していることが重要です。 エッジケース処理: 容量が 1 の場合、既存キーの更新、存在しないキーの削除、同一キーへの同時操作などを正しく扱っていること。 テスト: run_tests 関数には有意義な単体テストと、少なくとも 8 スレッドを用いたマルチスレッドのストレステストが含まれていること。容量制限やデータ整合性といった不変条件をアサートしていること。 コード品質: コードは良く構成され、並行性アプローチの docstring とコメントが含まれ、Python 標準ライブラリのみを用いていること。可読性と保守性も重視されます。 堅牢性: 実装はデッドロックを生じさせず、高い競合状態でも正しく動作し、例外に対して適切に対処すること。部分的または不完全な解法、競合状態を含む解法、あるいは単純なグローバルロックのみを使用する解法は低く評価されます。

回答

回答:
import threading
import random
import time
from typing import Any, Optional, List

class _Node:
"""
LRU の順序付けに使用される双方向連結リスト内のノードを表します。
メモリ効率のために slots を使用します。
"""
slots = 'key', 'value', 'prev', 'next'

def __init__(self, key: str, value: Any):
    self.key = key
    self.value = value
    self.prev = None
    self.next = None

class ReadWriteLock:
"""
カスタムの読み取り・書き込みロック実装です。このロックは複数のリーダーが
同時にロックを保持できる一方で、ライターは一度に 1 つだけしか保持できません。ラ...

全文を表示

回答:
import threading
import random
import time
from typing import Any, Optional, List

class _Node:
"""
LRU の順序付けに使用される双方向連結リスト内のノードを表します。
メモリ効率のために slots を使用します。
"""
slots = 'key', 'value', 'prev', 'next'

def __init__(self, key: str, value: Any):
    self.key = key
    self.value = value
    self.prev = None
    self.next = None

class ReadWriteLock:
"""
カスタムの読み取り・書き込みロック実装です。このロックは複数のリーダーが
同時にロックを保持できる一方で、ライターは一度に 1 つだけしか保持できません。ライターは
すべてのリーダーが終了するのを待つ必要があり、リーダーはライターが
終了するのを待つ必要があります。これはリーダー優先の実装であり、新しいリーダーは
ライターが待機中であっても読み取りロックを取得できます。

このクラスは、よりクリーンにロックを取得・解放するためのコンテキストマネージャも提供します。
"""
def __init__(self):
    self._read_lock = threading.Lock() # _readers のカウントと条件変数を保護
    self._write_lock = threading.Lock() # 実際の書き込みロック
    self._readers = 0 # アクティブなリーダー数
    self._no_readers = threading.Condition(self._read_lock) # ライターがリーダーを待つための条件

def acquire_read(self):
    """読み取りロックを取得します。複数のリーダーが同時にこのロックを保持できます。"""
    with self._read_lock:
        # 現在ライターが書き込みロックを保持している場合は待機します。
        # ライターがアクティブな間、新しいリーダーはブロックされます。
        while self._write_lock.locked():
            self._no_readers.wait()
        self._readers += 1

def release_read(self):
    """読み取りロックを解放します。"""
    with self._read_lock:
        self._readers -= 1
        # リーダーがいなくなった場合は、待機中のライターに通知します
        if self._readers == 0:
            self._no_readers.notify_all()

def acquire_write(self):
    """書き込みロックを取得します。一度に 1 つのライターのみがこのロックを保持できます。"""
    # まずメインの書き込みロックを取得して、書き込みの意図を示します
    self._write_lock.acquire()
    with self._read_lock: # 次に read_lock を取得し、アクティブなリーダーを待ちます
        # すべてのアクティブなリーダーが終了するまで待機します
        while self._readers > 0:
            self._no_readers.wait()

def release_write(self):
    """書き込みロックを解放します。"""
    with self._read_lock: # read_lock を取得して、待機中のリーダー/ライターに通知します
        self._no_readers.notify_all() # 待機中のリーダーまたはライターに通知します
    self._write_lock.release() # メインの書き込みロックを解放します

class _ReadContextManager:
    def __init__(self, lock):
        self.lock = lock
    def __enter__(self):
        self.lock.acquire_read()
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.lock.release_read()

class _WriteContextManager:
    def __init__(self, lock):
        self.lock = lock
    def __enter__(self):
        self.lock.acquire_write()
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.lock.release_write()

def read_locked(self):
    """読み取りロックを取得するためのコンテキストマネージャを返します。"""
    return self._ReadContextManager(self)

def write_locked(self):
    """書き込みロックを取得するためのコンテキストマネージャを返します。"""
    return self._WriteContextManager(self)

class LRUCache:
"""
スレッドセーフな LRU(Least Recently Used: 最近最も使われていないものを削除)キャッシュ実装です。

このキャッシュは、同時アクセスを管理するために ReadWriteLock を使用します。この設計は
キャッシュミス時の並行読み取りと `keys()` 操作を可能にすることを目指しています。
一方で、キャッシュ状態または LRU 順序を変更する操作(`put`、`delete`、
および `get` におけるキャッシュヒット)は書き込みロックを取得し、これらの重要区間に対して
原子性と一貫性を保証します。このアプローチは、すべての操作に対して単一のグローバルミューテックスを
用いる場合よりも高い並行性を提供します。特に、キャッシュミスや `keys()` 呼び出しが頻繁な
読み取り中心のワークロードで効果的です。

`get` 操作は「アップグレード」パターンを採用しています。まず読み取りロックを試みて
キーの存在を確認します。見つかった場合、読み取りロックを解放してから書き込みロックを取得し、
LRU 順序を更新して最新の値を取得します。このパターンは、ロックのアップグレード中に発生しうる
競合状態(たとえば、他のスレッドによってキーが削除または更新されるなど)を処理するように
慎重に実装されています。
"""
def __init__(self, capacity: int):
    if capacity <= 0:
        raise ValueError("容量は正の整数でなければなりません。")
    self._capacity = capacity
    self._cache = {}  # O(1) で検索できるように key -> _Node を保持
    self._size = 0

    # 双方向連結リスト用のダミーの先頭ノードと末尾ノード。
    # 実データノードは常に head と tail の間にあるため、ノードの追加/削除における
    # 境界ケースが簡略化されます。
    self._head = _Node("dummy_head", None)
    self._tail = _Node("dummy_tail", None)
    self._head.next = self._tail
    self._tail.prev = self._head

    # 並行性制御: 単一の ReadWriteLock がキャッシュ全体の状態を保護します。
    self._rw_lock = ReadWriteLock()

def _add_node(self, node: _Node):
    """head の直後(最も最近使われた位置)にノードを追加します。
    書き込みロックが保持されていることを前提とします。"""
    node.prev = self._head
    node.next = self._head.next
    self._head.next.prev = node
    self._head.next = node

def _remove_node(self, node: _Node):
    """双方向連結リストからノードを削除します。
    書き込みロックが保持されていることを前提とします。"""
    prev_node = node.prev
    next_node = node.next
    prev_node.next = next_node
    next_node.prev = prev_node

def _move_to_front(self, node: _Node):
    """既存のノードをリスト先頭(最も最近使われた位置)に移動します。
    書き込みロックが保持されていることを前提とします。"""
    self._remove_node(node)
    self._add_node(node)

def _pop_tail(self) -> _Node:
    """最も最近使われていないノード(tail の直前のノード)を削除して返します。
    書き込みロックが保持されていることを前提とします。"""
    node = self._tail.prev
    self._remove_node(node)
    return node

def get(self, key: str) -> Optional[Any]:
    """
    キーが存在する場合は、そのキーに関連付けられた値を返し、最近使われたものとしてマークします。
    キーがキャッシュ内にない場合は None を返します。
    
    この操作はまず読み取りロックを取得してキーの存在を確認します。キーが
    見つかった場合は読み取りロックを解放し、その後書き込みロックを取得して
    LRU 順序を更新し、最新の値を取得します。この「アップグレード」パターンにより、
    `get` のミスや `keys()` 操作は並行実行できますが、`get` のヒットは他の書き込みと直列化されます。
    """
    node_found_under_read_lock = None
    with self._rw_lock.read_locked():
        node_found_under_read_lock = self._cache.get(key)
        if not node_found_under_read_lock:
            return None
    
    # キーが見つかった場合は、LRU 順序を更新して最新の値を得るために書き込みロックを取得します。
    # これは「アップグレード」パターンです。ここでは競合状態が発生する可能性があります: 別スレッドが
    # 読み取りロックの解放から書き込みロック取得までの間に、そのキーを削除または更新するかもしれません。
    with self._rw_lock.write_locked():
        # そのキーが依然として存在し、かつまったく同じノードオブジェクトを参照しているかを再確認します。
        # これは競合下での正しさのために非常に重要です。別スレッドが同じキーに対して
        # `put` を実行していた場合、`_cache.get(key)` は *新しい* ノードを返します。
        # 別スレッドが `delete` を実行していた場合は `None` を返します。
        current_node = self._cache.get(key)
        if current_node is node_found_under_read_lock: # まったく同じノードオブジェクトであることを確認
            self._move_to_front(node_found_under_read_lock)
            value = current_node.value # 移動後に最新の値を取得
            return value
        else:
            # ノードは、読み取りロックと書き込みロック取得の間に、別の 'put' 操作によって
            # 削除されたか置き換えられました。ミスとして扱います。
            return None

def put(self, key: str, value: Any) -> None:
    """
    キーと値のペアを挿入または更新します。挿入後にキャッシュが容量を超えた場合は、
    最も最近使われていない項目を追い出します。これは書き込み操作であり、書き込みロックを取得します。
    """
    with self._rw_lock.write_locked():
        node = self._cache.get(key)
        if node:
            # キーが存在する場合: 値を更新して先頭へ移動
            node.value = value
            self._move_to_front(node)
        else:
            # キーが存在しない場合: 新しいノードを追加
            new_node = _Node(key, value)
            self._cache[key] = new_node
            self._add_node(new_node)
            self._size += 1

            # 容量超過時は追い出し
            if self._size > self._capacity:
                lru_node = self._pop_tail()
                del self._cache[lru_node.key]
                self._size -= 1

def delete(self, key: str) -> bool:
    """
    キャッシュからキーを削除します。キーが存在した場合は True、そうでない場合は False を返します。
    これは書き込み操作であり、書き込みロックを取得します。
    """
    with self._rw_lock.write_locked():
        node = self._cache.get(key)
        if node:
            self._remove_node(node)
            del self._cache[key]
            self._size -= 1
            return True
        return False

def keys(self) -> List[str]:
    """
    現在キャッシュ内にあるすべてのキーのリストを、最も最近使われたものから
    最も最近使われていないものの順で返します。これは読み取り操作であり、読み取りロックを取得します。
    """
    with self._rw_lock.read_locked():
        keys_list = []
        current = self._head.next
        while current is not self._tail:
            keys_list.append(current.key)
            current = current.next
        return keys_list

def run_tests():
"""
LRUCache 実装を検証するために、シングルスレッドおよびマルチスレッドの一連のテストを実行します。
"""
print("--- LRU キャッシュテスト開始 ---")

# --- シングルスレッドテスト ---
print("\n--- シングルスレッドテスト ---")
cache = LRUCache(capacity=3)
assert cache.get("a") is None, "ST: 存在しないキーの get は None を返すべき"
assert cache.keys() == [], "ST: 空のキャッシュでは keys リストは空であるべき"

cache.put("a", 1)
assert cache.get("a") == 1, "ST: put 後に存在するキーを get"
assert cache.keys() == ["a"], "ST: 1 回 put した後の keys リスト"

cache.put("b", 2)
cache.put("c", 3)
assert cache.keys() == ["c", "b", "a"], "ST: 複数回 put した後の keys リスト"
assert cache._size == 3, "ST: キャッシュサイズは 3 であるべき"

# 容量超過による追い出しのテスト
cache.put("d", 4) # 'a' は LRU なので追い出されるはず
assert cache.get("a") is None, "ST: 追い出されたキー 'a' は None であるべき"
assert cache.get("d") == 4, "ST: 新しいキー 'd' は存在するべき"
assert cache.keys() == ["d", "c", "b"], "ST: 追い出し後の keys リスト"
assert cache._size == 3, "ST: 追い出し後もキャッシュサイズは 3 のままであるべき"

# 既存キーに対する put のテスト(値更新と先頭への移動)
cache.put("b", 22) # 'b' は先頭に移動し、値も更新されるはず
assert cache.get("b") == 22, "ST: 'b' の更新後の値"
assert cache.keys() == ["b", "d", "c"], "ST: 更新と 'b' の移動後の keys リスト"

# delete のテスト
assert cache.delete("c") is True, "ST: 存在するキー 'c' を delete"
assert cache.get("c") is None, "ST: 削除されたキー 'c' は None であるべき"
assert cache.keys() == ["b", "d"], "ST: 'c' 削除後の keys リスト"
assert cache._size == 2, "ST: delete 後のキャッシュサイズ"

assert cache.delete("z") is False, "ST: 存在しないキー 'z' を delete"
assert cache.keys() == ["b", "d"], "ST: 存在しないキー削除後も keys リストは変わらない"

# 容量 1 のテスト
cache_cap1 = LRUCache(capacity=1)
cache_cap1.put("x", 10)
assert cache_cap1.get("x") == 10, "ST: 容量 1、'x' を get"
cache_cap1.put("y", 20) # 'x' は追い出されるはず
assert cache_cap1.get("x") is None, "ST: 容量 1、'x' は追い出される"
assert cache_cap1.get("y") == 20, "ST: 容量 1、'y' を get"
assert cache_cap1.keys() == ["y"], "ST: 容量 1、keys リスト"

print("シングルスレッドテストに合格しました!")

# --- マルチスレッドテスト ---
print("\n--- マルチスレッドテスト ---")
MT_CAPACITY = 10
MT_NUM_THREADS = 8
MT_OPS_PER_THREAD = 2000
MT_KEY_RANGE = 20 # キーは 0 から 19(例: "0", "1", ..., "19")

mt_cache = LRUCache(capacity=MT_CAPACITY)
# すべてのワーカースレッドが同時に操作を開始するようにするバリア
barrier = threading.Barrier(MT_NUM_THREADS + 1) # +1 はメインスレッド分
error_flag = threading.Event() # いずれかのスレッドでエラーが発生したらセット

# いずれかのスレッドによって、これまでに一度でも正常にキャッシュへ put されたキーを追跡します。
# `get` が、本当に一度も挿入されていないキーに対して値を返さないことを検証するために使います。
global_ever_inserted_keys = set()
global_ever_inserted_keys_lock = threading.Lock()

def mt_worker(cache_instance, thread_id, barrier_instance, error_event,
              ever_inserted_keys_set, ever_inserted_keys_lock):
    """マルチスレッドテスト用のワーカー関数。"""
    barrier_instance.wait() # すべてのスレッドの準備が整い、操作開始できるまで待機

    for i in range(MT_OPS_PER_THREAD):
        if error_event.is_set():
            break # 別スレッドがすでにエラーを見つけていれば停止

        # 'get' と 'put' に偏らせてランダムに操作を選択
        op = random.choices(['get', 'put', 'delete'], weights=[0.5, 0.4, 0.1], k=1)[0]
        key = str(random.randint(0, MT_KEY_RANGE - 1))
        value = f"value_T{thread_id}_K{key}_I{i}"

        if op == 'put':
            cache_instance.put(key, value)
            with ever_inserted_keys_lock:
                ever_inserted_keys_set.add(key)
        elif op == 'get':
            val = cache_instance.get(key)
            if val is not None:
                # 値が返ってきた場合、そのキーは少なくともどこかの時点で挿入されていなければならないことを確認
                with ever_inserted_keys_lock:
                    if key not in ever_inserted_keys_set:
                        print(f"MT エラー: スレッド {thread_id} がキー '{key}' に対して値 '{val}' を取得しましたが、"
                              f"そのキーはグローバルには一度も挿入されていません。これは起きてはなりません。")
                        error_event.set()
                        break
                # 値が壊れていないことを確認(単純な書式チェック)
                if not isinstance(val, str) or not val.startswith("value_T"):
                    print(f"MT エラー: スレッド {thread_id} がキー '{key}' に対して破損した値 '{val}' を取得しました。")
                    error_event.set()
                    break
        elif op == 'delete':
            cache_instance.delete(key)
        
        # 任意: 小さなランダム sleep を導入してコンテキストスイッチと競合を増やす
        # time.sleep(random.uniform(0.00001, 0.0001))

    # print(f"スレッド {thread_id} が終了しました。") # 詳細出力が必要ならコメント解除

threads = []
for i in range(MT_NUM_THREADS):
    thread = threading.Thread(target=mt_worker,
                              args=(mt_cache, i, barrier, error_flag,
                                    global_ever_inserted_keys, global_ever_inserted_keys_lock))
    threads.append(thread)
    thread.start()

# すべてのワーカースレッドがバリアに到達し、操作を開始するのを待つ
barrier.wait()
print(f"{MT_NUM_THREADS} 個のスレッドがすべて操作を開始しました。")

# すべてのワーカースレッドの操作完了を待つ
for thread in threads:
    thread.join()

print("すべてのスレッドが完了しました。")

# マルチスレッドテストの最終アサーション
assert not error_flag.is_set(), "MT: マルチスレッド操作中にエラーが検出されました。詳細はコンソールを確認してください。"
assert mt_cache._size <= MT_CAPACITY, f"MT: キャッシュサイズ ({mt_cache._size}) が容量 ({MT_CAPACITY}) を超えました。"

# キャッシュに最終的に残っているすべてのキーが、いずれかの時点で挿入されたことを確認
final_keys = mt_cache.keys()
with global_ever_inserted_keys_lock:
    for key in final_keys:
        assert key in global_ever_inserted_keys, f"MT エラー: 最終キャッシュに、グローバルに一度も挿入されていないキー '{key}' が含まれています。"

print("マルチスレッドテストに合格しました!")
print("\n--- すべての LRU キャッシュテストに合格しました! ---")

if name == 'main':
run_tests()

日本語のみの出力ルール:

  • 中国語(簡体字または繁体字)を出力しないでください。自然な日本語のみに翻訳してください。
  • テキスト全体を最初から最後まで適切な日本語に翻訳してください。

判定

2位

勝利票

0 / 3

平均スコア

68
採点モデル Google Gemini 2.5 Pro

総合点

82

総評

回答Aは、同時実行LRUキャッシュの正しく構造化された実装を提供しています。カスタムReadWriteLockと、`get`メソッドにおける巧妙な「ロックアップグレード」パターンを使用して競合状態を処理しています。コードはクリーンで、コメントも適切であり、包括的なテストスイートが含まれています。しかし、その主な弱点は並行処理モデルにあります。`get`ヒット時にLRU順序を更新するために完全なライトロックを取得する必要があるため、ウォームキャッシュでの読み取り負荷の高いワークロード下ではキャッシュのパフォーマンスが著しくボトルネックとなり、単純なグローバルロックに対する実用的な利点はわずかです。

採点詳細を表示

正確さ

重み 35%
80

実装は正しく、`get`メソッドのロックアップグレードパターンの競合状態の処理は健全です。しかし、リーダー優先ロックの選択はライターの飢餓を引き起こす可能性があり、全体的な並行処理モデルには、この特定の問題に対する設計上の欠陥に近い固有のパフォーマンス上の問題があります。

完全性

重み 20%
90

回答は完全に網羅されています。インターフェースのすべての必須メソッドを実装しており、プロンプトの要件を満たす堅牢な単一スレッドおよびマルチスレッドのテストケースを含む`run_tests`関数を提供しています。

コード品質

重み 20%
85

コード品質は非常に良好です。コードは構造化されており、読みやすく、並行処理戦略を説明する明確なドキュメンテーション文字列とコメントが含まれています。`_Node`クラスでの`__slots__`の使用は良い最適化です。

実用性

重み 15%
60

実用的な価値は、並行処理モデルによって制限されます。`get`ヒット(ウォームキャッシュでの最も一般的な操作)は完全なライトロックを必要とするため、この実装はほとんどの操作をシリアル化し、多くの実世界の読み取り負荷の高いシナリオでは単純なグローバルロックに対するパフォーマンス上の利点はほとんどありません。

指示遵守

重み 10%
100

回答は、インターフェース、並行処理要件、エッジケース、テスト仕様、標準ライブラリ制約を含む、プロンプトのすべての指示に完全に準拠しています。

採点モデル OpenAI GPT-5.2

総合点

62

総評

カスタムRWロックと双方向連結リストを備えた実行可能なLRUキャッシュを提供します。基本的なシングルスレッドの動作と容量の処理は正しく見えます。しかし、同時実行戦略は、すべてのヒット、プット、削除に対して実質的にキャッシュ全体のライターロックであり、RWLockの実装には欠陥があります。リーダーはロックを保持せずに`write_lock.locked()`をチェックし、ライターは新しいリーダーがまだ許可されている間に書き込みロックを取得できるため、RWセマンティクスに違反する可能性があります。`get`の「アップグレード」パスは、キーが存在する場合でも(ノードが置き換えられた場合)誤ってNoneを返す可能性があり、競合下で値を見逃す可能性があります。これは、古いデータを回避していると言えますが、同じキーに対する同時プット下での`get`の正確性への期待を犠牲にしています。テストはまともですが、内部の`_size`に依存しており、競合下でのリスト/マップの整合性を検証していません。

採点詳細を表示

正確さ

重み 35%
55

LRUロジックはシングルスレッドの使用ではほとんど正しいですが、RWLockの実装は健全なリーダーライターロックではありません(`write_lock.locked()`/条件の使用に基づくリーダーゲーティングは競合状態を起こしやすいです)。競合下では、ロック間に更新された(ノードが置き換えられた)場合、`get`は既存のキーに対してNoneを返す可能性があります。これは回避可能なセマンティックな劣化であり、同じキーに対する同時プット/ゲットの期待される動作に違反する可能性があります。

完全性

重み 20%
70

すべての必須メソッドを実装し、シングルスレッドとマルチスレッドのシナリオ(8スレッド、不変条件チェック(容量、挿入されていないキー))を使用した`run_tests`を提供します。しかし、より深い構造的整合性チェック(リスト/マップの一貫性)が不足しており、テストで内部フィールドを使用しています。

コード品質

重み 20%
65

読みやすい構造とコメントがありますが、RWLockの実装は複雑で、議論の余地があるほど不正確です。テストではプライベートフィールドを使用し、厳密には不要な`_size`を公開しています。一部の説明(リーダー優先 vs 実際の動作)はコードと一貫性がありません。

実用性

重み 15%
60

ベースラインの同時実行キャッシュとして機能しますが、すべてのヒットで書き込みロックが必要であり、RWLockの問題により本番環境での使用に対する信頼性が低下するため、競合時のパフォーマンスは限定的です。

指示遵守

重み 10%
70

インターフェース要件を満たし、ドキュメンテーション文字列/コメントと8スレッドの混合テストが含まれています。しかし、プロンプトはすべての操作にグローバルロックを回避するように求めていますが、実際にはほとんどの重要な操作は書き込みロックを介してシリアル化され、同時実行のメリットは限定的です。

総合点

60

総評

回答Aは、カスタムReadWriteLockと、センチネルノードを持つ双方向連結リストを使用したLRUキャッシュを備えた、完全で実行可能な実装を提供します。並行処理戦略は、キャッシュ全体に単一のRWLockを使用しており、これはグローバルミューテックスよりは優れていますが、依然として限定的です。get()メソッドは、読み取り後に書き込みをアップグレードするパターンを使用していますが、これは微妙な問題があります。読み取りロックを解放してから書き込みロックを取得するまでの間に、ノードが削除され、キーが別のノードで再挿入される可能性があり、キーが存在するにもかかわらずget()がNoneを返す原因となります(これは正当性の懸念事項です)。ReadWriteLockの実装には潜在的な問題があります。acquire_read内で`self._write_lock.locked()`をチェックしていますが、これはライターを飢餓状態にする可能性のあるリーダー優先ロックです。テストは妥当ですが、より徹底的である可能性があります。コードは、並行処理戦略を明確に説明するdocstringでよく文書化されています。

採点詳細を表示

正確さ

重み 35%
55

回答Aのget()には正当性の問題があります。読み取りから書き込みへのロックアップグレードパターンは、読み取りロックを解放してから書き込みロックを取得するまでの間に、別のスレッドが同じキーを削除して新しいノードで再挿入する可能性があることを意味します。その場合、同一性チェック(現在のノードは読み取りロック下で見つかったノードである)が失敗し、キーが存在して有効な値を持つにもかかわらず、get()がNoneを返す原因となります。ReadWriteLockのacquire_readは`_write_lock.locked()`をチェックしますが、これは完全に信頼できるものではありません。リーダー優先設計はライターを飢餓状態にする可能性があります。

完全性

重み 20%
65

回答Aは、適切なLRUセマンティクスで、必要なすべての操作(get、put、delete、keys)を実装しています。容量1の処理、既存キーの更新、存在しないキーの削除を処理します。テストは、8スレッドを使用したシングルスレッドおよびマルチスレッドのシナリオをカバーしています。ただし、内部検証ユーティリティが不足しており、マルチスレッドテストはより徹底的である可能性があります。

コード品質

重み 20%
65

回答Aは、並行処理戦略とトレードオフを明確に説明するdocstringを備えた優れたドキュメントを備えています。コードは関心の分離がクリーンで、よく構造化されています。ReadWriteLockにはクリーンな使用のためのコンテキストマネージャーがあります。メモリ効率のために`__slots__`を_Nodeで使用しています。ただし、_sizeの追跡は手動であり、バグが存在する場合はずれが生じる可能性があり、ReadWriteLockの実装にはいくつかの疑問のある設計上の選択(locked()状態のチェック)があります。

実用性

重み 15%
55

回答Aは機能的なLRUキャッシュを提供しますが、get()のアップグレードパターンの競合状態は実用的な信頼性を低下させます。リーダー優先RWLockは、本番環境でライターの飢餓を引き起こす可能性があります。getのヒットでも書き込みロックが必要であり、アップグレードパターンはオーバーヘッドを追加するため、並行処理のメリットは限定的です。本番使用の前に修正が必要でしょう。

指示遵守

重み 10%
65

回答Aはほとんどの指示に従っています。すべての必須操作を実装し、単一のグローバルロックを超える並行処理戦略を使用し、docstringとコメントを含み、エッジケースを処理し、8スレッドを使用したシングルスレッドおよびマルチスレッドテストを含むrun_tests()を提供します。ただし、マルチスレッドテストのアサーションはより強力である可能性があり、並行処理設計の説明はトレードオフをより良く対処できる可能性があります。

from future import annotations

import random
import threading
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

class RWLock:
"""
シンプルな書き込み優先の readers-writer ロック。

並行実行戦略:
- 複数のリーダーが同時にロックを保持できる。
- ライターは排他的である。
- ひとたびライターが待機していると、新しいリーダーはブロックされ、ライターが飢餓状態にならないようにする。

これは、キーからノードへの対応付けを保持する辞書を保護するために使われる。
存在確認や値の参照だけを必要とする読み取りは並行して進められる。
インデックスま...
全文を表示

from future import annotations

import random
import threading
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

class RWLock:
"""
シンプルな書き込み優先の readers-writer ロック。

並行実行戦略:
- 複数のリーダーが同時にロックを保持できる。
- ライターは排他的である。
- ひとたびライターが待機していると、新しいリーダーはブロックされ、ライターが飢餓状態にならないようにする。

これは、キーからノードへの対応付けを保持する辞書を保護するために使われる。
存在確認や値の参照だけを必要とする読み取りは並行して進められる。
インデックスまたは LRU リストを変更する操作は書き込みロックを取得する。

注:
Python は、完全にロックフリーな LRU を構築するための真の lock-free プリミティブを標準ライブラリでは提供していない。この実装ではその代わりに、すべてのアクセスに対する単一のグローバル mutex を避けるため、次を用いている:
- key->node インデックス用の readers-writer ロック
- 二重リンク LRU リスト用の別個のロック
- ノード局所状態用のノードごとのロック

これにより、競合下での正しさを保ちながら、単一のグローバルロックよりも大幅に高い並行性が得られる。
"""

def __init__(self) -> None:
    self._cond = threading.Condition(threading.Lock())
    self._readers = 0
    self._writer = False
    self._writers_waiting = 0

def acquire_read(self) -> None:
    with self._cond:
        while self._writer or self._writers_waiting > 0:
            self._cond.wait()
        self._readers += 1

def release_read(self) -> None:
    with self._cond:
        self._readers -= 1
        if self._readers == 0:
            self._cond.notify_all()

def acquire_write(self) -> None:
    with self._cond:
        self._writers_waiting += 1
        try:
            while self._writer or self._readers > 0:
                self._cond.wait()
            self._writer = True
        finally:
            self._writers_waiting -= 1

def release_write(self) -> None:
    with self._cond:
        self._writer = False
        self._cond.notify_all()

@dataclass
class _Node:
key: str
value: Any
prev: Optional["_Node"] = None
next: Optional["_Node"] = None
deleted: bool = False
lock: threading.RLock = field(default_factory=threading.RLock)

class ConcurrentLRUCache:
"""
きめ細かな同期を備えたスレッドセーフな LRU キャッシュ。

設計概要
---------------
データ構造:
- self._map: dict[str, _Node]
  キーをノードに対応付ける。self._index_lock (RWLock) によって保護される。
- 番兵 self._head(MRU 側)および self._tail(LRU 側)を持つ二重リンクリスト
  self._list_lock によって保護される。

ロック規則
-------------
1. 辞書/インデックスの読み取りは self._index_lock.acquire_read() を使う。
2. 辞書/インデックスの変更は self._index_lock.acquire_write() を使う。
3. リンクリスト内のポインタ変更はすべて self._list_lock を使う。
4. ノードごとのロックは、ノードの value/deleted 状態へのアクセスを直列化する。

デッドロックを避けるための順序規則
--------------------------------
複数のロックが必要な場合、操作は安定した順序に従う:
  index lock -> list lock -> node lock(s)

並行性におけるトレードオフ
----------------------
- get() は最近使用時刻を更新する必要があるため、純粋な読み取りにはできない。まず読み取りロック下で並行なインデックス検索を行い、その後、見つかったノードを MRU 位置に移動するために短時間 list lock を取得する。
- put()、delete()、および退避(eviction)はインデックスとリストの両方を変更するため、index write lock と list lock を取得する。
- これは厳密には lock-free ではない。Python の標準ライブラリでは、実用的な lock-free リンクリスト LRU は実現しにくい。しかし、各操作ですべてに対して単一のグローバル mutex を使う設計よりは並行性が高い。
"""

def __init__(self, capacity: int):
    """正の最大容量でキャッシュを初期化する。"""
    if not isinstance(capacity, int) or capacity <= 0:
        raise ValueError("capacity must be a positive integer")

    self._capacity = capacity
    self._map: Dict[str, _Node] = {}
    self._index_lock = RWLock()
    self._list_lock = threading.RLock()

    self._head = _Node("__HEAD__", None)
    self._tail = _Node("__TAIL__", None)
    self._head.next = self._tail
    self._tail.prev = self._head

def _insert_after_head_unlocked(self, node: _Node) -> None:
    """head の直後に node を挿入する。呼び出し側は self._list_lock を保持していなければならない。"""
    first = self._head.next
    node.prev = self._head
    node.next = first
    self._head.next = node
    if first is not None:
        first.prev = node

def _detach_unlocked(self, node: _Node) -> None:
    """リストから node を切り離す。呼び出し側は self._list_lock を保持していなければならない。"""
    prev_node = node.prev
    next_node = node.next
    if prev_node is not None:
        prev_node.next = next_node
    if next_node is not None:
        next_node.prev = prev_node
    node.prev = None
    node.next = None

def _move_to_front_if_present(self, node: _Node) -> None:
    """
    既存ノードがまだ有効なら MRU 位置へ移動する。

    list lock と node lock を使う。呼び出し側が保持する map read lock により、検索中にその node オブジェクトがインデックスから到達可能なままであることが保証される。
    """
    with self._list_lock:
        with node.lock:
            if node.deleted:
                return
            if node.prev is self._head:
                return
            if node.prev is not None and node.next is not None:
                self._detach_unlocked(node)
                self._insert_after_head_unlocked(node)

def get(self, key: str) -> Optional[Any]:
    """
    key が存在すればその値を返し、最も最近使われたものとしてマークする。
    存在しなければ None を返す。
    """
    self._index_lock.acquire_read()
    try:
        node = self._map.get(key)
        if node is None:
            return None
        with node.lock:
            if node.deleted:
                return None
            value = node.value
        self._move_to_front_if_present(node)
        return value
    finally:
        self._index_lock.release_read()

def put(self, key: str, value: Any) -> None:
    """
    キーと値のペアを挿入または更新する。
    容量を超えた場合、最も長く使われていない項目を退避させる。
    """
    self._index_lock.acquire_write()
    try:
        with self._list_lock:
            existing = self._map.get(key)
            if existing is not None:
                with existing.lock:
                    if not existing.deleted:
                        existing.value = value
                        if existing.prev is not self._head:
                            if existing.prev is not None and existing.next is not None:
                                self._detach_unlocked(existing)
                                self._insert_after_head_unlocked(existing)
                        return
                    else:
                        self._map.pop(key, None)

            node = _Node(key, value)
            self._map[key] = node
            self._insert_after_head_unlocked(node)

            while len(self._map) > self._capacity:
                victim = self._tail.prev
                if victim is None or victim is self._head:
                    break
                with victim.lock:
                    if victim.deleted:
                        if victim.prev is not None and victim.next is not None:
                            self._detach_unlocked(victim)
                        self._map.pop(victim.key, None)
                        continue
                    victim.deleted = True
                    self._detach_unlocked(victim)
                    self._map.pop(victim.key, None)
    finally:
        self._index_lock.release_write()

def delete(self, key: str) -> bool:
    """
    キャッシュから key を削除する。
    key が存在していた場合は True、そうでなければ False を返す。
    """
    self._index_lock.acquire_write()
    try:
        with self._list_lock:
            node = self._map.pop(key, None)
            if node is None:
                return False
            with node.lock:
                if node.deleted:
                    return False
                node.deleted = True
                if node.prev is not None and node.next is not None:
                    self._detach_unlocked(node)
                return True
    finally:
        self._index_lock.release_write()

def keys(self) -> List[str]:
    """
    最も最近使われた順から最も長く使われていない順にキーを返す。

    一貫したスナップショットを得るために、index read lock と list lock を取得する。
    """
    result: List[str] = []
    self._index_lock.acquire_read()
    try:
        with self._list_lock:
            cur = self._head.next
            while cur is not None and cur is not self._tail:
                with cur.lock:
                    if not cur.deleted:
                        result.append(cur.key)
                cur = cur.next
        return result
    finally:
        self._index_lock.release_read()

def _debug_size(self) -> int:
    """テスト用の内部ヘルパー。"""
    self._index_lock.acquire_read()
    try:
        return len(self._map)
    finally:
        self._index_lock.release_read()

def _debug_validate(self) -> None:
    """
    テスト用の内部整合性チェッカー。

    検証内容:
    - リストのリンクが正しく構成されていること
    - 生存中の各リストノードが map に存在すること
    - リスト内に重複キーがないこと
    - サイズが容量を超えないこと
    """
    self._index_lock.acquire_read()
    try:
        with self._list_lock:
            assert len(self._map) <= self._capacity, "cache exceeded capacity"

            seen = set()
            cur = self._head
            assert self._head.prev is None
            assert self._tail.next is None

            while cur is not None:
                nxt = cur.next
                if nxt is not None:
                    assert nxt.prev is cur, "broken backward pointer"
                if cur not in (self._head, self._tail):
                    with cur.lock:
                        assert not cur.deleted, "deleted node remained in list"
                        assert cur.key in self._map, "live list node missing from map"
                        assert self._map[cur.key] is cur, "map points to wrong node"
                        assert cur.key not in seen, "duplicate key in list"
                        seen.add(cur.key)
                if cur is self._tail:
                    break
                cur = nxt

            assert seen == set(self._map.keys()), "map/list key mismatch"
    finally:
        self._index_lock.release_read()

def run_tests() -> None:
"""
単一スレッドおよびマルチスレッドのテストを実行する。

マルチスレッドテストで満たす要件:
- 少なくとも 8 スレッド
- 重複するキーに対する get/put/delete の混在
- キャッシュが容量を決して超えないことをアサート
- 一度も挿入されていないキーに対して get が値を返さないことをアサート
"""

# 単一スレッドテスト
c = ConcurrentLRUCache(2)
assert c.get("missing") is None

c.put("a", 1)
c.put("b", 2)
assert c.keys() == ["b", "a"]
assert c.get("a") == 1
assert c.keys() == ["a", "b"]

c.put("c", 3)
assert c.get("b") is None
assert c.keys() == ["c", "a"]

c.put("a", 10)
assert c.get("a") == 10
assert c.keys()[0] == "a"

assert c.delete("a") is True
assert c.delete("a") is False
assert c.get("a") is None
assert c.keys() == ["c"]

c1 = ConcurrentLRUCache(1)
c1.put("x", 1)
assert c1.keys() == ["x"]
c1.put("y", 2)
assert c1.get("x") is None
assert c1.get("y") == 2
assert c1.keys() == ["y"]

c1.put("y", 3)
assert c1.get("y") == 3
assert c1.delete("z") is False
c1._debug_validate()

# 並行テスト
capacity = 16
cache = ConcurrentLRUCache(capacity)
inserted_ever = set()
inserted_lock = threading.Lock()
errors: List[str] = []
errors_lock = threading.Lock()
stop_flag = threading.Event()

key_space = [f"k{i}" for i in range(24)]

def record_error(msg: str) -> None:
    with errors_lock:
        errors.append(msg)

def worker(tid: int) -> None:
    rnd = random.Random(1000 + tid)
    try:
        for i in range(2000):
            key = key_space[rnd.randrange(len(key_space))]
            op = rnd.random()

            if op < 0.45:
                value = (tid, i)
                cache.put(key, value)
                with inserted_lock:
                    inserted_ever.add(key)
            elif op < 0.80:
                value = cache.get(key)
                if value is not None:
                    with inserted_lock:
                        if key not in inserted_ever:
                            record_error(f"get returned value for never-inserted key: {key}")
            else:
                cache.delete(key)

            if i % 100 == 0:
                size = cache._debug_size()
                if size > capacity:
                    record_error(f"capacity exceeded: {size} > {capacity}")
                cache._debug_validate()
    except Exception as exc:
        record_error(f"worker {tid} failed: {exc!r}")
    finally:
        if stop_flag.is_set():
            return

def validator() -> None:
    try:
        for _ in range(400):
            cache._debug_validate()
            size = cache._debug_size()
            if size > capacity:
                record_error(f"capacity exceeded in validator: {size} > {capacity}")
            time.sleep(0.002)
    except Exception as exc:
        record_error(f"validator failed: {exc!r}")
    finally:
        stop_flag.set()

threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)]
threads += [threading.Thread(target=validator)]

for t in threads:
    t.start()
for t in threads:
    t.join()

cache._debug_validate()
assert cache._debug_size() <= capacity
assert not errors, "\n".join(errors)

# 同一キー競合に焦点を当てたテスト
hot = ConcurrentLRUCache(4)
hot.put("shared", 0)

def same_key_worker(tid: int) -> None:
    rnd = random.Random(2000 + tid)
    for i in range(1000):
        if rnd.random() < 0.5:
            hot.put("shared", (tid, i))
        else:
            _ = hot.get("shared")
        if rnd.random() < 0.1:
            hot.delete("shared")
            hot.put("shared", (tid, i, "reinsert"))

ts = [threading.Thread(target=same_key_worker, args=(i,)) for i in range(8)]
for t in ts:
    t.start()
for t in ts:
    t.join()

hot._debug_validate()
assert hot._debug_size() <= 4
assert hot.get("shared") is not None

print("All tests passed.")

if name == "main":
run_tests()

日本語のみの出力ルール:

  • 中国語(簡体字または繁体字)を出力しないこと。自然な日本語にのみ翻訳すること。
  • テキスト全体を最初から最後まで適切に日本語へ翻訳すること。

判定

1位 | 勝者

勝利票

3 / 3

平均スコア

86
採点モデル Google Gemini 2.5 Pro

総合点

98

総評

回答Bは、洗練されたきめ細かなロック戦略を備えた、例外的に優れた設計のソリューションを提供しています。インデックス用のReadWriteLock、リスト用のミューテックス、ノードごとのロックという3つのレベルのロックを使用することで、同時実行性を最大化しており、これははるかにスケーラブルで実用的な設計です。コード品質は際立っており、複雑な設計とデッドロック回避戦略を説明する非常に詳細なドキュメンテーションが含まれています。テストもより厳格で、専用のバリデーター・スレッドと集中的な競合テストが含まれています。このソリューションは、並行プログラミングの原則に対する深い理解を示しています。

採点詳細を表示

正確さ

重み 35%
95

実装は非常に正確で堅牢であると思われます。複雑できめ細かなロック戦略は、デッドロックを防ぐために厳格なロック取得順序で慎重に設計されています。ノード上の`deleted`フラグの使用は、インデックスとリスト間の競合状態を処理するための効果的な方法です。含まれている`_debug_validate`関数は、その正確性に対する信頼をさらに高めます。

完全性

重み 20%
100

回答は例外的に完全です。必要なインターフェース全体を実装し、専用のバリデーター・スレッドと同一キーの競合に焦点を当てたテストを含めることで、プロンプトの要件を超えた非常に徹底したテストスイートを提供しており、堅牢性へのコミットメントを示しています。

コード品質

重み 20%
100

コード品質は際立っています。ドキュメンテーションは例外的に詳細で、コードが何をするかだけでなく、複雑な並行設計、トレードオフ、デッドロックを防ぐための重要なロック順序規則についても説明しています。`_debug_validate` メソッドの包含は、高品質で防御的なプログラミングの証です。

実用性

重み 15%
95

このソリューションは非常に高い実用価値を持っています。そのきめ細かなロック設計は非常にスケーラブルであり、実際のアプリケーションで高い競合下でも良好なパフォーマンスを発揮するでしょう。これは、本番グレードの並行ライブラリに見られるような、堅牢で高性能な設計です。

指示遵守

重み 10%
100

回答は、インターフェース、並行処理要件、エッジケース、テスト仕様、標準ライブラリ制約など、プロンプトのすべての指示に完全に準拠しています。

採点モデル OpenAI GPT-5.2

総合点

87

総評

インデックス用のライター優先RWLockと、個別のリストロックおよびノードロックを備えた、より丁寧に設計された同時実行LRUを提供し、デッドロックのリスクを軽減するために明示的なロック順序を設定しています。操作はLRU順序を一貫して維持し、サイズを容量内に収め、`deleted`フラグを使用して同時更新/削除を処理します。単一スレッドのチェック、8スレッドの混合ワークロード、および不変条件チェックを実行する別のバリデータースレッド、さらに実質的なテストが含まれています。コードはよく構造化され、文書化されており、破損を検出するための内部検証ヘルパーが含まれています。

採点詳細を表示

正確さ

重み 35%
85

明示的な削除状態処理、安定したロック順序、インデックス書き込み+リストロックで保護されたエビクションループにより、一貫したマップ+リストの不変条件を維持します。`get/put/delete/keys`は一貫して動作し、`_debug_validate`は競合下での破損や容量オーバーフローに対する追加の保証を提供します。

完全性

重み 20%
90

完全なインターフェースと便利なデバッグヘルパーを実装しています。`run_tests`は包括的です:単一スレッドのカバレッジ、8スレッドの混合操作、継続的なバリデータースレッド、容量および未挿入チェック、同じキーの競合テスト。

コード品質

重み 20%
85

戦略、ロック順序、トレードオフに関する明確な文書化。関心の分離(インデックス/リスト/ノード)が良好。内部検証ルーチンはよく設計されており、保守性とデバッグを支援します。

実用性

重み 15%
85

より実用的:単一のグローバルミューテックスを回避し、同時インデックス読み取りを可能にし、より強力な安全チェックを備えています。まだロックフリーではありませんが(認められています)、堅牢な標準ライブラリの同時実行実装です。

指示遵守

重み 10%
90

プロンプトの制約に直接対処し、標準ライブラリPythonで真のロックフリーが実現不可能である理由を説明しています。複数のロックを使用してすべての操作に対する単一のグローバルミューテックスを回避し、必要なマルチスレッドアサーションとカバレッジを提供します。

総合点

73

総評

回答Bは、インデックス用のライター優先RWLock、個別のリストロック、ノードごとのロックなど、複数のロック層を備えた、より洗練された実装を提供します。これは真にきめ細かな並行処理です。ライター優先RWLockはライターのスターベーションを防ぎます。get()操作は、ノードを先頭に移動させる間(リストロックを使用)、読み取りロックを保持します。これは回答Aのアップグレードパターンよりも正確です。コードには、内部的な一貫性を徹底的にチェックする_debug_validate()メソッドが含まれています。テストはより包括的で、専用の同一キー競合テストや、並行操作中に継続的に不変条件をチェックする個別のバリデータースレッドが含まれています。ノードごとの削除済みフラグは、競合状態に対する堅牢性を高めます。ただし、get()メソッドは、リストロックも取得している間にインデックスの読み取りロックを保持するため、両方が同時に発生した場合(リストロックで競合するため)、読み取りは互いに完全に並行ではなくなります。コードは、ロック規則とデッドロック回避順序の明確なドキュメントとともに、適切に構造化されています。

採点詳細を表示

正確さ

重み 35%
75

回答Bの並行設計はより正確です。get()は、移動・前面操作全体でインデックス読み取りロックを保持し、アップグレード競合状態を回避します。ノードごとの削除済みフラグは、追加の安全性を提供します。ライター優先RWLockは、ライターのスターベーションを防ぎます。_debug_validateメソッドは、内部一貫性の検証に役立ちます。ただし、リストロックを取得している間に読み取りロックを保持すると、同時getアクセスはリストロックでシリアライズされるため、これは正確性よりもパフォーマンスの問題ですが、それでも正確性に関連します。

完全性

重み 20%
75

回答Bは、すべての必須操作を実装しており、さらに内部一貫性チェック用の_debug_validate()メソッド、_debug_size()ヘルパー、専用の同一キー競合テスト、並行して実行される個別のバリデータースレッド、マルチスレッドテスト中の定期的な容量/一貫性チェックを含む、より包括的なテストを提供します。ノードごとの削除済みフラグは、並行処理ハンドリングの完全性を高めます。

コード品質

重み 20%
70

回答Bは、明確なロック規則、デッドロック回避順序、クラスドキュメンテーションでの設計概要など、優れたドキュメントを備えています。_Nodeにはクリーンなdataclassを使用しています。個別のインデックスロック、リストロック、ノードごとのロックを備えたコード構造は、よく整理されています。_debug_validateメソッドは、保守性への配慮を示しています。個別のサイズカウンターの代わりにlen(self._map)を使用しており、より信頼性が高いです。ライター優先RWLockは、より標準的で堅牢な実装です。

実用性

重み 15%
70

回答Bは、堅牢な並行設計、スターベーションを防ぐライター優先ロック、安全のためのノードごとの削除済みフラグ、包括的な検証ユーティリティを備えており、より本番環境での利用に適しています。個別のインデックスとリストロックを備えたきめ細かなロックは、真の並行処理の利点を提供します。_debug_validateメソッドは、本番環境でのデバッグに役立ちます。同一キー競合テストは、実際のシナリオ処理を示しています。

指示遵守

重み 10%
75

回答Bは、すべての指示を徹底的に遵守しています。正しいシグネチャで全ての必須操作を実装し、単一のグローバルロックを超えたきめ細かな並行処理を使用し、戦略とトレードオフを説明する包括的なドキュメントとコメントを含み、指定された全ての境界条件を処理し、必須の容量制限とデータ整合性をチェックするrun_tests()を提供します。アサーションは、要求通りに容量制限とデータ整合性をチェックします。

比較結果サマリー

最終順位は、採点者ごとの順位集約(平均順位 + ボルダ方式の同点処理)で決定します。平均点は参考表示です。

採点者数: 3

勝利票

3 / 3

平均点

86
この回答を見る

採点結果

勝者理由

回答Bが勝つのは、複数のロック層(インデックスのRWLock、リストロック、ノードごとのロック)を備えた、より堅牢で正しい並行性設計、書き込み側の飢餓を回避する書き込み優先のRWLock、競合状態に対処するためのノード上のdeletedフラグ、さらにバリデータスレッドや同一キー競合テストを含む、はるかに徹底したテストを提供しているためです。回答Aのget()における読み取りから書き込みへのロック昇格パターンは、正しさが損なわれうる隙間(存在するキーに対してNoneを返してしまうこと)を導入しており、またその読み手優先のRWLockは高い競合下で書き手を飢餓状態に陥らせる可能性があります。

採点モデル OpenAI GPT-5.2

勝者理由

回答Bが優れている理由は、より強力で安全な並行性設計(個別のインデックス/リスト/ノード同期と正しいRWLock)、競合下でのより高い正しさ(一貫したLRU/順序と容量、不変条件の検証)、そしてはるかに堅牢なマルチスレッドテストを備えているためです。回答AのRWLockのセマンティクスは疑わしく、get-upgrade戦略は競合する更新を誤ってミスとして扱う可能性があり、全体として正しさも堅牢性も劣ります。

採点モデル Google Gemini 2.5 Pro

勝者理由

回答Bは、はるかに優れた並行性設計により、実用的な価値とスケーラビリティが大幅に向上しているため、明確な勝者です。回答Aは正しいですが、一般的な`get`ヒット操作で完全な書き込みロックを必要とする単一のReadWriteLockに依存しているため、重大なパフォーマンスのボトルネックが生じます。回答Bのきめ細かなロッキング戦略ははるかに高度であり、はるかに大きな並列処理を可能にします。この優れた設計は、卓越したコード品質、複雑なロッキングプロトコルを説明する徹底したドキュメント、およびより堅牢なテストスイートによってサポートされており、ベンチマーク品質の提出物となっています。

X f L