Skip to content

Control Flow: Parallel Execution with .branch()

While the .next() method enforces sequential execution, the .branch() method allows you to execute multiple steps concurrently from the same input, providing a powerful mechanism for parallelism.

A Branch is a special Feeder that takes the output of the preceding step and feeds it simultaneously to a collection of independent steps. It then collects all their individual outputs and returns them as a single, ordered tuple.

Parallel Execution Example

from chorey import step
from dataclasses import dataclass


@dataclass
class Client:
    host: str
    port: int


@dataclass
class Request:
    short_url_id: str
    client: Client
    headers: dict[str, str]


@dataclass
class ClickEvent:
    short_url_id: str
    ip_address: str
    user_agent: str
    referer: str | None


@dataclass
class GeoInfo:
    country: str
    city: str


@dataclass
class ProcessedClickEvent:
    click_event: ClickEvent
    geo_info: GeoInfo
    is_bot: bool


async def clean_data(request: Request) -> ClickEvent:
    return ClickEvent(
        short_url_id=request.short_url_id,
        ip_address=request.client.host,
        user_agent=request.headers.get("User-Agent", ""),
        referer=request.headers.get("Referer"),
    )


# we need to further pass click_event to combine_results, so we use tuple
# but you can also use a nicer way to do this
async def fetch_geo_info(click_event: ClickEvent) -> tuple[GeoInfo, ClickEvent]:
    # Simulate an async call to a geo IP service
    await asyncio.sleep(0.5)
    return GeoInfo(country="HU", city="Szeged"), click_event


async def fetch_bot_score(click_event: ClickEvent) -> int:
    import random

    # Simulate an async call to a bot detection service
    await asyncio.sleep(0.2)

    return random.randint(0, 100)


async def identify_bot(bot_score: int) -> bool:
    return bot_score > 50


async def combine_results(input: tuple[tuple[GeoInfo, ClickEvent], bool]) -> ProcessedClickEvent:
    geo_pack, is_bot = input
    geo_info, click_event = geo_pack

    return ProcessedClickEvent(
        click_event=click_event,
        geo_info=geo_info,
        is_bot=is_bot,
    )

pipeline = (
    step(clean_data)
    .branch(
        step(fetch_geo_info),
        step(fetch_bot_score).next(identify_bot),
    )
    .merge(combine_results)
)

Merging Branch Outputs with .merge()

The .branch() method produces a Branch object, which is responsible for parallel execution. Since a pipeline must continue sequentially, you must use .merge() to consume the tuple output of the Branch and convert it back into a standard, single-output Step.

The function provided to .merge():

  1. Must accept the entire output tuple of the branch as its single input argument.
  2. Performs the final consolidation or aggregation of the parallel results.
  3. Determines the output type for the next sequential step in the pipeline.

In the example above, combine_results is the merge step, taking the complex nested tuple tuple[tuple[GeoInfo, ClickEvent], bool] and transforming it into the single ProcessedClickEvent object, allowing the pipeline to continue or terminate.

Type Safety and the Merge Signature

The Branch guarantees the type and order of its output tuple, which dictates the strict input type for the subsequent .merge() step.

In this example:

  1. Branch 1 Output Type: tuple[GeoInfo, ClickEvent]
  2. Branch 2 Output Type: bool (from identify_bot)

Therefore, the entire Branch output is strictly tuple[tuple[GeoInfo, ClickEvent], bool], and the type checker ensures that the combine_results function accepts exactly this signature. This ensures that any change in the branch structure is immediately flagged by the type checker.

When To Use .branch()

Use .branch() when:

  1. Inputs are Identical: All parallel tasks require the exact same input data (the output of the preceding step).
  2. Tasks are Independent: The execution of one task does not depend on the result of any other task within the branch.
  3. Performance is Critical: You have multiple I/O-bound tasks that can benefit from running concurrently.