Answer A: Google Gemini 2.5 Flash
import threading
import random
import time
from typing import Any, Optional, List
class _Node:
"""
Represents a node in the doubly linked list used for LRU ordering.
Uses slots for memory efficiency.
"""
slots = 'key', 'value', 'prev', 'next'
def __init__(self, key: str, value: Any):
self.key = key
self.value = value
self.prev = None
self.next = None
class ReadWriteLock:
"""
A custom Read-Write Lock implementation. This lock allows multiple readers
to hold the lock concurrently, but only one writer at a time. A writer
must wait for...
Show Full Answer ▼
import threading
import random
import time
from typing import Any, Optional, List
class _Node:
"""
Represents a node in the doubly linked list used for LRU ordering.
Uses slots for memory efficiency.
"""
slots = 'key', 'value', 'prev', 'next'
def __init__(self, key: str, value: Any):
self.key = key
self.value = value
self.prev = None
self.next = None
class ReadWriteLock:
"""
A custom Read-Write Lock implementation. This lock allows multiple readers
to hold the lock concurrently, but only one writer at a time. A writer
must wait for all readers to finish, and readers must wait for a writer
to finish. This is a readers-preferring implementation, meaning new readers
can acquire the read lock even if a writer is waiting.
This class also provides context managers for cleaner lock acquisition and release.
"""
def __init__(self):
self._read_lock = threading.Lock() # Protects _readers count and condition variable
self._write_lock = threading.Lock() # The actual write lock
self._readers = 0 # Number of active readers
self._no_readers = threading.Condition(self._read_lock) # Condition for writers to wait for readers
def acquire_read(self):
"""Acquire a read lock. Multiple readers can hold this lock concurrently."""
with self._read_lock:
# Wait if a writer currently holds the write lock.
# New readers will block if a writer is active.
while self._write_lock.locked():
self._no_readers.wait()
self._readers += 1
def release_read(self):
"""Release a read lock."""
with self._read_lock:
self._readers -= 1
# If no readers left, notify any waiting writers
if self._readers == 0:
self._no_readers.notify_all()
def acquire_write(self):
"""Acquire a write lock. Only one writer can hold this lock at a time."""
# Acquire the main write lock first to signal intent to write
self._write_lock.acquire()
with self._read_lock: # Then acquire read_lock to wait for active readers
# Wait until all active readers have finished
while self._readers > 0:
self._no_readers.wait()
def release_write(self):
"""Release a write lock."""
with self._read_lock: # Acquire read_lock to notify potential readers/writers
self._no_readers.notify_all() # Notify any waiting readers or writers
self._write_lock.release() # Release the main write lock
class _ReadContextManager:
def __init__(self, lock):
self.lock = lock
def __enter__(self):
self.lock.acquire_read()
def __exit__(self, exc_type, exc_val, exc_tb):
self.lock.release_read()
class _WriteContextManager:
def __init__(self, lock):
self.lock = lock
def __enter__(self):
self.lock.acquire_write()
def __exit__(self, exc_type, exc_val, exc_tb):
self.lock.release_write()
def read_locked(self):
"""Returns a context manager for acquiring a read lock."""
return self._ReadContextManager(self)
def write_locked(self):
"""Returns a context manager for acquiring a write lock."""
return self._WriteContextManager(self)
class LRUCache:
"""
A thread-safe LRU (Least Recently Used) cache implementation.
This cache uses a ReadWriteLock to manage concurrent access. The design aims
to allow concurrent reads for cache misses and the `keys()` operation.
However, operations that modify the cache state or LRU order (`put`, `delete`,
and cache hits in `get`) acquire a write lock, ensuring atomicity and consistency
for these critical sections. This approach provides better concurrency than a
single global mutex for all operations, especially for read-heavy workloads
with frequent cache misses or `keys()` calls.
The `get` operation employs an "upgrade" pattern: it first attempts a read lock
to check for key existence. If found, it releases the read lock and acquires a
write lock to update the LRU order and retrieve the latest value. This pattern
is carefully implemented to handle race conditions that might occur during the
lock upgrade (e.g., key being deleted or updated by another thread).
"""
def __init__(self, capacity: int):
if capacity <= 0:
raise ValueError("Capacity must be a positive integer.")
self._capacity = capacity
self._cache = {} # Stores key -> _Node for O(1) lookup
self._size = 0
# Dummy head and tail nodes for the doubly linked list.
# This simplifies edge cases for adding/removing nodes, as actual data nodes
# are always between head and tail.
self._head = _Node("dummy_head", None)
self._tail = _Node("dummy_tail", None)
self._head.next = self._tail
self._tail.prev = self._head
# Concurrency control: A single ReadWriteLock protects the entire cache state.
self._rw_lock = ReadWriteLock()
def _add_node(self, node: _Node):
"""Adds a node right after the head (most recently used position).
Assumes write lock is held."""
node.prev = self._head
node.next = self._head.next
self._head.next.prev = node
self._head.next = node
def _remove_node(self, node: _Node):
"""Removes a node from the doubly linked list.
Assumes write lock is held."""
prev_node = node.prev
next_node = node.next
prev_node.next = next_node
next_node.prev = prev_node
def _move_to_front(self, node: _Node):
"""Moves an existing node to the front of the list (most recently used).
Assumes write lock is held."""
self._remove_node(node)
self._add_node(node)
def _pop_tail(self) -> _Node:
"""Removes and returns the least recently used node (the one just before the tail).
Assumes write lock is held."""
node = self._tail.prev
self._remove_node(node)
return node
def get(self, key: str) -> Optional[Any]:
"""
Returns the value associated with the key if it exists, and marks it as recently used.
Returns None if the key is not in the cache.
This operation first acquires a read lock to check for key existence. If the key
is found, it releases the read lock and then acquires a write lock to update the
LRU order and retrieve the latest value. This "upgrade" pattern allows concurrent
`get` misses and `keys()` operations, but serializes `get` hits with other writes.
"""
node_found_under_read_lock = None
with self._rw_lock.read_locked():
node_found_under_read_lock = self._cache.get(key)
if not node_found_under_read_lock:
return None
# If key found, acquire write lock to update LRU order and get the latest value.
# This is an "upgrade" pattern. A race condition can occur here: another thread
# might delete or update the key between releasing the read lock and acquiring the write lock.
with self._rw_lock.write_locked():
# Re-check if the key still exists and refers to the exact same node object.
# This is crucial for correctness under contention. If another thread performed
# a `put` for the same key, `_cache.get(key)` would return a *new* node.
# If another thread performed a `delete`, it would return `None`.
current_node = self._cache.get(key)
if current_node is node_found_under_read_lock: # Ensure it's the exact same node object
self._move_to_front(node_found_under_read_lock)
value = current_node.value # Get the latest value after moving
return value
else:
# The node was either deleted or replaced by another 'put' operation
# between our read-lock and write-lock acquisition. Treat as a miss.
return None
def put(self, key: str, value: Any) -> None:
"""
Inserts or updates the key-value pair. If the cache exceeds capacity after insertion,
evicts the least recently used item. This is a write operation and acquires a write lock.
"""
with self._rw_lock.write_locked():
node = self._cache.get(key)
if node:
# Key exists: update value and move to front
node.value = value
self._move_to_front(node)
else:
# Key does not exist: add new node
new_node = _Node(key, value)
self._cache[key] = new_node
self._add_node(new_node)
self._size += 1
# Evict if capacity exceeded
if self._size > self._capacity:
lru_node = self._pop_tail()
del self._cache[lru_node.key]
self._size -= 1
def delete(self, key: str) -> bool:
"""
Removes the key from the cache. Returns True if the key was present, False otherwise.
This is a write operation and acquires a write lock.
"""
with self._rw_lock.write_locked():
node = self._cache.get(key)
if node:
self._remove_node(node)
del self._cache[key]
self._size -= 1
return True
return False
def keys(self) -> List[str]:
"""
Returns a list of all keys currently in the cache, ordered from most recently used
to least recently used. This is a read operation and acquires a read lock.
"""
with self._rw_lock.read_locked():
keys_list = []
current = self._head.next
while current is not self._tail:
keys_list.append(current.key)
current = current.next
return keys_list
def run_tests():
"""
Runs a series of single-threaded and multi-threaded tests to verify the LRUCache implementation.
"""
print("--- Starting LRU Cache Tests ---")
# --- Single-threaded Tests ---
print("\n--- Single-threaded Tests ---")
cache = LRUCache(capacity=3)
assert cache.get("a") is None, "ST: Get non-existent key should return None"
assert cache.keys() == [], "ST: Empty cache should have empty keys list"
cache.put("a", 1)
assert cache.get("a") == 1, "ST: Get existing key after put"
assert cache.keys() == ["a"], "ST: Keys list after one put"
cache.put("b", 2)
cache.put("c", 3)
assert cache.keys() == ["c", "b", "a"], "ST: Keys list after multiple puts"
assert cache._size == 3, "ST: Cache size should be 3"
# Test capacity eviction
cache.put("d", 4) # 'a' should be evicted as it's LRU
assert cache.get("a") is None, "ST: Evicted key 'a' should be None"
assert cache.get("d") == 4, "ST: New key 'd' should be present"
assert cache.keys() == ["d", "c", "b"], "ST: Keys list after eviction"
assert cache._size == 3, "ST: Cache size should remain 3 after eviction"
# Test put with existing key (update value and move to front)
cache.put("b", 22) # 'b' should move to front, value updated
assert cache.get("b") == 22, "ST: Updated value for 'b'"
assert cache.keys() == ["b", "d", "c"], "ST: Keys list after update and move 'b'"
# Test delete
assert cache.delete("c") is True, "ST: Delete existing key 'c'"
assert cache.get("c") is None, "ST: Deleted key 'c' should be None"
assert cache.keys() == ["b", "d"], "ST: Keys list after deleting 'c'"
assert cache._size == 2, "ST: Cache size after delete"
assert cache.delete("z") is False, "ST: Delete non-existent key 'z'"
assert cache.keys() == ["b", "d"], "ST: Keys list unchanged after deleting non-existent key"
# Test capacity 1
cache_cap1 = LRUCache(capacity=1)
cache_cap1.put("x", 10)
assert cache_cap1.get("x") == 10, "ST: Capacity 1, get 'x'"
cache_cap1.put("y", 20) # 'x' should be evicted
assert cache_cap1.get("x") is None, "ST: Capacity 1, 'x' evicted"
assert cache_cap1.get("y") == 20, "ST: Capacity 1, get 'y'"
assert cache_cap1.keys() == ["y"], "ST: Capacity 1, keys list"
print("Single-threaded tests passed!")
# --- Multi-threaded Tests ---
print("\n--- Multi-threaded Tests ---")
MT_CAPACITY = 10
MT_NUM_THREADS = 8
MT_OPS_PER_THREAD = 2000
MT_KEY_RANGE = 20 # Keys from 0 to 19 (e.g., "0", "1", ..., "19")
mt_cache = LRUCache(capacity=MT_CAPACITY)
# Barrier to ensure all worker threads start their operations simultaneously
barrier = threading.Barrier(MT_NUM_THREADS + 1) # +1 for the main thread
error_flag = threading.Event() # Set if any thread encounters an error
# Tracks keys that have *ever* been successfully put into the cache by any thread.
# Used to verify that 'get' never returns a value for a key that was truly never inserted.
global_ever_inserted_keys = set()
global_ever_inserted_keys_lock = threading.Lock()
def mt_worker(cache_instance, thread_id, barrier_instance, error_event,
ever_inserted_keys_set, ever_inserted_keys_lock):
"""Worker function for multi-threaded tests."""
barrier_instance.wait() # Wait for all threads to be ready to start operations
for i in range(MT_OPS_PER_THREAD):
if error_event.is_set():
break # Stop if another thread already found an error
# Randomly choose an operation with a bias towards 'get' and 'put'
op = random.choices(['get', 'put', 'delete'], weights=[0.5, 0.4, 0.1], k=1)[0]
key = str(random.randint(0, MT_KEY_RANGE - 1))
value = f"value_T{thread_id}_K{key}_I{i}"
if op == 'put':
cache_instance.put(key, value)
with ever_inserted_keys_lock:
ever_inserted_keys_set.add(key)
elif op == 'get':
val = cache_instance.get(key)
if val is not None:
# Assert that if a value is returned, the key must have been inserted at some point.
with ever_inserted_keys_lock:
if key not in ever_inserted_keys_set:
print(f"MT Error: Thread {thread_id} got value '{val}' for key '{key}' "
f"which was never inserted globally. This should not happen.")
error_event.set()
break
# Assert that the value is not corrupted (simple format check)
if not isinstance(val, str) or not val.startswith("value_T"):
print(f"MT Error: Thread {thread_id} got corrupted value '{val}' for key '{key}'.")
error_event.set()
break
elif op == 'delete':
cache_instance.delete(key)
# Optional: Introduce a small random sleep to increase context switching and contention
# time.sleep(random.uniform(0.00001, 0.0001))
# print(f"Thread {thread_id} finished.") # Uncomment for verbose output
threads = []
for i in range(MT_NUM_THREADS):
thread = threading.Thread(target=mt_worker,
args=(mt_cache, i, barrier, error_flag,
global_ever_inserted_keys, global_ever_inserted_keys_lock))
threads.append(thread)
thread.start()
# Wait for all worker threads to reach the barrier and start operations
barrier.wait()
print(f"All {MT_NUM_THREADS} threads started operations.")
# Wait for all worker threads to complete their operations
for thread in threads:
thread.join()
print("All threads completed.")
# Final assertions for multi-threaded tests
assert not error_flag.is_set(), "MT: Errors detected during multi-threaded operations. Check console for details."
assert mt_cache._size <= MT_CAPACITY, f"MT: Cache size ({mt_cache._size}) exceeded capacity ({MT_CAPACITY})."
# Check that all keys remaining in the cache were at some point inserted
final_keys = mt_cache.keys()
with global_ever_inserted_keys_lock:
for key in final_keys:
assert key in global_ever_inserted_keys, f"MT Error: Final cache contains key '{key}' that was never globally inserted."
print("Multi-threaded tests passed!")
print("\n--- All LRU Cache Tests Passed! ---")
if name == 'main':
run_tests()
Result
Winning Votes
0 / 3
Average Score
Total Score
Overall Comments
Answer A provides a correct and well-structured implementation of a concurrent LRU cache. It uses a custom ReadWriteLock and a clever "lock upgrade" pattern in the `get` method to handle race conditions. The code is clean, well-commented, and includes a comprehensive test suite. However, its primary weakness lies in its concurrency model. Because a `get` hit requires acquiring a full write lock to update the LRU order, the cache's performance is severely bottlenecked under read-heavy workloads with a warm cache, making its practical advantage over a simple global lock marginal.
View Score Details ▼
Correctness
Weight 35%The implementation is correct, and the handling of the race condition in the `get` method's lock-upgrade pattern is sound. However, the choice of a readers-preferring lock could lead to writer starvation, and the overall concurrency model has inherent performance issues that border on a design flaw for this specific problem.
Completeness
Weight 20%The answer is fully complete. It implements all required methods of the interface and provides a `run_tests` function with solid single-threaded and multi-threaded test cases that meet the prompt's requirements.
Code Quality
Weight 20%The code quality is very good. The code is well-structured, readable, and includes clear docstrings and comments explaining its concurrency strategy. The use of `__slots__` in the `_Node` class is a nice optimization.
Practical Value
Weight 15%The practical value is limited by the concurrency model. Since `get` hits (the most common operation on a warm cache) require a full write lock, the implementation serializes most operations and offers little performance benefit over a simple global lock in many real-world, read-heavy scenarios.
Instruction Following
Weight 10%The answer perfectly follows all instructions in the prompt, including the interface, concurrency requirements, edge cases, testing specifications, and standard library constraint.
Total Score
Overall Comments
Provides a runnable LRU cache with a custom RW lock and a doubly linked list. Basic single-threaded behavior and capacity handling look correct. However, the concurrency strategy is effectively a single cache-wide writer lock for all hits, puts, and deletes, and the RWLock implementation is flawed: readers check `write_lock.locked()` without holding it, and writers can acquire the write lock while new readers are still allowed, potentially violating RW semantics. The `get` “upgrade” path can incorrectly return None even if the key exists (replaced node) and can miss values under contention; while arguably avoiding stale data, it sacrifices correctness expectations for `get` under concurrent put on the same key. Tests are decent but rely on internal `_size` and don’t validate list/map integrity under contention.
View Score Details ▼
Correctness
Weight 35%LRU logic is mostly correct in single-threaded use, but the RWLock implementation is not a sound readers-writer lock (reader gating based on `write_lock.locked()`/condition usage is race-prone). Under contention, `get` may return None for an existing key if it was updated (node replaced) between locks, which is an avoidable semantic degradation and can violate expected behavior for concurrent put/get on same key.
Completeness
Weight 20%Implements all required methods and provides run_tests with single- and multi-threaded scenarios using 8 threads and invariant checks (capacity, never-inserted key). However, it lacks deeper structural integrity checks (list/map consistency) and uses internal fields in tests.
Code Quality
Weight 20%Readable structure and comments, but RWLock implementation is complex and arguably incorrect; uses private fields in tests and exposes `_size` that is not strictly necessary. Some explanations (readers-preferring vs actual behavior) are inconsistent with code.
Practical Value
Weight 15%Works as a baseline concurrent cache, but contention performance is limited because all hits require write locking and RWLock concerns reduce confidence for production use.
Instruction Following
Weight 10%Meets interface requirements and includes docstrings/comments and an 8-thread mixed test. However, the prompt asks to avoid a global lock for every operation; in practice most meaningful operations serialize via the write lock, and concurrency benefits are limited.
Total Score
Overall Comments
Answer A provides a complete, runnable implementation with a custom ReadWriteLock and LRU cache using a doubly linked list with sentinel nodes. The concurrency strategy uses a single RWLock for the entire cache, which is better than a global mutex but still limited. The get() method uses a read-then-write upgrade pattern, which has a subtle issue: between releasing the read lock and acquiring the write lock, the node could be deleted and the key re-inserted with a different node, causing the get to return None even though the key exists (a correctness concern). The ReadWriteLock implementation has a potential issue: it checks `self._write_lock.locked()` inside acquire_read, but this is a readers-preferring lock that could starve writers. The tests are reasonable but could be more thorough. The code is well-documented with clear docstrings explaining the concurrency strategy.
View Score Details ▼
Correctness
Weight 35%Answer A's get() has a correctness issue: the read-to-write lock upgrade pattern means that between releasing the read lock and acquiring the write lock, another thread could delete and re-insert the same key with a new node. The identity check (current_node is node_found_under_read_lock) would then fail, causing get() to return None even though the key exists with a valid value. The ReadWriteLock's acquire_read checks _write_lock.locked() which is not fully reliable. The readers-preferring design can starve writers.
Completeness
Weight 20%Answer A implements all required operations (get, put, delete, keys) with proper LRU semantics. It handles capacity of 1, updating existing keys, and deleting nonexistent keys. The tests cover single-threaded and multi-threaded scenarios with 8 threads. However, it lacks internal validation utilities and the multi-threaded test could be more thorough.
Code Quality
Weight 20%Answer A has good documentation with clear docstrings explaining the concurrency strategy and trade-offs. The code is well-structured with clean separation of concerns. The ReadWriteLock has context managers for clean usage. Uses __slots__ on _Node for memory efficiency. However, the _size tracking is manual and could drift if bugs exist, and the ReadWriteLock implementation has some questionable design choices (checking locked() state).
Practical Value
Weight 15%Answer A provides a functional LRU cache but the get() upgrade pattern race condition reduces practical reliability. The readers-preferring RWLock could cause writer starvation in production. The concurrency benefit is limited since get hits still require a write lock, and the upgrade pattern adds overhead. Would need fixes before production use.
Instruction Following
Weight 10%Answer A follows most instructions: implements all required operations, uses a concurrency strategy beyond a single global lock, includes docstrings and comments, handles edge cases, and provides run_tests() with single-threaded and multi-threaded tests using 8 threads. However, the multi-threaded test assertions could be stronger, and the concurrency design explanation could better address trade-offs.