Skip to content

Plug in custom components

Every pipeline stage is a Protocol. Implement the right method signature on any object and pass it in — no inheritance required. This page shows the two most common cases: a custom event loader and a custom trade source. For the full extension surface (formats, writers, plot backends, live capturers, registries), see Extending ob-analytics.

Custom event loader

For non-Bitstamp, non-LOBSTER data, implement the EventLoader protocol. Your load method must return a DataFrame with the columns subsequent stages consume:

Column Type Notes
id int / str Exchange-assigned order identifier
event_id int Sequential, 1-based, unique per event
original_number int Original input row number (stable event order)
timestamp datetime64 Local receive time
exchange_timestamp datetime64 Server time stamp
price float Order price
volume float Remaining size
action category created / changed / deleted
direction category bid / ask
fill float Volume executed by this event (0 for non-fills)

The columns above are the contract the pipeline reads; see Data Contracts for the canonical column lists and the validate_* helpers.

Generic CSV loader

from pathlib import Path
import pandas as pd
from ob_analytics import Pipeline, PipelineConfig
from ob_analytics.bitstamp import BitstampTradeReader

class GenericCsvLoader:
    """Load events from a CSV with different column names."""

    COLUMN_MAP = {
        "order_id": "id",
        "event_time": "timestamp",
        "server_time": "exchange_timestamp",
        "side": "direction",
        "type": "action",
    }
    ACTION_MAP = {"new": "created", "partial_fill": "changed",
                  "fill": "changed", "cancel": "deleted"}
    DIRECTION_MAP = {"buy": "bid", "sell": "ask"}

    def __init__(self, config: PipelineConfig | None = None):
        self.config = config or PipelineConfig()

    def load(self, source: str | Path) -> pd.DataFrame:
        df = pd.read_csv(source)
        df = df.rename(columns=self.COLUMN_MAP)
        df["action"] = df["action"].map(self.ACTION_MAP)
        df["direction"] = df["direction"].map(self.DIRECTION_MAP)
        df["timestamp"] = pd.to_datetime(df["timestamp"])
        df["exchange_timestamp"] = pd.to_datetime(df["exchange_timestamp"])
        # Compute fill deltas and event IDs (see BitstampLoader for reference)
        ...
        return df

result = Pipeline(
    loader=GenericCsvLoader(),
    trade_source=BitstampTradeReader(),
).run("my_exchange_data/orders.csv")  # requires sibling trades.csv

Cryptofeed L3 adapter (conceptual)

from pathlib import Path
import pandas as pd
from ob_analytics import PipelineConfig

class CryptofeedLoader:
    """Load events from a cryptofeed L3 order book log.

    Size of 0 means deletion. This adapter tracks state to infer
    actions and compute fills.
    """

    def __init__(self, config: PipelineConfig | None = None):
        self.config = config or PipelineConfig()

    def load(self, source: str | Path) -> pd.DataFrame:
        raw = pd.read_parquet(source)
        known_orders: dict[str, float] = {}
        rows = []

        for _, row in raw.iterrows():
            oid = row["order_id"]
            size = row["size"]
            prev_size = known_orders.get(oid)

            if prev_size is None:
                action, fill = "created", 0.0
            elif size == 0:
                action, fill = "deleted", prev_size
            else:
                action, fill = "changed", prev_size - size

            if size > 0:
                known_orders[oid] = size
            elif oid in known_orders:
                del known_orders[oid]

            rows.append({
                "id": oid,
                "timestamp": row["receipt_timestamp"],
                "exchange_timestamp": row["exchange_timestamp"],
                "price": row["price"],
                "volume": size,
                "action": action,
                "direction": "bid" if row["side"] == "BID" else "ask",
                "fill": fill,
            })

        events = pd.DataFrame(rows)
        events["event_id"] = range(1, len(events) + 1)
        events["original_number"] = events["event_id"]
        return events

Note

A production adapter would handle out-of-order messages, reconnection gaps, and exchange-specific quirks. The key point: any object with a load method returning the right DataFrame works — no subclassing required.

Custom trade source

Implement TradeSource (a load(events, source) -> DataFrame method) when trades come from an API, a database, or a non-CSV layout:

import pandas as pd
from ob_analytics import Pipeline, PipelineConfig, sample_csv_path

class ApiTradeSource:
    def load(self, events: pd.DataFrame, source: object) -> pd.DataFrame:
        # Build the canonical trades DataFrame (timestamp, price, volume,
        # direction, maker_event_id, taker_event_id, maker, taker, …)
        raise NotImplementedError

result = Pipeline(
    config=PipelineConfig(),
    trade_source=ApiTradeSource(),
).run(sample_csv_path())

Bundle defaults in a Format subclass — see Protocols.