bug-analysis
Source: Notion | Last edited: 2025-12-29 | ID: 2d82d2dc-3ef...
Signal Generation Flow - Bug Analysis
Section titled “Signal Generation Flow - Bug Analysis”Date: 2025-12-27 (Updated: 2025-12-29) Status: Investigation Complete - Issue #4 expanded with Lock 2 analysis Severity: HIGH (potential data loss in streaming)
1. Executive Summary
Section titled “1. Executive Summary”Four issues identified in the Signal Generation streaming flow:
2. Bug #1: Gap Between Warmup and Streaming
Section titled “2. Bug #1: Gap Between Warmup and Streaming”2.1 Problem Description
Section titled “2.1 Problem Description”The warmup flow creates a window where bars can be permanently lost:
Timeline:12:00:00 First WS event arrives (13:00 bar closes)12:00:01 WarmupOrchestrator.fetch_warmup_data() fetches historical data12:00:02 *** WebSocket CLOSED *** (line 272-278 warmup_orchestrator.py)12:00:03 RealTimeRunner processes transforms12:00:10 RealTimeRunner processes features12:00:30 RealTimeRunner seeds frames to FrameLogs12:00:31 EventLoopManager.connect_sources() *** WebSocket RECONNECTS ***
PROBLEM: If 14:00 bar closes at 12:00:25 (during processing), it's LOST.2.2 Code Trace
Section titled “2.2 Code Trace”File: orchestrator/services/warmup_orchestrator.py
# Line 262-278: WebSocket is closed AFTER warmup data merge# Step 5: Stop buffering and close connectionsbuffering_active = False
for task in buffer_tasks: task.cancel() try: await task except asyncio.CancelledError: pass
for event_id, source in ws_sources: # <-- CLOSES WebSocket try: await source.close() logger.debug(f"Closed warmup WebSocket for '{event_id}'") except Exception as e: logger.warning(f"Error closing warmup WebSocket '{event_id}':{e}")File: orchestrator/real_time_runner.py
# Line 473-501: Processing happens AFTER warmup returnsawait self._fetch_warmup_data_gapless() # <-- WS closed here
# These can take significant time (seconds to minutes for complex features)self._process_warmup_through_transforms() # <-- No WS connectionself._process_warmup_through_features() # <-- No WS connectionself._seed_frames_to_frame_logs() # <-- No WS connectionself._seed_event_tracker_from_warmup() # <-- No WS connection
# Only NOW does WebSocket reconnectawait self._event_loop_manager.connect_sources(self._event_sources) # <-- Reconnect2.3 Impact
Section titled “2.3 Impact”- Lost bars: Any bar that closes between WebSocket close and reconnect is permanently lost
- Silent failure: No error is raised, the gap goes undetected
- Inconsistent predictions: ML model receives incomplete data
2.4 Proposed Fix
Section titled “2.4 Proposed Fix”Option A: Keep WebSocket open during processing (Recommended)
Instead of closing WebSocket after warmup buffer, keep it open and continue buffering during processing:
# In WarmupOrchestrator.fetch_warmup_data():# DON'T close WebSocket - let RealTimeRunner manage lifecycle
# Return buffer task handles so RealTimeRunner can:# 1. Continue buffering during transform/feature processing# 2. Merge buffered events after processing completes# 3. Then transition to streaming modeOption B: Parallel processing with continued buffering
Continue buffering in a separate task while processing warmup data, then merge accumulated events before starting streaming.
3. Bug #2: First Event Timing Inefficiency
Section titled “3. Bug #2: First Event Timing Inefficiency”3.1 Problem Description
Section titled “3.1 Problem Description”The warmup flow waits for a closed candle event to establish the timeline anchor. For hourly bars, this can delay startup by up to 60 minutes.
3.2 Code Trace
Section titled “3.2 Code Trace”File: orchestrator/services/warmup_orchestrator.py
# Line 144-152: buffer_events() buffers ALL events (including partial)async def buffer_events(source: "BinanceWebSocketEventStream") -> None: nonlocal buffering_active await source.connect() async for event in source.stream(): if buffering_active: ws_buffer.append(event) if not ws_started.is_set(): ws_started.set() # <-- Triggers on ANY event else: break
# Line 194: First buffered event's timestamp used as anchorws_first_event_ts = ws_buffer[0].tsThe issue is that ws_started.set() triggers on the first event (could be partial), but the timestamp is still from that first event which might not be the timeline we want.
3.3 Analysis
Section titled “3.3 Analysis”Current behavior:
- First event (partial or closed) triggers
ws_started.set() ws_first_event_ts = ws_buffer[0].tsuses that timestamp- Historical fetch ends at this timestamp
Concerns:
- If first event is partial (e.g., at 12:30:45 during 1h bar), timestamp is mid-bar
- Historical data fetched ends at 12:30:45, potentially creating misalignment
- 60-minute wait for closed candle is unnecessary if partial events are acceptable
3.4 Impact
Section titled “3.4 Impact”- Startup delay: Up to 60 minutes for 1h bars
- Potential misalignment: If partial event timestamp is used, historical data may not align with bar boundaries
3.5 Proposed Fix
Section titled “3.5 Proposed Fix”Option A: Use current time as anchor (Fastest startup)
# Use current time, let WarmupDataService fetch up to "now"ws_first_event_ts = pd.Timestamp.now(tz="UTC")Option B: Wait for first closed candle only (Current, but clarify)
If close_time alignment is critical, document that startup delay is expected and add logging:
# Only use closed candle events for anchorasync def buffer_events(source): async for event in source.stream(): if buffering_active: ws_buffer.append(event) if not ws_started.is_set() and event.metadata.get("is_closed", False): ws_started.set() # Only trigger on closed candle4. Bug #3: Event Row Lost After Feature Execution
Section titled “4. Bug #3: Event Row Lost After Feature Execution”4.1 Problem Description
Section titled “4.1 Problem Description”When a new WebSocket event arrives:
- Event row is appended to
context["frames"][frame_name] - Feature plugin executes and calls
handle_feature_result() handle_feature_result()replacescontext["frames"][frame_name]withframe_log.window()- The newly appended row is LOST because it was never added to FrameLog
4.2 Code Trace
Section titled “4.2 Code Trace”File: orchestrator/real_time_runner.py
# Line 558-592: Event row appended to context["frames"]async with self._context_lock: # Initialize frame if first event if self._context["frames"][frame_name] is None: self._context["frames"][frame_name] = pd.DataFrame([event_row]) else: # Append event data to existing frame self._context["frames"][frame_name] = pd.concat( [self._context["frames"][frame_name], pd.DataFrame([event_row])], ignore_index=True ) # ... dedup and trim ...
# CRITICAL: Also append to FrameLog to keep them in sync # After handle_feature_result, context["frames"] is replaced with frame_log.window() # So the FrameLog must have the new row, or it will be lost if "_frame_logs" in self._context and frame_name in self._context["_frame_logs"]: self._context["_frame_logs"][frame_name].append(event_row) # <-- FIX ADDEDFile: orchestrator/services/result_handler.py
# Line 365: context["frames"] replaced with frame_log.window()# Real-time mode: update working frame with window viewcontext["frames"][input_frame_name] = frame_log.window() # <-- This loses unsynced rows4.3 Current Fix Status
Section titled “4.3 Current Fix Status”A fix was added at lines 583-587 of real_time_runner.py:
# CRITICAL: Also append to FrameLog to keep them in syncif "_frame_logs" in self._context and frame_name in self._context["_frame_logs"]: self._context["_frame_logs"][frame_name].append(event_row)4.4 Verification Required
Section titled “4.4 Verification Required”The fix appends event_row dict to FrameLog, but needs verification that:
event_rowformat matches what FrameLog expects (dict with “ts” key)- The append happens before any feature execution
- Deduplication in FrameLog handles potential duplicates correctly
4.5 Potential Issue with Fix
Section titled “4.5 Potential Issue with Fix”Looking at frame_log.py:
def append(self, row: dict) -> None: if "ts" not in row: raise ValueError("Row must contain 'ts' key") self._rows.append(row) self._columns.update(row.keys()) self._trim_if_needed()The fix looks correct - event_row contains “ts” key (line 549-554 in real_time_runner.py).
5. Issue #4: Unnecessary Context Locking
Section titled “5. Issue #4: Unnecessary Context Locking”5.1 Problem Description
Section titled “5.1 Problem Description”The _process_event_source() method uses asyncio.Lock() to protect context mutations, but the lock provides no protection because:
- Python asyncio is single-threaded cooperative multitasking
- Context switches only happen at
awaitpoints - There are no
awaitcalls inside the locked sections The lock adds overhead without preventing any race conditions.
5.2 Code Trace
Section titled “5.2 Code Trace”File: orchestrator/real_time_runner.py
# Lock 1: Append event to frame (lines 559-592)async with self._context_lock: # <-- Lock acquired # ALL operations below are SYNCHRONOUS (no await) self._context["frames"][frame_name] = pd.concat(...) # Sync self._context["frames"][frame_name].drop_duplicates(...) # Sync self._context["_frame_logs"][frame_name].append(event_row) # Sync # <-- Lock released
# Lock 2: Execute flows (lines 603-680+)async with self._context_lock: # <-- Lock acquired if self._event_tracker.is_duplicate(...): continue # Sync self._event_tracker.mark_processed(...) # Sync result = self._executor.execute(...) # Sync - FlowExecutor has NO async methods # <-- Lock releasedLock 2 Detailed Analysis (lines 603-680+):
The second lock holds for the ENTIRE flow execution including event chaining:
async with self._context_lock: # Lock acquired here # 1. Deduplication check - SYNC if self._event_tracker.is_duplicate(...): continue self._event_tracker.mark_processed(...) # SYNC
# 2. Flow execution loop - ALL SYNC for flow in flows: result = self._executor.execute(flow, event, self._context) # SYNC
# 3. Event bus operations - SYNC for emitted_event in result.emitted_events: self._event_bus.publish(emitted_event) # SYNC
# 4. Event chaining loop - ALL SYNC (up to 100 iterations!) while chain_depth < MAX_EVENT_CHAIN_DEPTH: emitted_events = self._event_bus.drain() # SYNC if not emitted_events: break
for emitted_event in emitted_events: # Dedup check - SYNC if self._event_tracker.is_duplicate(...): continue self._event_tracker.mark_processed(...) # SYNC
# Route and execute - SYNC emitted_flows = self._router.route(emitted_event) # SYNC for emitted_flow in emitted_flows: emitted_result = self._executor.execute(...) # SYNC
for newly_emitted in emitted_result.emitted_events: self._event_bus.publish(newly_emitted) # SYNC# Lock released here (after ALL chained events processed!)Operations audit table:
ALL operations are synchronous. No await = no context switch = no race condition possible.
5.3 Concurrency Model Analysis
Section titled “5.3 Concurrency Model Analysis”asyncio.gather(*tasks) │ ├── _process_event_source("bar_1m_btc", ws_btc) ──┐ ├── _process_event_source("bar_1m_eth", ws_eth) ──┼── Concurrent tasks └── _process_event_source("bar_1h_all", ws_all) ──┘
Each task runs in the SAME thread. Context switches ONLY at await points.Key observation: The locked sections contain only synchronous operations:
Without await inside the lock, no other coroutine can run, so no race condition is possible.
5.4 Impact
Section titled “5.4 Impact”- Performance overhead: Lock acquire/release cycles on every event
- Potential bottleneck: Multiple symbols waiting for lock unnecessarily
- Code complexity: Misleading - suggests concurrency issues that don’t exist
- No actual protection: Lock does nothing because no
awaitinside - Excessive lock hold time: Lock 2 holds during entire event chain (up to 100 iterations)
- Misleading comment: Code says “v1.9.1: Lock BEFORE deduplication check to prevent race conditions” but race is impossible without
awaitWhy the lock “appears” to work:
asyncio Model (Single-threaded):
Task A (BTCUSDT) Task B (ETHUSDT) │ │ ▼ │async with lock ──────┐ │ │ │ │ ├─ is_duplicate │ SYNC │ (cannot run - no await) ├─ execute() │ SYNC │ (cannot run - no await) ├─ chain events │ SYNC │ (cannot run - no await) │ │ │release lock ─────────┘ │ │ ▼ await ◄─────────────────── async with lock (NOW can acquire)Even WITHOUT the lock, Task B cannot interleave because there’s no await inside.
The lock only adds overhead without changing behavior.
5.5 When Lock WOULD Be Needed
Section titled “5.5 When Lock WOULD Be Needed”Lock would be necessary if:
- Operations inside used
await(allows context switch) - Thread pool executors were used (
asyncio.to_thread) - Multiple OS threads accessed context (not the case)
5.6 Proposed Fix
Section titled “5.6 Proposed Fix”Lock locations in codebase:
Fix 1: Remove Lock from __init__ (Line 195)
Section titled “Fix 1: Remove Lock from __init__ (Line 195)”# BEFORE (Line 195):self._context_lock = asyncio.Lock()
# AFTER: Remove this line entirely# (no lock needed - all operations are synchronous)Fix 2: Remove Lock 1 - Append Event (Lines 557-591)
Section titled “Fix 2: Remove Lock 1 - Append Event (Lines 557-591)”# =============================================================================# BEFORE (Lines 557-591): Unnecessary lock around frame append# ============================================================================= # Protect shared context mutations with lock async with self._context_lock: # Initialize frame if first event if self._context["frames"][frame_name] is None: self._context["frames"][frame_name] = pd.DataFrame([event_row]) else: # Append event data to existing frame self._context["frames"][frame_name] = pd.concat( [self._context["frames"][frame_name], pd.DataFrame([event_row])], ignore_index=True ) # Deduplicate and trim... self._context["frames"][frame_name] = ( self._context["frames"][frame_name] .drop_duplicates(subset=["symbol", "ts"], keep="last") .reset_index(drop=True) )
# CRITICAL: Also append to FrameLog if "_frame_logs" in self._context and frame_name in self._context["_frame_logs"]: self._context["_frame_logs"][frame_name].append(event_row)
# =============================================================================# AFTER: Same code without lock wrapper (same thread-safety, less overhead)# ============================================================================= # Initialize frame if first event (no lock needed - all sync operations) if self._context["frames"][frame_name] is None: self._context["frames"][frame_name] = pd.DataFrame([event_row]) else: # Append event data to existing frame self._context["frames"][frame_name] = pd.concat( [self._context["frames"][frame_name], pd.DataFrame([event_row])], ignore_index=True ) # Deduplicate and trim... self._context["frames"][frame_name] = ( self._context["frames"][frame_name] .drop_duplicates(subset=["symbol", "ts"], keep="last") .reset_index(drop=True) )
# CRITICAL: Also append to FrameLog if "_frame_logs" in self._context and frame_name in self._context["_frame_logs"]: self._context["_frame_logs"][frame_name].append(event_row)Fix 3: Remove Lock 2 - Execute Flows (Lines 601-690+)
Section titled “Fix 3: Remove Lock 2 - Execute Flows (Lines 601-690+)”# =============================================================================# BEFORE (Lines 601-690+): Unnecessary lock around flow execution# ============================================================================= if is_closed: # Execute flows (thread-safe - protects context mutations) # v1.9.1: Lock BEFORE deduplication check to prevent race conditions async with self._context_lock: # ADR-2025-12-06: Use ProcessedEventTracker for deduplication # CRITICAL: Check+add must be atomic (inside lock) to prevent race condition event_def_str = event_def_id or "" symbol_str = event.symbol or "" ts_str = str(event.ts) if self._event_tracker.is_duplicate(event_def_str, symbol_str, ts_str): logger.debug(f"Skipping duplicate event...") continue self._event_tracker.mark_processed(event_def_str, symbol_str, ts_str)
for flow in flows: result = self._executor.execute(flow, event, self._context) # ... event bus publish ... # ... event chaining loop (up to 100 iterations) ...
# =============================================================================# AFTER: Same code without lock wrapper# Note: Comment updated to reflect actual thread-safety model# ============================================================================= if is_closed: # Execute flows (no lock needed - asyncio is single-threaded, # all operations below are synchronous with no await points, # so no other coroutine can interleave) event_def_str = event_def_id or "" symbol_str = event.symbol or "" ts_str = str(event.ts) if self._event_tracker.is_duplicate(event_def_str, symbol_str, ts_str): logger.debug(f"Skipping duplicate event...") continue self._event_tracker.mark_processed(event_def_str, symbol_str, ts_str)
for flow in flows: result = self._executor.execute(flow, event, self._context) # ... event bus publish ... # ... event chaining loop (up to 100 iterations) ...Summary of Changes
Section titled “Summary of Changes”5.7 Verification
Section titled “5.7 Verification”Checked for thread pool usage in orchestrator:
grep -r "to_thread\|run_in_executor\|ThreadPool" orchestrator/# Only result: warmup_data_service.py:162 (warmup phase only, not streaming)Checked FlowExecutor for async operations:
grep -E "async def|await " flow_executor.py# No matches - FlowExecutor is purely synchronousConclusion: Lock can be safely removed.
6. References
Section titled “6. References”orchestrator/real_time_runner.py- Main streaming orchestrationorchestrator/services/warmup_orchestrator.py- Warmup data fetchingorchestrator/services/result_handler.py- Feature result handlingorchestrator/frame_log.py- Append-only log for frame data