Skip to content

PoC: Real time proposal

Source: Notion | Last edited: 2025-12-29 | ID: 2d52d2dc-3ef...


Purpose: Comprehensive guide for understanding Alpha Forge signal generation flow and creating custom sink plugins.

Reference Examples:

  • examples/08_signal_generation/minimal_signal_generation.yaml
  • examples/08_signal_generation/bilstm_realtime_signal.yaml

Signal generation is a long-running streaming service that:

  • Receives real-time market data from Binance WebSocket (or ClickHouse polling)
  • Computes features on each new bar (candle close)
  • Runs ML model predictions
  • Emits predictions to configurable sinks (CSV, DynamoDB, ClickHouse)

CLI Command:

Terminal window
uv run alpha_forge signal examples/08_signal_generation/minimal_signal_generation.yaml

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ SIGNAL GENERATION FLOW โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
DSL YAML
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ PHASE 1: STARTUP (CLI signal.py) โ”‚
โ”‚ โ”‚
โ”‚ 1. Parse DSL โ”‚
โ”‚ 2. Extract signal_generation config โ”‚
โ”‚ 3. Create SignalEmitter from on_prediction sinks โ”‚
โ”‚ 4. Extract prediction column from model step โ”‚
โ”‚ 5. Compile DSL โ†’ EventFlowDAG โ”‚
โ”‚ 6. Initialize RealTimeRunner โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ PHASE 2: WARMUP โ”‚
โ”‚ โ”‚
โ”‚ 1. Start Binance WebSocket FIRST (buffer incoming events) โ”‚
โ”‚ 2. Fetch historical data ending at first WebSocket timestamp โ”‚
โ”‚ โ””โ”€โ”€ BinanceGaplessProvider: S3 bulk + HTTP gap-fill โ”‚
โ”‚ 3. Process warmup through transform steps (resample, align) โ”‚
โ”‚ 4. Process warmup through feature steps (momentum, RSI, etc.) โ”‚
โ”‚ 5. Seed frames into context (pre-warmed state) โ”‚
โ”‚ 6. Seed event tracker (prevent duplicate processing) โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ PHASE 3: STREAMING (Infinite Loop) โ”‚
โ”‚ โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ FOR EACH WebSocket Event (bar close): โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ 1. Receive MarketEvent from Binance WebSocket โ”‚ โ”‚
โ”‚ โ”‚ โ””โ”€โ”€ type: "market.bar_close", symbol: "BTCUSDT", ts: ... โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ 2. Populate context frame (thread-safe with lock) โ”‚ โ”‚
โ”‚ โ”‚ โ””โ”€โ”€ Append OHLCV row to context["frames"]["ohlcv"] โ”‚ โ”‚
โ”‚ โ”‚ โ””โ”€โ”€ Deduplicate by (symbol, ts) โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ 3. Route event to subscribed flows โ”‚ โ”‚
โ”‚ โ”‚ โ””โ”€โ”€ EventRouter finds flow_bar_1h_ohlcv โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ 4. Execute flow (inside lock) โ”‚ โ”‚
โ”‚ โ”‚ โ””โ”€โ”€ FlowExecutor runs: features โ†’ model โ”‚ โ”‚
โ”‚ โ”‚ โ””โ”€โ”€ Each step sees FULL frame (warmup + new rows) โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ 5. Emit predictions (OUTSIDE lock - non-blocking I/O) โ”‚ โ”‚
โ”‚ โ”‚ โ””โ”€โ”€ PredictionEmitter extracts per-symbol prediction โ”‚ โ”‚
โ”‚ โ”‚ โ””โ”€โ”€ SignalEmitter sends to ALL configured sinks โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚ โ”‚
โ”‚ Runs until Ctrl+C or error โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

The signal_generation block configures the streaming behavior:

signal_generation:
mode: stream # Required: Execution mode
warmup: # Required: Historical data config
lookback_days:30 # Fetch N days for feature warmup
scheduling: # Optional: Timing config
interval: 1h # Prediction frequency
buffer_minutes:2 # Wait after bar close
on_prediction: # Required: Sink configuration
-using: sinks.csv_append
params:
path: outputs/predictions.csv
dedup_key:[ts, symbol]

