Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 100 additions & 35 deletions src/aws_durable_execution_sdk_python/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,22 @@
BAD_REQUEST_ERROR: int = 400
TOO_MANY_REQUESTS_ERROR: int = 429
SERVICE_ERROR: int = 500
INVALID_PARAMETER_VALUE_EXCEPTION: str = "InvalidParameterValueException"
INVALID_CHECKPOINT_TOKEN_PREFIX: str = "Invalid Checkpoint Token"

# Non-retryable customer error codes that arrive as non-4xx (e.g. HTTP 502) from Lambda.
# Unlike typical 5xx errors, these require customer intervention (e.g., fixing
# a KMS key configuration) and will never succeed on retry.
# Add new non-retryable error codes here — they are automatically classified
# as EXECUTION (non-retryable) by _classify_error_category().
_NON_RETRYABLE_CUSTOMER_ERROR_CODES: frozenset[str] = frozenset(
{
"KMSAccessDeniedException",
"KMSDisabledException",
"KMSInvalidStateException",
"KMSNotFoundException",
}
)

if TYPE_CHECKING:
import datetime
Expand Down Expand Up @@ -77,6 +93,14 @@ def __init__(
):
super().__init__(message, termination_reason)

def is_retryable(self) -> bool:
"""Whether this error is retryable. Returns True by default.

Subclasses override to implement classification logic based on
error codes and HTTP status codes.
"""
return True


class CallbackError(ExecutionError):
"""Error in callback handling."""
Expand All @@ -86,27 +110,98 @@ def __init__(self, message: str, callback_id: str | None = None):
self.callback_id = callback_id


class DurableApiErrorCategory(Enum):
INVOCATION = "INVOCATION"
EXECUTION = "EXECUTION"
Comment thread
zhongkechen marked this conversation as resolved.


# Backward-compatible alias
CheckpointErrorCategory = DurableApiErrorCategory


class BotoClientError(InvocationError):
"""Error from a Lambda API call (e.g., CheckpointDurableExecution, GetDurableExecutionState).

Extends InvocationError because the default behavior for API failures is to retry
the Lambda invocation. However, some errors are non-retryable (e.g., 4xx client errors,
KMS key misconfiguration) and should fail the execution instead. The error_category field
and is_retryable() method distinguish these cases at runtime.
Comment thread
zhongkechen marked this conversation as resolved.
"""

def __init__(
self,
message: str,
error_category: DurableApiErrorCategory = DurableApiErrorCategory.INVOCATION,
error: AwsErrorObj | None = None,
response_metadata: AwsErrorMetadata | None = None,
termination_reason=TerminationReason.INVOCATION_ERROR,
):
super().__init__(message=message, termination_reason=termination_reason)
self.error: AwsErrorObj | None = error
self.response_metadata: AwsErrorMetadata | None = response_metadata
self.error_category: DurableApiErrorCategory = error_category

@classmethod
def from_exception(cls, exception: Exception) -> Self:
response = getattr(exception, "response", {})
response_metadata = response.get("ResponseMetadata")
error = response.get("Error")
error_category = BotoClientError._classify_error_category(
error, response_metadata
)
return cls(
message=str(exception), error=error, response_metadata=response_metadata
message=str(exception),
error_category=error_category,
error=error,
response_metadata=response_metadata,
)

@staticmethod
def _classify_error_category(
error: AwsErrorObj | None,
response_metadata: AwsErrorMetadata | None,
) -> DurableApiErrorCategory:
"""Classify a Durable API error as retryable (INVOCATION) or non-retryable (EXECUTION).

Classification rules:
- Non-retryable customer error codes (e.g., KMS key issues) → EXECUTION
These arrive as HTTP 502 but require customer intervention to fix.
- 4xx errors → EXECUTION, except:
- 429 (TooManyRequests) → INVOCATION (throttling is transient)
- InvalidParameterValueException with "Invalid Checkpoint Token" → INVOCATION
(stale token from a concurrent checkpoint; next invocation gets a fresh token)
- 5xx, network errors → INVOCATION
"""
error_code: str | None = (error and error.get("Code")) or None
if error_code and error_code in _NON_RETRYABLE_CUSTOMER_ERROR_CODES:
return DurableApiErrorCategory.EXECUTION

