Skip to content

bug-analysis

Source: Notion | Last edited: 2025-12-29 | ID: 2d82d2dc-3ef...


Date: 2025-12-27 (Updated: 2025-12-29) Status: Investigation Complete - Issue #4 expanded with Lock 2 analysis Severity: HIGH (potential data loss in streaming)


Four issues identified in the Signal Generation streaming flow:


2. Bug #1: Gap Between Warmup and Streaming

Section titled “2. Bug #1: Gap Between Warmup and Streaming”

The warmup flow creates a window where bars can be permanently lost:

Timeline:
12:00:00 First WS event arrives (13:00 bar closes)
12:00:01 WarmupOrchestrator.fetch_warmup_data() fetches historical data
12:00:02 *** WebSocket CLOSED *** (line 272-278 warmup_orchestrator.py)
12:00:03 RealTimeRunner processes transforms
12:00:10 RealTimeRunner processes features
12:00:30 RealTimeRunner seeds frames to FrameLogs
12:00:31 EventLoopManager.connect_sources() *** WebSocket RECONNECTS ***
PROBLEM: If 14:00 bar closes at 12:00:25 (during processing), it's LOST.

File: orchestrator/services/warmup_orchestrator.py

# Line 262-278: WebSocket is closed AFTER warmup data merge
# Step 5: Stop buffering and close connections
buffering_active = False
for task in buffer_tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
for event_id, source in ws_sources: # <-- CLOSES WebSocket
try:
await source.close()
logger.debug(f"Closed warmup WebSocket for '{event_id}'")
except Exception as e:
logger.warning(f"Error closing warmup WebSocket '{event_id}':{e}")

File: orchestrator/real_time_runner.py

# Line 473-501: Processing happens AFTER warmup returns
await self._fetch_warmup_data_gapless() # <-- WS closed here
# These can take significant time (seconds to minutes for complex features)
self._process_warmup_through_transforms() # <-- No WS connection
self._process_warmup_through_features() # <-- No WS connection
self._seed_frames_to_frame_logs() # <-- No WS connection
self._seed_event_tracker_from_warmup() # <-- No WS connection
# Only NOW does WebSocket reconnect
await self._event_loop_manager.connect_sources(self._event_sources) # <-- Reconnect
  • Lost bars: Any bar that closes between WebSocket close and reconnect is permanently lost
  • Silent failure: No error is raised, the gap goes undetected
  • Inconsistent predictions: ML model receives incomplete data

Option A: Keep WebSocket open during processing (Recommended)

Instead of closing WebSocket after warmup buffer, keep it open and continue buffering during processing:

# In WarmupOrchestrator.fetch_warmup_data():
# DON'T close WebSocket - let RealTimeRunner manage lifecycle
# Return buffer task handles so RealTimeRunner can:
# 1. Continue buffering during transform/feature processing
# 2. Merge buffered events after processing completes
# 3. Then transition to streaming mode

Option B: Parallel processing with continued buffering

Continue buffering in a separate task while processing warmup data, then merge accumulated events before starting streaming.


3. Bug #2: First Event Timing Inefficiency

Section titled “3. Bug #2: First Event Timing Inefficiency”

The warmup flow waits for a closed candle event to establish the timeline anchor. For hourly bars, this can delay startup by up to 60 minutes.

File: orchestrator/services/warmup_orchestrator.py

# Line 144-152: buffer_events() buffers ALL events (including partial)
async def buffer_events(source: "BinanceWebSocketEventStream") -> None:
nonlocal buffering_active
await source.connect()
async for event in source.stream():
if buffering_active:
ws_buffer.append(event)
if not ws_started.is_set():
ws_started.set() # <-- Triggers on ANY event
else:
break
# Line 194: First buffered event's timestamp used as anchor
ws_first_event_ts = ws_buffer[0].ts

The issue is that ws_started.set() triggers on the first event (could be partial), but the timestamp is still from that first event which might not be the timeline we want.

Current behavior:

  • First event (partial or closed) triggers ws_started.set()
  • ws_first_event_ts = ws_buffer[0].ts uses that timestamp
  • Historical fetch ends at this timestamp

