Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions src/crawlee/crawlers/_basic/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,8 +692,9 @@ async def run(

Args:
requests: The requests to be enqueued before the crawler starts.
purge_request_queue: If this is `True` and the crawler is not being run for the first time, the default
request queue will be purged.
purge_request_queue: If this is `True` and the crawler is not being run for the first time, the request
queue will be purged. Named request queues are considered persistent and are never purged
implicitly.
"""
if self._running:
raise RuntimeError(
Expand All @@ -717,7 +718,16 @@ async def run(

if purge_request_queue:
request_manager = await self.get_request_manager()
await request_manager.purge()
# A `ThrottlingRequestManager` delegates `purge` to the manager it wraps, so inspect the wrapped
# manager when deciding whether the purge would hit a named queue.
inner_manager = (
request_manager.inner if isinstance(request_manager, ThrottlingRequestManager) else request_manager
)
# Named storages are persistent and shared across runs, so they are never purged implicitly
# (the same named-storage exemption as in `StorageClient._purge_if_needed`).
is_named_queue = isinstance(inner_manager, RequestQueue) and inner_manager.name is not None
if not is_named_queue:
await request_manager.purge()

if requests is not None:
await self.add_requests(requests)
Expand Down
5 changes: 5 additions & 0 deletions src/crawlee/request_loaders/_throttling_request_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ def __init__(
"""Set whenever a request is added or reclaimed. Lets `fetch_next_request` wake from a throttle
wait early when fresh work appears, instead of sleeping for the full computed cooldown."""

@property
def inner(self) -> TRequestManager:
"""The wrapped request manager that stores requests for non-throttled domains."""
return self._inner

@override
async def drop(self) -> None:
await asyncio.gather(self._inner.drop(), *(sm.drop() for sm in self._sub_managers.values()))
Expand Down
39 changes: 39 additions & 0 deletions tests/unit/crawlers/_basic/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,45 @@ async def handler(context: BasicCrawlingContext) -> None:
}


async def test_consecutive_runs_do_not_purge_named_request_queue() -> None:
"""A second `run()` must not purge a user-supplied named request queue, as named storages persist across runs."""
queue = await RequestQueue.open(name='persistent-queue')
crawler = BasicCrawler(request_manager=queue)
visited = list[str]()

@crawler.router.default_handler
async def handler(context: BasicCrawlingContext) -> None:
visited.append(context.request.url)

await crawler.run(['https://a.placeholder.com'])

# Simulate new work added to the persistent queue between runs.
await queue.add_request('https://b.placeholder.com')
await crawler.run()

assert visited == ['https://a.placeholder.com', 'https://b.placeholder.com']


async def test_consecutive_runs_do_not_purge_named_queue_wrapped_in_throttling_manager() -> None:
"""A second `run()` must not purge a named request queue wrapped in a `ThrottlingRequestManager`."""
queue = await RequestQueue.open(name='persistent-queue')
throttler = ThrottlingRequestManager(inner=queue, domains=[], request_manager_opener=RequestQueue.open)
crawler = BasicCrawler(request_manager=throttler)
visited = list[str]()

@crawler.router.default_handler
async def handler(context: BasicCrawlingContext) -> None:
visited.append(context.request.url)

await crawler.run(['https://a.placeholder.com'])

# Simulate new work added to the persistent queue between runs.
await queue.add_request('https://b.placeholder.com')
await crawler.run()

assert visited == ['https://a.placeholder.com', 'https://b.placeholder.com']


@pytest.mark.skipif(os.name == 'nt' and 'CI' in os.environ, reason='Skipped in Windows CI')
@pytest.mark.parametrize(
('statistics_log_format'),
Expand Down
Loading