Skip to content

Data Flow in Chorey: Single Input, Single Output

The core philosophy behind Chorey's data flow is simplicity and predictability, achieved through the single-input/single-output design paradigm for every individual Step.

The Step Contract

Every function wrapped in a Step must adhere to a strict contract:

StepFunc = Callable[[InputType], Awaitable[OutputType]]

# example
async def example_step(data: int) -> str:
    return str(data)

This means data mutation is explicit: a step receives exactly one type of data and produces exactly one type of data.

Why Single Input/Output?

I chose this for very simple reasons: 1. Clarity: Each step's purpose is clear and unambiguous. You know exactly what to expect in and out. 2. Strict Type Safety: This design allows type checkers to enforce that Step A's output type matches Step B's input type, catching mismatches at development time. 3. Visualization: It simplifies the generation of visual representations of the pipeline, making it easier to understand complex flows.

What is Considered a Pipeline?

A pipeline is the last Feeder object (Step, Branch and others) of a chain of Feeders. It can be executed from the first step to the last step by calling .feed() on it.

Building a Pipeline: Fluent Chaining and Type Introspection

I wanted to keep things simple, so I designed the pipeline to be built using a fluent interface with methods like .next(), .branch(), and .route().

Here's how you can build a simple pipeline:

from chorey import step

async def a(text: str) -> int:
    return len(text)

async def b(length: int) -> float:
    return length / 2.0

async def c(half_length: float) -> str:
    return f"Half length is {half_length}"

pipeline = step(a).next(b).next(c)

It is that simple. By looking at exactly one line, you can understand the entire flow of data through the pipeline. In this case, step(c) is the pipeline (as it is the last Feeder of the chain), and calling pipeline.feed("Hello, World!") will execute steps a, b, and c in order.

Understanding the Pipeline Type Signature

When you define a pipeline, the resulting Step object carries three generic type arguments that provide end-to-end information about the entire chain.

The type of pipeline is inspected as:

(variable) pipeline: Step[str, float, str]

In order, these generics tell you:

  1. The First Input Type (e.g., str): This is the required input type for the very first step in the entire chain (step(a)). This is the type you must provide to the .feed() method to start the pipeline
  2. The Last Input Type (e.g., float): This represents the input type of the current step (c). It is primarily used internally by the framework to ensure the next .next() call has the correct input type. You can generally ignore it.
  3. The Final Output Type (e.g., str): This is the output type of the current step, and therefore the final return type when you call the entire pipeline (pipeline.feed(...)).

This means you can always know what to give the pipeline and what to expect back, without needing to read through all the intermediate steps. In this example, the type checker will tell you to give a str to pipeline.feed() and expect a str back.

What Happens When You Call .feed()?

When you call .feed(), the pipeline executes each step in sequence, passing the output of one step as the input to the next. The final output is returned to you.