Skip to content

ClickHouse Signal Integration - Implementation Specification

Source: Notion | Last edited: 2025-12-30 | ID: 2d62d2dc-3ef...


Date: 2025-12-27 Status: Draft Related: concepts.md, bug-analysis.md


This specification defines the implementation plan for integrating ClickHouse as a data source for Alpha Forge signal generation. Key clarifications:

  • ClickHouse is READ-ONLY - Alpha Forge reads historical data from ClickHouse, does NOT write to it
  • WebSocket Tunnel is external - Tunnel service (CCXT-based) is separate infrastructure, Alpha Forge connects as consumer
  • Warmup from ClickHouse - Historical data for warmup fetched from ClickHouse warehouse
  • Stream from Tunnel - Real-time data consumed from WebSocket tunnel Scope:
  1. ClickHouse Provider for warmup (historical gap-fill)
  2. Tunnel Consumer for streaming (real-time data)
  3. Gapless warmup-to-streaming transition
  4. Provider abstraction layer Key Insight: Tunnel buffer covers the gap between ClickHouse latest_ts and streaming start. No ClickHouse gap query needed - this avoids race condition where ClickHouse might not have ingested recent data yet.

Current Architecture:
┌──────────────────────────────────────────────────────────────────┐
│ Signal Generation │
│ │
│ Binance WS ──┬── WarmupOrchestrator ──── WarmupDataService │
│ │ │ │ │
│ │ │ BinanceGaplessProvider │
│ │ │ (ONLY Binance Spot) │
│ │ ▼ │
│ └── RealTimeRunner ──── FlowExecutor │
│ │
│ Problems: │
│ ❌ No ClickHouse provider (for historical warmup) │
│ ❌ No Tunnel consumer (for streaming) │
│ ❌ Hardcoded to Binance direct connection │
│ ❌ Gap between warmup and streaming │
└──────────────────────────────────────────────────────────────────┘

Q1: Should we detect first WebSocket record to determine lookback window?

Answer: No. The lookback window is determined by:

  1. Feature warmup_formula analysis during compilation → max_warmup_bars
  2. Query ClickHouse for latest available timestamp
  3. Fetch historical from (latest_ts - lookback_bars) to latest_ts

Q2: Where does historical data come from?

Answer: ClickHouse warehouse. External infrastructure manages data ingestion. Alpha Forge only reads.

Q3: Where does streaming data come from?

Answer: External WebSocket Tunnel. CCXT-based tunnel service provides fan-out. Alpha Forge connects as consumer.


