Hooks¶
PipelineRunner and BatchRunner expose hook points that let you observe and react
to pipeline events without modifying the pipeline itself.
PipelineRunner hooks¶
on_run_start¶
Called once before any steps execute.
runner = PipelineRunner("pipeline.yaml")
@runner.on_run_start
def log_start(input_data: dict) -> None:
print(f"Starting run with {len(input_data)} input keys")
result = runner.run({"document": "..."})
Signature: fn(input_data: dict) -> None
on_run_end¶
Called once after the run completes, whether it succeeded or failed.
@runner.on_run_end
def log_end(rctx: RunContext) -> None:
summary = rctx.summary()
print(f"Run complete: {summary.steps_run} steps, {summary.elapsed_seconds:.2f}s")
if rctx.failed:
alert(f"Pipeline failed: {rctx.failure_state}")
Signature: fn(rctx: RunContext) -> None
on_step_end¶
Called after each step completes (including failed steps).
@runner.on_step_end
def track_step(step_name: str, result, rctx: RunContext) -> None:
metrics.record(step_name, rctx.steps[step_name].status)
Signature: fn(step_name: str, result: Any, rctx: RunContext) -> None
on_llm_call¶
Called after each LLM API call (including retries).
@runner.on_llm_call
def log_llm(step_name: str, model: str, response) -> None:
print(f"LLM call: step={step_name} model={model}")
Signature: fn(step_name: str, model: str, response: Any) -> None
BatchRunner hooks¶
on_batch_item_end¶
Called after each item completes, in the thread that processed it.
batch = BatchRunner("pipeline.yaml", max_workers=4)
@batch.on_batch_item_end
def save(item_id, rctx: RunContext) -> None:
if not rctx.failed:
db.save(item_id, rctx.steps["extract"].value)
Signature: fn(item_id: Any, rctx: RunContext) -> None
Thread safety
on_batch_item_end may be called concurrently from multiple worker threads.
Ensure your callback is thread-safe (e.g. use a thread-safe queue or lock
around shared state).
Behaviour¶
- Hooks registered with the decorator form (
@runner.on_run_end) or the method form (runner.on_run_end(fn)) are equivalent; both return the original function. - Multiple hooks of the same type are called in registration order.
- Hook errors are caught, logged as warnings, and do not abort the run or batch.
- Hooks run synchronously in the calling thread; long-running hooks block step execution.
on_llm_call and parallel steps
on_llm_call fires for LLM calls in top-level steps only. LLM calls made
inside a type: parallel branch are not currently surfaced to this hook.
Use on_step_end to observe parallel step results.
Combining hooks¶
runner = PipelineRunner("pipeline.yaml")
started: dict[str, float] = {}
@runner.on_run_start
def record_start(data):
started["t"] = time.monotonic()
@runner.on_run_end
def record_end(rctx):
elapsed = time.monotonic() - started.get("t", 0)
metrics.histogram("pipeline.duration", elapsed)
@runner.on_step_end
def per_step(name, value, rctx):
metrics.increment(f"step.{name}.{rctx.steps[name].status}")