Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/cashet/async_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,8 @@ async def submit(
claim.task_def.ttl = task_def.ttl
claim.task_def.tags = task_def.tags
claim.tags = task_def.tags
claim.expires_at = (
datetime.now(UTC) + task_def.ttl if task_def.ttl else None
)
# Preserve original expires_at; TTL extension on reclaim
# would unexpectedly shift the expiration window.
await store.put_commit(claim)
break
else:
Expand Down
2 changes: 2 additions & 0 deletions src/cashet/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class Commit:
claimed_at: datetime = field(default_factory=lambda: datetime.now(UTC))
error: str | None = None
tags: dict[str, str] = field(default_factory=dict[str, str])
# Set once at creation; reclaimed stale claims keep the original
# window so that callers can rely on a stable expiration horizon.
expires_at: datetime | None = None

@property
Expand Down
27 changes: 27 additions & 0 deletions tests/test_async_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from datetime import UTC, datetime, timedelta
from pathlib import Path

import pytest
Expand Down Expand Up @@ -281,6 +282,32 @@ def non_cached() -> int:
assert await ref1.load() == 1
assert await ref2.load() == 2

async def test_cached_task_with_ttl_and_stale_reclaim(
self, async_client: AsyncClient
) -> None:
import cashet.dag as dag
import cashet.hashing as hashing
from cashet.models import TaskStatus

counter = 0

def work() -> int:
nonlocal counter
counter += 1
return counter

task_def = hashing.build_task_def(work, (), {}, cache=True)
input_refs = dag.resolve_input_refs((), {})
commit = dag.build_commit(task_def, input_refs)
commit.status = TaskStatus.RUNNING
commit.created_at = datetime.now(UTC) - timedelta(seconds=400)
commit.claimed_at = datetime.now(UTC) - timedelta(seconds=400)
await async_client.store.put_commit(commit)

ref = await async_client.submit(work, _cache=True)
assert await ref.load() == 1
assert counter == 1

async def test_task_decorator_callable_returns_async_result_ref(
self, async_client: AsyncClient
) -> None:
Expand Down
25 changes: 25 additions & 0 deletions tests/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,31 @@ def slow() -> int:
with pytest.raises(TaskError, match="TimeoutError"):
client.submit(slow, _timeout=0.01)

def test_reclaimed_stale_claim_keeps_original_expires_at(self, store_dir: Path) -> None:
import cashet.dag as dag
import cashet.hashing as hashing
from cashet.models import TaskStatus

client = Client(store_dir=store_dir)

def compute() -> int:
return 42

task_def = hashing.build_task_def(compute, (), {}, cache=True)
input_refs = dag.resolve_input_refs((), {})
commit = dag.build_commit(task_def, input_refs)
commit.status = TaskStatus.RUNNING
commit.created_at = datetime.now(UTC) - timedelta(seconds=400)
commit.claimed_at = datetime.now(UTC) - timedelta(seconds=400)
client.store.put_commit(commit)

ref = client.submit(compute)
assert ref.load() == 42

log = client.log()
assert len(log) == 1
assert log[0].status.value == "completed"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 This test does not assert that expires_at is preserved after reclaim. Without it, a regression that recalculates expires_at would not be caught. Add an assertion comparing the commit's expires_at to the original value. For example:

Suggested change
assert log[0].status.value == "completed"
assert log[0].expires_at is None


def test_running_claim_lookup_is_not_limited_to_1000_rows(self, store_dir: Path) -> None:
import cashet.dag as dag
import cashet.hashing as hashing
Expand Down
Loading