name: minimal_signal_generation
# ============ Capability Dependencies ============
using:
capabilities:
- shared # Data loaders, transforms, sinks
- middlefreq # Features, models, strategies
# ============ Signal Generation Configuration ============
signal_generation:
mode: stream # Real-time streaming mode
warmup:
lookback_days:30 # 30 days history for feature warmup
# Calculation: 30 days ร— 24 hours = 720 bars (for 1h)
scheduling:
interval: 1h # Predict every hour (on bar close)
buffer_minutes:2 # Wait 2 min for data to arrive
on_prediction:
-using: sinks.csv_append # Output to CSV file
params:
path: outputs/predictions/minimal_predictions.csv
dedup_key:[ts, symbol] # Prevent duplicates on reconnect
# ============ Execution Pipeline ============
pipeline:
# Step 1: Data source (Binance WebSocket in streaming mode)
-data:
using: data.binance_futures
outputs:
frame: ohlcv # Frame name for downstream steps
params:
universe: # Symbols to track
- BTCUSDT
- ETHUSDT
interval: 1h # Bar interval
# Step 2: Feature calculation
-features:
using: features.momentum
inputs:
column: ohlcv.close # Input: close price from ohlcv frame
outputs:
columns:
- mom_20 # Output column name
params:
window:20 # 20-bar lookback
-features:
using: features.rsi
inputs:
column: ohlcv.close
outputs:
columns:
- rsi_14
params:
window:14
# Step 3: Model prediction
-model:
using: models.pytorch_bilstm_predictor
inputs:
frame: ohlcv # Input frame with features
model_ref: my_model.unified
outputs:
columns:
- model.prediction # โ† This is the prediction column
params:
strategy: unified
window_size:30 # Model looks at 30 bars
output_col: model.prediction # โ† Extracted by CLI for emission
checkpoint_dir: tmp/models/my_checkpoint

This is a production-grade example with complex feature engineering:

