Skip to content

Resilience: Retries and Failure Callbacks

Chorey allows you to configure individual steps to be resilient to transient errors by automatically retrying failed operations and executing custom side-effect functions on final failure. This resilience logic is defined on a per-step basis.

Failure Configuration

The core of Chorey's resilience is configured using two fluent methods on any Step:

Automatic Retries with .retry()

The .retry() method configures the step to automatically attempt re-execution upon an exception. This is ideal for handling transient errors like network timeouts or temporary database unavailability.

Parameter Type Default Description
max_attempts int 3 The maximum number of times the step will be executed (including the initial attempt). Must be ≥ 1.
delay_seconds float 1.0 The time (in seconds) to wait between retry attempts. Must be ≥ 0.0.

Example: Retrying a Network Call

async def fetch_api_data(url: str) -> ...:
    # this step might raise a network exception
    # ...
    pass

pipeline = (
    step(fetch_api_data)
    # retry up to 5 times, waiting 2 seconds between each attempt
    .retry(max_attempts=5, delay_seconds=2.0) 
    .next(...)
)

Execution Flow

The Step.__call__ method runs in a loop. If an exception is caught, it checks if max_attempts has been reached. If not, it waits for delay_seconds and repeats the function call.

2. Side Effects on Failure with .on_failure_do()

The .on_failure_do() method allows you to register a function to be executed only when the step fails after all retry attempts have been exhausted. This is designed for side effects like custom logging, sending alerts, or pushing the failed input to a dead-letter queue (DLQ).

The provided callback function, defined by the OnFailureFunc type, must accept the following two arguments:

  1. input: The data input that caused the final failure.
  2. exception: The specific Exception that terminated the step.

The callback function can be either synchronous or asynchronous but must not return a value (its return type is None | Awaitable[None]).

Example: Logging and Alerting on Final Failure

async def log_failure_alert(input: int, e: Exception) -> None:
    print(f"FATAL: Step failed after all retries for input: {input}")
    print(f"Exception: {type(e).__name__}: {e}")
    # logic to send an alert or write to a database

async def process_data(data: int) -> str:
    # ... logic that is expected to fail sometimes
    pass

pipeline = (
    step(process_data)
    # 1. configure for retries
    .retry(max_attempts=3) 
    # 2. add the callback for final failure (type checked to the step input type)
    .on_failure_do(log_failure_alert) 
    .next(...)
)

Error Propagation

If a step fails—either on the first attempt without .retry() configured, or after the final configured attempt—the original exception is wrapped in a PipelineError and raised up the execution chain.

The PipelineError provides context about which step failed and what input caused the failure, making debugging easier.

Note: The .on_failure_do() callback is guaranteed to execute before the final PipelineError is raised.