status_code: int | None = (
response_metadata and response_metadata.get("HTTPStatusCode")
) or None
if (
status_code
and BAD_REQUEST_ERROR <= status_code < SERVICE_ERROR
and status_code != TOO_MANY_REQUESTS_ERROR
and error
and not (
(error.get("Code") or "") == INVALID_PARAMETER_VALUE_EXCEPTION
and (error.get("Message") or "").startswith(
INVALID_CHECKPOINT_TOKEN_PREFIX
)
)
):
return DurableApiErrorCategory.EXECUTION

return DurableApiErrorCategory.INVOCATION

def is_retryable(self) -> bool:
"""Whether this error is retryable based on error_category."""
return self.error_category == DurableApiErrorCategory.INVOCATION

# Backward-compatible alias
is_retriable = is_retryable

def build_logger_extras(self) -> dict:
extras: dict = {}
# preserve PascalCase to be consistent with other langauges
Expand All @@ -125,55 +220,23 @@ def __init__(self, message: str, step_id: str | None = None):
self.step_id = step_id


class CheckpointErrorCategory(Enum):
INVOCATION = "INVOCATION"
EXECUTION = "EXECUTION"


class CheckpointError(BotoClientError):
"""Failure to checkpoint. Will terminate the lambda."""

def __init__(
self,
message: str,
error_category: CheckpointErrorCategory,
error_category: DurableApiErrorCategory = DurableApiErrorCategory.INVOCATION,
error: AwsErrorObj | None = None,
response_metadata: AwsErrorMetadata | None = None,
):
super().__init__(
message,
error_category,
error,
response_metadata,
termination_reason=TerminationReason.CHECKPOINT_FAILED,
)
self.error_category: CheckpointErrorCategory = error_category

@classmethod
def from_exception(cls, exception: Exception) -> CheckpointError:
base = BotoClientError.from_exception(exception)
metadata: AwsErrorMetadata | None = base.response_metadata
error: AwsErrorObj | None = base.error
error_category: CheckpointErrorCategory = CheckpointErrorCategory.INVOCATION

# 4xx errors (except 429) are permanent failures (EXECUTION), unless it's an
# InvalidParameterValueException with "Invalid Checkpoint Token" which is retriable (INVOCATION).
# 5xx, 429, and network errors are retriable (INVOCATION).
status_code: int | None = (metadata and metadata.get("HTTPStatusCode")) or None
if (
status_code
and BAD_REQUEST_ERROR <= status_code < SERVICE_ERROR
and status_code != TOO_MANY_REQUESTS_ERROR
and error
and not (
(error.get("Code") or "") == "InvalidParameterValueException"
and (error.get("Message") or "").startswith("Invalid Checkpoint Token")
)
):
error_category = CheckpointErrorCategory.EXECUTION
return CheckpointError(str(exception), error_category, error, metadata)

def is_retriable(self):
return self.error_category == CheckpointErrorCategory.INVOCATION


class ValidationError(DurableExecutionsError):
Expand All @@ -186,11 +249,13 @@ class GetExecutionStateError(BotoClientError):
def __init__(
self,
message: str,
error_category: DurableApiErrorCategory = DurableApiErrorCategory.INVOCATION,
error: AwsErrorObj | None = None,
response_metadata: AwsErrorMetadata | None = None,
):
super().__init__(
message,
error_category,
error,
response_metadata,
termination_reason=TerminationReason.INVOCATION_ERROR,
Expand Down
57 changes: 46 additions & 11 deletions src/aws_durable_execution_sdk_python/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,26 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
else ReplayStatus.NEW,
)

execution_state.fetch_paginated_operations(
invocation_input.initial_execution_state.operations,
invocation_input.checkpoint_token,
invocation_input.initial_execution_state.next_marker,
)
try:
execution_state.fetch_paginated_operations(
Comment thread
denzyll marked this conversation as resolved.
invocation_input.initial_execution_state.operations,
invocation_input.checkpoint_token,
invocation_input.initial_execution_state.next_marker,
)
except BotoClientError as e:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this error handling logic be in the method fetch_paginated_operations? fetch_paginated_operations is called in two places

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All decisions to fail the execution or retry the invocation are made in the execution file.

# Non-retryable Durable API errors (e.g., customer configuration issues,
# 4xx client errors) will never succeed on retry — fail the execution immediately.
if not e.is_retryable():
logger.exception(
"Non-retryable Durable API error during initial state fetch. Must fail execution "
"without retry.",
extra=e.build_logger_extras(),
)
return DurableExecutionInvocationOutput(
status=InvocationStatus.FAILED,
error=ErrorObject.from_exception(e),
).to_dict()
raise

