order.py
Source: Notion | Last edited: 2024-11-10 | ID: 1372d2dc-3ef...
2024-11-07 enhanced tracking
FULL-RECORD TERMINAL MEMORY LOG
Section titled “FULL-RECORD TERMINAL MEMORY LOG”MEMORY LOG ONLY
claude-3-5-sonnet-20241022
Section titled “claude-3-5-sonnet-20241022”Let me analyze the memory usage patterns from the output:
- Initial Dataset Size:
Dataset OverviewRows: 20,662Columns: 6Total Memory: 2.29 MB- Memory Growth Pattern:
Time RSS(MB) Δ RSS22:11:57 730.4 -48.5 # Base memory22:15:02 751.2 +22.0 # Processing start22:15:05 778.9 +27.722:16:12 858.4 +42.422:35:16 1016.0 +33.022:35:18 1102.5 +86.522:35:20 1088.7 -13.8 # Small fluctuation22:35:37 1181.5 +90.822:35:39 1285.5 +104.022:35:41 1405.5 +120.0 # Peak memory- Final Memory State:
Final RSS: 923.9 MBFinal USS: 918.6 MBVMS: 3640.0 MBMemory Efficiency (USS/RSS): 99.4%- Key Memory Patterns:
- The initial dataset (20,662 rows) only uses 2.29 MB
- During processing, memory grows non-linearly
- There’s a feature construction step showing: This creates a 3D array that significantly impacts memory usage
moving_features.shape (7025, 30, 84)- Memory Growth Analysis: If we increase the input CSV rows:
- Base Data Impact: Linear growth (very small)
- Current: 20,662 rows → 2.29 MB
- Double rows → ~4.6 MB
- Processing Memory Impact: Non-linear growth
- The feature construction creates a 3D array (7025, 30, 84)
- Memory peaks at ~1.4 GB during processing
- The relationship appears to be O(n²) or worse due to:
- Feature calculations
- Moving windows operations
- Intermediate array storage
- Projected Impact of Increasing Rows: If you double the input rows:
- Base CSV memory: ~4.6 MB (2x)
- Peak processing memory: Likely 3-4 GB (>2x)
- Final RSS: Potentially 2-2.5 GB The memory growth is not linear because:
-
Moving window calculations create overlapping data copies
-
Feature engineering creates multiple intermediate arrays
-
The 3D array structure (n, 30, 84) grows with input size
-
Matrix operations during feature calculation may require temporary copies Recommendations:
-
Consider batch processing for large datasets
-
Implement memory-efficient rolling windows
-
Use generators where possible
-
Add memory checkpoints to release intermediate results
-
Monitor USS (Unique Set Size) as it shows actual memory owned by process
o1-mini
Section titled “o1-mini”Memory Usage Analysis of ml_feature_set/demo.py
Section titled “Memory Usage Analysis of ml_feature_set/demo.py”Overview
Section titled “Overview”The provided output logs offer insights into the memory consumption patterns of the ml_feature_set/demo.py script during its execution. The analysis focuses on understanding how the script’s memory usage scales with the size of the CSV input, particularly the number of rows.
Memory Usage Logs
Section titled “Memory Usage Logs”Initial Execution
Section titled “Initial Execution”- Dataset Overview:
┏━━━━━━━━━━━━━━┳━━━━━━━━━┓┃ Metric ┃ Value ┃┡━━━━━━━━━━━━━━╇━━━━━━━━━┩│ Rows │ 20,662 ││ Columns │ 6 ││ Total Memory │ 2.29 MB │└──────────────┴─────────┘- Process Memory Summary:
WARNING Final RSS (Resident Set Size): 923.9 MBWARNING Final USS (Unique Set Size): 918.6 MBWARNING Final VMS (Virtual Memory Size): 3640.0 MBSubsequent Monitoring
Section titled “Subsequent Monitoring”- Memory Growth Over Time:
┃ PID ┃ CPU% ┃ MEM% ┃RSS(MB) ┃Δ RSS ┃COMMAND ┃│ 2368624 │100.0 │ 1.1 │ 581.0 │+17.5 │python -m ml_feature_set.demo ││ 2368624 │100.0 │ 1.2 │ 620.7 │+39.8 │python -m ml_feature_set.demo ││ 2368624 │100.0 │ 1.1 │ 665.2 │+44.5 │python -m ml_feature_set.demo ││ 2368624 │100.0 │ 1.1 │ 696.4 │+31.2 │python -m ml_feature_set.demo ││ 2368624 │100.0 │ 1.1 │ 707.2 │+10.8 │python -m ml_feature_set.demo ││ 2368624 │100.0 │ 1.1 │ 729.2 │+8.5 │python -m ml_feature_set.demo ││ 2368624 │100.0 │ 1.1 │ 751.2 │+22.0 │python -m ml_feature_set.demo ││ 2368624 │100.0 │ 1.1 │ 778.9 │+27.7 │python -m ml_feature_set.demo ││ 2368624 │100.0 │ 1.1 │ 730.4 │-48.5 │python -m ml_feature_set.demo ││ 2368624 │100.0 │ 1.1 │ 742.2 │+13.8 │python -m ml_feature_set.demo ││ 2368624 │100.0 │ 1.1 │ 858.4 │+42.4 │python -m ml_feature_set.demo ││ 2368624 │100.0 │ 1.3 │ 889.1 │+30.7 │python -m ml_feature_set.demo ││ 2368624 │100.0 │ 1.4 │ 923.9 │+34.8 │python -m ml_feature_set.demo │- Final Memory Report:
WARNING Final RSS (Resident Set Size): 923.9 MBWARNING Final USS (Unique Set Size): 918.6 MBWARNING Final VMS (Virtual Memory Size): 3640.0 MBWARNING Memory Efficiency (USS/RSS): 99.4%WARNING Shared Memory Usage: 59.2 MBAnalysis
Section titled “Analysis”- Initial Memory Consumption:
- The script starts by loading a CSV file with 20,662 rows and 6 columns, consuming approximately 2.29 MB of memory for the dataset.
- The initial Resident Set Size (RSS) is 923.9 MB, indicating the total memory occupied by the process in RAM.
- Memory Growth During Execution:
- RSS Increase: Observations show a gradual increase in RSS from 581.0 MB to 923.9 MB over the execution timeline.
- Δ RSS: The memory increment (
Δ RSS) ranges from +8.5 MB to +44.5 MB in successive monitoring intervals.
- Δ RSS: The memory increment (
- USS (Unique Set Size): Remains consistently high at 918.6 MB, suggesting efficient memory use with minimal duplication.
- Memory Efficiency: The USS/RSS ratio is 99.4%, indicating that almost all the resident memory is uniquely used by the process without significant shared memory overhead.
- Garbage Collection Impact:
- Garbage Collection (GC): No significant memory was freed post-GC, as indicated by:
WARNING RSS: +0.0 MBWARNING USS: +0.0 MBWARNING VMS: +0.0 MB- This suggests that most memory allocations persist until the process terminates, with negligible garbage collection during execution.
- Process Termination:
- The script terminated with an exit code: 1, indicating a failure in validation:
WARNING Only 0/1 files validated successfullyDeduction on Increasing CSV Rows
Section titled “Deduction on Increasing CSV Rows”Based on the observed memory usage patterns, the impact of increasing the number of rows in the CSV input can be inferred as follows:
- Linear Memory Scaling:
- Data Loading: The initial memory footprint for the dataset is 2.29 MB for 20,662 rows. Assuming linear scaling:
- Doubling the number of rows to 41,324 would approximately double the memory consumption for the dataset to 4.58 MB.
- Data Processing: The script’s memory usage grows beyond the dataset size due to data processing activities. Since RSS reaches 923.9 MB for 20,662 rows, scaling up the rows would proportionally increase memory consumption.
- For instance, increasing rows by 10x to 206,620 could potentially raise RSS to around 9.24 GB, subject to processing complexity.
- Memory Efficiency Considerations:
- High USS/RSS Ratio (99.4%): The script efficiently utilizes memory with minimal overlap. This efficiency implies that scaling the dataset size will lead to a roughly proportional increase in unique memory usage without significant overhead.
- Potential Bottlenecks:
- Process Monitor Insights: Persistent high CPU usage (100% CPU for PID 2368624) suggests intensive processing, which, when scaled, might lead to increased memory consumption and potential performance issues.
- Validation Failures: The current failure in validation (0/1 successful) indicates possible inefficiencies or errors in handling larger datasets, which could exacerbate memory issues when scaling.
Recommendations
Section titled “Recommendations”- Optimize Data Processing:
- Incremental Processing: Implement batch processing or data streaming to handle large datasets without loading the entire dataset into memory.
- Memory Profiling: Utilize tools like
memory_profilerto identify and optimize memory-intensive operations.
- Enhance Garbage Collection:
- Explicit GC Calls: Introduce explicit garbage collection after significant memory allocations to free up unused memory promptly.
- Refactor Code: Ensure that references to large objects are deleted or go out of scope to facilitate garbage collection.
- Validation Checks:
- Error Handling: Investigate the root cause of validation failures to ensure that scaling up the dataset doesn’t introduce new errors.
- Resource Allocation: Adjust resource allocations or constraints to accommodate larger datasets effectively.
- Scaling Infrastructure:
- Hardware Considerations: For significantly larger datasets, consider deploying the script on machines with higher RAM capacities or leveraging distributed computing frameworks.
Conclusion
Section titled “Conclusion”The analysis indicates that memory usage in the ml_feature_set/demo.py script scales approximately linearly with the number of rows in the CSV input. Increasing the dataset size will proportionally increase both the data memory footprint and the overall process memory consumption. To handle larger datasets efficiently, optimizing data processing methods, enhancing memory management, and addressing validation issues are crucial.
Enhanced logging demo.py script 2024-11-07
Section titled “Enhanced logging demo.py script 2024-11-07”from ml_feature_set.utils.logger_setup import Console, get_logger, Table, Text, print; logger = get_logger(__name__, "DEBUG", show_path=False, rich_tracebacks=True)
import loggingimport os.pathimport sysimport psutilimport timefrom typing import Dict, Anyimport pandas as pdtry: from memory_profiler import profile MEMORY_PROFILER_AVAILABLE = Trueexcept ImportError: MEMORY_PROFILER_AVAILABLE = False def profile(func): return func tqdm = None
from ml_feature_set.validate_feature_set import validate
import subprocessimport threadingfrom datetime import datetime
def get_process_memory() -> Dict[str, Any]: try: process = psutil.Process() memory_info = process.memory_info() memory_full_info = process.memory_full_info() if hasattr(process, 'memory_full_info') else None
# Get system memory info system_memory = psutil.virtual_memory()
return { 'rss': memory_info.rss / (1024 * 1024), # MB 'vms': memory_info.vms / (1024 * 1024), # MB 'shared': getattr(memory_info, 'shared', 0) / (1024 * 1024), # MB 'data': getattr(memory_info, 'data', 0) / (1024 * 1024), # MB 'uss': memory_full_info.uss / (1024 * 1024) if memory_full_info else 0, # MB 'system_total': system_memory.total / (1024 * 1024), # MB 'system_available': system_memory.available / (1024 * 1024), # MB 'system_percent': system_memory.percent # Percentage used } except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.TimeoutExpired) as e: logging.error(f"Error getting memory info: {e}") return {k: 0 for k in ['rss', 'vms', 'shared', 'data', 'uss', 'system_total', 'system_available', 'system_percent']}
def log_memory_usage(logger: logging.Logger, stage: str) -> None: memory_stats = get_process_memory() table = Table(title=f"Memory Usage at {stage}")
table.add_column("Category", style="cyan", no_wrap=True) table.add_column("Metric", style="magenta") table.add_column("Value", style="green")
table.add_section() table.add_row("Process Memory", "RSS (Resident Set Size)", f"{memory_stats['rss']:.1f} MB") table.add_row("Process Memory", "VMS (Virtual Memory Size)", f"{memory_stats['vms']:.1f} MB") table.add_row("Process Memory", "USS (Unique Set Size)", f"{memory_stats['uss']:.1f} MB") table.add_row("Process Memory", "Shared Memory", f"{memory_stats['shared']:.1f} MB") table.add_row("Process Memory", "Data Memory", f"{memory_stats['data']:.1f} MB")
table.add_section() table.add_row("System Memory", "Total", f"{memory_stats['system_total']:.1f} MB") table.add_row("System Memory", "Available", f"{memory_stats['system_available']:.1f} MB") table.add_row("System Memory", "Usage", f"{memory_stats['system_percent']}%")
Console().print(table)
def get_dataframe_memory_info(df: pd.DataFrame) -> Dict[str, float]: memory_usage = df.memory_usage(deep=True) total_memory = memory_usage.sum() / 1024 / 1024 # Convert to MB per_column = {col: mem / 1024 / 1024 for col, mem in memory_usage.items()} dtypes_info = {col: str(dtype) for col, dtype in df.dtypes.items()}
return { 'total_mb': total_memory, 'per_column_mb': per_column, 'dtypes': dtypes_info, 'num_rows': len(df), 'num_cols': len(df.columns) }
@profiledef run_validation(feature_set_path: str, sample_data_path: str, logger: logging.Logger) -> bool: start_time = time.time() start_memory = get_process_memory()
Console().print(Text("\n" + '='*20 + " Starting Validation " + '='*20, style="bold blue")) log_memory_usage(logger, "Before validation")
try: try: import talib logger.warning(f"TA-Lib version: {talib.__version__}") logger.warning(f"TA-Lib binary path: {talib.__file__}") except ImportError as e: logger.critical(f"Failed to import TA-Lib: {str(e)}") logger.critical("Please ensure TA-Lib is properly installed") logger.critical("Try: pip uninstall TA-Lib && pip install TA-Lib==0.4.24") return False except Exception as e: logger.critical(f"TA-Lib initialization error: {str(e)}") logger.critical("This might indicate binary incompatibility between NumPy and TA-Lib") logger.critical("Recommended fix:") logger.critical("1. pip uninstall numpy TA-Lib") logger.critical("2. pip install numpy==1.21.6") logger.critical("3. pip install TA-Lib==0.4.24") return False
is_valid = validate(feature_set_path, sample_data_path, logger)
end_time = time.time() end_memory = get_process_memory()
execution_time = end_time - start_time memory_diff = {k: end_memory[k] - start_memory[k] for k in start_memory}
Console().print(Text("\n" + '='*20 + " Validation Complete " + '='*20, style="bold green")) logger.warning(f"Execution time: {execution_time:.2f} seconds")
memory_table = Table(title="Memory Changes") memory_table.add_column("Metric", style="cyan") memory_table.add_column("Change", style="magenta")
for metric, change in memory_diff.items(): if metric.startswith('system'): continue memory_table.add_row(metric.upper(), f"{change:+.1f} MB")
Console().print(memory_table) log_memory_usage(logger, "After validation") return is_valid
except Exception as e: logger.critical(f"Validation failed: {str(e)}") logger.critical("Stack trace:", exc_info=True) return False
def format_command(cmd: str) -> str: """Shorten and format command for display""" if '/home/tca/.cursor-server/' in cmd: return cmd.split('cursor-server/')[-1] return cmd
def monitor_process_usage(stop_event: threading.Event, log_dir: str = "logs") -> None: # Ensure log directory exists os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, f"process_monitor_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
# Store previous readings for comparison previous_stats = {} first_run = True
def has_significant_changes(old: dict, new: dict, threshold: float = 0.1) -> bool: """Check if memory or CPU values changed significantly""" if not old: return True return any([ abs(float(new['cpu']) - float(old['cpu'])) > threshold, abs(float(new['mem']) - float(old['mem'])) > threshold, abs(float(new['rss']) - float(old['rss'])) > 0.5, # 0.5MB threshold abs(float(new['vsz']) - float(old['vsz'])) > 1.0 # 1MB threshold ])
while not stop_event.is_set(): try: cmd = "ps -eo user,pid,pcpu,pmem,vsz,rss,stat,start,time,cmd --sort -rss | head -6" output = subprocess.check_output(cmd, shell=True, text=True) processes = [line.split(None, 9) for line in output.strip().split('\n')[1:]]
current_stats = {} changed_processes = [] timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Process current readings for proc in processes: pid = proc[1] current_stats[pid] = { 'user': proc[0], 'cpu': proc[2], 'mem': proc[3], 'vsz': float(proc[4]) / 1024, # Convert to MB 'rss': float(proc[5]) / 1024, # Convert to MB 'stat': proc[6], 'start': proc[7], 'time': proc[8], 'cmd': format_command(proc[9]) }
if first_run or has_significant_changes( previous_stats.get(pid, {}), current_stats[pid] ): changed_processes.append(pid)
if changed_processes: # Create Rich table table = Table( title=f"[bold cyan]Process Monitor {timestamp}[/]", show_header=True, header_style="bold yellow", border_style="cyan", padding=(0, 1), collapse_padding=True )
# Add columns with specific formatting table.add_column("PID", justify="right", style="cyan", no_wrap=True) table.add_column("CPU%", justify="right", style="magenta") table.add_column("MEM%", justify="right", style="magenta") table.add_column("RSS(MB)", justify="right", style="blue") table.add_column("Δ RSS", justify="right", style="green") table.add_column("COMMAND", style="white")
# Prepare log entry log_entries = [f"\n=== Process Monitor {timestamp} ==="] log_entries.append(f"{'PID':>8} {'CPU%':>6} {'MEM%':>6} {'RSS(MB)':>8} {'Δ RSS':>8} COMMAND")
for pid in changed_processes: stats = current_stats[pid] old_stats = previous_stats.get(pid, {})
# Calculate RSS change rss_change = "" if old_stats: delta = stats['rss'] - old_stats['rss'] rss_change = f"{delta:+.1f}" rss_change_display = f"[{'red' if delta > 0 else 'green'}]{delta:+.1f}[/]" else: rss_change = "N/A" rss_change_display = "N/A"
# Add row to Rich table table.add_row( pid, f"{float(stats['cpu']):5.1f}", f"{float(stats['mem']):5.1f}", f"{stats['rss']:7.1f}", rss_change_display, stats['cmd'] )
# Add to log entries log_entries.append( f"{pid:>8} {float(stats['cpu']):6.1f} {float(stats['mem']):6.1f} " f"{stats['rss']:8.1f} {rss_change:>8} {stats['cmd']}" )
# Display table Console().print("\n") Console().print(table) Console().print("\n")
# Write to log file with open(log_file, 'a') as f: f.write('\n'.join(log_entries) + '\n')
previous_stats = current_stats first_run = False time.sleep(2)
except subprocess.SubprocessError: logger.warning("Process monitoring is only available on Unix-like systems") break except Exception as e: logger.warning(f"Process monitoring error: {str(e)}") break
def main() -> int: logger = get_logger(__name__, "DEBUG", show_path=False, rich_tracebacks=True)
try: import numpy as np logger.warning(f"Python version: {sys.version}") logger.warning(f"NumPy version: {np.__version__}") logger.warning(f"NumPy include dir: {np.get_include()}") logger.warning(f"NumPy dtype size: {np.dtype(np.float64).itemsize}") logger.warning(f"Platform: {sys.platform}") except Exception as e: logger.critical(f"Failed to get system information: {str(e)}")
sample_data_dir = os.path.normpath(os.path.join(__file__, "..", "sample_data"))
# Initial memory snapshot logger.warning("\n=== Initial Memory State ===") log_memory_usage(logger, "startup")
if not MEMORY_PROFILER_AVAILABLE: logger.warning("memory_profiler and/or tqdm not installed. Install with:") logger.warning("pip install memory_profiler tqdm")
sample_data_paths = [ f"{sample_data_dir}/resampled_binance_BTC-2h.csv", ]
# Verify sample data paths exist for path in sample_data_paths: if not os.path.exists(path): logger.critical(f"Sample data file not found: {path}") return 1
feature_set_path = "ohlcv_size84_indicators_v2"
results = {}
# Start process monitoring with log directory stop_monitoring = threading.Event() monitor_thread = None
if sys.platform != "win32": try: log_dir = os.path.join(os.path.dirname(__file__), "logs") monitor_thread = threading.Thread( target=monitor_process_usage, args=(stop_monitoring, log_dir), daemon=True, name="ProcessMonitor" ) monitor_thread.start() except Exception as e: logger.warning(f"Failed to start process monitoring: {e}")
total_start_time = time.time()
for i, data_path in enumerate(sample_data_paths, 1): logger.warning(f"\n{'='*20} Processing File {i}/{len(sample_data_paths)} {'='*20}") logger.warning(f"File: {os.path.basename(data_path)}")
file_start_time = time.time()
try: df = pd.read_csv(data_path) df_memory_info = get_dataframe_memory_info(df)
# Dataset Overview Table overview_table = Table(title="Dataset Overview", show_header=True, header_style="bold magenta") overview_table.add_column("Metric", style="cyan") overview_table.add_column("Value", style="green")
overview_table.add_row("Rows", f"{df_memory_info['num_rows']:,}") overview_table.add_row("Columns", str(df_memory_info['num_cols'])) overview_table.add_row("Total Memory", f"{df_memory_info['total_mb']:.2f} MB")
Console().print(overview_table)
except Exception as e: logger.critical(f"Error loading {data_path}: {str(e)}") logger.critical("Stack trace:", exc_info=True) continue
is_valid = run_validation(feature_set_path, data_path, logger) results[data_path] = is_valid
file_execution_time = time.time() - file_start_time logger.warning(f"\nFile processing completed in {file_execution_time:.2f} seconds")
# Force garbage collection and log memory import gc gc.collect() log_memory_usage(logger, f"After processing {os.path.basename(data_path)}")
total_execution_time = time.time() - total_start_time
logger.warning("\nValidation Results Summary:") logger.warning('='*50) logger.warning(f"Total execution time: {total_execution_time:.2f} seconds")
success_count = sum(1 for result in results.values() if result) if success_count < len(sample_data_paths): logger.warning(f"Only {success_count}/{len(sample_data_paths)} files validated successfully")
# Enhanced final memory report logger.warning("\n" + "="*20 + " FINAL MEMORY REPORT " + "="*20)
# Get current memory state final_memory = get_process_memory()
# System memory summary logger.warning("\nSystem Memory Summary:") logger.warning(f" Total System Memory: {final_memory['system_total']:.1f} MB") logger.warning(f" Available Memory: {final_memory['system_available']:.1f} MB") logger.warning(f" System Memory Usage: {final_memory['system_percent']}%")
# Process memory summary logger.warning("\nProcess Memory Summary:") logger.warning(f" Final RSS (Resident Set Size): {final_memory['rss']:.1f} MB") logger.warning(f" Final USS (Unique Set Size): {final_memory['uss']:.1f} MB") logger.warning(f" Final VMS (Virtual Memory Size): {final_memory['vms']:.1f} MB")
# Memory efficiency metrics memory_efficiency = (final_memory['uss'] / final_memory['rss']) * 100 if final_memory['rss'] > 0 else 0 logger.warning("\nMemory Efficiency Metrics:") logger.warning(f" Memory Efficiency (USS/RSS): {memory_efficiency:.1f}%") logger.warning(f" Shared Memory Usage: {final_memory['shared']:.1f} MB")
# Force final garbage collection import gc gc.collect()
# Memory after garbage collection post_gc_memory = get_process_memory() memory_freed = {k: final_memory[k] - post_gc_memory[k] for k in ['rss', 'uss', 'vms']}
logger.warning("\nGarbage Collection Impact:") for metric, change in memory_freed.items(): if not metric.startswith('system'): logger.warning(f" {metric.upper()}: {change:+.1f} MB")
logger.warning("\nPer-File Memory Stats:") for path in sample_data_paths: if path in results: logger.warning(f" {os.path.basename(path)}: {'✓' if results[path] else '✗'}")
logger.warning("="*60)
# Stop monitoring before exit stop_monitoring.set()
return 0 if all(results.values()) else 1
if __name__ == "__main__": sys.exit(main())