Concerns:

  1. If first event is partial (e.g., at 12:30:45 during 1h bar), timestamp is mid-bar
  2. Historical data fetched ends at 12:30:45, potentially creating misalignment
  3. 60-minute wait for closed candle is unnecessary if partial events are acceptable
  • Startup delay: Up to 60 minutes for 1h bars
  • Potential misalignment: If partial event timestamp is used, historical data may not align with bar boundaries

Option A: Use current time as anchor (Fastest startup)

# Use current time, let WarmupDataService fetch up to "now"
ws_first_event_ts = pd.Timestamp.now(tz="UTC")

Option B: Wait for first closed candle only (Current, but clarify)

If close_time alignment is critical, document that startup delay is expected and add logging:

# Only use closed candle events for anchor
async def buffer_events(source):
async for event in source.stream():
if buffering_active:
ws_buffer.append(event)
if not ws_started.is_set() and event.metadata.get("is_closed", False):
ws_started.set() # Only trigger on closed candle

4. Bug #3: Event Row Lost After Feature Execution

Section titled “4. Bug #3: Event Row Lost After Feature Execution”

When a new WebSocket event arrives:

  1. Event row is appended to context["frames"][frame_name]
  2. Feature plugin executes and calls handle_feature_result()
  3. handle_feature_result() replaces context["frames"][frame_name] with frame_log.window()
  4. The newly appended row is LOST because it was never added to FrameLog

File: orchestrator/real_time_runner.py

# Line 558-592: Event row appended to context["frames"]
async with self._context_lock:
# Initialize frame if first event
if self._context["frames"][frame_name] is None:
self._context["frames"][frame_name] = pd.DataFrame([event_row])
else:
# Append event data to existing frame
self._context["frames"][frame_name] = pd.concat(
[self._context["frames"][frame_name], pd.DataFrame([event_row])],
ignore_index=True
)
# ... dedup and trim ...
# CRITICAL: Also append to FrameLog to keep them in sync
# After handle_feature_result, context["frames"] is replaced with frame_log.window()
# So the FrameLog must have the new row, or it will be lost
if "_frame_logs" in self._context and frame_name in self._context["_frame_logs"]:
self._context["_frame_logs"][frame_name].append(event_row) # <-- FIX ADDED

File: orchestrator/services/result_handler.py

# Line 365: context["frames"] replaced with frame_log.window()
# Real-time mode: update working frame with window view
context["frames"][input_frame_name] = frame_log.window() # <-- This loses unsynced rows

A fix was added at lines 583-587 of real_time_runner.py:

# CRITICAL: Also append to FrameLog to keep them in sync
if "_frame_logs" in self._context and frame_name in self._context["_frame_logs"]:
self._context["_frame_logs"][frame_name].append(event_row)

The fix appends event_row dict to FrameLog, but needs verification that:

  1. event_row format matches what FrameLog expects (dict with “ts” key)
  2. The append happens before any feature execution
  3. Deduplication in FrameLog handles potential duplicates correctly

Looking at frame_log.py:

def append(self, row: dict) -> None:
if "ts" not in row:
raise ValueError("Row must contain 'ts' key")
self._rows.append(row)
self._columns.update(row.keys())
self._trim_if_needed()

The fix looks correct - event_row contains “ts” key (line 549-554 in real_time_runner.py).


The _process_event_source() method uses asyncio.Lock() to protect context mutations, but the lock provides no protection because:

  1. Python asyncio is single-threaded cooperative multitasking
  2. Context switches only happen at await points
  3. There are no await calls inside the locked sections The lock adds overhead without preventing any race conditions.

File: orchestrator/real_time_runner.py

# Lock 1: Append event to frame (lines 559-592)
async with self._context_lock: # <-- Lock acquired
# ALL operations below are SYNCHRONOUS (no await)
self._context["frames"][frame_name] = pd.concat(...) # Sync
self._context["frames"][frame_name].drop_duplicates(...) # Sync
self._context["_frame_logs"][frame_name].append(event_row) # Sync
# <-- Lock released
# Lock 2: Execute flows (lines 603-680+)
async with self._context_lock: # <-- Lock acquired
if self._event_tracker.is_duplicate(...): continue # Sync
self._event_tracker.mark_processed(...) # Sync
result = self._executor.execute(...) # Sync - FlowExecutor has NO async methods
# <-- Lock released