raw_input_payload: str | None = execution_state.get_input_payload()

Expand Down Expand Up @@ -356,11 +371,20 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
"Checkpoint processing failed",
extra=bg_error.source_exception.build_logger_extras(),
)
# Non-retryable Durable API errors (e.g., customer configuration issues,
# 4xx client errors) will never succeed on retry — fail the execution immediately.
if not bg_error.source_exception.is_retryable():
logger.exception(
"Non-retryable Durable API error from background thread. Must fail execution "
"without retry.",
extra=bg_error.source_exception.build_logger_extras(),
)
return DurableExecutionInvocationOutput(
status=InvocationStatus.FAILED,
error=ErrorObject.from_exception(bg_error.source_exception),
).to_dict()
else:
logger.exception("Checkpoint processing failed")
# handle the original exception
if isinstance(bg_error.source_exception, CheckpointError):
return handle_checkpoint_error(bg_error.source_exception).to_dict()
raise bg_error.source_exception from bg_error

except SuspendExecution:
Expand All @@ -377,12 +401,23 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:
extra=e.build_logger_extras(),
)
return handle_checkpoint_error(e).to_dict()
except InvocationError:
except InvocationError as e:
# Non-retryable Durable API errors (e.g., customer configuration issues,
# 4xx client errors) will never succeed on retry — fail the execution immediately.
if not e.is_retryable():
logger.exception(
"Non-retryable Durable API error. Must fail execution without retry.",
extra=e.build_logger_extras(), # type: ignore[attr-defined]
)
return DurableExecutionInvocationOutput(
status=InvocationStatus.FAILED,
error=ErrorObject.from_exception(e),
).to_dict()
logger.exception("Invocation error. Must terminate.")
# Throw the error to trigger Lambda retry
raise
except ExecutionError as e:
logger.exception("Execution error. Must terminate without retry.")
logger.exception("Execution error. Must fail execution without retry.")
return DurableExecutionInvocationOutput(
status=InvocationStatus.FAILED,
error=ErrorObject.from_exception(e),
Expand Down Expand Up @@ -428,7 +463,7 @@ def wrapper(event: Any, context: LambdaContext) -> MutableMapping[str, Any]:


def handle_checkpoint_error(error: CheckpointError) -> DurableExecutionInvocationOutput:
if error.is_retriable():
if error.is_retryable():
raise error from None # Terminate Lambda immediately and have it be retried
return DurableExecutionInvocationOutput(
status=InvocationStatus.FAILED, error=ErrorObject.from_exception(error)
Expand Down
37 changes: 28 additions & 9 deletions src/aws_durable_execution_sdk_python/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
BackgroundThreadError,
CallableRuntimeError,
DurableExecutionsError,
GetExecutionStateError,
OrphanedChildException,
)
from aws_durable_execution_sdk_python.lambda_service import (
Expand Down Expand Up @@ -275,20 +276,38 @@ def fetch_paginated_operations(
initial_operations: initial operations to be added to ExecutionState
checkpoint_token: checkpoint token used to call Durable Functions API.
next_marker: a marker indicates that there are paginated operations.

Raises:
GetExecutionStateError: If the API call fails. The error is logged
with structured extras before re-raising. Callers are responsible
for deciding whether to fail the execution or allow Lambda retry
based on is_retryable().
"""
all_operations: list[Operation] = (
initial_operations.copy() if initial_operations else []
)
while next_marker:
output: StateOutput = self._service_client.get_execution_state(
durable_execution_arn=self.durable_execution_arn,
checkpoint_token=checkpoint_token,
next_marker=next_marker,
try:
while next_marker:
output: StateOutput = self._service_client.get_execution_state(
durable_execution_arn=self.durable_execution_arn,
checkpoint_token=checkpoint_token,
next_marker=next_marker,
)
all_operations.extend(output.operations)
next_marker = output.next_marker
except GetExecutionStateError as e:
logger.exception(
"Durable API error during state fetch.",
extra=e.build_logger_extras(),
)
all_operations.extend(output.operations)
next_marker = output.next_marker
with self._operations_lock:
self.operations.update({op.operation_id: op for op in all_operations})
raise
finally:
# Always store whatever operations we successfully fetched
if all_operations:
with self._operations_lock:
self.operations.update(
{op.operation_id: op for op in all_operations}
)

def get_input_payload(self) -> str | None:
# It is possible that backend will not provide an execution operation
Expand Down
Loading
Loading