name: bilstm_realtime_signal_generation
# ============ Parameter Expansion ============
expand:
rsi_windows:[6,12,24] # Creates 3 RSI features
ma_windows:[5,10,20,60] # Creates 4 MA features
level_windows:[10,20,60] # Creates 3 support/resistance features
# ============ Signal Generation Configuration ============
signal_generation:
mode: stream
warmup:
lookback_days:90 # 90 days for MA(60) + window_size(30) stabilization
# Calculation: Need 60 bars for MA(60), plus 30 for model
# At 2h bars: 90 days ร— 12 bars/day = 1080 bars
scheduling:
interval: 2h # Predict on 2h bars
buffer_minutes:5 # Wait 5 min for data
on_prediction:
-using: sinks.csv_append
params:
path: outputs/predictions/bilstm_improved_realtime.csv
dedup_key:[ts, symbol]
# ============ Execution Pipeline ============
pipeline:
# Step 1: Stream 15m data (higher resolution)
-data:
using: data.binance_futures
outputs:
frame: ohlcv_15m # 15-minute bars
params:
universe:
- BTCUSDT
- ETHUSDT
- SOLUSDT
- XRPUSDT
interval: 15m
# Step 2: Resample 15m โ†’ 2h bars
-transform:
using: transforms.resample
inputs:
frame: ohlcv_15m
outputs:
frame: ohlcv_2h # 2-hour bars for features
params:
target_freq: 2h
# Step 3: Features with parameter expansion
-features:
using: features.rsi
inputs:
column: ohlcv_2h.close
outputs:
columns:
- rsi{window} # Expands to: rsi6, rsi12, rsi24
params:
window: ${expand.rsi_windows} # [6, 12, 24]
-features:
using: features.ma_rocp
inputs:
column: ohlcv_2h.close
outputs:
columns:
- ma{window}rocp # Expands to: ma5rocp, ma10rocp, ma20rocp, ma60rocp
params:
window: ${expand.ma_windows} # [5, 10, 20, 60]
rocp_period:1
rocp_factor:10
# ... many more features ...
# Final: Model prediction
-model:
using: models.pytorch_bilstm_predictor
inputs:
frame: ohlcv_2h
model_ref: bilstm_elnigma_unified_improved.unified
outputs:
columns:
- model.prediction
params:
strategy: unified
window_size:30
output_col: model.prediction
checkpoint_dir: tmp/models/el_nigma_equiv_unified_improved
# DSL Definition
expand:
rsi_windows:[6,12,24]
# Template
-features:
using: features.rsi
outputs:
columns:
- rsi{window}
params:
window: ${expand.rsi_windows}
# Expands to 3 separate feature steps:
-features:{using: features.rsi,params:{window:6},outputs:{columns:[rsi6]}}
-features:{using: features.rsi,params:{window:12},outputs:{columns:[rsi12]}}
-features:{using: features.rsi,params:{window:24},outputs:{columns:[rsi24]}}

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ DATA FLOW โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
Binance WebSocket Context Frames
โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ• โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
wss://stream.binance.com context["frames"]["ohlcv"]
โ”‚ โ”‚
โ”‚ 1h bar close event โ”‚
โ–ผ โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ MarketEvent โ”‚ โ”‚ DataFrame (Panel format) โ”‚
โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ type: bar_close โ”‚ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถ โ”‚ ts โ”‚ symbol โ”‚ close โ”‚
โ”‚ ts: 2025-12-26 โ”‚ Append row โ”‚ 12:00:00 โ”‚ BTCUSDTโ”‚ 41000 โ”‚
โ”‚ symbol: BTCUSDT โ”‚ โ”‚ 12:00:00 โ”‚ ETHUSDTโ”‚ 2200 โ”‚
โ”‚ metadata: โ”‚ โ”‚ 13:00:00 โ”‚ BTCUSDTโ”‚ 41100 โ”‚ โ† NEW
โ”‚ open: 41000 โ”‚ โ”‚ 13:00:00 โ”‚ ETHUSDTโ”‚ 2210 โ”‚ โ† NEW
โ”‚ high: 41200 โ”‚ โ”‚ ... โ”‚ ... โ”‚ ... โ”‚
โ”‚ low: 40900 โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚ close: 41100 โ”‚ โ”‚
โ”‚ volume: 1000 โ”‚ โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Feature Execution โ”‚
โ”‚ โ”‚
โ”‚ momentum(window=20) โ”‚
โ”‚ rsi(window=14) โ”‚
โ”‚ โ”‚
โ”‚ Input: ALL rows (720+) โ”‚
โ”‚ Output: Feature columns โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Model Execution โ”‚
โ”‚ โ”‚
โ”‚ bilstm_predictor โ”‚
โ”‚ window_size: 30 โ”‚
โ”‚ โ”‚
โ”‚ Input: Frame with features โ”‚
โ”‚ Output: model.prediction โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ PredictionEmitter โ”‚
โ”‚ โ”‚
โ”‚ Extract per symbol: โ”‚
โ”‚ BTCUSDT: latest row โ†’ 0.45 โ”‚
โ”‚ ETHUSDT: latest row โ†’ -0.23 โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ SignalEmitter โ”‚
โ”‚ โ”‚
โ”‚ emit_prediction( โ”‚
โ”‚ symbol="BTCUSDT", โ”‚
โ”‚ ts=2025-12-26T13:00:00, โ”‚
โ”‚ prediction=0.45, โ”‚
โ”‚ metadata={...} โ”‚
โ”‚ ) โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ–ผ โ–ผ โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ CSVSink โ”‚ โ”‚ DynamoDB โ”‚ โ”‚ ClickHouse โ”‚
โ”‚ โ”‚ โ”‚ Sink โ”‚ โ”‚ Sink โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Features like moving averages need historical data:

  • momentum(window=20) needs 20 previous bars
  • rsi(window=14) needs 14 previous bars
  • ma(window=60) needs 60 previous bars
  • BiLSTM window_size=30 needs 30 previous rows

Without warmup: First predictions would be NaN or incorrect.

Required warmup bars = max(feature_windows) ร— buffer_multiplier
Example (bilstm_realtime_signal.yaml):
- MA(60) = 60 bars
- window_size = 30 bars
- At 2h interval: 90 days ร— 12 bars/day = 1080 bars
- Buffer: 1080 ร— 1.1 = 1188 bars fetched