Lock 2 Detailed Analysis (lines 603-680+):

The second lock holds for the ENTIRE flow execution including event chaining:

async with self._context_lock: # Lock acquired here
# 1. Deduplication check - SYNC
if self._event_tracker.is_duplicate(...):
continue
self._event_tracker.mark_processed(...) # SYNC
# 2. Flow execution loop - ALL SYNC
for flow in flows:
result = self._executor.execute(flow, event, self._context) # SYNC
# 3. Event bus operations - SYNC
for emitted_event in result.emitted_events:
self._event_bus.publish(emitted_event) # SYNC
# 4. Event chaining loop - ALL SYNC (up to 100 iterations!)
while chain_depth < MAX_EVENT_CHAIN_DEPTH:
emitted_events = self._event_bus.drain() # SYNC
if not emitted_events:
break
for emitted_event in emitted_events:
# Dedup check - SYNC
if self._event_tracker.is_duplicate(...):
continue
self._event_tracker.mark_processed(...) # SYNC
# Route and execute - SYNC
emitted_flows = self._router.route(emitted_event) # SYNC
for emitted_flow in emitted_flows:
emitted_result = self._executor.execute(...) # SYNC
for newly_emitted in emitted_result.emitted_events:
self._event_bus.publish(newly_emitted) # SYNC
# Lock released here (after ALL chained events processed!)

Operations audit table:

ALL operations are synchronous. No await = no context switch = no race condition possible.

asyncio.gather(*tasks)
├── _process_event_source("bar_1m_btc", ws_btc) ──┐
├── _process_event_source("bar_1m_eth", ws_eth) ──┼── Concurrent tasks
└── _process_event_source("bar_1h_all", ws_all) ──┘
Each task runs in the SAME thread. Context switches ONLY at await points.

Key observation: The locked sections contain only synchronous operations:

Without await inside the lock, no other coroutine can run, so no race condition is possible.

  • Performance overhead: Lock acquire/release cycles on every event
  • Potential bottleneck: Multiple symbols waiting for lock unnecessarily
  • Code complexity: Misleading - suggests concurrency issues that don’t exist
  • No actual protection: Lock does nothing because no await inside
  • Excessive lock hold time: Lock 2 holds during entire event chain (up to 100 iterations)
  • Misleading comment: Code says “v1.9.1: Lock BEFORE deduplication check to prevent race conditions” but race is impossible without await Why the lock “appears” to work:
asyncio Model (Single-threaded):
Task A (BTCUSDT) Task B (ETHUSDT)
│ │
▼ │
async with lock ──────┐ │
│ │ │
├─ is_duplicate │ SYNC │ (cannot run - no await)
├─ execute() │ SYNC │ (cannot run - no await)
├─ chain events │ SYNC │ (cannot run - no await)
│ │ │
release lock ─────────┘ │
│ ▼
await ◄─────────────────── async with lock (NOW can acquire)

Even WITHOUT the lock, Task B cannot interleave because there’s no await inside. The lock only adds overhead without changing behavior.

Lock would be necessary if:

  1. Operations inside used await (allows context switch)
  2. Thread pool executors were used (asyncio.to_thread)
  3. Multiple OS threads accessed context (not the case)

Lock locations in codebase:


Fix 1: Remove Lock from __init__ (Line 195)

Section titled “Fix 1: Remove Lock from __init__ (Line 195)”
# BEFORE (Line 195):
self._context_lock = asyncio.Lock()
# AFTER: Remove this line entirely
# (no lock needed - all operations are synchronous)

Fix 2: Remove Lock 1 - Append Event (Lines 557-591)

