ClickHouse Signal Integration - Implementation Specification
Source: Notion | Last edited: 2025-12-30 | ID: 2d62d2dc-3ef...
Date: 2025-12-27 Status: Draft Related: concepts.md, bug-analysis.md
1. Overview
Section titled “1. Overview”This specification defines the implementation plan for integrating ClickHouse as a data source for Alpha Forge signal generation. Key clarifications:
- ClickHouse is READ-ONLY - Alpha Forge reads historical data from ClickHouse, does NOT write to it
- WebSocket Tunnel is external - Tunnel service (CCXT-based) is separate infrastructure, Alpha Forge connects as consumer
- Warmup from ClickHouse - Historical data for warmup fetched from ClickHouse warehouse
- Stream from Tunnel - Real-time data consumed from WebSocket tunnel Scope:
- ClickHouse Provider for warmup (historical gap-fill)
- Tunnel Consumer for streaming (real-time data)
- Gapless warmup-to-streaming transition
- Provider abstraction layer
Key Insight: Tunnel buffer covers the gap between ClickHouse
latest_tsand streaming start. No ClickHouse gap query needed - this avoids race condition where ClickHouse might not have ingested recent data yet.
2. Current State Analysis
Section titled “2. Current State Analysis”2.1 Identified Issues
Section titled “2.1 Identified Issues”2.2 Codebase Gaps
Section titled “2.2 Codebase Gaps”Current Architecture:┌──────────────────────────────────────────────────────────────────┐│ Signal Generation ││ ││ Binance WS ──┬── WarmupOrchestrator ──── WarmupDataService ││ │ │ │ ││ │ │ BinanceGaplessProvider ││ │ │ (ONLY Binance Spot) ││ │ ▼ ││ └── RealTimeRunner ──── FlowExecutor ││ ││ Problems: ││ ❌ No ClickHouse provider (for historical warmup) ││ ❌ No Tunnel consumer (for streaming) ││ ❌ Hardcoded to Binance direct connection ││ ❌ Gap between warmup and streaming │└──────────────────────────────────────────────────────────────────┘2.3 Key Design Decisions
Section titled “2.3 Key Design Decisions”Q1: Should we detect first WebSocket record to determine lookback window?
Answer: No. The lookback window is determined by:
- Feature
warmup_formulaanalysis during compilation →max_warmup_bars - Query ClickHouse for latest available timestamp
- Fetch historical from
(latest_ts - lookback_bars)tolatest_ts
Q2: Where does historical data come from?
Answer: ClickHouse warehouse. External infrastructure manages data ingestion. Alpha Forge only reads.
Q3: Where does streaming data come from?
Answer: External WebSocket Tunnel. CCXT-based tunnel service provides fan-out. Alpha Forge connects as consumer.
3. Target Architecture
Section titled “3. Target Architecture”┌──────────────────────────────────────────────────────────────────────────┐│ Target Architecture ││ ││ EXTERNAL INFRASTRUCTURE (Not Alpha Forge) ││ ═════════════════════════════════════════ ││ ││ ┌─────────────────────────────────────────────────────────────────┐ ││ │ Exchanges ──▶ CCXT WS Tunnel ──▶ Redis Pub/Sub │ ││ │ :8080 :6379 │ ││ │ │ │ │ ││ │ └────────┬─────────┘ │ ││ │ │ │ ││ │ ▼ │ ││ │ ┌────────────────┐ │ ││ │ │ ClickHouse │ (market_data table) │ ││ │ │ :8123/:9000 │ │ ││ │ └────────────────┘ │ ││ └─────────────────────────────────────────────────────────────────┘ ││ ││ ALPHA FORGE (This Spec) ││ ═══════════════════════ ││ ││ ┌─────────────────────────────────────────────────────────────────┐ ││ │ │ ││ │ ┌─────────────────────────────────────────────────────────┐ │ ││ │ │ WARMUP LAYER │ │ ││ │ │ │ │ ││ │ │ WarmupDataService ── ClickHouseProvider (Phase 2) │ │ ││ │ │ │ │ │ │ ││ │ │ │ └── Query ClickHouse for │ │ ││ │ │ │ historical OHLCV │ │ ││ │ │ │ │ │ ││ │ │ └── Find latest_ts → Fetch lookback bars │ │ ││ │ └─────────────────────────────────────────────────────────┘ │ ││ │ │ │ ││ │ ▼ │ ││ │ ┌─────────────────────────────────────────────────────────┐ │ ││ │ │ STREAMING LAYER │ │ ││ │ │ │ │ ││ │ │ TunnelEventSource (Phase 3) │ │ ││ │ │ │ │ │ ││ │ │ └── Connect to WS Tunnel :8080 │ │ ││ │ │ Subscribe to market:{exchange}:{symbol}:{tf} │ │ ││ │ │ Receive real-time OHLCV │ │ ││ │ └─────────────────────────────────────────────────────────┘ │ ││ │ │ │ ││ │ ▼ │ ││ │ ┌─────────────────────────────────────────────────────────┐ │ ││ │ │ SIGNAL LAYER │ │ ││ │ │ │ │ ││ │ │ RealTimeRunner ── FlowExecutor ── SignalEmitter │ │ ││ │ │ │ │ │ ││ │ │ CSVAppendSink │ │ ││ │ │ DynamoDBSink │ │ ││ │ └─────────────────────────────────────────────────────────┘ │ ││ └─────────────────────────────────────────────────────────────────┘ │└──────────────────────────────────────────────────────────────────────────┘4.1 New Warmup Flow (ClickHouse-based)
Section titled “4.1 New Warmup Flow (ClickHouse-based)”┌──────────────────────────────────────────────────────────────────────────┐│ NEW WARMUP FLOW ││ ││ Step 1: Connect to Tunnel FIRST (start buffering immediately) ││ ═════════════════════════════════════════════════════════════ ││ ││ Connect to Tunnel, start buffering ALL events ││ Buffer collects ALL bars that close after this moment ││ ││ Step 2: Query ClickHouse for latest available timestamp ││ ═══════════════════════════════════════════════════════ ││ ││ SELECT MAX(ts) as latest_ts ││ FROM market_data ││ WHERE symbol IN ('BTCUSDT', 'ETHUSDT') ││ AND interval = '1m' ││ AND exchange = 'binance' ││ ││ Result: latest_ts = 2025-12-27 10:45:00 UTC ││ ││ Step 3: Calculate warmup range and fetch historical ││ ═══════════════════════════════════════════════════ ││ ││ max_warmup_bars = 720 (from DAG config, based on feature analysis) ││ interval = 1m ││ start_ts = latest_ts - (720 × 1m) = 2025-12-27 22:45:00 UTC (prev day) ││ end_ts = latest_ts = 2025-12-27 10:45:00 UTC ││ ││ SELECT ts, symbol, open, high, low, close, volume ││ FROM market_data ││ WHERE symbol IN ('BTCUSDT', 'ETHUSDT') ││ AND interval = '1m' ││ AND ts BETWEEN start_ts AND end_ts ││ ORDER BY symbol, ts ││ ││ Result: 720 bars × 2 symbols = 1440 rows ││ ││ Step 4: NO GAP QUERY - Buffer covers gap! ││ ═════════════════════════════════════════ ││ ││ Since Tunnel connected BEFORE ClickHouse query: ││ - Buffer has ALL bars from ~10:45:00 onwards ││ - ClickHouse returned data up to 10:45:00 ││ - Buffer overlaps and covers any gap automatically ││ - NO need to query ClickHouse again for gap ││ ││ (This avoids race condition where ClickHouse might not have ││ ingested recent data yet due to ingestion delay) ││ ││ Step 5: Merge and transition to streaming ││ ═════════════════════════════════════════ ││ ││ historical_df + buffer_df → merged_df ││ Deduplicate by (symbol, ts) - handles overlap ││ Seed to FrameLogs ││ Switch tunnel from buffering to streaming mode ││ │└──────────────────────────────────────────────────────────────────────────┘4.2 Problem: Gap Between Warmup and Streaming
Section titled “4.2 Problem: Gap Between Warmup and Streaming”Current flow creates a gap where bars can be lost:
Timeline (Current - BROKEN):┌──────────────────────────────────────────────────────────────────────────┐│ 12:00:00 WS connects, first event buffered ││ 12:00:01 Historical fetch starts (S3 + HTTP) ││ 12:00:05 Historical fetch complete, buffer merged ││ 12:00:05 *** WS CLOSED *** ││ 12:00:06 Transform processing... ││ 12:00:15 Feature processing... ││ 12:00:25 *** 14:00 BAR CLOSES HERE - LOST! *** ││ 12:00:30 FrameLog seeding... ││ 12:00:31 *** WS RECONNECTS *** │└──────────────────────────────────────────────────────────────────────────┘4.3 Solution: Connect Tunnel FIRST, Buffer Covers Gap
Section titled “4.3 Solution: Connect Tunnel FIRST, Buffer Covers Gap”Timeline (Proposed - GAPLESS):┌──────────────────────────────────────────────────────────────────────────┐│ 12:00:00 Connect to Tunnel FIRST, start buffering ││ 12:00:01 Query ClickHouse for latest_ts (returns 11:58:00) ││ 12:00:02 Fetch historical from ClickHouse (up to 11:58:00) ││ 12:00:05 Historical fetch complete ││ 12:00:05 *** TUNNEL CONTINUES BUFFERING *** (no disconnect) ││ 12:00:06 Transform processing (Tunnel still buffering) ││ 12:00:15 Feature processing (Tunnel still buffering) ││ 12:00:25 14:00 bar closes → buffered! ││ 12:00:30 FrameLog seeding ││ 12:00:31 Merge: historical + buffer (NO gap query needed!) ││ Buffer contains: 11:59, 12:00, ..., 12:30 (all bars since ││ Tunnel connected at 12:00:00, which is BEFORE latest_ts) ││ 12:00:32 Deduplicate by (symbol, ts) - handles any overlap ││ 12:00:33 Switch Tunnel from buffering to streaming mode ││ 12:00:34 Streaming begins (no bars lost) │└──────────────────────────────────────────────────────────────────────────┘
Key Insight: Tunnel connected at 12:00:00, ClickHouse latest_ts = 11:58:00 Buffer automatically covers gap from 11:58 → now No race condition with ClickHouse ingestion delay!4.4 Implementation: WarmupOrchestrator Changes
Section titled “4.4 Implementation: WarmupOrchestrator Changes”@dataclassclass WarmupResult: frames: dict[str, pd.DataFrame] latest_ts: pd.Timestamp | None symbols_per_frame: dict[str, list[str]] # NEW: Active tunnel connection for continued buffering tunnel_source: "TunnelEventSource" | None = None tunnel_buffer: list[Any] = field(default_factory=list)
async def fetch_warmup_data(self, ...) -> WarmupResult: """Gapless warmup with ClickHouse historical + Tunnel buffering.
KEY INSIGHT: Connect Tunnel FIRST, then query ClickHouse. Buffer automatically covers any gap - no need for gap query. This avoids race condition where ClickHouse might not have ingested recent data yet. """
# Step 1: Connect to Tunnel FIRST (start buffering BEFORE ClickHouse query) tunnel_source = TunnelEventSource(...) await tunnel_source.connect() buffer_task = asyncio.create_task(self._buffer_events(tunnel_source)) tunnel_connect_ts = pd.Timestamp.now(tz="UTC")
# Step 2: Query ClickHouse for latest available timestamp # This will be BEFORE tunnel_connect_ts (ClickHouse ingestion has delay) latest_ts = await self._clickhouse_provider.get_latest_ts( symbols=symbols, interval=interval, )
# Step 3: Fetch historical from ClickHouse (up to latest_ts) historical_df = await self._clickhouse_provider.fetch( symbols=symbols, interval=interval, start_ts=latest_ts - (max_warmup_bars * bar_duration), end_ts=latest_ts, )
# Step 4: NO GAP QUERY NEEDED! # Buffer started at tunnel_connect_ts (Step 1) # ClickHouse latest_ts < tunnel_connect_ts (due to ingestion delay) # So buffer ALREADY covers the gap: latest_ts → tunnel_connect_ts → now
# DON'T close tunnel - return for continued buffering return WarmupResult( frames={frame_name: historical_df}, latest_ts=latest_ts, tunnel_source=tunnel_source, # Keep alive tunnel_buffer=self._buffer, # Shared reference )
def merge_warmup_and_buffer( historical_df: pd.DataFrame, buffer: list[dict],) -> pd.DataFrame: """Merge historical data with buffer. Deduplicate handles overlap.
Since Tunnel connected BEFORE ClickHouse query, buffer covers gap. Deduplication handles any overlap between historical and buffer. """ if not buffer: return historical_df
buffer_df = pd.DataFrame(buffer) merged = pd.concat([historical_df, buffer_df], ignore_index=True)
# Deduplicate - keep first (historical) for any overlap merged = merged.drop_duplicates(subset=["symbol", "ts"], keep="first") merged = merged.sort_values(["symbol", "ts"]).reset_index(drop=True)
return merged5. ClickHouse Provider (Phase 2)
Section titled “5. ClickHouse Provider (Phase 2)”5.1 Provider Interface
Section titled “5.1 Provider Interface”class ClickHouseWarmupProvider: """Warmup data provider reading from ClickHouse warehouse."""
def __init__( self, host: str | None = None, port: int = 8123, database: str = "trading", table: str = "market_data", ): self._host = host or os.environ.get("CLICKHOUSE_HOST", "localhost") self._port = port self._database = database self._table = table self._client = None
@property def name(self) -> str: return "clickhouse"
async def get_latest_ts( self, symbols: list[str], interval: str, exchange: str = "binance", ) -> pd.Timestamp: """Get latest available timestamp from ClickHouse.
Used to determine warmup end_ts (anchor point). """ client = self._get_client()
query = f""" SELECT MAX(ts) as latest_ts FROM{self._database}.{self._table} WHERE symbol IN %(symbols)s AND interval = %(interval)s AND exchange = %(exchange)s """
result = client.query( query, parameters={ "symbols": symbols, "interval": interval, "exchange": exchange, }, )
latest_ts = result.first_row[0] if latest_ts is None: raise RuntimeError( f"No data in ClickHouse for{symbols} @{interval}" )
return pd.Timestamp(latest_ts, tz="UTC")
def fetch( self, symbols: list[str], interval: str, start_ts: datetime, end_ts: datetime, exchange: str = "binance", ) -> pd.DataFrame: """Fetch historical OHLCV from ClickHouse.""" client = self._get_client()
query = f""" SELECT ts, symbol, open, high, low, close, volume FROM{self._database}.{self._table} WHERE symbol IN %(symbols)s AND interval = %(interval)s AND exchange = %(exchange)s AND ts >= %(start_ts)s AND ts < %(end_ts)s ORDER BY symbol, ts """
result = client.query( query, parameters={ "symbols": symbols, "interval": interval, "exchange": exchange, "start_ts": start_ts, "end_ts": end_ts, }, )
df = result.to_pandas() df["ts"] = pd.to_datetime(df["ts"], utc=True) return df
#NOTE: No fetch_gap() method needed! # With Approach A (connect Tunnel FIRST), the buffer automatically # covers the gap between ClickHouse latest_ts and streaming start. # This avoids the race condition where ClickHouse might not have # ingested recent data yet.5.2 Provider Registry
Section titled “5.2 Provider Registry”class ProviderRegistry: """Registry for warmup data providers."""
_providers: dict[str, WarmupDataProvider] = {} _default_provider: str | None = None
@classmethod def register(cls, provider: WarmupDataProvider, default: bool = False) -> None: """Register a provider.""" cls._providers[provider.name] = provider if default: cls._default_provider = provider.name
@classmethod def get(cls, name: str | None = None) -> WarmupDataProvider: """Get provider by name or default.""" if name is None: name = cls._default_provider if name not in cls._providers: raise ValueError(f"Provider not found:{name}") return cls._providers[name]
# Registration at startupProviderRegistry.register(ClickHouseWarmupProvider(), default=True)ProviderRegistry.register(BinanceGaplessProvider()) # Keep for backward compat6. Tunnel Event Source (Phase 3)
Section titled “6. Tunnel Event Source (Phase 3)”6.1 Tunnel Connection
Section titled “6.1 Tunnel Connection”class TunnelEventSource: """Event source consuming from external WebSocket Tunnel."""
def __init__( self, symbols: list[str], interval: str, exchange: str = "binance", tunnel_url: str | None = None, event_def_id: str = "tunnel", ): self._symbols = symbols self._interval = interval self._exchange = exchange self._tunnel_url = tunnel_url or os.environ.get( "WS_TUNNEL_URL", "ws://localhost:8080" ) self._event_def_id = event_def_id self._ws = None self._sequence = 0
async def connect(self) -> None: """Connect to WebSocket tunnel.""" # Build subscription channels channels = [ f"market:{self._exchange}:{symbol}:{self._interval}" for symbol in self._symbols ]
url = f"{self._tunnel_url}?channels={','.join(channels)}" self._ws = await websockets.connect(url) logger.info(f"Connected to Tunnel:{url}")
async def stream(self) -> AsyncIterator[MarketEvent]: """Stream market events from tunnel.""" if self._ws is None: raise RuntimeError("Not connected. Call connect() first.")
async for message in self._ws: data = json.loads(message)
# Skip partial candles if not data.get("is_closed", False): continue
self._sequence += 1 yield MarketEvent( type="market.bar_close", ts=pd.Timestamp(data["ts"], tz="UTC"), symbol=data["symbol"], data_ref="", metadata={ "event_def_id": self._event_def_id, "interval": self._interval, "open": float(data["open"]), "high": float(data["high"]), "low": float(data["low"]), "close": float(data["close"]), "volume": float(data["volume"]), "is_closed": True, }, source="tunnel", sequence=self._sequence, )
async def close(self) -> None: """Close tunnel connection.""" if self._ws: await self._ws.close() self._ws = None7. References
Section titled “7. References”- concepts.md - Signal Generation flow documentation
- bug-analysis.md - Identified bugs and fixes
- ADR-2025-12-06 - Warmup data service design