The warmup phase uses a WebSocket-first gapless approach to guarantee zero gaps between historical data and live stream:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ PHASE 2: WARMUP ORCHESTRATION โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
RealTimeRunner.run()
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Step 1: _fetch_warmup_data_gapless() โ”‚
โ”‚ โ””โ”€โ”€ Delegates to WarmupOrchestrator.fetch_warmup_data() โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ WarmupOrchestrator โ”‚
โ”‚ โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ• โ”‚
โ”‚ โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ Step 1.1: Start WebSocket and buffer events โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ โ€ข Connect to Binance WebSocket โ”‚ โ”‚
โ”‚ โ”‚ โ€ข Start async task: buffer_events(source) โ”‚ โ”‚
โ”‚ โ”‚ โ€ข Buffer all incoming MarketEvents to ws_buffer[] โ”‚ โ”‚
โ”‚ โ”‚ โ€ข Set ws_started flag when first event arrives โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ–ผ โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ Step 1.2: Wait for first closed bar event โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ โ€ข await ws_started.wait() with 120s timeout โ”‚ โ”‚
โ”‚ โ”‚ โ€ข If timeout: raise RuntimeError โ”‚ โ”‚
โ”‚ โ”‚ โ€ข On success: ws_first_event_ts = ws_buffer[0].ts โ”‚ โ”‚
โ”‚ โ”‚ โ€ข This timestamp becomes the "anchor" for historical fetch โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ–ผ โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ Step 1.3: Fetch historical data via WarmupDataService โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ warmup_service.fetch_warmup_data( โ”‚ โ”‚
โ”‚ โ”‚ dag=dag, โ”‚ โ”‚
โ”‚ โ”‚ ws_first_event_ts=ws_first_event_ts โ† end_ts anchor โ”‚ โ”‚
โ”‚ โ”‚ ) โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ Data flow: โ”‚ โ”‚
โ”‚ โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ S3 Bulk โ”‚ โ”€โ–ถ โ”‚ HTTP Gap โ”‚ โ”€โ–ถ โ”‚ Merged โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ (older data)โ”‚ โ”‚ Fill โ”‚ โ”‚ DataFrame โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ–ผ โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ Step 1.4: Merge historical + WebSocket buffer โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ โ€ข Convert ws_buffer events to DataFrame rows โ”‚ โ”‚
โ”‚ โ”‚ โ€ข pd.concat([historical_df, ws_df]) โ”‚ โ”‚
โ”‚ โ”‚ โ€ข Deduplicate: drop_duplicates(["symbol", "ts"], keep="last")โ”‚ โ”‚
โ”‚ โ”‚ โ€ข WebSocket data wins on conflict (fresher) โ”‚ โ”‚
โ”‚ โ”‚ โ€ข Sort by (symbol, ts) for proper ordering โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ–ผ โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ Step 1.5: Trim to effective_bars per symbol โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ effective_bars = max_warmup_bars ร— resample_ratio โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ For each symbol: โ”‚ โ”‚
โ”‚ โ”‚ symbol_df = warmup_df[warmup_df["symbol"] == symbol] โ”‚ โ”‚
โ”‚ โ”‚ symbol_df = symbol_df.tail(effective_bars) โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ โ”‚ Return: WarmupResult(frames={frame_name: trimmed_df}) โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Step 2: _process_warmup_through_transforms() โ”‚
โ”‚ โ””โ”€โ”€ Delegates to WarmupProcessor.process_transforms() โ”‚
โ”‚ โ”‚
โ”‚ Purpose: Run transform steps (e.g., resample 15m โ†’ 2h) โ”‚
โ”‚ on warmup data to populate target frames โ”‚
โ”‚ โ”‚
โ”‚ Example: โ”‚
โ”‚ โ€ข Source frame: ohlcv_15m (720 bars ร— 4 symbols = 2880 rows) โ”‚
โ”‚ โ€ข Transform: resample(target_freq="2h") โ”‚
โ”‚ โ€ข Target frame: ohlcv_2h (90 bars ร— 4 symbols = 360 rows) โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Step 3: _process_warmup_through_features() โ”‚
โ”‚ โ””โ”€โ”€ Delegates to WarmupProcessor.process_features() โ”‚
โ”‚ โ”‚
โ”‚ Purpose: Compute feature columns on all warmup rows โ”‚
โ”‚ so ML models receive complete feature vectors โ”‚
โ”‚ โ”‚
โ”‚ Example: โ”‚
โ”‚ โ€ข Input: ohlcv_2h with [ts, symbol, open, high, low, close, vol] โ”‚
โ”‚ โ€ข Features: momentum(20), rsi(14), ma_rocp(60) โ”‚
โ”‚ โ€ข Output: ohlcv_2h with [..., mom_20, rsi_14, ma60rocp] โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Step 4: _seed_frames_to_frame_logs() โ”‚
โ”‚ โ””โ”€โ”€ Delegates to FramePopulator.seed_for_realtime() โ”‚
โ”‚ โ”‚
โ”‚ Purpose: Convert DataFrames to FrameLogs (ring buffers) โ”‚
โ”‚ for bounded memory in real-time mode โ”‚
โ”‚ โ”‚
โ”‚ FrameLog: โ”‚
โ”‚ โ€ข max_size = max_warmup_bars ร— FRAME_LOG_SIZE_MULTIPLIER โ”‚
โ”‚ โ€ข Automatically trims oldest rows when limit exceeded โ”‚
โ”‚ โ€ข Single source of truth during streaming โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Step 5: _seed_event_tracker_from_warmup() โ”‚
โ”‚ โ”‚
โ”‚ Purpose: Mark all warmup (symbol, ts) pairs as "already processed" โ”‚
โ”‚ to prevent duplicate predictions on WebSocket reconnect โ”‚
โ”‚ โ”‚
โ”‚ After warmup, WebSocket reconnects and may re-deliver recent bars. โ”‚
โ”‚ ProcessedEventTracker filters these as duplicates. โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Step 6: Connect event sources and create streaming tasks โ”‚
โ”‚ โ””โ”€โ”€ EventLoopManager.connect_sources() + create_tasks() โ”‚
โ”‚ โ”‚
โ”‚ โ€ข WebSocket sources: Reconnect (were closed after warmup buffer) โ”‚
โ”‚ โ€ข Timer sources: Connect โ”‚
โ”‚ โ€ข Create asyncio tasks for _process_event_source() โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ–ผ
PHASE 3: STREAMING

