-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_thread_safety.py
More file actions
221 lines (185 loc) · 8.13 KB
/
Copy pathtest_thread_safety.py
File metadata and controls
221 lines (185 loc) · 8.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
"""Thread-safety contention tests for lock-protected paths.
Intentional OS-thread stress of lock-protected paths; production uses
event-loop cooperative scheduling + snapshot reads.
NOTE: FakePool is not thread-safe. These tests do not call pool methods
concurrently; they only stress ISOProber._stats_lock and Scheduler._health_lock.
"""
from __future__ import annotations
import threading
import time
from unittest.mock import MagicMock
from paperscout.models import CycleStatus
from paperscout.monitor import Scheduler, _compute_probe_success_rate
from paperscout.protocols import SOURCE_ISO_PROBE, SOURCE_WG21_INDEX
from paperscout.sources import ISOProber, WG21Index
from paperscout.storage import ProbeState, UserWatchlist
from tests.conftest import make_test_settings
_STATS_KEYS = frozenset(ISOProber._STATS_TEMPLATE.keys())
_HEALTH_SNAPSHOT_KEYS = frozenset(
{
"last_updated",
"poll_count",
"last_successful_poll",
"last_cycle_status",
"last_cycle_error",
"probe_stats",
"probe_success_rate",
}
)
THREAD_JOIN_TIMEOUT = 5.0
RESET_ITERATIONS = 1000
SNAPSHOT_ITERATIONS = 1000
BUMPER_ITERATIONS = 1000
PUBLISH_ITERATIONS = 500
READ_ITERATIONS = 500
def _make_prober(fake_pool) -> ISOProber:
index = WG21Index(fake_pool)
state = ProbeState(fake_pool)
wl = MagicMock(spec=UserWatchlist)
wl.get_all_watched_paper_nums.return_value = set()
return ISOProber(index, state, user_watchlist=wl, cfg=make_test_settings())
def _make_scheduler(fake_pool) -> Scheduler:
wg21 = MagicMock()
wg21.source_id = SOURCE_WG21_INDEX
iso = MagicMock()
iso.source_id = SOURCE_ISO_PROBE
user_watchlist = MagicMock(spec=UserWatchlist)
user_watchlist.matches_for_users.return_value = {}
state = ProbeState(fake_pool)
return Scheduler(
sources=[wg21, iso],
user_watchlist=user_watchlist,
state=state,
cfg=make_test_settings(),
)
def _join_threads(threads: list[threading.Thread]) -> None:
for thread in threads:
thread.join(timeout=THREAD_JOIN_TIMEOUT)
assert not thread.is_alive(), (
f"thread {thread.name} did not finish within {THREAD_JOIN_TIMEOUT}s"
)
def _assert_valid_probe_stats(stats: dict[str, int]) -> None:
assert set(stats.keys()) == _STATS_KEYS
assert all(isinstance(v, int) and v >= 0 for v in stats.values())
def _assert_health_snapshot_consistent(snap: dict) -> None:
assert set(snap.keys()) == _HEALTH_SNAPSHOT_KEYS
assert isinstance(snap["poll_count"], int) and snap["poll_count"] >= 0
assert isinstance(snap["probe_stats"], dict)
assert all(isinstance(v, int) for v in snap["probe_stats"].values())
assert snap["probe_success_rate"] == _compute_probe_success_rate(snap["probe_stats"])
assert snap["probe_success_rate"] is None or isinstance(snap["probe_success_rate"], float)
assert snap["last_cycle_status"] is None or isinstance(snap["last_cycle_status"], str)
assert snap["last_cycle_error"] is None or isinstance(snap["last_cycle_error"], str)
assert snap["last_successful_poll"] is None or isinstance(snap["last_successful_poll"], str)
last_updated = snap["last_updated"]
assert last_updated is None or (isinstance(last_updated, str) and len(last_updated) > 0)
class TestISOProberStatsContention:
"""Intentional OS-thread stress of ISOProber._stats_lock."""
def test_concurrent_bump_stat_totals(self, fake_pool):
"""Concurrent _bump_stat() calls produce correct totals."""
prober = _make_prober(fake_pool)
n_threads = 32
bumps_per_thread = 100
barrier = threading.Barrier(n_threads)
def worker() -> None:
barrier.wait()
for _ in range(bumps_per_thread):
prober._bump_stat("miss")
threads = [threading.Thread(target=worker, name=f"bump-{i}") for i in range(n_threads)]
for thread in threads:
thread.start()
_join_threads(threads)
assert prober.snapshot_stats()["miss"] == n_threads * bumps_per_thread
def test_snapshot_stats_consistent_under_concurrent_reset(self, fake_pool):
"""snapshot_stats() concurrent with _reset_stats() returns consistent state."""
prober = _make_prober(fake_pool)
errors: list[Exception] = []
errors_lock = threading.Lock()
n_threads = 5
barrier = threading.Barrier(n_threads)
def record_error(exc: Exception) -> None:
with errors_lock:
errors.append(exc)
def resetter() -> None:
try:
barrier.wait()
for _ in range(RESET_ITERATIONS):
prober._reset_stats()
except Exception as exc:
record_error(exc)
def snapshotter() -> None:
try:
barrier.wait()
for _ in range(SNAPSHOT_ITERATIONS):
_assert_valid_probe_stats(prober.snapshot_stats())
except Exception as exc:
record_error(exc)
def bumper() -> None:
try:
barrier.wait()
for _ in range(BUMPER_ITERATIONS):
prober._bump_stat("miss")
except Exception as exc:
record_error(exc)
threads = [
threading.Thread(target=resetter, name="resetter"),
threading.Thread(target=snapshotter, name="snapshot-0"),
threading.Thread(target=snapshotter, name="snapshot-1"),
threading.Thread(target=bumper, name="bumper-0"),
threading.Thread(target=bumper, name="bumper-1"),
]
for thread in threads:
thread.start()
_join_threads(threads)
assert not errors, f"thread errors: {errors!r}"
class TestSchedulerHealthContention:
"""Intentional OS-thread stress of Scheduler._health_lock."""
def test_health_snapshot_consistent_under_concurrent_publish(self, fake_pool):
"""health_snapshot() from non-event-loop threads returns consistent data."""
scheduler = _make_scheduler(fake_pool)
scheduler._poll_count = 0
scheduler._last_probe_stats = {}
scheduler._last_cycle_status = None
scheduler._last_successful_poll = None
scheduler._publish_health_snapshot()
errors: list[Exception] = []
errors_lock = threading.Lock()
def record_error(exc: Exception) -> None:
with errors_lock:
errors.append(exc)
def writer() -> None:
try:
# Writer thread is the sole mutator; attribute writes happen-before
# _publish_health_snapshot() reads on the same thread. _health_lock
# only guards _health_snapshot assignment vs reader threads.
for i in range(PUBLISH_ITERATIONS):
scheduler._poll_count = i + 1
scheduler._last_probe_stats = {
"miss": i % 10,
"error": i % 3,
"hit_recent": i % 5,
"hit_old": 0,
"hit_no_lm": 0,
"skipped_discovered": 0,
"skipped_in_index": 0,
}
scheduler._last_cycle_status = (
CycleStatus.SUCCESS if i % 2 == 0 else CycleStatus.EMPTY
)
scheduler._last_successful_poll = time.time()
scheduler._publish_health_snapshot()
except Exception as exc:
record_error(exc)
def reader() -> None:
try:
for _ in range(READ_ITERATIONS):
_assert_health_snapshot_consistent(scheduler.health_snapshot())
except Exception as exc:
record_error(exc)
writer_thread = threading.Thread(target=writer, name="writer")
reader_threads = [threading.Thread(target=reader, name=f"reader-{i}") for i in range(6)]
writer_thread.start()
for thread in reader_threads:
thread.start()
_join_threads([writer_thread, *reader_threads])
assert not errors, f"thread errors: {errors!r}"