┌──────────────────────────────────────────────────────────────────────────┐
│ Target Architecture │
│ │
│ EXTERNAL INFRASTRUCTURE (Not Alpha Forge) │
│ ═════════════════════════════════════════ │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Exchanges ──▶ CCXT WS Tunnel ──▶ Redis Pub/Sub │ │
│ │ :8080 :6379 │ │
│ │ │ │ │ │
│ │ └────────┬─────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌────────────────┐ │ │
│ │ │ ClickHouse │ (market_data table) │ │
│ │ │ :8123/:9000 │ │ │
│ │ └────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ ALPHA FORGE (This Spec) │
│ ═══════════════════════ │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────────┐ │ │
│ │ │ WARMUP LAYER │ │ │
│ │ │ │ │ │
│ │ │ WarmupDataService ── ClickHouseProvider (Phase 2) │ │ │
│ │ │ │ │ │ │ │
│ │ │ │ └── Query ClickHouse for │ │ │
│ │ │ │ historical OHLCV │ │ │
│ │ │ │ │ │ │
│ │ │ └── Find latest_ts → Fetch lookback bars │ │ │
│ │ └─────────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────────┐ │ │
│ │ │ STREAMING LAYER │ │ │
│ │ │ │ │ │
│ │ │ TunnelEventSource (Phase 3) │ │ │
│ │ │ │ │ │ │
│ │ │ └── Connect to WS Tunnel :8080 │ │ │
│ │ │ Subscribe to market:{exchange}:{symbol}:{tf} │ │ │
│ │ │ Receive real-time OHLCV │ │ │
│ │ └─────────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────────┐ │ │
│ │ │ SIGNAL LAYER │ │ │
│ │ │ │ │ │
│ │ │ RealTimeRunner ── FlowExecutor ── SignalEmitter │ │ │
│ │ │ │ │ │ │
│ │ │ CSVAppendSink │ │ │
│ │ │ DynamoDBSink │ │ │
│ │ └─────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────────────────────┐
│ NEW WARMUP FLOW │
│ │
│ Step 1: Connect to Tunnel FIRST (start buffering immediately) │
│ ═════════════════════════════════════════════════════════════ │
│ │
│ Connect to Tunnel, start buffering ALL events │
│ Buffer collects ALL bars that close after this moment │
│ │
│ Step 2: Query ClickHouse for latest available timestamp │
│ ═══════════════════════════════════════════════════════ │
│ │
│ SELECT MAX(ts) as latest_ts │
│ FROM market_data │
│ WHERE symbol IN ('BTCUSDT', 'ETHUSDT') │
│ AND interval = '1m' │
│ AND exchange = 'binance' │
│ │
│ Result: latest_ts = 2025-12-27 10:45:00 UTC │
│ │
│ Step 3: Calculate warmup range and fetch historical │
│ ═══════════════════════════════════════════════════ │
│ │
│ max_warmup_bars = 720 (from DAG config, based on feature analysis) │
│ interval = 1m │
│ start_ts = latest_ts - (720 × 1m) = 2025-12-27 22:45:00 UTC (prev day) │
│ end_ts = latest_ts = 2025-12-27 10:45:00 UTC │
│ │
│ SELECT ts, symbol, open, high, low, close, volume │
│ FROM market_data │
│ WHERE symbol IN ('BTCUSDT', 'ETHUSDT') │
│ AND interval = '1m' │
│ AND ts BETWEEN start_ts AND end_ts │
│ ORDER BY symbol, ts │
│ │
│ Result: 720 bars × 2 symbols = 1440 rows │
│ │
│ Step 4: NO GAP QUERY - Buffer covers gap! │
│ ═════════════════════════════════════════ │
│ │
│ Since Tunnel connected BEFORE ClickHouse query: │
│ - Buffer has ALL bars from ~10:45:00 onwards │
│ - ClickHouse returned data up to 10:45:00 │
│ - Buffer overlaps and covers any gap automatically │
│ - NO need to query ClickHouse again for gap │
│ │
│ (This avoids race condition where ClickHouse might not have │
│ ingested recent data yet due to ingestion delay) │
│ │
│ Step 5: Merge and transition to streaming │
│ ═════════════════════════════════════════ │
│ │
│ historical_df + buffer_df → merged_df │
│ Deduplicate by (symbol, ts) - handles overlap │
│ Seed to FrameLogs │
│ Switch tunnel from buffering to streaming mode │
│ │
└──────────────────────────────────────────────────────────────────────────┘

4.2 Problem: Gap Between Warmup and Streaming

Section titled “4.2 Problem: Gap Between Warmup and Streaming”

Current flow creates a gap where bars can be lost:

Timeline (Current - BROKEN):
┌──────────────────────────────────────────────────────────────────────────┐
│ 12:00:00 WS connects, first event buffered │
│ 12:00:01 Historical fetch starts (S3 + HTTP) │
│ 12:00:05 Historical fetch complete, buffer merged │
│ 12:00:05 *** WS CLOSED *** │
│ 12:00:06 Transform processing... │
│ 12:00:15 Feature processing... │
│ 12:00:25 *** 14:00 BAR CLOSES HERE - LOST! *** │
│ 12:00:30 FrameLog seeding... │
│ 12:00:31 *** WS RECONNECTS *** │
└──────────────────────────────────────────────────────────────────────────┘

4.3 Solution: Connect Tunnel FIRST, Buffer Covers Gap

Section titled “4.3 Solution: Connect Tunnel FIRST, Buffer Covers Gap”
Timeline (Proposed - GAPLESS):
┌──────────────────────────────────────────────────────────────────────────┐
│ 12:00:00 Connect to Tunnel FIRST, start buffering │
│ 12:00:01 Query ClickHouse for latest_ts (returns 11:58:00) │
│ 12:00:02 Fetch historical from ClickHouse (up to 11:58:00) │
│ 12:00:05 Historical fetch complete │
│ 12:00:05 *** TUNNEL CONTINUES BUFFERING *** (no disconnect) │
│ 12:00:06 Transform processing (Tunnel still buffering) │
│ 12:00:15 Feature processing (Tunnel still buffering) │
│ 12:00:25 14:00 bar closes → buffered! │
│ 12:00:30 FrameLog seeding │
│ 12:00:31 Merge: historical + buffer (NO gap query needed!) │
│ Buffer contains: 11:59, 12:00, ..., 12:30 (all bars since │
│ Tunnel connected at 12:00:00, which is BEFORE latest_ts) │
│ 12:00:32 Deduplicate by (symbol, ts) - handles any overlap │
│ 12:00:33 Switch Tunnel from buffering to streaming mode │
│ 12:00:34 Streaming begins (no bars lost) │
└──────────────────────────────────────────────────────────────────────────┘
Key Insight: Tunnel connected at 12:00:00, ClickHouse latest_ts = 11:58:00
Buffer automatically covers gap from 11:58 → now
No race condition with ClickHouse ingestion delay!