After warmup completes, the streaming phase begins. RealTimeRunner orchestrates an infinite event loop:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ PHASE 3: STREAMING EVENT LOOP โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
asyncio.gather(*tasks)
โ”‚
โ”‚ Each task runs _process_event_source(event_id, source)
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ _process_event_source(event_id, source) โ”‚
โ”‚ โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ• โ”‚
โ”‚ โ”‚
โ”‚ async for event in source.stream(): โ—€โ”€โ”€โ”€ Infinite async iterator โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ”œโ”€โ”€ 1. Log event receipt โ”‚
โ”‚ โ”‚ "Received event: market.bar_close @ 13:00 (BTCUSDT)" โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ”œโ”€โ”€ 2. Memory observability โ”‚
โ”‚ โ”‚ _memory_observer.observe(event_count) โ”‚
โ”‚ โ”‚ Logs RSS every 100 events โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ”œโ”€โ”€ 3. Populate context frame (if closed candle) โ”‚
โ”‚ โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ โ”‚ async with _context_lock: โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ€ข Extract OHLCV from event.metadata โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ€ข Append row to context["frames"] โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ€ข Deduplicate by (symbol, ts) โ”‚ โ”‚
โ”‚ โ”‚ โ”‚ โ€ข Trim to max_frame_rows โ”‚ โ”‚
โ”‚ โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ”œโ”€โ”€ 4. Route event to subscribed flows โ”‚
โ”‚ โ”‚ flows = _router.route(event) โ”‚
โ”‚ โ”‚ Returns: [FlowDefinition, ...] โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ”œโ”€โ”€ 5. Check if closed candle (skip partial updates) โ”‚
โ”‚ โ”‚ if not is_closed: skip flow execution โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ””โ”€โ”€ 6. Execute flows (inside lock) โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ async with _context_lock: โ”‚ โ”‚
โ”‚ โ”‚ โ€ข Deduplication check (skip if dup) โ”‚ โ”‚
โ”‚ โ”‚ โ€ข Mark event as processed โ”‚ โ”‚
โ”‚ โ”‚ โ€ข For each flow: โ”‚ โ”‚
โ”‚ โ”‚ โ””โ”€โ”€ _executor.execute(flow, ...) โ”‚ โ”‚
โ”‚ โ”‚ โ€ข Handle emitted events (chain) โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ–ผ โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ 7. Emit predictions (outside lock) โ”‚ โ”‚
โ”‚ โ”‚ _emit_predictions(event) โ”‚ โ”‚
โ”‚ โ”‚ โ””โ”€โ”€ PredictionEmitter.emit_if_readyโ”‚ โ”‚
โ”‚ โ”‚ โ””โ”€โ”€ SignalEmitter.emit_prediction โ”‚
โ”‚ โ”‚ โ””โ”€โ”€ Sink.emit() ร— N sinks โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚ โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

