Skip to content

order.py

Source: Notion | Last edited: 2024-11-10 | ID: 1372d2dc-3ef...


2024-11-07 enhanced tracking

File attachment

MEMORY LOG ONLY

File attachment

Let me analyze the memory usage patterns from the output:

  1. Initial Dataset Size:
Dataset Overview
Rows: 20,662
Columns: 6
Total Memory: 2.29 MB
  1. Memory Growth Pattern:
Time RSS(MB) Δ RSS
22:11:57 730.4 -48.5 # Base memory
22:15:02 751.2 +22.0 # Processing start
22:15:05 778.9 +27.7
22:16:12 858.4 +42.4
22:35:16 1016.0 +33.0
22:35:18 1102.5 +86.5
22:35:20 1088.7 -13.8 # Small fluctuation
22:35:37 1181.5 +90.8
22:35:39 1285.5 +104.0
22:35:41 1405.5 +120.0 # Peak memory
  1. Final Memory State:
Final RSS: 923.9 MB
Final USS: 918.6 MB
VMS: 3640.0 MB
Memory Efficiency (USS/RSS): 99.4%
  1. Key Memory Patterns:
  • The initial dataset (20,662 rows) only uses 2.29 MB
  • During processing, memory grows non-linearly
  • There’s a feature construction step showing: This creates a 3D array that significantly impacts memory usage
moving_features.shape (7025, 30, 84)
  1. Memory Growth Analysis: If we increase the input CSV rows:
  • Base Data Impact: Linear growth (very small)
    • Current: 20,662 rows → 2.29 MB
    • Double rows → ~4.6 MB
  • Processing Memory Impact: Non-linear growth
    • The feature construction creates a 3D array (7025, 30, 84)
    • Memory peaks at ~1.4 GB during processing
    • The relationship appears to be O(n²) or worse due to:
      • Feature calculations
      • Moving windows operations
      • Intermediate array storage
  1. Projected Impact of Increasing Rows: If you double the input rows:
  • Base CSV memory: ~4.6 MB (2x)
  • Peak processing memory: Likely 3-4 GB (>2x)
  • Final RSS: Potentially 2-2.5 GB The memory growth is not linear because:
  1. Moving window calculations create overlapping data copies

  2. Feature engineering creates multiple intermediate arrays

  3. The 3D array structure (n, 30, 84) grows with input size

  4. Matrix operations during feature calculation may require temporary copies Recommendations:

  5. Consider batch processing for large datasets

  6. Implement memory-efficient rolling windows

  7. Use generators where possible

  8. Add memory checkpoints to release intermediate results

  9. Monitor USS (Unique Set Size) as it shows actual memory owned by process

Memory Usage Analysis of ml_feature_set/demo.py

Section titled “Memory Usage Analysis of ml_feature_set/demo.py”

The provided output logs offer insights into the memory consumption patterns of the ml_feature_set/demo.py script during its execution. The analysis focuses on understanding how the script’s memory usage scales with the size of the CSV input, particularly the number of rows.

  • Dataset Overview:
┏━━━━━━━━━━━━━━┳━━━━━━━━━┓
┃ Metric ┃ Value ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━┩
│ Rows │ 20,662 │
│ Columns │ 6 │
│ Total Memory │ 2.29 MB │
└──────────────┴─────────┘
  • Process Memory Summary:
WARNING Final RSS (Resident Set Size): 923.9 MB
WARNING Final USS (Unique Set Size): 918.6 MB
WARNING Final VMS (Virtual Memory Size): 3640.0 MB
  • Memory Growth Over Time:
┃ PID ┃ CPU% ┃ MEM% ┃RSS(MB) ┃Δ RSS ┃COMMAND ┃
│ 2368624 │100.0 │ 1.1 │ 581.0 │+17.5 │python -m ml_feature_set.demo │
│ 2368624 │100.0 │ 1.2 │ 620.7 │+39.8 │python -m ml_feature_set.demo │
│ 2368624 │100.0 │ 1.1 │ 665.2 │+44.5 │python -m ml_feature_set.demo │
│ 2368624 │100.0 │ 1.1 │ 696.4 │+31.2 │python -m ml_feature_set.demo │
│ 2368624 │100.0 │ 1.1 │ 707.2 │+10.8 │python -m ml_feature_set.demo │
│ 2368624 │100.0 │ 1.1 │ 729.2 │+8.5 │python -m ml_feature_set.demo │
│ 2368624 │100.0 │ 1.1 │ 751.2 │+22.0 │python -m ml_feature_set.demo │
│ 2368624 │100.0 │ 1.1 │ 778.9 │+27.7 │python -m ml_feature_set.demo │
│ 2368624 │100.0 │ 1.1 │ 730.4 │-48.5 │python -m ml_feature_set.demo │
│ 2368624 │100.0 │ 1.1 │ 742.2 │+13.8 │python -m ml_feature_set.demo │
│ 2368624 │100.0 │ 1.1 │ 858.4 │+42.4 │python -m ml_feature_set.demo │
│ 2368624 │100.0 │ 1.3 │ 889.1 │+30.7 │python -m ml_feature_set.demo │
│ 2368624 │100.0 │ 1.4 │ 923.9 │+34.8 │python -m ml_feature_set.demo │
  • Final Memory Report:
WARNING Final RSS (Resident Set Size): 923.9 MB
WARNING Final USS (Unique Set Size): 918.6 MB
WARNING Final VMS (Virtual Memory Size): 3640.0 MB
WARNING Memory Efficiency (USS/RSS): 99.4%
WARNING Shared Memory Usage: 59.2 MB
  1. Initial Memory Consumption:
  • The script starts by loading a CSV file with 20,662 rows and 6 columns, consuming approximately 2.29 MB of memory for the dataset.
  • The initial Resident Set Size (RSS) is 923.9 MB, indicating the total memory occupied by the process in RAM.
  1. Memory Growth During Execution:
  • RSS Increase: Observations show a gradual increase in RSS from 581.0 MB to 923.9 MB over the execution timeline.
    • Δ RSS: The memory increment (Δ RSS) ranges from +8.5 MB to +44.5 MB in successive monitoring intervals.
  • USS (Unique Set Size): Remains consistently high at 918.6 MB, suggesting efficient memory use with minimal duplication.
  • Memory Efficiency: The USS/RSS ratio is 99.4%, indicating that almost all the resident memory is uniquely used by the process without significant shared memory overhead.
  1. Garbage Collection Impact:
  • Garbage Collection (GC): No significant memory was freed post-GC, as indicated by:
WARNING RSS: +0.0 MB
WARNING USS: +0.0 MB
WARNING VMS: +0.0 MB
  • This suggests that most memory allocations persist until the process terminates, with negligible garbage collection during execution.
  1. Process Termination:
  • The script terminated with an exit code: 1, indicating a failure in validation:
WARNING Only 0/1 files validated successfully

Based on the observed memory usage patterns, the impact of increasing the number of rows in the CSV input can be inferred as follows:

  1. Linear Memory Scaling:
  • Data Loading: The initial memory footprint for the dataset is 2.29 MB for 20,662 rows. Assuming linear scaling:
    • Doubling the number of rows to 41,324 would approximately double the memory consumption for the dataset to 4.58 MB.
  • Data Processing: The script’s memory usage grows beyond the dataset size due to data processing activities. Since RSS reaches 923.9 MB for 20,662 rows, scaling up the rows would proportionally increase memory consumption.
    • For instance, increasing rows by 10x to 206,620 could potentially raise RSS to around 9.24 GB, subject to processing complexity.
  1. Memory Efficiency Considerations:
  • High USS/RSS Ratio (99.4%): The script efficiently utilizes memory with minimal overlap. This efficiency implies that scaling the dataset size will lead to a roughly proportional increase in unique memory usage without significant overhead.
  1. Potential Bottlenecks:
  • Process Monitor Insights: Persistent high CPU usage (100% CPU for PID 2368624) suggests intensive processing, which, when scaled, might lead to increased memory consumption and potential performance issues.
  • Validation Failures: The current failure in validation (0/1 successful) indicates possible inefficiencies or errors in handling larger datasets, which could exacerbate memory issues when scaling.
  1. Optimize Data Processing:
  • Incremental Processing: Implement batch processing or data streaming to handle large datasets without loading the entire dataset into memory.
  • Memory Profiling: Utilize tools like memory_profiler to identify and optimize memory-intensive operations.
  1. Enhance Garbage Collection:
  • Explicit GC Calls: Introduce explicit garbage collection after significant memory allocations to free up unused memory promptly.
  • Refactor Code: Ensure that references to large objects are deleted or go out of scope to facilitate garbage collection.
  1. Validation Checks:
  • Error Handling: Investigate the root cause of validation failures to ensure that scaling up the dataset doesn’t introduce new errors.
  • Resource Allocation: Adjust resource allocations or constraints to accommodate larger datasets effectively.
  1. Scaling Infrastructure:
  • Hardware Considerations: For significantly larger datasets, consider deploying the script on machines with higher RAM capacities or leveraging distributed computing frameworks.