4.4 Implementation: WarmupOrchestrator Changes

Section titled “4.4 Implementation: WarmupOrchestrator Changes”
orchestrator/services/warmup_orchestrator.py
@dataclass
class WarmupResult:
frames: dict[str, pd.DataFrame]
latest_ts: pd.Timestamp | None
symbols_per_frame: dict[str, list[str]]
# NEW: Active tunnel connection for continued buffering
tunnel_source: "TunnelEventSource" | None = None
tunnel_buffer: list[Any] = field(default_factory=list)
async def fetch_warmup_data(self, ...) -> WarmupResult:
"""Gapless warmup with ClickHouse historical + Tunnel buffering.
KEY INSIGHT: Connect Tunnel FIRST, then query ClickHouse.
Buffer automatically covers any gap - no need for gap query.
This avoids race condition where ClickHouse might not have
ingested recent data yet.
"""
# Step 1: Connect to Tunnel FIRST (start buffering BEFORE ClickHouse query)
tunnel_source = TunnelEventSource(...)
await tunnel_source.connect()
buffer_task = asyncio.create_task(self._buffer_events(tunnel_source))
tunnel_connect_ts = pd.Timestamp.now(tz="UTC")
# Step 2: Query ClickHouse for latest available timestamp
# This will be BEFORE tunnel_connect_ts (ClickHouse ingestion has delay)
latest_ts = await self._clickhouse_provider.get_latest_ts(
symbols=symbols,
interval=interval,
)
# Step 3: Fetch historical from ClickHouse (up to latest_ts)
historical_df = await self._clickhouse_provider.fetch(
symbols=symbols,
interval=interval,
start_ts=latest_ts - (max_warmup_bars * bar_duration),
end_ts=latest_ts,
)
# Step 4: NO GAP QUERY NEEDED!
# Buffer started at tunnel_connect_ts (Step 1)
# ClickHouse latest_ts < tunnel_connect_ts (due to ingestion delay)
# So buffer ALREADY covers the gap: latest_ts → tunnel_connect_ts → now
# DON'T close tunnel - return for continued buffering
return WarmupResult(
frames={frame_name: historical_df},
latest_ts=latest_ts,
tunnel_source=tunnel_source, # Keep alive
tunnel_buffer=self._buffer, # Shared reference
)
def merge_warmup_and_buffer(
historical_df: pd.DataFrame,
buffer: list[dict],
) -> pd.DataFrame:
"""Merge historical data with buffer. Deduplicate handles overlap.
Since Tunnel connected BEFORE ClickHouse query, buffer covers gap.
Deduplication handles any overlap between historical and buffer.
"""
if not buffer:
return historical_df
buffer_df = pd.DataFrame(buffer)
merged = pd.concat([historical_df, buffer_df], ignore_index=True)
# Deduplicate - keep first (historical) for any overlap
merged = merged.drop_duplicates(subset=["symbol", "ts"], keep="first")
merged = merged.sort_values(["symbol", "ts"]).reset_index(drop=True)
return merged