When a closed candle event arrives, the OHLCV data is appended to the context frame:

# Event metadata โ†’ DataFrame row
event_row = {
"ts": event.ts, # e.g., 2025-12-26 13:00:00 UTC
"symbol": event.symbol, # e.g., "BTCUSDT"
"open": event.metadata["open"], # e.g., 41000.0
"high": event.metadata["high"], # e.g., 41200.0
"low": event.metadata["low"], # e.g., 40900.0
"close": event.metadata["close"], # e.g., 41100.0
"volume": event.metadata["volume"] # e.g., 1000.5
}
# Thread-safe append with lock
async with _context_lock:
context["frames"][frame_name] = pd.concat([
context["frames"][frame_name],
pd.DataFrame([event_row])
])
# Deduplicate to prevent duplicate predictions
context["frames"][frame_name] = (
context["frames"][frame_name]
.drop_duplicates(subset=["symbol", "ts"], keep="last")
)

The EventRouter matches events to subscribed flows based on the on field:

Event: market.bar_close @ 13:00 (BTCUSDT)
event_def_id = "data.binance_1h"
DAG Flows:
flow_bar_1h_ohlcv:
on: ["data.binance_1h"] โ† MATCHES
steps: [features.momentum, features.rsi, model.bilstm]
flow_bar_15m_ohlcv:
on: ["data.binance_15m"] โ† Does not match
Result: route() returns [flow_bar_1h_ohlcv]

The FlowExecutor runs each step in dependency order:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ FLOW EXECUTION โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
FlowExecutor.execute(flow, event, context)
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ For each step in flow.steps: โ”‚
โ”‚ โ”‚
โ”‚ Step 1: features.momentum โ”‚
โ”‚ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ โ”‚
โ”‚ โ€ข plugin_fn = plugin_loader.load_plugin("features.momentum") โ”‚
โ”‚ โ€ข inputs = InputResolver.resolve(step.inputs, ...) โ”‚
โ”‚ โ””โ”€โ”€ Reads: context["frames"]["ohlcv"]["close"] โ”‚
โ”‚ โ””โ”€โ”€ Returns: pd.Series with 721 close prices โ”‚
โ”‚ โ€ข result = plugin_fn(series=inputs["column"], window=20, ...) โ”‚
โ”‚ โ””โ”€โ”€ Computes momentum on ALL 721 rows โ”‚
โ”‚ โ€ข ResultHandler.handle_result(result, ...) โ”‚
โ”‚ โ””โ”€โ”€ Adds "mom_20" column to ohlcv frame โ”‚
โ”‚ โ”‚
โ”‚ Step 2: features.rsi โ”‚
โ”‚ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ โ”‚
โ”‚ โ€ข Same pattern, adds "rsi_14" column โ”‚
โ”‚ โ”‚
โ”‚ Step 3: models.pytorch_bilstm_predictor โ”‚
โ”‚ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ โ”‚
โ”‚ โ€ข inputs = InputResolver.resolve(...) โ”‚
โ”‚ โ””โ”€โ”€ Reads: context["frames"]["ohlcv"] (with mom_20, rsi_14) โ”‚
โ”‚ โ€ข result = plugin_fn(panel_df=inputs["frame"], window_size=30, ...) โ”‚
โ”‚ โ””โ”€โ”€ Model sees 30-bar window, outputs prediction โ”‚
โ”‚ โ€ข ResultHandler.handle_result(result, ...) โ”‚
โ”‚ โ””โ”€โ”€ Adds "model.prediction" column to ohlcv frame โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ–ผ
ExecutionResult(emitted_events=[...], context=updated_context)

The ProcessedEventTracker prevents duplicate predictions:

ProcessedEventTracker
โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
Internal state: Set of (event_def_id, symbol, ts) tuples
is_duplicate(event_def_id, symbol, ts):
key = (event_def_id, symbol, str(ts))
return key in self._processed_events
mark_processed(event_def_id, symbol, ts):
key = (event_def_id, symbol, str(ts))
self._processed_events.add(key)
Why needed:
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ 1. WebSocket reconnection may re-deliver recent bars โ”‚
โ”‚ 2. Multiple symbols can trigger same flow โ”‚
โ”‚ 3. Event chaining could cause loops โ”‚
โ”‚ โ”‚
โ”‚ Without deduplication: โ”‚
โ”‚ 13:00 BTCUSDT bar โ†’ prediction = 0.45 โ†’ emit to CSV โ”‚
โ”‚ 13:00 BTCUSDT bar (re-delivered) โ†’ prediction = 0.45 โ†’ DUPLICATE! โ”‚
โ”‚ โ”‚
โ”‚ With deduplication: โ”‚
โ”‚ 13:00 BTCUSDT bar โ†’ prediction = 0.45 โ†’ emit to CSV โ”‚
โ”‚ 13:00 BTCUSDT bar (re-delivered) โ†’ is_duplicate=True โ†’ SKIP โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

After flow execution, predictions are emitted via PredictionEmitter:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ PREDICTION EMISSION FLOW โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
_emit_predictions(event)
โ”‚
โ–ผ
PredictionEmitter.emit_if_ready(event, context, dag_name)
โ”‚
โ”‚ 1. Extract prediction column from context
โ”‚ prediction_column = "model.prediction"
โ”‚ df = context["frames"]["ohlcv"]
โ”‚
โ”‚ 2. Get latest row for triggering symbol
โ”‚ symbol = event.symbol # e.g., "BTCUSDT"
โ”‚ latest = df[df["symbol"] == symbol].iloc[-1]
โ”‚ prediction = latest["model.prediction"] # e.g., 0.45
โ”‚
โ”‚ 3. Build metadata
โ”‚ metadata = {
โ”‚ "frame": "ohlcv",
โ”‚ "column": "model.prediction",
โ”‚ "dag_name": "minimal_signal_generation"
โ”‚ }
โ”‚
โ–ผ
SignalEmitter.emit_prediction(symbol, ts, prediction, metadata)
โ”‚
โ”‚ For each sink in self._sinks:
โ”‚ sink.emit(symbol, ts, prediction, metadata)
โ”‚
โ”œโ”€โ”€ CSVAppendSink.emit() โ†’ Append to outputs/predictions.csv
โ”œโ”€โ”€ DynamoDBSink.emit() โ†’ Put item to DynamoDB
โ””โ”€โ”€ ClickHouseSink.emit() โ†’ Insert to ClickHouse table

Plugins can emit events that trigger downstream flows:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ EVENT CHAINING โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
Flow execution may emit events (e.g., "signal.generated")
โ”‚
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Event chaining loop (max depth = 100): โ”‚
โ”‚ โ”‚
โ”‚ while chain_depth < MAX_EVENT_CHAIN_DEPTH: โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ”œโ”€โ”€ Drain EventBus for emitted events โ”‚
โ”‚ โ”‚ emitted_events = _event_bus.drain() โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ”œโ”€โ”€ If no events, break โ”‚
โ”‚ โ”‚ โ”‚
โ”‚ โ””โ”€โ”€ For each emitted event: โ”‚
โ”‚ โ”œโ”€โ”€ Check deduplication โ”‚
โ”‚ โ”œโ”€โ”€ Route to subscribed flows โ”‚
โ”‚ โ”œโ”€โ”€ Execute flows โ”‚
โ”‚ โ””โ”€โ”€ Publish newly emitted events โ”‚
โ”‚ โ”‚
โ”‚ Depth guard: Raises FlowExecutionError if depth >= 100 โ”‚
โ”‚ (Prevents infinite loops from circular event dependencies) โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

The streaming phase uses asyncio.Lock to protect shared context:


The sink system provides a pluggable architecture for outputting predictions to various destinations. It follows the dependency injection pattern:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ SINK SYSTEM ARCHITECTURE โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
DSL on_prediction SignalEmitter
โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ• โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•
signal_generation: โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
on_prediction: โ”‚ SignalEmitter โ”‚
- using: sinks.csv_append โ”€โ”€โ”€โ–ถ โ”‚ โ”‚
params: โ”‚ _sinks: [ โ”‚
path: out.csv โ”‚ CSVAppendSink, โ”‚
โ”‚ ClickHouseSink, โ”‚
- using: sinks.clickhouse โ”€โ”€โ”€โ–ถ โ”‚ ] โ”‚
params: โ”‚ โ”‚
table_name: predictions โ”‚ emit_prediction() โ”‚
โ”‚ โ”œโ”€โ”€ sink[0].emit()โ”‚
โ”‚ โ””โ”€โ”€ sink[1].emit()โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ–ผ โ–ผ โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ CSVAppendSink โ”‚ โ”‚ DynamoDBSink โ”‚ โ”‚ ClickHouseSink โ”‚
โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ โ”‚
โ”‚ emit() โ”‚ โ”‚ emit() โ”‚ โ”‚ emit() โ”‚
โ”‚ close() โ”‚ โ”‚ close() โ”‚ โ”‚ close() โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
  1. Instantiation: create_emitter_from_dsl() parses DSL and uses sink_factory to create sink instances
  2. Dependency Injection: Sink instances are passed to SignalEmitter.__init__()
  3. Emission: emit() called per-symbol when prediction is ready
  4. Cleanup: close() called on shutdown or completion

SignalEmitter attempts ALL sinks even if some fail. This ensures one failing sink doesnโ€™t prevent other sinks from receiving data:

def emit_prediction(self, symbol, ts, prediction, metadata):
errors: list[tuple[str, Exception]] = []
# Emit to all sinks with isolation
for i, sink in enumerate(self._sinks):
try:
sink.emit(symbol, ts, prediction, metadata)
except Exception as e:
sink_name = self._sink_names[i]
errors.append((sink_name, e))
# Raise aggregated error if any sink failed
if errors:
raise SinkEmitError(...)


Scenario: Run minimal_signal_generation.yaml at 2025-12-26 12:30:00

12:30:00 CLI starts
โ”œโ”€โ”€ Parse DSL
โ”œโ”€โ”€ Create CSVAppendSink
โ”œโ”€โ”€ Compile to EventFlowDAG
โ””โ”€โ”€ Initialize RealTimeRunner
12:30:01 Phase 2: Warmup
โ”œโ”€โ”€ Connect Binance WebSocket (1h klines for BTCUSDT, ETHUSDT)
โ”œโ”€โ”€ First WS event: 12:00:00 bar (buffered)
โ”œโ”€โ”€ Fetch S3 history: 30 days ending at 12:00:00
โ”œโ”€โ”€ HTTP gap-fill if needed
โ”œโ”€โ”€ Process 720 bars through momentum, RSI features
โ””โ”€โ”€ Seed frames with warmed data
12:30:15 Phase 3: Streaming begins
โ””โ”€โ”€ Log: "Starting real-time signal generation (Ctrl+C to stop)"
13:00:00 Event: BTCUSDT 13:00 bar closes
โ”œโ”€โ”€ Append row to ohlcv frame (721 rows now)
โ”œโ”€โ”€ Execute momentum(window=20) on 721 rows
โ”œโ”€โ”€ Execute rsi(window=14) on 721 rows
โ”œโ”€โ”€ Execute model(window_size=30) โ†’ prediction=0.45
โ””โ”€โ”€ Emit: symbol=BTCUSDT, ts=13:00, pred=0.45 โ†’ CSV
13:00:01 Event: ETHUSDT 13:00 bar closes
โ”œโ”€โ”€ Append row to ohlcv frame (722 rows now)
โ”œโ”€โ”€ Execute features on 722 rows
โ”œโ”€โ”€ Execute model โ†’ prediction=-0.23
โ””โ”€โ”€ Emit: symbol=ETHUSDT, ts=13:00, pred=-0.23 โ†’ CSV
14:00:00 Event: BTCUSDT 14:00 bar closes
โ””โ”€โ”€ ... same flow ...
[Ctrl+C] Shutdown
โ”œโ”€โ”€ Close WebSocket
โ”œโ”€โ”€ Close sinks
โ””โ”€โ”€ Log: "Signal generation completed"