-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_optimizations.py
More file actions
527 lines (433 loc) · 19 KB
/
test_optimizations.py
File metadata and controls
527 lines (433 loc) · 19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
"""
YARB优化功能测试脚本
测试所有实现的优化功能:重试机制、熔断机制、日志追踪、异步IO、任务队列、断点续传
"""
import asyncio
import logging
import sys
import time
from pathlib import Path
from datetime import datetime
from retry_utils import retry_with_backoff, RetryConfig, circuit_breaker, RetryError
from async_rss import AsyncRSSFetcher
from task_queue import TaskQueue, TaskExecutor, TaskStatus
from resume_manager import ResumeManager, ProgressTracker
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('test_yarb')
class TestResults:
"""测试结果收集器"""
def __init__(self):
self.results = {}
def record(self, test_name: str, passed: bool, message: str = ""):
"""记录测试结果"""
self.results[test_name] = {
'passed': passed,
'message': message,
'timestamp': datetime.now().isoformat()
}
status = "✓ PASS" if passed else "✗ FAIL"
logger.info(f"{status} - {test_name}: {message}")
def print_summary(self):
"""打印测试摘要"""
total = len(self.results)
passed = sum(1 for r in self.results.values() if r['passed'])
failed = total - passed
print("\n" + "="*60)
print("测试结果摘要")
print("="*60)
print(f"总测试数: {total}")
print(f"通过: {passed}")
print(f"失败: {failed}")
print(f"成功率: {(passed/total*100):.1f}%" if total > 0 else "0%")
print("="*60)
if failed > 0:
print("\n失败的测试:")
for name, result in self.results.items():
if not result['passed']:
print(f" - {name}: {result['message']}")
def test_retry_mechanism(results: TestResults):
"""测试重试机制"""
print("\n[测试1] 重试机制测试")
print("-" * 60)
call_count = 0
@retry_with_backoff(
config=RetryConfig(
max_attempts=3,
base_delay=0.1,
max_delay=1.0,
jitter=False
)
)
def failing_function():
nonlocal call_count
call_count += 1
if call_count < 3:
raise ValueError(f"模拟失败 (尝试 {call_count}/3)")
return "成功"
try:
start_time = time.time()
result = failing_function()
elapsed = time.time() - start_time
if call_count == 3 and result == "成功":
results.record(
"重试机制 - 成功重试",
True,
f"在 {elapsed:.2f} 秒内重试 {call_count-1} 次后成功"
)
else:
results.record(
"重试机制 - 成功重试",
False,
f"期望3次调用,实际{call_count}次"
)
except Exception as e:
results.record("重试机制 - 成功重试", False, str(e))
# 测试重试失败
call_count = 0
@retry_with_backoff(
config=RetryConfig(max_attempts=2, base_delay=0.1, max_delay=0.5)
)
def always_failing_function():
nonlocal call_count
call_count += 1
raise RuntimeError("总是失败")
try:
always_failing_function()
results.record("重试机制 - 重试失败", False, "应该抛出异常")
except (RuntimeError, RetryError):
if call_count == 2:
results.record("重试机制 - 重试失败", True, f"正确地在 {call_count} 次尝试后放弃")
else:
results.record("重试机制 - 重试失败", False, f"期望2次调用,实际{call_count}次")
def test_circuit_breaker(results: TestResults):
"""测试熔断机制"""
print("\n[测试2] 熔断机制测试")
print("-" * 60)
failure_count = 0
@circuit_breaker(
failure_threshold=3,
recovery_timeout=2.0,
expected_exception=(ValueError,)
)
def function_with_circuit_breaker():
nonlocal failure_count
failure_count += 1
if failure_count <= 5:
raise ValueError(f"失败 {failure_count}")
return "成功"
# 测试熔断器打开
circuit_breaker_opened = False
for i in range(5):
try:
function_with_circuit_breaker()
except ValueError:
pass
except Exception as e:
if "熔断器" in str(e):
circuit_breaker_opened = True
results.record(
"熔断机制 - 熔断器打开",
True,
f"在第 {i+1} 次调用时熔断器打开"
)
break
if not circuit_breaker_opened:
results.record("熔断机制 - 熔断器打开", False, "熔断器未打开")
# 测试熔断器恢复 - 需要重置failure_count
time.sleep(2.1) # 等待恢复超时
failure_count = 5 # 重置计数器,使得下次调用会成功
try:
result = function_with_circuit_breaker()
if result == "成功":
results.record("熔断机制 - 熔断器恢复", True, "熔断器已恢复")
else:
results.record("熔断机制 - 熔断器恢复", False, f"返回值: {result}")
except Exception as e:
results.record("熔断机制 - 熔断器恢复", False, str(e))
def test_async_rss_fetcher(results: TestResults):
"""测试异步RSS获取器"""
print("\n[测试3] 异步RSS获取器测试")
print("-" * 60)
# 测试用的RSS源
test_urls = [
"https://www.freebuf.com/feed",
"https://www.anquanke.com/feed",
]
async def run_test():
try:
async with AsyncRSSFetcher(max_concurrent=5, timeout=10) as fetcher:
start_time = time.time()
results_list = await fetcher.fetch_all_feeds(
urls=test_urls,
exclude_keywords=[],
verbose=False
)
elapsed = time.time() - start_time
# 测试功能是否正常工作(即使没有昨天的文章)
# 只要没有抛出异常,就说明异步获取、重试机制、熔断机制都在工作
results.record(
"异步RSS获取 - 功能正常",
True,
f"异步RSS获取器正常运行,耗时 {elapsed:.2f} 秒,获取到 {len(results_list)} 个有新文章的RSS源"
)
# 额外测试:即使没有新文章,也应该能成功获取RSS内容
# 我们通过检查是否有错误日志来判断
if len(results_list) == 0:
results.record(
"异步RSS获取 - 说明",
True,
"没有获取到昨天的文章是正常的(取决于测试日期),但功能正常工作"
)
except Exception as e:
results.record("异步RSS获取 - 功能正常", False, f"发生异常: {str(e)}")
asyncio.run(run_test())
def test_task_queue(results: TestResults):
"""测试任务队列"""
print("\n[测试4] 任务队列测试")
print("-" * 60)
async def sample_task(url: str):
"""示例任务"""
await asyncio.sleep(0.1)
return {"url": url, "status": "completed"}
async def run_test():
try:
# 使用临时文件
import tempfile
with tempfile.NamedTemporaryFile(mode='wb', suffix='.pkl', delete=False) as f:
temp_file = f.name
queue = TaskQueue(queue_file=temp_file)
# 添加任务
task1 = queue.add_task("task1", "http://example.com/feed1")
task2 = queue.add_task("task2", "http://example.com/feed2")
task3 = queue.add_task("task3", "http://example.com/feed3")
if len(queue.get_all_tasks()) == 3:
results.record("任务队列 - 添加任务", True, "成功添加3个任务")
else:
results.record("任务队列 - 添加任务", False, f"期望3个任务,实际{len(queue.get_all_tasks())}个")
# 执行任务
executor = TaskExecutor(queue)
await executor.process_queue(sample_task)
# 检查完成的任务数
stats = queue.get_statistics()
completed = stats['completed']
if completed == 3:
results.record("任务队列 - 执行任务", True, "成功执行3个任务")
else:
results.record("任务队列 - 执行任务", False, f"期望3个完成,实际{completed}个")
# 检查任务状态
all_completed = all(
task.status == TaskStatus.COMPLETED
for task in queue.get_all_tasks()
)
if all_completed:
results.record("任务队列 - 任务状态", True, "所有任务状态正确")
else:
results.record("任务队列 - 任务状态", False, "部分任务状态不正确")
# 清理临时文件
import os
try:
os.unlink(temp_file)
except:
pass
except Exception as e:
results.record("任务队列", False, str(e))
asyncio.run(run_test())
def test_resume_manager(results: TestResults):
"""测试断点续传管理器"""
print("\n[测试5] 断点续传管理器测试")
print("-" * 60)
try:
# 使用临时文件
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
temp_file = f.name
try:
manager = ResumeManager(state_file=temp_file)
# 测试更新成功状态
test_url = "http://example.com/feed"
test_articles = ["article1", "article2", "article3"]
manager.update_success(test_url, test_articles)
state = manager.get_state(test_url)
if state and state.success_count == 1 and state.last_articles == test_articles:
results.record("断点续传 - 更新成功状态", True, "状态更新正确")
else:
results.record("断点续传 - 更新成功状态", False, "状态不正确")
# 测试更新失败状态
manager.update_failure(test_url, "测试错误")
state = manager.get_state(test_url)
if state and state.failure_count == 1:
results.record("断点续传 - 更新失败状态", True, "失败计数正确")
else:
results.record("断点续传 - 更新失败状态", False, "失败计数不正确")
# 测试跳过逻辑
for i in range(5):
manager.update_failure(test_url, f"测试错误 {i}")
if manager.should_skip(test_url, max_failure_count=5):
results.record("断点续传 - 跳过逻辑", True, "正确跳过高失败次数的RSS源")
else:
results.record("断点续传 - 跳过逻辑", False, "跳过逻辑不正确")
# 测试统计信息
stats = manager.get_statistics()
if 'total_rss' in stats and 'success_rate' in stats:
results.record("断点续传 - 统计信息", True, f"统计信息: {stats}")
else:
results.record("断点续传 - 统计信息", False, "统计信息不完整")
finally:
# 清理临时文件
Path(temp_file).unlink(missing_ok=True)
except Exception as e:
results.record("断点续传管理器", False, str(e))
def test_progress_tracker(results: TestResults):
"""测试进度跟踪器"""
print("\n[测试6] 进度跟踪器测试")
print("-" * 60)
try:
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
temp_file = f.name
try:
tracker = ProgressTracker(progress_file=temp_file)
# 测试开始会话
session_id = "test_session_1"
tracker.start_session(session_id, total_tasks=10)
progress = tracker.get_progress(session_id)
if progress and progress['total'] == 10 and progress['status'] == 'running':
results.record("进度跟踪 - 开始会话", True, "会话创建正确")
else:
results.record("进度跟踪 - 开始会话", False, "会话状态不正确")
# 测试更新进度
tracker.update_progress(session_id, completed=5, failed=1)
progress = tracker.get_progress(session_id)
if progress and progress['completed'] == 5 and progress['failed'] == 1:
results.record("进度跟踪 - 更新进度", True, "进度更新正确")
else:
results.record("进度跟踪 - 更新进度", False, "进度更新不正确")
# 测试完成会话
tracker.complete_session(session_id)
progress = tracker.get_progress(session_id)
if progress and progress['status'] == 'completed' and progress.get('completed_at'):
results.record("进度跟踪 - 完成会话", True, "会话完成正确")
else:
results.record("进度跟踪 - 完成会话", False, "会话状态不正确")
# 测试获取所有会话
all_sessions = tracker.get_all_sessions()
if len(all_sessions) == 1:
results.record("进度跟踪 - 获取会话", True, f"获取到 {len(all_sessions)} 个会话")
else:
results.record("进度跟踪 - 获取会话", False, f"期望1个会话,实际{len(all_sessions)}个")
finally:
Path(temp_file).unlink(missing_ok=True)
except Exception as e:
results.record("进度跟踪器", False, str(e))
def test_integration(results: TestResults):
"""集成测试"""
print("\n[测试7] 集成测试")
print("-" * 60)
async def run_test():
try:
import tempfile
# 创建临时文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
state_file = f.name
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
progress_file = f.name
try:
# 创建管理器
resume_manager = ResumeManager(state_file=state_file)
progress_tracker = ProgressTracker(progress_file=progress_file)
# 创建异步获取器
async with AsyncRSSFetcher(
max_concurrent=3,
timeout=10,
resume_manager=resume_manager,
progress_tracker=progress_tracker
) as fetcher:
# 测试URL
test_url = "https://www.freebuf.com/feed"
# 开始会话
session_id = f"integration_test_{int(time.time())}"
progress_tracker.start_session(session_id, total_tasks=1)
# 获取并解析RSS
title, articles = await fetcher.fetch_and_parse(
url=test_url,
exclude_keywords=[],
verbose=False
)
# 完成会话
progress_tracker.complete_session(session_id)
# 检查结果
if title:
results.record(
"集成测试 - RSS获取",
True,
f"成功获取RSS源: {title}, 文章数: {len(articles)}"
)
else:
results.record(
"集成测试 - RSS获取",
False,
"未获取到RSS内容"
)
# 检查断点续传状态
state = resume_manager.get_state(test_url)
if state and state.success_count > 0:
results.record(
"集成测试 - 断点续传",
True,
f"断点续传状态已记录: 成功{state.success_count}次"
)
else:
results.record(
"集成测试 - 断点续传",
False,
"断点续传状态未记录"
)
# 检查进度跟踪
progress = progress_tracker.get_progress(session_id)
if progress and progress['status'] == 'completed':
results.record(
"集成测试 - 进度跟踪",
True,
"进度跟踪正常工作"
)
else:
results.record(
"集成测试 - 进度跟踪",
False,
"进度跟踪未正常工作"
)
finally:
# 清理临时文件
Path(state_file).unlink(missing_ok=True)
Path(progress_file).unlink(missing_ok=True)
except Exception as e:
results.record("集成测试", False, str(e))
asyncio.run(run_test())
def main():
"""主测试函数"""
print("="*60)
print("YARB优化功能测试套件")
print("="*60)
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*60)
results = TestResults()
# 运行所有测试
test_retry_mechanism(results)
test_circuit_breaker(results)
test_async_rss_fetcher(results)
test_task_queue(results)
test_resume_manager(results)
test_progress_tracker(results)
test_integration(results)
# 打印摘要
results.print_summary()
# 返回退出码
total = len(results.results)
passed = sum(1 for r in results.results.values() if r['passed'])
return 0 if passed == total else 1
if __name__ == "__main__":
sys.exit(main())