The analysis indicates that memory usage in the ml_feature_set/demo.py script scales approximately linearly with the number of rows in the CSV input. Increasing the dataset size will proportionally increase both the data memory footprint and the overall process memory consumption. To handle larger datasets efficiently, optimizing data processing methods, enhancing memory management, and addressing validation issues are crucial.

from ml_feature_set.utils.logger_setup import Console, get_logger, Table, Text, print; logger = get_logger(__name__, "DEBUG", show_path=False, rich_tracebacks=True)
import logging
import os.path
import sys
import psutil
import time
from typing import Dict, Any
import pandas as pd
try:
from memory_profiler import profile
MEMORY_PROFILER_AVAILABLE = True
except ImportError:
MEMORY_PROFILER_AVAILABLE = False
def profile(func):
return func
tqdm = None
from ml_feature_set.validate_feature_set import validate
import subprocess
import threading
from datetime import datetime
def get_process_memory() -> Dict[str, Any]:
try:
process = psutil.Process()
memory_info = process.memory_info()
memory_full_info = process.memory_full_info() if hasattr(process, 'memory_full_info') else None
# Get system memory info
system_memory = psutil.virtual_memory()
return {
'rss': memory_info.rss / (1024 * 1024), # MB
'vms': memory_info.vms / (1024 * 1024), # MB
'shared': getattr(memory_info, 'shared', 0) / (1024 * 1024), # MB
'data': getattr(memory_info, 'data', 0) / (1024 * 1024), # MB
'uss': memory_full_info.uss / (1024 * 1024) if memory_full_info else 0, # MB
'system_total': system_memory.total / (1024 * 1024), # MB
'system_available': system_memory.available / (1024 * 1024), # MB
'system_percent': system_memory.percent # Percentage used
}
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.TimeoutExpired) as e:
logging.error(f"Error getting memory info: {e}")
return {k: 0 for k in ['rss', 'vms', 'shared', 'data', 'uss', 'system_total', 'system_available', 'system_percent']}
def log_memory_usage(logger: logging.Logger, stage: str) -> None:
memory_stats = get_process_memory()
table = Table(title=f"Memory Usage at {stage}")
table.add_column("Category", style="cyan", no_wrap=True)
table.add_column("Metric", style="magenta")
table.add_column("Value", style="green")
table.add_section()
table.add_row("Process Memory", "RSS (Resident Set Size)", f"{memory_stats['rss']:.1f} MB")
table.add_row("Process Memory", "VMS (Virtual Memory Size)", f"{memory_stats['vms']:.1f} MB")
table.add_row("Process Memory", "USS (Unique Set Size)", f"{memory_stats['uss']:.1f} MB")
table.add_row("Process Memory", "Shared Memory", f"{memory_stats['shared']:.1f} MB")
table.add_row("Process Memory", "Data Memory", f"{memory_stats['data']:.1f} MB")
table.add_section()
table.add_row("System Memory", "Total", f"{memory_stats['system_total']:.1f} MB")
table.add_row("System Memory", "Available", f"{memory_stats['system_available']:.1f} MB")
table.add_row("System Memory", "Usage", f"{memory_stats['system_percent']}%")
Console().print(table)
def get_dataframe_memory_info(df: pd.DataFrame) -> Dict[str, float]:
memory_usage = df.memory_usage(deep=True)
total_memory = memory_usage.sum() / 1024 / 1024 # Convert to MB
per_column = {col: mem / 1024 / 1024 for col, mem in memory_usage.items()}
dtypes_info = {col: str(dtype) for col, dtype in df.dtypes.items()}
return {
'total_mb': total_memory,
'per_column_mb': per_column,
'dtypes': dtypes_info,
'num_rows': len(df),
'num_cols': len(df.columns)
}
@profile
def run_validation(feature_set_path: str, sample_data_path: str, logger: logging.Logger) -> bool:
start_time = time.time()
start_memory = get_process_memory()
Console().print(Text("\n" + '='*20 + " Starting Validation " + '='*20, style="bold blue"))
log_memory_usage(logger, "Before validation")
try:
try:
import talib
logger.warning(f"TA-Lib version: {talib.__version__}")
logger.warning(f"TA-Lib binary path: {talib.__file__}")
except ImportError as e:
logger.critical(f"Failed to import TA-Lib: {str(e)}")
logger.critical("Please ensure TA-Lib is properly installed")
logger.critical("Try: pip uninstall TA-Lib && pip install TA-Lib==0.4.24")
return False
except Exception as e:
logger.critical(f"TA-Lib initialization error: {str(e)}")
logger.critical("This might indicate binary incompatibility between NumPy and TA-Lib")
logger.critical("Recommended fix:")
logger.critical("1. pip uninstall numpy TA-Lib")
logger.critical("2. pip install numpy==1.21.6")
logger.critical("3. pip install TA-Lib==0.4.24")
return False
is_valid = validate(feature_set_path, sample_data_path, logger)
end_time = time.time()
end_memory = get_process_memory()
execution_time = end_time - start_time
memory_diff = {k: end_memory[k] - start_memory[k] for k in start_memory}
Console().print(Text("\n" + '='*20 + " Validation Complete " + '='*20, style="bold green"))
logger.warning(f"Execution time: {execution_time:.2f} seconds")
memory_table = Table(title="Memory Changes")
memory_table.add_column("Metric", style="cyan")
memory_table.add_column("Change", style="magenta")
for metric, change in memory_diff.items():
if metric.startswith('system'):
continue
memory_table.add_row(metric.upper(), f"{change:+.1f} MB")
Console().print(memory_table)
log_memory_usage(logger, "After validation")
return is_valid
except Exception as e:
logger.critical(f"Validation failed: {str(e)}")
logger.critical("Stack trace:", exc_info=True)
return False
def format_command(cmd: str) -> str:
"""Shorten and format command for display"""
if '/home/tca/.cursor-server/' in cmd:
return cmd.split('cursor-server/')[-1]
return cmd
def monitor_process_usage(stop_event: threading.Event, log_dir: str = "logs") -> None:
# Ensure log directory exists
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"process_monitor_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
# Store previous readings for comparison
previous_stats = {}
first_run = True
def has_significant_changes(old: dict, new: dict, threshold: float = 0.1) -> bool:
"""Check if memory or CPU values changed significantly"""
if not old:
return True
return any([
abs(float(new['cpu']) - float(old['cpu'])) > threshold,
abs(float(new['mem']) - float(old['mem'])) > threshold,
abs(float(new['rss']) - float(old['rss'])) > 0.5, # 0.5MB threshold
abs(float(new['vsz']) - float(old['vsz'])) > 1.0 # 1MB threshold
])
while not stop_event.is_set():
try:
cmd = "ps -eo user,pid,pcpu,pmem,vsz,rss,stat,start,time,cmd --sort -rss | head -6"
output = subprocess.check_output(cmd, shell=True, text=True)
processes = [line.split(None, 9) for line in output.strip().split('\n')[1:]]
current_stats = {}
changed_processes = []
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Process current readings
for proc in processes:
pid = proc[1]
current_stats[pid] = {
'user': proc[0],
'cpu': proc[2],
'mem': proc[3],
'vsz': float(proc[4]) / 1024, # Convert to MB
'rss': float(proc[5]) / 1024, # Convert to MB
'stat': proc[6],
'start': proc[7],
'time': proc[8],
'cmd': format_command(proc[9])
}
if first_run or has_significant_changes(
previous_stats.get(pid, {}), current_stats[pid]
):
changed_processes.append(pid)
if changed_processes:
# Create Rich table
table = Table(
title=f"[bold cyan]Process Monitor {timestamp}[/]",
show_header=True,
header_style="bold yellow",
border_style="cyan",
padding=(0, 1),
collapse_padding=True
)
# Add columns with specific formatting
table.add_column("PID", justify="right", style="cyan", no_wrap=True)
table.add_column("CPU%", justify="right", style="magenta")
table.add_column("MEM%", justify="right", style="magenta")
table.add_column("RSS(MB)", justify="right", style="blue")
table.add_column("Δ RSS", justify="right", style="green")
table.add_column("COMMAND", style="white")
# Prepare log entry
log_entries = [f"\n=== Process Monitor {timestamp} ==="]
log_entries.append(f"{'PID':>8} {'CPU%':>6} {'MEM%':>6} {'RSS(MB)':>8} {'Δ RSS':>8} COMMAND")
for pid in changed_processes:
stats = current_stats[pid]
old_stats = previous_stats.get(pid, {})
# Calculate RSS change
rss_change = ""
if old_stats:
delta = stats['rss'] - old_stats['rss']
rss_change = f"{delta:+.1f}"
rss_change_display = f"[{'red' if delta > 0 else 'green'}]{delta:+.1f}[/]"
else:
rss_change = "N/A"
rss_change_display = "N/A"
# Add row to Rich table
table.add_row(
pid,
f"{float(stats['cpu']):5.1f}",
f"{float(stats['mem']):5.1f}",
f"{stats['rss']:7.1f}",
rss_change_display,
stats['cmd']
)
# Add to log entries
log_entries.append(
f"{pid:>8} {float(stats['cpu']):6.1f} {float(stats['mem']):6.1f} "
f"{stats['rss']:8.1f} {rss_change:>8} {stats['cmd']}"
)
# Display table
Console().print("\n")
Console().print(table)
Console().print("\n")
# Write to log file
with open(log_file, 'a') as f:
f.write('\n'.join(log_entries) + '\n')
previous_stats = current_stats
first_run = False
time.sleep(2)
except subprocess.SubprocessError:
logger.warning("Process monitoring is only available on Unix-like systems")
break
except Exception as e:
logger.warning(f"Process monitoring error: {str(e)}")
break
def main() -> int:
logger = get_logger(__name__, "DEBUG", show_path=False, rich_tracebacks=True)
try:
import numpy as np
logger.warning(f"Python version: {sys.version}")
logger.warning(f"NumPy version: {np.__version__}")
logger.warning(f"NumPy include dir: {np.get_include()}")
logger.warning(f"NumPy dtype size: {np.dtype(np.float64).itemsize}")
logger.warning(f"Platform: {sys.platform}")
except Exception as e:
logger.critical(f"Failed to get system information: {str(e)}")
sample_data_dir = os.path.normpath(os.path.join(__file__, "..", "sample_data"))
# Initial memory snapshot
logger.warning("\n=== Initial Memory State ===")
log_memory_usage(logger, "startup")
if not MEMORY_PROFILER_AVAILABLE:
logger.warning("memory_profiler and/or tqdm not installed. Install with:")
logger.warning("pip install memory_profiler tqdm")
sample_data_paths = [
f"{sample_data_dir}/resampled_binance_BTC-2h.csv",
]
# Verify sample data paths exist
for path in sample_data_paths:
if not os.path.exists(path):
logger.critical(f"Sample data file not found: {path}")
return 1
feature_set_path = "ohlcv_size84_indicators_v2"
results = {}
# Start process monitoring with log directory
stop_monitoring = threading.Event()
monitor_thread = None
if sys.platform != "win32":
try:
log_dir = os.path.join(os.path.dirname(__file__), "logs")
monitor_thread = threading.Thread(
target=monitor_process_usage,
args=(stop_monitoring, log_dir),
daemon=True,
name="ProcessMonitor"
)
monitor_thread.start()
except Exception as e:
logger.warning(f"Failed to start process monitoring: {e}")
total_start_time = time.time()
for i, data_path in enumerate(sample_data_paths, 1):
logger.warning(f"\n{'='*20} Processing File {i}/{len(sample_data_paths)} {'='*20}")
logger.warning(f"File: {os.path.basename(data_path)}")
file_start_time = time.time()
try:
df = pd.read_csv(data_path)
df_memory_info = get_dataframe_memory_info(df)
# Dataset Overview Table
overview_table = Table(title="Dataset Overview", show_header=True, header_style="bold magenta")
overview_table.add_column("Metric", style="cyan")
overview_table.add_column("Value", style="green")
overview_table.add_row("Rows", f"{df_memory_info['num_rows']:,}")
overview_table.add_row("Columns", str(df_memory_info['num_cols']))
overview_table.add_row("Total Memory", f"{df_memory_info['total_mb']:.2f} MB")
Console().print(overview_table)
except Exception as e:
logger.critical(f"Error loading {data_path}: {str(e)}")
logger.critical("Stack trace:", exc_info=True)
continue
is_valid = run_validation(feature_set_path, data_path, logger)
results[data_path] = is_valid
file_execution_time = time.time() - file_start_time
logger.warning(f"\nFile processing completed in {file_execution_time:.2f} seconds")
# Force garbage collection and log memory
import gc
gc.collect()
log_memory_usage(logger, f"After processing {os.path.basename(data_path)}")
total_execution_time = time.time() - total_start_time
logger.warning("\nValidation Results Summary:")
logger.warning('='*50)
logger.warning(f"Total execution time: {total_execution_time:.2f} seconds")
success_count = sum(1 for result in results.values() if result)
if success_count < len(sample_data_paths):
logger.warning(f"Only {success_count}/{len(sample_data_paths)} files validated successfully")
# Enhanced final memory report
logger.warning("\n" + "="*20 + " FINAL MEMORY REPORT " + "="*20)
# Get current memory state
final_memory = get_process_memory()
# System memory summary
logger.warning("\nSystem Memory Summary:")
logger.warning(f" Total System Memory: {final_memory['system_total']:.1f} MB")
logger.warning(f" Available Memory: {final_memory['system_available']:.1f} MB")
logger.warning(f" System Memory Usage: {final_memory['system_percent']}%")
# Process memory summary
logger.warning("\nProcess Memory Summary:")
logger.warning(f" Final RSS (Resident Set Size): {final_memory['rss']:.1f} MB")
logger.warning(f" Final USS (Unique Set Size): {final_memory['uss']:.1f} MB")
logger.warning(f" Final VMS (Virtual Memory Size): {final_memory['vms']:.1f} MB")
# Memory efficiency metrics
memory_efficiency = (final_memory['uss'] / final_memory['rss']) * 100 if final_memory['rss'] > 0 else 0
logger.warning("\nMemory Efficiency Metrics:")
logger.warning(f" Memory Efficiency (USS/RSS): {memory_efficiency:.1f}%")
logger.warning(f" Shared Memory Usage: {final_memory['shared']:.1f} MB")
# Force final garbage collection
import gc
gc.collect()
# Memory after garbage collection
post_gc_memory = get_process_memory()
memory_freed = {k: final_memory[k] - post_gc_memory[k] for k in ['rss', 'uss', 'vms']}
logger.warning("\nGarbage Collection Impact:")
for metric, change in memory_freed.items():
if not metric.startswith('system'):
logger.warning(f" {metric.upper()}: {change:+.1f} MB")
logger.warning("\nPer-File Memory Stats:")
for path in sample_data_paths:
if path in results:
logger.warning(f" {os.path.basename(path)}: {'' if results[path] else ''}")
logger.warning("="*60)
# Stop monitoring before exit
stop_monitoring.set()
return 0 if all(results.values()) else 1
if __name__ == "__main__":
sys.exit(main())