orchestrator/services/warmup_providers/clickhouse_provider.py
class ClickHouseWarmupProvider:
"""Warmup data provider reading from ClickHouse warehouse."""
def __init__(
self,
host: str | None = None,
port: int = 8123,
database: str = "trading",
table: str = "market_data",
):
self._host = host or os.environ.get("CLICKHOUSE_HOST", "localhost")
self._port = port
self._database = database
self._table = table
self._client = None
@property
def name(self) -> str:
return "clickhouse"
async def get_latest_ts(
self,
symbols: list[str],
interval: str,
exchange: str = "binance",
) -> pd.Timestamp:
"""Get latest available timestamp from ClickHouse.
Used to determine warmup end_ts (anchor point).
"""
client = self._get_client()
query = f"""
SELECT MAX(ts) as latest_ts
FROM{self._database}.{self._table}
WHERE symbol IN %(symbols)s
AND interval = %(interval)s
AND exchange = %(exchange)s
"""
result = client.query(
query,
parameters={
"symbols": symbols,
"interval": interval,
"exchange": exchange,
},
)
latest_ts = result.first_row[0]
if latest_ts is None:
raise RuntimeError(
f"No data in ClickHouse for{symbols} @{interval}"
)
return pd.Timestamp(latest_ts, tz="UTC")
def fetch(
self,
symbols: list[str],
interval: str,
start_ts: datetime,
end_ts: datetime,
exchange: str = "binance",
) -> pd.DataFrame:
"""Fetch historical OHLCV from ClickHouse."""
client = self._get_client()
query = f"""
SELECT
ts,
symbol,
open,
high,
low,
close,
volume
FROM{self._database}.{self._table}
WHERE symbol IN %(symbols)s
AND interval = %(interval)s
AND exchange = %(exchange)s
AND ts >= %(start_ts)s
AND ts < %(end_ts)s
ORDER BY symbol, ts
"""
result = client.query(
query,
parameters={
"symbols": symbols,
"interval": interval,
"exchange": exchange,
"start_ts": start_ts,
"end_ts": end_ts,
},
)
df = result.to_pandas()
df["ts"] = pd.to_datetime(df["ts"], utc=True)
return df
#NOTE: No fetch_gap() method needed!
# With Approach A (connect Tunnel FIRST), the buffer automatically
# covers the gap between ClickHouse latest_ts and streaming start.
# This avoids the race condition where ClickHouse might not have
# ingested recent data yet.
orchestrator/services/warmup_providers/registry.py
class ProviderRegistry:
"""Registry for warmup data providers."""
_providers: dict[str, WarmupDataProvider] = {}
_default_provider: str | None = None
@classmethod
def register(cls, provider: WarmupDataProvider, default: bool = False) -> None:
"""Register a provider."""
cls._providers[provider.name] = provider
if default:
cls._default_provider = provider.name
@classmethod
def get(cls, name: str | None = None) -> WarmupDataProvider:
"""Get provider by name or default."""
if name is None:
name = cls._default_provider
if name not in cls._providers:
raise ValueError(f"Provider not found:{name}")
return cls._providers[name]
# Registration at startup
ProviderRegistry.register(ClickHouseWarmupProvider(), default=True)
ProviderRegistry.register(BinanceGaplessProvider()) # Keep for backward compat

orchestrator/events/sources/tunnel.py
class TunnelEventSource:
"""Event source consuming from external WebSocket Tunnel."""
def __init__(
self,
symbols: list[str],
interval: str,
exchange: str = "binance",
tunnel_url: str | None = None,
event_def_id: str = "tunnel",
):
self._symbols = symbols
self._interval = interval
self._exchange = exchange
self._tunnel_url = tunnel_url or os.environ.get(
"WS_TUNNEL_URL", "ws://localhost:8080"
)
self._event_def_id = event_def_id
self._ws = None
self._sequence = 0
async def connect(self) -> None:
"""Connect to WebSocket tunnel."""
# Build subscription channels
channels = [
f"market:{self._exchange}:{symbol}:{self._interval}"
for symbol in self._symbols
]
url = f"{self._tunnel_url}?channels={','.join(channels)}"
self._ws = await websockets.connect(url)
logger.info(f"Connected to Tunnel:{url}")
async def stream(self) -> AsyncIterator[MarketEvent]:
"""Stream market events from tunnel."""
if self._ws is None:
raise RuntimeError("Not connected. Call connect() first.")
async for message in self._ws:
data = json.loads(message)
# Skip partial candles
if not data.get("is_closed", False):
continue
self._sequence += 1
yield MarketEvent(
type="market.bar_close",
ts=pd.Timestamp(data["ts"], tz="UTC"),
symbol=data["symbol"],
data_ref="",
metadata={
"event_def_id": self._event_def_id,
"interval": self._interval,
"open": float(data["open"]),
"high": float(data["high"]),
"low": float(data["low"]),
"close": float(data["close"]),
"volume": float(data["volume"]),
"is_closed": True,
},
source="tunnel",
sequence=self._sequence,
)
async def close(self) -> None:
"""Close tunnel connection."""
if self._ws:
await self._ws.close()
self._ws = None

  • concepts.md - Signal Generation flow documentation
  • bug-analysis.md - Identified bugs and fixes
  • ADR-2025-12-06 - Warmup data service design