ChoreyContext: A Datastructure for Managing Pipeline State
In a complex pipeline, you may need to share state or context between various steps (e.g. configuration, database session, etc.). Chorey provides a built-in way to manage this shared state using the ChoreyContext
class.
Structure of ChoreyContext
ChoreyContext[T, Context]
is a basic generic dataclass:
@dataclass
class ChoreyContext(Generic[T, Context]):
data: T
context: Context
def with_data(self, new_data: R) -> "ChoreyContext[R, Context]":
return ChoreyContext(data=new_data, context=self.context)
@property
def parts(self) -> tuple[T, Context]:
return (self.data, self.context)
Due to how simple it is, one might think that it is not worth using or they can just implement something similar on their own. However, ChoreyContext
is tightly integrated with Chorey's type system and pipeline mechanics, making it a powerful tool for managing shared state in a type-safe manner. And when visualizing a pipeline that uses ChoreyContext
, the context type is not shown in the diagram, keeping it clean and focused on the main data flow.
Using ChoreyContext
Let's look at the previous example from routing:
from dataclasses import dataclass
from typing import TypeVar
from chorey import ChoreyContext, step
@dataclass(frozen=True)
class Context:
force_full_analysis: bool
db_session: str # Placeholder for an actual DB session object
@dataclass
class Document:
id: int
content: str
@dataclass
class Summary:
document_id: int
summary_text: str
@dataclass
class FullAnalysis:
document_id: int
word_count: int
sentiment: str
# helper types to make the signatures cleaner
T = TypeVar("T")
Input = ChoreyContext[T, Context]
Output = Input
async def fetch_document(input: Input[int]) -> Output[Document]:
# unpack the ChoreyContext to work with its parts
doc_id, ctx = input.parts
# simulate fetching a document from a database or API
# use .with_data() to keep the context while changing the data
return input.with_data(Document(id=doc_id, content="This is a sample document content."))
async def summarize_document(input: Input[Document]) -> Output[Summary]:
# unpack the ChoreyContext to work with its parts
doc, ctx = input.parts
# simulate summarizing the document
return input.with_data(Summary(document_id=doc.id, summary_text="This is a summary."))
async def analyze_document(input: Input[Document]) -> Output[FullAnalysis]:
doc, ctx = input.parts
# simulate performing a full analysis of the document
return input.with_data(FullAnalysis(document_id=doc.id, word_count=len(doc.content.split()), sentiment="Positive"))
async def store_processing_result(input: Input[Summary] | Input[FullAnalysis]) -> None:
result, ctx = input.parts
# simulate storing the result
print(f"Storing result: {result}")
print(f"Type of result: {type(result)}")
def selector(input: Input[Document]) -> int:
doc, ctx = input.parts
if ctx.force_full_analysis:
return 1 # force full analysis if the context says so
# same as before
return 0 if len(doc.content) < 50 else 1
pipeline = (
step(fetch_document)
.route(step(summarize_document), step(analyze_document), selector=selector, decision_label="Document Length Check")
.next(store_processing_result)
)
In this example, we define a Context
dataclass that holds shared state for the pipeline. Each step now accepts and returns a ChoreyContext
, which wraps both the step's main data and the shared context.
When implementing each step, you can unpack the ChoreyContext
to access both the main data and the context. After processing, use the .with_data()
method to create a new ChoreyContext
with updated data while preserving the context.
This approach ensures that all steps in the pipeline have access to the shared context, allowing for more flexible and powerful data processing workflows.
CLI Limitation
Since the context
component often holds un-serializable runtime objects (like database connections), the generic command-line interface cannot safely construct it.
For complex, production-ready pipelines, you must create a dedicated Python entry point that manages the context lifecycle and explicitly injects it via Python code.