Section titled “Fix 2: Remove Lock 1 - Append Event (Lines 557-591)”
# =============================================================================
# BEFORE (Lines 557-591): Unnecessary lock around frame append
# =============================================================================
# Protect shared context mutations with lock
async with self._context_lock:
# Initialize frame if first event
if self._context["frames"][frame_name] is None:
self._context["frames"][frame_name] = pd.DataFrame([event_row])
else:
# Append event data to existing frame
self._context["frames"][frame_name] = pd.concat(
[self._context["frames"][frame_name], pd.DataFrame([event_row])],
ignore_index=True
)
# Deduplicate and trim...
self._context["frames"][frame_name] = (
self._context["frames"][frame_name]
.drop_duplicates(subset=["symbol", "ts"], keep="last")
.reset_index(drop=True)
)
# CRITICAL: Also append to FrameLog
if "_frame_logs" in self._context and frame_name in self._context["_frame_logs"]:
self._context["_frame_logs"][frame_name].append(event_row)
# =============================================================================
# AFTER: Same code without lock wrapper (same thread-safety, less overhead)
# =============================================================================
# Initialize frame if first event (no lock needed - all sync operations)
if self._context["frames"][frame_name] is None:
self._context["frames"][frame_name] = pd.DataFrame([event_row])
else:
# Append event data to existing frame
self._context["frames"][frame_name] = pd.concat(
[self._context["frames"][frame_name], pd.DataFrame([event_row])],
ignore_index=True
)
# Deduplicate and trim...
self._context["frames"][frame_name] = (
self._context["frames"][frame_name]
.drop_duplicates(subset=["symbol", "ts"], keep="last")
.reset_index(drop=True)
)
# CRITICAL: Also append to FrameLog
if "_frame_logs" in self._context and frame_name in self._context["_frame_logs"]:
self._context["_frame_logs"][frame_name].append(event_row)

Fix 3: Remove Lock 2 - Execute Flows (Lines 601-690+)

Section titled “Fix 3: Remove Lock 2 - Execute Flows (Lines 601-690+)”
# =============================================================================
# BEFORE (Lines 601-690+): Unnecessary lock around flow execution
# =============================================================================
if is_closed:
# Execute flows (thread-safe - protects context mutations)
# v1.9.1: Lock BEFORE deduplication check to prevent race conditions
async with self._context_lock:
# ADR-2025-12-06: Use ProcessedEventTracker for deduplication
# CRITICAL: Check+add must be atomic (inside lock) to prevent race condition
event_def_str = event_def_id or ""
symbol_str = event.symbol or ""
ts_str = str(event.ts)
if self._event_tracker.is_duplicate(event_def_str, symbol_str, ts_str):
logger.debug(f"Skipping duplicate event...")
continue
self._event_tracker.mark_processed(event_def_str, symbol_str, ts_str)
for flow in flows:
result = self._executor.execute(flow, event, self._context)
# ... event bus publish ...
# ... event chaining loop (up to 100 iterations) ...
# =============================================================================
# AFTER: Same code without lock wrapper
# Note: Comment updated to reflect actual thread-safety model
# =============================================================================
if is_closed:
# Execute flows (no lock needed - asyncio is single-threaded,
# all operations below are synchronous with no await points,
# so no other coroutine can interleave)
event_def_str = event_def_id or ""
symbol_str = event.symbol or ""
ts_str = str(event.ts)
if self._event_tracker.is_duplicate(event_def_str, symbol_str, ts_str):
logger.debug(f"Skipping duplicate event...")
continue
self._event_tracker.mark_processed(event_def_str, symbol_str, ts_str)
for flow in flows:
result = self._executor.execute(flow, event, self._context)
# ... event bus publish ...
# ... event chaining loop (up to 100 iterations) ...

Checked for thread pool usage in orchestrator:

Terminal window
grep -r "to_thread\|run_in_executor\|ThreadPool" orchestrator/
# Only result: warmup_data_service.py:162 (warmup phase only, not streaming)

Checked FlowExecutor for async operations:

Terminal window
grep -E "async def|await " flow_executor.py
# No matches - FlowExecutor is purely synchronous

Conclusion: Lock can be safely removed.


  • orchestrator/real_time_runner.py - Main streaming orchestration
  • orchestrator/services/warmup_orchestrator.py - Warmup data fetching
  • orchestrator/services/result_handler.py - Feature result handling
  • orchestrator/frame_log.py - Append-only log for frame data