Resilience: Retries and Failure Callbacks
Chorey allows you to configure individual steps to be resilient to transient errors by automatically retrying failed operations and executing custom side-effect functions on final failure. This resilience logic is defined on a per-step basis.
Failure Configuration
The core of Chorey's resilience is configured using two fluent methods on any Step
:
Automatic Retries with .retry()
The .retry() method configures the step to automatically attempt re-execution upon an exception. This is ideal for handling transient errors like network timeouts or temporary database unavailability.
Parameter | Type | Default | Description |
---|---|---|---|
max_attempts |
int |
3 |
The maximum number of times the step will be executed (including the initial attempt). Must be ≥ 1. |
delay_seconds |
float |
1.0 |
The time (in seconds) to wait between retry attempts. Must be ≥ 0.0. |
Example: Retrying a Network Call
async def fetch_api_data(url: str) -> ...:
# this step might raise a network exception
# ...
pass
pipeline = (
step(fetch_api_data)
# retry up to 5 times, waiting 2 seconds between each attempt
.retry(max_attempts=5, delay_seconds=2.0)
.next(...)
)
Execution Flow
The Step.__call__
method runs in a loop. If an exception is caught, it checks if max_attempts
has been reached. If not, it waits for delay_seconds
and repeats the function call.
2. Side Effects on Failure with .on_failure_do()
The .on_failure_do()
method allows you to register a function to be executed only when the step fails after all retry attempts have been exhausted. This is designed for side effects like custom logging, sending alerts, or pushing the failed input to a dead-letter queue (DLQ).
The provided callback function, defined by the OnFailureFunc type, must accept the following two arguments:
input
: The data input that caused the final failure.exception
: The specificException
that terminated the step.
The callback function can be either synchronous or asynchronous but must not return a value (its return type is None | Awaitable[None]
).
Example: Logging and Alerting on Final Failure
async def log_failure_alert(input: int, e: Exception) -> None:
print(f"FATAL: Step failed after all retries for input: {input}")
print(f"Exception: {type(e).__name__}: {e}")
# logic to send an alert or write to a database
async def process_data(data: int) -> str:
# ... logic that is expected to fail sometimes
pass
pipeline = (
step(process_data)
# 1. configure for retries
.retry(max_attempts=3)
# 2. add the callback for final failure (type checked to the step input type)
.on_failure_do(log_failure_alert)
.next(...)
)
Error Propagation
If a step fails—either on the first attempt without .retry()
configured, or after the final configured attempt—the original exception is wrapped in a PipelineError
and raised up the execution chain.
The PipelineError
provides context about which step failed and what input caused the failure, making debugging easier.
Note: The .on_failure_do()
callback is guaranteed to execute before the final PipelineError
is raised.