PoC: Real time proposal
Source: Notion | Last edited: 2025-12-29 | ID: 2d52d2dc-3ef...
Signal Generation: Core Concepts
Section titled โSignal Generation: Core ConceptsโPurpose: Comprehensive guide for understanding Alpha Forge signal generation flow and creating custom sink plugins.
Reference Examples:
examples/08_signal_generation/minimal_signal_generation.yamlexamples/08_signal_generation/bilstm_realtime_signal.yaml
1. What is Signal Generation?
Section titled โ1. What is Signal Generation?โSignal generation is a long-running streaming service that:
- Receives real-time market data from Binance WebSocket (or ClickHouse polling)
- Computes features on each new bar (candle close)
- Runs ML model predictions
- Emits predictions to configurable sinks (CSV, DynamoDB, ClickHouse)
CLI Command:
uv run alpha_forge signal examples/08_signal_generation/minimal_signal_generation.yaml2. Complete Signal Flow
Section titled โ2. Complete Signal Flowโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ SIGNAL GENERATION FLOW โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
DSL YAML โ โผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ PHASE 1: STARTUP (CLI signal.py) โโ โโ 1. Parse DSL โโ 2. Extract signal_generation config โโ 3. Create SignalEmitter from on_prediction sinks โโ 4. Extract prediction column from model step โโ 5. Compile DSL โ EventFlowDAG โโ 6. Initialize RealTimeRunner โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ PHASE 2: WARMUP โโ โโ 1. Start Binance WebSocket FIRST (buffer incoming events) โโ 2. Fetch historical data ending at first WebSocket timestamp โโ โโโ BinanceGaplessProvider: S3 bulk + HTTP gap-fill โโ 3. Process warmup through transform steps (resample, align) โโ 4. Process warmup through feature steps (momentum, RSI, etc.) โโ 5. Seed frames into context (pre-warmed state) โโ 6. Seed event tracker (prevent duplicate processing) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ PHASE 3: STREAMING (Infinite Loop) โโ โโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โโ โ FOR EACH WebSocket Event (bar close): โ โโ โ โ โโ โ 1. Receive MarketEvent from Binance WebSocket โ โโ โ โโโ type: "market.bar_close", symbol: "BTCUSDT", ts: ... โ โโ โ โ โโ โ 2. Populate context frame (thread-safe with lock) โ โโ โ โโโ Append OHLCV row to context["frames"]["ohlcv"] โ โโ โ โโโ Deduplicate by (symbol, ts) โ โโ โ โ โโ โ 3. Route event to subscribed flows โ โโ โ โโโ EventRouter finds flow_bar_1h_ohlcv โ โโ โ โ โโ โ 4. Execute flow (inside lock) โ โโ โ โโโ FlowExecutor runs: features โ model โ โโ โ โโโ Each step sees FULL frame (warmup + new rows) โ โโ โ โ โโ โ 5. Emit predictions (OUTSIDE lock - non-blocking I/O) โ โโ โ โโโ PredictionEmitter extracts per-symbol prediction โ โโ โ โโโ SignalEmitter sends to ALL configured sinks โ โโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โโ โโ Runs until Ctrl+C or error โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ3. DSL Structure: signal_generation Section
Section titled โ3. DSL Structure: signal_generation SectionโThe signal_generation block configures the streaming behavior:
signal_generation:mode: stream # Required: Execution mode
warmup: # Required: Historical data configlookback_days:30 # Fetch N days for feature warmup
scheduling: # Optional: Timing configinterval: 1h # Prediction frequencybuffer_minutes:2 # Wait after bar close
on_prediction: # Required: Sink configuration-using: sinks.csv_appendparams:path: outputs/predictions.csvdedup_key:[ts, symbol]3.1 Parameter Reference
Section titled โ3.1 Parameter Referenceโ4. DSL Analysis: minimal_signal_generation.yaml
Section titled โ4. DSL Analysis: minimal_signal_generation.yamlโname: minimal_signal_generation
# ============ Capability Dependencies ============using:capabilities:- shared # Data loaders, transforms, sinks- middlefreq # Features, models, strategies
# ============ Signal Generation Configuration ============signal_generation:mode: stream # Real-time streaming mode
warmup:lookback_days:30 # 30 days history for feature warmup # Calculation: 30 days ร 24 hours = 720 bars (for 1h)
scheduling:interval: 1h # Predict every hour (on bar close)buffer_minutes:2 # Wait 2 min for data to arrive
on_prediction:-using: sinks.csv_append # Output to CSV fileparams:path: outputs/predictions/minimal_predictions.csvdedup_key:[ts, symbol] # Prevent duplicates on reconnect
# ============ Execution Pipeline ============pipeline: # Step 1: Data source (Binance WebSocket in streaming mode)-data:using: data.binance_futuresoutputs:frame: ohlcv # Frame name for downstream stepsparams:universe: # Symbols to track- BTCUSDT- ETHUSDTinterval: 1h # Bar interval
# Step 2: Feature calculation-features:using: features.momentuminputs:column: ohlcv.close # Input: close price from ohlcv frameoutputs:columns:- mom_20 # Output column nameparams:window:20 # 20-bar lookback
-features:using: features.rsiinputs:column: ohlcv.closeoutputs:columns:- rsi_14params:window:14
# Step 3: Model prediction-model:using: models.pytorch_bilstm_predictorinputs:frame: ohlcv # Input frame with featuresmodel_ref: my_model.unifiedoutputs:columns:- model.prediction # โ This is the prediction columnparams:strategy: unifiedwindow_size:30 # Model looks at 30 barsoutput_col: model.prediction # โ Extracted by CLI for emissioncheckpoint_dir: tmp/models/my_checkpointHow Parameters Affect Execution
Section titled โHow Parameters Affect Executionโ5. DSL Analysis: bilstm_realtime_signal.yaml
Section titled โ5. DSL Analysis: bilstm_realtime_signal.yamlโThis is a production-grade example with complex feature engineering:
name: bilstm_realtime_signal_generation
# ============ Parameter Expansion ============expand:rsi_windows:[6,12,24] # Creates 3 RSI featuresma_windows:[5,10,20,60] # Creates 4 MA featureslevel_windows:[10,20,60] # Creates 3 support/resistance features
# ============ Signal Generation Configuration ============signal_generation:mode: stream
warmup:lookback_days:90 # 90 days for MA(60) + window_size(30) stabilization # Calculation: Need 60 bars for MA(60), plus 30 for model # At 2h bars: 90 days ร 12 bars/day = 1080 bars
scheduling:interval: 2h # Predict on 2h barsbuffer_minutes:5 # Wait 5 min for data
on_prediction:-using: sinks.csv_appendparams:path: outputs/predictions/bilstm_improved_realtime.csvdedup_key:[ts, symbol]
# ============ Execution Pipeline ============pipeline: # Step 1: Stream 15m data (higher resolution)-data:using: data.binance_futuresoutputs:frame: ohlcv_15m # 15-minute barsparams:universe:- BTCUSDT- ETHUSDT- SOLUSDT- XRPUSDTinterval: 15m
# Step 2: Resample 15m โ 2h bars-transform:using: transforms.resampleinputs:frame: ohlcv_15moutputs:frame: ohlcv_2h # 2-hour bars for featuresparams:target_freq: 2h
# Step 3: Features with parameter expansion-features:using: features.rsiinputs:column: ohlcv_2h.closeoutputs:columns:- rsi{window} # Expands to: rsi6, rsi12, rsi24params:window: ${expand.rsi_windows} # [6, 12, 24]
-features:using: features.ma_rocpinputs:column: ohlcv_2h.closeoutputs:columns:- ma{window}rocp # Expands to: ma5rocp, ma10rocp, ma20rocp, ma60rocpparams:window: ${expand.ma_windows} # [5, 10, 20, 60]rocp_period:1rocp_factor:10
# ... many more features ...
# Final: Model prediction-model:using: models.pytorch_bilstm_predictorinputs:frame: ohlcv_2hmodel_ref: bilstm_elnigma_unified_improved.unifiedoutputs:columns:- model.predictionparams:strategy: unifiedwindow_size:30output_col: model.predictioncheckpoint_dir: tmp/models/el_nigma_equiv_unified_improvedKey Differences from Minimal Example
Section titled โKey Differences from Minimal ExampleโParameter Expansion Flow
Section titled โParameter Expansion Flowโ# DSL Definitionexpand:rsi_windows:[6,12,24]
# Template-features:using: features.rsioutputs:columns:- rsi{window}params:window: ${expand.rsi_windows}
# Expands to 3 separate feature steps:-features:{using: features.rsi,params:{window:6},outputs:{columns:[rsi6]}}-features:{using: features.rsi,params:{window:12},outputs:{columns:[rsi12]}}-features:{using: features.rsi,params:{window:24},outputs:{columns:[rsi24]}}6. Data Flow Diagram
Section titled โ6. Data Flow Diagramโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ DATA FLOW โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Binance WebSocket Context Frames โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโ
wss://stream.binance.com context["frames"]["ohlcv"] โ โ โ 1h bar close event โ โผ โผ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ MarketEvent โ โ DataFrame (Panel format) โ โ โ โ โ โ type: bar_close โ โโโโโโโโโโโโโโโโโโถ โ ts โ symbol โ close โ โ ts: 2025-12-26 โ Append row โ 12:00:00 โ BTCUSDTโ 41000 โ โ symbol: BTCUSDT โ โ 12:00:00 โ ETHUSDTโ 2200 โ โ metadata: โ โ 13:00:00 โ BTCUSDTโ 41100 โ โ NEW โ open: 41000 โ โ 13:00:00 โ ETHUSDTโ 2210 โ โ NEW โ high: 41200 โ โ ... โ ... โ ... โ โ low: 40900 โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ close: 41100 โ โ โ volume: 1000 โ โ โโโโโโโโโโโโโโโโโโโ โ โผ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ Feature Execution โ โ โ โ momentum(window=20) โ โ rsi(window=14) โ โ โ โ Input: ALL rows (720+) โ โ Output: Feature columns โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โผ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ Model Execution โ โ โ โ bilstm_predictor โ โ window_size: 30 โ โ โ โ Input: Frame with features โ โ Output: model.prediction โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โผ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ PredictionEmitter โ โ โ โ Extract per symbol: โ โ BTCUSDT: latest row โ 0.45 โ โ ETHUSDT: latest row โ -0.23 โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โผ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ SignalEmitter โ โ โ โ emit_prediction( โ โ symbol="BTCUSDT", โ โ ts=2025-12-26T13:00:00, โ โ prediction=0.45, โ โ metadata={...} โ โ ) โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโ โผ โผ โผ โโโโโโโโโโโโโ โโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ โ CSVSink โ โ DynamoDB โ โ ClickHouse โ โ โ โ Sink โ โ Sink โ โโโโโโโโโโโโโ โโโโโโโโโโโโโ โโโโโโโโโโโโโโโโ7. Phase 2: Warmup Deep Dive
Section titled โ7. Phase 2: Warmup Deep Diveโ7.1 Why Warmup is Needed
Section titled โ7.1 Why Warmup is NeededโFeatures like moving averages need historical data:
momentum(window=20)needs 20 previous barsrsi(window=14)needs 14 previous barsma(window=60)needs 60 previous bars- BiLSTM
window_size=30needs 30 previous rows
Without warmup: First predictions would be NaN or incorrect.
7.2 Warmup Calculation
Section titled โ7.2 Warmup CalculationโRequired warmup bars = max(feature_windows) ร buffer_multiplier
Example (bilstm_realtime_signal.yaml):- MA(60) = 60 bars- window_size = 30 bars- At 2h interval: 90 days ร 12 bars/day = 1080 bars- Buffer: 1080 ร 1.1 = 1188 bars fetched7.3 Gapless Warmup Architecture
Section titled โ7.3 Gapless Warmup ArchitectureโThe warmup phase uses a WebSocket-first gapless approach to guarantee zero gaps between historical data and live stream:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ PHASE 2: WARMUP ORCHESTRATION โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
RealTimeRunner.run() โ โผ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ Step 1: _fetch_warmup_data_gapless() โ โ โโโ Delegates to WarmupOrchestrator.fetch_warmup_data() โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โผ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ WarmupOrchestrator โ โ โโโโโโโโโโโโโโโโโโ โ โ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ Step 1.1: Start WebSocket and buffer events โ โ โ โ โ โ โ โ โข Connect to Binance WebSocket โ โ โ โ โข Start async task: buffer_events(source) โ โ โ โ โข Buffer all incoming MarketEvents to ws_buffer[] โ โ โ โ โข Set ws_started flag when first event arrives โ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ โ โ โผ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ Step 1.2: Wait for first closed bar event โ โ โ โ โ โ โ โ โข await ws_started.wait() with 120s timeout โ โ โ โ โข If timeout: raise RuntimeError โ โ โ โ โข On success: ws_first_event_ts = ws_buffer[0].ts โ โ โ โ โข This timestamp becomes the "anchor" for historical fetch โ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ โ โ โผ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ Step 1.3: Fetch historical data via WarmupDataService โ โ โ โ โ โ โ โ warmup_service.fetch_warmup_data( โ โ โ โ dag=dag, โ โ โ โ ws_first_event_ts=ws_first_event_ts โ end_ts anchor โ โ โ โ ) โ โ โ โ โ โ โ โ Data flow: โ โ โ โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ โ โ โ โ S3 Bulk โ โโถ โ HTTP Gap โ โโถ โ Merged โ โ โ โ โ โ (older data)โ โ Fill โ โ DataFrame โ โ โ โ โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ โ โ โผ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ Step 1.4: Merge historical + WebSocket buffer โ โ โ โ โ โ โ โ โข Convert ws_buffer events to DataFrame rows โ โ โ โ โข pd.concat([historical_df, ws_df]) โ โ โ โ โข Deduplicate: drop_duplicates(["symbol", "ts"], keep="last")โ โ โ โ โข WebSocket data wins on conflict (fresher) โ โ โ โ โข Sort by (symbol, ts) for proper ordering โ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ โ โ โผ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ Step 1.5: Trim to effective_bars per symbol โ โ โ โ โ โ โ โ effective_bars = max_warmup_bars ร resample_ratio โ โ โ โ โ โ โ โ For each symbol: โ โ โ โ symbol_df = warmup_df[warmup_df["symbol"] == symbol] โ โ โ โ symbol_df = symbol_df.tail(effective_bars) โ โ โ โ โ โ โ โ Return: WarmupResult(frames={frame_name: trimmed_df}) โ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โผ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ Step 2: _process_warmup_through_transforms() โ โ โโโ Delegates to WarmupProcessor.process_transforms() โ โ โ โ Purpose: Run transform steps (e.g., resample 15m โ 2h) โ โ on warmup data to populate target frames โ โ โ โ Example: โ โ โข Source frame: ohlcv_15m (720 bars ร 4 symbols = 2880 rows) โ โ โข Transform: resample(target_freq="2h") โ โ โข Target frame: ohlcv_2h (90 bars ร 4 symbols = 360 rows) โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โผ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ Step 3: _process_warmup_through_features() โ โ โโโ Delegates to WarmupProcessor.process_features() โ โ โ โ Purpose: Compute feature columns on all warmup rows โ โ so ML models receive complete feature vectors โ โ โ โ Example: โ โ โข Input: ohlcv_2h with [ts, symbol, open, high, low, close, vol] โ โ โข Features: momentum(20), rsi(14), ma_rocp(60) โ โ โข Output: ohlcv_2h with [..., mom_20, rsi_14, ma60rocp] โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โผ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ Step 4: _seed_frames_to_frame_logs() โ โ โโโ Delegates to FramePopulator.seed_for_realtime() โ โ โ โ Purpose: Convert DataFrames to FrameLogs (ring buffers) โ โ for bounded memory in real-time mode โ โ โ โ FrameLog: โ โ โข max_size = max_warmup_bars ร FRAME_LOG_SIZE_MULTIPLIER โ โ โข Automatically trims oldest rows when limit exceeded โ โ โข Single source of truth during streaming โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โผ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ Step 5: _seed_event_tracker_from_warmup() โ โ โ โ Purpose: Mark all warmup (symbol, ts) pairs as "already processed" โ โ to prevent duplicate predictions on WebSocket reconnect โ โ โ โ After warmup, WebSocket reconnects and may re-deliver recent bars. โ โ ProcessedEventTracker filters these as duplicates. โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โผ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ Step 6: Connect event sources and create streaming tasks โ โ โโโ EventLoopManager.connect_sources() + create_tasks() โ โ โ โ โข WebSocket sources: Reconnect (were closed after warmup buffer) โ โ โข Timer sources: Connect โ โ โข Create asyncio tasks for _process_event_source() โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โผ PHASE 3: STREAMING7.4 WarmupOrchestrator Services
Section titled โ7.4 WarmupOrchestrator Servicesโ7.5 Key Configuration
Section titled โ7.5 Key Configurationโ8. Phase 3: Streaming Deep Dive
Section titled โ8. Phase 3: Streaming Deep Diveโ8.1 RealTimeRunner Event Loop
Section titled โ8.1 RealTimeRunner Event LoopโAfter warmup completes, the streaming phase begins. RealTimeRunner orchestrates an infinite event loop:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ PHASE 3: STREAMING EVENT LOOP โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
asyncio.gather(*tasks) โ โ Each task runs _process_event_source(event_id, source) โ โผ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ _process_event_source(event_id, source) โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ โ async for event in source.stream(): โโโโ Infinite async iterator โ โ โ โ โ โโโ 1. Log event receipt โ โ โ "Received event: market.bar_close @ 13:00 (BTCUSDT)" โ โ โ โ โ โโโ 2. Memory observability โ โ โ _memory_observer.observe(event_count) โ โ โ Logs RSS every 100 events โ โ โ โ โ โโโ 3. Populate context frame (if closed candle) โ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ โ async with _context_lock: โ โ โ โ โ โข Extract OHLCV from event.metadata โ โ โ โ โ โข Append row to context["frames"] โ โ โ โ โ โข Deduplicate by (symbol, ts) โ โ โ โ โ โข Trim to max_frame_rows โ โ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ โ โ โโโ 4. Route event to subscribed flows โ โ โ flows = _router.route(event) โ โ โ Returns: [FlowDefinition, ...] โ โ โ โ โ โโโ 5. Check if closed candle (skip partial updates) โ โ โ if not is_closed: skip flow execution โ โ โ โ โ โโโ 6. Execute flows (inside lock) โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ async with _context_lock: โ โ โ โ โข Deduplication check (skip if dup) โ โ โ โ โข Mark event as processed โ โ โ โ โข For each flow: โ โ โ โ โโโ _executor.execute(flow, ...) โ โ โ โ โข Handle emitted events (chain) โ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ โ โ โผ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ 7. Emit predictions (outside lock) โ โ โ โ _emit_predictions(event) โ โ โ โ โโโ PredictionEmitter.emit_if_readyโ โ โ โ โโโ SignalEmitter.emit_prediction โ โ โ โโโ Sink.emit() ร N sinks โ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ8.2 Event Processing Details
Section titled โ8.2 Event Processing DetailsโStep 3: Frame Population
Section titled โStep 3: Frame PopulationโWhen a closed candle event arrives, the OHLCV data is appended to the context frame:
# Event metadata โ DataFrame rowevent_row = { "ts": event.ts, # e.g., 2025-12-26 13:00:00 UTC "symbol": event.symbol, # e.g., "BTCUSDT" "open": event.metadata["open"], # e.g., 41000.0 "high": event.metadata["high"], # e.g., 41200.0 "low": event.metadata["low"], # e.g., 40900.0 "close": event.metadata["close"], # e.g., 41100.0 "volume": event.metadata["volume"] # e.g., 1000.5}
# Thread-safe append with lockasync with _context_lock: context["frames"][frame_name] = pd.concat([ context["frames"][frame_name], pd.DataFrame([event_row]) ]) # Deduplicate to prevent duplicate predictions context["frames"][frame_name] = ( context["frames"][frame_name] .drop_duplicates(subset=["symbol", "ts"], keep="last") )Step 4: Event Routing
Section titled โStep 4: Event RoutingโThe EventRouter matches events to subscribed flows based on the on field:
Event: market.bar_close @ 13:00 (BTCUSDT) event_def_id = "data.binance_1h"
DAG Flows: flow_bar_1h_ohlcv: on: ["data.binance_1h"] โ MATCHES steps: [features.momentum, features.rsi, model.bilstm]
flow_bar_15m_ohlcv: on: ["data.binance_15m"] โ Does not match
Result: route() returns [flow_bar_1h_ohlcv]Step 6: Flow Execution
Section titled โStep 6: Flow ExecutionโThe FlowExecutor runs each step in dependency order:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ FLOW EXECUTION โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
FlowExecutor.execute(flow, event, context) โ โผ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ For each step in flow.steps: โ โ โ โ Step 1: features.momentum โ โ โโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โข plugin_fn = plugin_loader.load_plugin("features.momentum") โ โ โข inputs = InputResolver.resolve(step.inputs, ...) โ โ โโโ Reads: context["frames"]["ohlcv"]["close"] โ โ โโโ Returns: pd.Series with 721 close prices โ โ โข result = plugin_fn(series=inputs["column"], window=20, ...) โ โ โโโ Computes momentum on ALL 721 rows โ โ โข ResultHandler.handle_result(result, ...) โ โ โโโ Adds "mom_20" column to ohlcv frame โ โ โ โ Step 2: features.rsi โ โ โโโโโโโโโโโโโโโโโโโโ โ โ โข Same pattern, adds "rsi_14" column โ โ โ โ Step 3: models.pytorch_bilstm_predictor โ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โ โข inputs = InputResolver.resolve(...) โ โ โโโ Reads: context["frames"]["ohlcv"] (with mom_20, rsi_14) โ โ โข result = plugin_fn(panel_df=inputs["frame"], window_size=30, ...) โ โ โโโ Model sees 30-bar window, outputs prediction โ โ โข ResultHandler.handle_result(result, ...) โ โ โโโ Adds "model.prediction" column to ohlcv frame โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ โผ ExecutionResult(emitted_events=[...], context=updated_context)8.3 Event Deduplication
Section titled โ8.3 Event DeduplicationโThe ProcessedEventTracker prevents duplicate predictions:
ProcessedEventTrackerโโโโโโโโโโโโโโโโโโโ
Internal state: Set of (event_def_id, symbol, ts) tuples
is_duplicate(event_def_id, symbol, ts): key = (event_def_id, symbol, str(ts)) return key in self._processed_events
mark_processed(event_def_id, symbol, ts): key = (event_def_id, symbol, str(ts)) self._processed_events.add(key)
Why needed: โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ 1. WebSocket reconnection may re-deliver recent bars โ โ 2. Multiple symbols can trigger same flow โ โ 3. Event chaining could cause loops โ โ โ โ Without deduplication: โ โ 13:00 BTCUSDT bar โ prediction = 0.45 โ emit to CSV โ โ 13:00 BTCUSDT bar (re-delivered) โ prediction = 0.45 โ DUPLICATE! โ โ โ โ With deduplication: โ โ 13:00 BTCUSDT bar โ prediction = 0.45 โ emit to CSV โ โ 13:00 BTCUSDT bar (re-delivered) โ is_duplicate=True โ SKIP โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ8.4 Prediction Emission
Section titled โ8.4 Prediction EmissionโAfter flow execution, predictions are emitted via PredictionEmitter:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ PREDICTION EMISSION FLOW โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
_emit_predictions(event) โ โผ PredictionEmitter.emit_if_ready(event, context, dag_name) โ โ 1. Extract prediction column from context โ prediction_column = "model.prediction" โ df = context["frames"]["ohlcv"] โ โ 2. Get latest row for triggering symbol โ symbol = event.symbol # e.g., "BTCUSDT" โ latest = df[df["symbol"] == symbol].iloc[-1] โ prediction = latest["model.prediction"] # e.g., 0.45 โ โ 3. Build metadata โ metadata = { โ "frame": "ohlcv", โ "column": "model.prediction", โ "dag_name": "minimal_signal_generation" โ } โ โผ SignalEmitter.emit_prediction(symbol, ts, prediction, metadata) โ โ For each sink in self._sinks: โ sink.emit(symbol, ts, prediction, metadata) โ โโโ CSVAppendSink.emit() โ Append to outputs/predictions.csv โโโ DynamoDBSink.emit() โ Put item to DynamoDB โโโ ClickHouseSink.emit() โ Insert to ClickHouse table8.5 Event Chaining
Section titled โ8.5 Event ChainingโPlugins can emit events that trigger downstream flows:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ EVENT CHAINING โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Flow execution may emit events (e.g., "signal.generated") โ โผ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ Event chaining loop (max depth = 100): โ โ โ โ while chain_depth < MAX_EVENT_CHAIN_DEPTH: โ โ โ โ โ โโโ Drain EventBus for emitted events โ โ โ emitted_events = _event_bus.drain() โ โ โ โ โ โโโ If no events, break โ โ โ โ โ โโโ For each emitted event: โ โ โโโ Check deduplication โ โ โโโ Route to subscribed flows โ โ โโโ Execute flows โ โ โโโ Publish newly emitted events โ โ โ โ Depth guard: Raises FlowExecutionError if depth >= 100 โ โ (Prevents infinite loops from circular event dependencies) โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ8.6 Thread Safety
Section titled โ8.6 Thread SafetyโThe streaming phase uses asyncio.Lock to protect shared context:
8.7 Streaming Services
Section titled โ8.7 Streaming Servicesโ9. Sink System Architecture
Section titled โ9. Sink System Architectureโ9.1 Overview
Section titled โ9.1 OverviewโThe sink system provides a pluggable architecture for outputting predictions to various destinations. It follows the dependency injection pattern:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ SINK SYSTEM ARCHITECTURE โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
DSL on_prediction SignalEmitter โโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโ
signal_generation: โโโโโโโโโโโโโโโโโโโโโโโโ on_prediction: โ SignalEmitter โ - using: sinks.csv_append โโโโถ โ โ params: โ _sinks: [ โ path: out.csv โ CSVAppendSink, โ โ ClickHouseSink, โ - using: sinks.clickhouse โโโโถ โ ] โ params: โ โ table_name: predictions โ emit_prediction() โ โ โโโ sink[0].emit()โ โ โโโ sink[1].emit()โ โโโโโโโโโโโโโโโโโโโโโโโโ โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โผ โผ โผ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโ โ CSVAppendSink โ โ DynamoDBSink โ โ ClickHouseSink โ โ โ โ โ โ โ โ emit() โ โ emit() โ โ emit() โ โ close() โ โ close() โ โ close() โ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโ9.2 Sink Lifecycle
Section titled โ9.2 Sink Lifecycleโ- Instantiation:
create_emitter_from_dsl()parses DSL and usessink_factoryto create sink instances - Dependency Injection: Sink instances are passed to
SignalEmitter.__init__() - Emission:
emit()called per-symbol when prediction is ready - Cleanup:
close()called on shutdown or completion
9.3 Error Isolation Philosophy
Section titled โ9.3 Error Isolation PhilosophyโSignalEmitter attempts ALL sinks even if some fail. This ensures one failing sink doesnโt prevent other sinks from receiving data:
def emit_prediction(self, symbol, ts, prediction, metadata): errors: list[tuple[str, Exception]] = []
# Emit to all sinks with isolation for i, sink in enumerate(self._sinks): try: sink.emit(symbol, ts, prediction, metadata) except Exception as e: sink_name = self._sink_names[i] errors.append((sink_name, e))
# Raise aggregated error if any sink failed if errors: raise SinkEmitError(...)9.4 Key Design Principles
Section titled โ9.4 Key Design Principlesโ10. Key Code Locations
Section titled โ10. Key Code Locationsโ11. Execution Timeline Example
Section titled โ11. Execution Timeline ExampleโScenario: Run minimal_signal_generation.yaml at 2025-12-26 12:30:00
12:30:00 CLI starts โโโ Parse DSL โโโ Create CSVAppendSink โโโ Compile to EventFlowDAG โโโ Initialize RealTimeRunner
12:30:01 Phase 2: Warmup โโโ Connect Binance WebSocket (1h klines for BTCUSDT, ETHUSDT) โโโ First WS event: 12:00:00 bar (buffered) โโโ Fetch S3 history: 30 days ending at 12:00:00 โโโ HTTP gap-fill if needed โโโ Process 720 bars through momentum, RSI features โโโ Seed frames with warmed data
12:30:15 Phase 3: Streaming begins โโโ Log: "Starting real-time signal generation (Ctrl+C to stop)"
13:00:00 Event: BTCUSDT 13:00 bar closes โโโ Append row to ohlcv frame (721 rows now) โโโ Execute momentum(window=20) on 721 rows โโโ Execute rsi(window=14) on 721 rows โโโ Execute model(window_size=30) โ prediction=0.45 โโโ Emit: symbol=BTCUSDT, ts=13:00, pred=0.45 โ CSV
13:00:01 Event: ETHUSDT 13:00 bar closes โโโ Append row to ohlcv frame (722 rows now) โโโ Execute features on 722 rows โโโ Execute model โ prediction=-0.23 โโโ Emit: symbol=ETHUSDT, ts=13:00, pred=-0.23 โ CSV
14:00:00 Event: BTCUSDT 14:00 bar closes โโโ ... same flow ...
[Ctrl+C] Shutdown โโโ Close WebSocket โโโ Close sinks โโโ Log: "Signal generation completed"