Skip to content

ChoreyContext: A Datastructure for Managing Pipeline State

In a complex pipeline, you may need to share state or context between various steps (e.g. configuration, database session, etc.). Chorey provides a built-in way to manage this shared state using the ChoreyContext class.

Structure of ChoreyContext

ChoreyContext[T, Context] is a basic generic dataclass:

@dataclass
class ChoreyContext(Generic[T, Context]):
    data: T
    context: Context

    def with_data(self, new_data: R) -> "ChoreyContext[R, Context]":
        return ChoreyContext(data=new_data, context=self.context)

    @property
    def parts(self) -> tuple[T, Context]:
        return (self.data, self.context)

Due to how simple it is, one might think that it is not worth using or they can just implement something similar on their own. However, ChoreyContext is tightly integrated with Chorey's type system and pipeline mechanics, making it a powerful tool for managing shared state in a type-safe manner. And when visualizing a pipeline that uses ChoreyContext, the context type is not shown in the diagram, keeping it clean and focused on the main data flow.

Using ChoreyContext

Let's look at the previous example from routing:

from dataclasses import dataclass
from typing import TypeVar

from chorey import ChoreyContext, step


@dataclass(frozen=True)
class Context:
    force_full_analysis: bool
    db_session: str  # Placeholder for an actual DB session object


@dataclass
class Document:
    id: int
    content: str


@dataclass
class Summary:
    document_id: int
    summary_text: str


@dataclass
class FullAnalysis:
    document_id: int
    word_count: int
    sentiment: str


# helper types to make the signatures cleaner
T = TypeVar("T")
Input = ChoreyContext[T, Context]
Output = Input


async def fetch_document(input: Input[int]) -> Output[Document]:
    # unpack the ChoreyContext to work with its parts
    doc_id, ctx = input.parts

    # simulate fetching a document from a database or API

    # use .with_data() to keep the context while changing the data
    return input.with_data(Document(id=doc_id, content="This is a sample document content."))


async def summarize_document(input: Input[Document]) -> Output[Summary]:
    # unpack the ChoreyContext to work with its parts
    doc, ctx = input.parts

    # simulate summarizing the document

    return input.with_data(Summary(document_id=doc.id, summary_text="This is a summary."))


async def analyze_document(input: Input[Document]) -> Output[FullAnalysis]:
    doc, ctx = input.parts

    # simulate performing a full analysis of the document

    return input.with_data(FullAnalysis(document_id=doc.id, word_count=len(doc.content.split()), sentiment="Positive"))


async def store_processing_result(input: Input[Summary] | Input[FullAnalysis]) -> None:
    result, ctx = input.parts

    # simulate storing the result
    print(f"Storing result: {result}")
    print(f"Type of result: {type(result)}")


def selector(input: Input[Document]) -> int:
    doc, ctx = input.parts

    if ctx.force_full_analysis:
        return 1  # force full analysis if the context says so

    # same as before
    return 0 if len(doc.content) < 50 else 1


pipeline = (
    step(fetch_document)
    .route(step(summarize_document), step(analyze_document), selector=selector, decision_label="Document Length Check")
    .next(store_processing_result)
)

In this example, we define a Context dataclass that holds shared state for the pipeline. Each step now accepts and returns a ChoreyContext, which wraps both the step's main data and the shared context.

When implementing each step, you can unpack the ChoreyContext to access both the main data and the context. After processing, use the .with_data() method to create a new ChoreyContext with updated data while preserving the context.

This approach ensures that all steps in the pipeline have access to the shared context, allowing for more flexible and powerful data processing workflows.

CLI Limitation

Since the context component often holds un-serializable runtime objects (like database connections), the generic command-line interface cannot safely construct it.

For complex, production-ready pipelines, you must create a dedicated Python entry point that manages the context lifecycle and explicitly injects it via Python code.