Skip to content

Pipeline

The main orchestrator. Runs the full sequence: load → build trades → classify → depth. Use Pipeline(format=...) for LOBSTER or other registered formats, or pass individual components (loader=, trade_source=) to override specific stages. Flow-toxicity metrics are computed after the run by calling compute_vpin / compute_kyle_lambda / order_flow_imbalance on result.trades.

Pipeline

Pipeline(
    config: PipelineConfig | None = None,
    *,
    format: Format | None = None,
    loader: EventLoader | None = None,
    trade_source: TradeSource | None = None,
    ctx: RunContext | None = None,
)

Configurable, composable order book analytics pipeline.

Each processing stage is handled by a pluggable component that satisfies the corresponding protocol. Pass your own implementations to override any stage.

Parameters:

Name Type Description Default
config PipelineConfig

Central configuration. Passed to default components when they are not explicitly provided.

None
format Format

A format descriptor that provides default loader, trade source, writer, and config overrides. Explicit component arguments take precedence over format defaults.

None
loader EventLoader

Loads raw events from a data source. Defaults to :class:BitstampLoader.

None
trade_source TradeSource

Builds the trades DataFrame. Defaults to :class:BitstampTradeReader.

None

writer property

writer: DataWriter | None

The format-provided writer, if any.

from_format classmethod

from_format(
    name: str,
    *,
    ctx: RunContext | None = None,
    **kwargs: Any,
) -> Pipeline

Create a pipeline from a registered format name.

Parameters:

Name Type Description Default
name str

Registered format name (case-insensitive), e.g. "bitstamp" or "lobster".

required
ctx RunContext

Per-run parameters (e.g. trading_date) forwarded to Format.create_* factories.

None
**kwargs Any

Passed to the :class:Format constructor.

{}

run

run(
    source: Any, *, ctx: RunContext | None = None
) -> PipelineResult

Execute the full pipeline on source and return results.

Parameters:

Name Type Description Default
source Any

Data source for the loader (typically a file path).

required
ctx RunContext

Override the pipeline's default :class:RunContext for this single call. When None, the ctx provided at construction (or the default empty context) is used.

None

Returns:

Type Description
PipelineResult

Frozen dataclass with events, trades, depth, depth_summary, and config.

Steps
  1. Load events (EventLoader.load)
  2. Build trades (TradeSource.load)
  3. Classify order types
  4. Compute price-level depth
  5. Compute depth metrics
  6. Compute order aggressiveness

PipelineResult dataclass

PipelineResult(
    events: DataFrame,
    trades: DataFrame,
    depth: DataFrame,
    depth_summary: DataFrame,
    config: PipelineConfig,
)

Immutable container for the core outputs of a pipeline run.

Analytic outputs (VPIN, OFI, Kyle's λ) are intentionally not stored here — compute them post-pipeline from trades and append them to the gallery model's analytics (build panels with the *_panel helpers).

Attributes:

Name Type Description
events, trades, depth, depth_summary DataFrame

Core pipeline tables.

config PipelineConfig

The configuration used for the run.

plot

plot(
    concept: str,
    level: Any = None,
    *,
    backend: str = "matplotlib",
    volume_scale: float | None = None,
    **overrides: Any,
) -> Any

Render one plot concept from this result in a single call.

Thin convenience wrapper over :func:ob_analytics.visualization.plot_result, e.g. result.plot("depth_heatmap", col_bias=0.1). See :func:~ob_analytics.visualization.available_concepts for what a given result can plot (it varies by format).

Format registry

register_format

register_format(
    name: str, format_cls: type[Format]
) -> None

Register a :class:Format implementation under name for lookup via :meth:Pipeline.from_format.

list_formats

list_formats() -> list[str]

Return a sorted list of registered format names.