Ansar Ali - Performance Evaluation
Source: Notion | Last edited: 2023-10-30 | ID: d623216f-5ca...
2023-07-01 Terry Li terry@eonlabs.com Fri, Jun 23, 8:03 PM (8 days ago) to ansar.ali, Georgiana
Hi Ali,
I hope this email finds you well.
Firstly, I’d like to thank you for the comprehensive update and extended backtest information. It makes sense to me that you have to start the backtest from July 2020 due to the available data of the full universe of baskets on Binance. Your commitment to ensuring a fair model by using a complete dataset for the backtest is commendable.
I understand that many of the active coins in your basket were not listed on Binance before July 2020, thus setting the benchmark for your dataset. The meticulous detail in your explanation is very much appreciated, and it further clarifies the situation for me.
Moreover, your insights on the Average ITH of 25.89, even after penalization for not trading on weekends, are very valuable. It helps in getting a clearer picture of the model’s performance.
However, there is one aspect I’d like to delve into a little further. It seems that the glaring characteristics remain with the outlying and extended ITH in the last leg, which seems to be unable to make the crossover. Can you share your thoughts on that and possibly explain any implications or potential adjustments needed for our model?
Looking forward to hearing your thoughts.
Best Regards, Terry
"""ITH (Investment Time Horizon) Fitness Analysis Script
HOW TO ANALYZE YOUR OWN TRADING DATA:====================================
1. PREPARE YOUR CSV FILE: - Required columns: "Date", "NAV" - Optional column: "PnL" (will be calculated if missing) - Date format: Any standard format (YYYY-MM-DD recommended) - NAV: Net Asset Value or cumulative returns starting from 1.0
2. DROP YOUR FILE: - Run this script once to create the folder structure - Place your CSV file in: ~/Documents/ith-fitness/nav_data_custom/ - Multiple CSV files are supported
3. RUN THE ANALYSIS: - Execute: python ith.py - Your custom data will be processed first - Results saved as interactive HTML charts in the same folder - Summary report opens automatically in your browser
4. INTERPRET RESULTS: - Green "Qualified" count: Datasets meeting ITH fitness criteria - Red "Disqualified" count: Datasets not meeting criteria - HTML files contain detailed Investment Time Horizon epoch analysis and visualizations
EXAMPLE CSV FORMAT:Date,NAV2020-01-01,1.00002020-01-02,1.01232020-01-03,0.9987...
The script will automatically bypass strict thresholds for your custom data,ensuring your trading performance gets analyzed regardless of synthetic criteria."""
from pathlib import Pathimport sysfrom datetime import datetimeimport platformdirsfrom loguru import loggerfrom rich.console import Consolefrom rich.traceback import install
# Install rich traceback for better error displayinstall(show_locals=True)
# Initialize console for critical user messages onlyconsole = Console()
sys.path.insert( 0, str( next((p for p in Path.cwd().parents if p.name == "ml_feature_set"), Path.cwd()) ),)__package__ = Path.cwd().parent.name
# Initialize project directories using platformdirsAPP_NAME = "ith-fitness"APP_AUTHOR = "Eon Labs Ltd."
# Get standardized directories - use Documents for main data to keep it accessibledocuments_dir = Path(platformdirs.user_documents_dir()) / APP_NAMEconfig_dir = Path(platformdirs.user_config_dir(APP_NAME, APP_AUTHOR))cache_dir = Path(platformdirs.user_cache_dir(APP_NAME, APP_AUTHOR))data_dir = documents_dir # Use Documents instead of hidden Application Supportlogs_dir = Path(platformdirs.user_log_dir(APP_NAME, APP_AUTHOR))
# Create directories idempotentlyfor directory in [config_dir, cache_dir, data_dir, logs_dir]: directory.mkdir(parents=True, exist_ok=True)
# Show user where data will be storedconsole.print(f"🏠 [bold cyan]Project directories initialized:[/bold cyan]")console.print(f" Data: {data_dir}")console.print(f" Logs: {logs_dir}")console.print(f" Config: {config_dir}")console.print(f" Cache: {cache_dir}")
# Configure Loguru - set appropriate log levels# Remove default handler and add custom oneslogger.remove()
# Add console handler with WARNING level (minimal console output)logger.add( sys.stderr, level="WARNING", format="<level>{level}</level>: <level>{message}</level>",)
# Add file handler with DEBUG level for detailed logginglogger.add( logs_dir / f"ith_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log", level="DEBUG", rotation="10 MB", retention="30 days", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",)# -------------------------------------------------------------------------------------------------------------------------------------------------------------------------#import pandas as pdimport webbrowser, os, glob, subprocess, platform, json, shutilimport numpy as np, pandas as pd, plotly.graph_objs as gofrom plotly.subplots import make_subplotsfrom scipy import statsfrom scipy.stats import gmeanfrom plotly.graph_objs.layout import XAxis, YAxisfrom typing import NamedTuplefrom numba import njitfrom rich.progress import ( Progress, SpinnerColumn, TimeElapsedColumn, BarColumn, TextColumn,)
class IthConfig(NamedTuple): delete_everything: bool = False output_dir: Path = data_dir / "synth_ithes" TMAEG_dynamically_determined_by: str = "mdd" TMAEG: float = 0.05 date_initiate: str = "2020-01-30" date_conclude: str = "2023-07-25" ith_epochs_lower_bound: int = 10 ith_epochs_upper_bound: int = 100000 sr_lower_bound: float = 0.5 sr_upper_bound: float = 9.9 aggcv_low_bound: float = 0 aggcv_up_bound: float = 0.70 qualified_results: int = 0 required_qualified_results: int = 15
class SyntheticNavParams(NamedTuple): start_date: str = "2020-01-30" # Start date for NAV data end_date: str = "2023-07-25" # End date for NAV data avg_daily_return: float = ( 0.00010123 # Average daily return; higher values increase the overall upward trend ) daily_return_volatility: float = ( 0.009 # Daily return volatility; higher values increase the daily fluctuations ) df: int = ( 5 # Degrees of freedom for the t-distribution; lower values increase the likelihood of extreme returns ) drawdown_prob: float = ( 0.05 # Probability of entering a drawdown; higher values increase the frequency of drawdowns ) drawdown_magnitude_low: float = ( 0.001 # Lower bound of drawdown magnitude; higher values increase the minimum drawdown size ) drawdown_magnitude_high: float = ( 0.003 # Upper bound of drawdown magnitude; higher values increase the maximum drawdown size ) drawdown_recovery_prob: float = ( 0.02 # Probability of recovering from a drawdown; higher values increase the likelihood of recovery )
class ProcessingParams(NamedTuple): trading_year_days_crypto: int = 365 trading_year_days_other: int = 252 retries: int = 20 delay: int = 3
class PlotConfig(NamedTuple): margin: dict = {"l": 50, "r": 50, "b": 50, "t": 50, "pad": 4} paper_bgcolor: str = "DarkSlateGrey" plot_bgcolor: str = "Black" legend_font_family: str = "Courier New" legend_font_size: int = 12 legend_font_color: str = "White" legend_bgcolor: str = "DarkSlateGrey" legend_bordercolor: str = "White" legend_borderwidth: int = 2 global_font_family: str = "Monospace" global_font_size: int = 12 global_font_color: str = "White" annotation_font_size: int = 16 annotation_font_color: str = "White" gridcolor: str = "dimgray" xaxis_tickmode: str = "array" xaxis_gridwidth: float = 0.5 yaxis_gridwidth: float = 0.5 auto_open_index_html: bool = ( True # New attribute to control auto-opening of index HTML ) auto_open_nav_html: bool = ( False # New attribute to control auto-opening of NAV HTML ) auto_open_live_view: bool = ( False # New attribute to control auto-opening of live view HTML )
# Use the configurationconfig = IthConfig()synthetic_nav_params = SyntheticNavParams()processing_params = ProcessingParams()plot_config = PlotConfig()
# ===== HELPER FUNCTIONS TO ELIMINATE DRY VIOLATIONS =====
def setup_directories(config: IthConfig) -> tuple[Path, Path]: """Setup output directories with consistent logic.""" if config.delete_everything: if config.output_dir.exists(): shutil.rmtree(config.output_dir) console.print( "🚀 [bold red]Sayonara, files![/bold red] Your files have left the chat" ) logger.warning(f"Deleted directory: {config.output_dir}") else: console.print("🤔 [yellow]Hmm, nothing to delete here.[/yellow]") logger.info("No existing directory to delete")
# Create anew after the great purge config.output_dir.mkdir(parents=True, exist_ok=True) nav_dir = config.output_dir / "nav_data_synthetic" nav_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Created output directories: {config.output_dir} and {nav_dir}") return config.output_dir, nav_dir
def create_progress_bar(console: Console) -> Progress: """Create a standardized progress bar configuration.""" return Progress( SpinnerColumn(), TimeElapsedColumn(), BarColumn(bar_width=25), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), TextColumn( "[red]Disqualified:{task.fields[disqualified]}[/red] [green]Qualified:{task.fields[qualified]}[/green]" ), console=console, transient=False, refresh_per_second=4, )
def load_and_validate_csv(csv_file: Path) -> pd.DataFrame: """Load and validate CSV file with consistent error handling.""" try: nav_data = pd.read_csv(csv_file, index_col="Date", parse_dates=True)
if nav_data.empty: raise ValueError(f"Empty CSV file: {csv_file}")
if "NAV" not in nav_data.columns: raise ValueError(f"No NAV column in {csv_file}")
if "PnL" not in nav_data.columns: logger.info(f"Calculating PnL for {csv_file}") nav_data = pnl_from_nav(nav_data).nav_data
return nav_data
except pd.errors.EmptyDataError: raise ValueError(f"Empty or corrupt CSV file: {csv_file}") except Exception as e: raise ValueError(f"Error processing {csv_file}: {e}")
def process_csv_batch( csv_files: list[Path], output_dir: Path, nav_dir: Path, config: IthConfig, bypass_thresholds: bool = False, data_source: str = "Synthetic",) -> tuple[int, int, int]: """Process a batch of CSV files with consistent logic.""" total_files = len(csv_files) processed_files = 0 disqualified_files = 0 qualified_results = config.qualified_results
for csv_file in csv_files: try: logger.info(f"Processing CSV: {csv_file.name}") nav_data = load_and_validate_csv(csv_file)
TMAEG = determine_tmaeg(nav_data, config.TMAEG_dynamically_determined_by) if pd.isna(TMAEG): logger.error(f"Invalid TMAEG calculated for {csv_file}") disqualified_files += 1 continue
result = process_nav_data( nav_data, output_dir, qualified_results, nav_dir, TMAEG, bypass_thresholds, data_source, ) qualified_results = result.qualified_results
if result.filename is not None and result.fig is not None: # Only save files for synthetic data, not for custom data to avoid duplication should_save_files = data_source != "Custom" if should_save_files: if save_files( result.fig, result.filename, nav_dir, nav_data, result.uid, source_file=csv_file if bypass_thresholds else None, ): processed_files += 1 else: disqualified_files += 1 else: # For custom data, we still count as processed but don't save files processed_files += 1 else: logger.warning(f"No output generated for {csv_file}") disqualified_files += 1
except Exception as e: logger.error(f"Error processing {csv_file}: {e}") disqualified_files += 1 continue
return processed_files, disqualified_files, qualified_results
def print_processing_summary(title: str, total: int, processed: int, disqualified: int): """Print processing summary with consistent formatting.""" if total > 0: console.print(f"📈 [bold blue]{title}[/bold blue]") console.print( f" Total: {total} | Processed: {processed} | Disqualified: {disqualified}" ) logger.info( f"{title}: Total={total}, Processed={processed}, Disqualified={disqualified}" )
# ===== END HELPER FUNCTIONS =====
# Setup directories using helper functionoutput_dir, nav_dir = setup_directories(config)
# Set constantsdate_duration = ( pd.to_datetime(config.date_conclude) - pd.to_datetime(config.date_initiate)).daysith_epochs_lower_bound = int(np.floor(date_duration / 28 / 6))logger.debug(f"{ith_epochs_lower_bound=}")ith_epochs_upper_bound = config.ith_epochs_upper_boundsr_lower_bound = config.sr_lower_boundsr_upper_bound = config.sr_upper_boundaggcv_low_bound = config.aggcv_low_boundaggcv_up_bound = config.aggcv_up_boundqualified_results = config.qualified_results
# Try to load existing dataexisting_csv_files = glob.glob(str(nav_dir / "*.csv"))
logger.debug(f"{existing_csv_files=}")
def load_config(filename="config.json"): """ config.json file contains the following: { "api_key": "key_string_here" } """ with open(filename, "r") as file: config = json.load(file) return config
from typing import NamedTuple
def generate_synthetic_nav(params: SyntheticNavParams): dates = pd.date_range(params.start_date, params.end_date) walk = stats.t.rvs( params.df, loc=params.avg_daily_return, scale=params.daily_return_volatility, size=len(dates), ) walk = np.cumsum(walk) drawdown = False for i in range(len(dates)): if drawdown: walk[i] -= np.random.uniform( params.drawdown_magnitude_low, params.drawdown_magnitude_high ) if np.random.rand() < params.drawdown_recovery_prob: drawdown = False elif np.random.rand() < params.drawdown_prob: drawdown = True walk = walk - walk[0] + 1 # Normalize the series so that it starts with 1 nav = pd.DataFrame(data=walk, index=dates, columns=["NAV"]) nav.index.name = "Date" nav["PnL"] = nav["NAV"].diff() nav["PnL"] = nav["PnL"].fillna( nav["NAV"].iloc[0] - 1 ) # Adjust the first PnL value accordingly return nav
@njitdef _sharpe_ratio_numba_helper( returns: np.ndarray, nperiods: float, rf: float = 0.0, annualize: bool = True) -> float: valid_returns = returns[~np.isnan(returns)] n = len(valid_returns) if n < 2: return np.nan mean_returns = np.mean(valid_returns) std_dev = np.sqrt(np.sum((valid_returns - mean_returns) ** 2) / (n - 1)) if std_dev == 0: return np.nan mean_diff = mean_returns - rf return ( np.sqrt(nperiods) * (mean_diff / std_dev) if annualize else mean_diff / std_dev )
def sharpe_ratio_numba( returns: np.ndarray, granularity: str, market_type: str = "crypto", rf: float = 0.0, annualize: bool = True,) -> float: trading_year_days = ( processing_params.trading_year_days_crypto if market_type == "crypto" else processing_params.trading_year_days_other ) if "d" in granularity: nperiods = trading_year_days / int(granularity.replace("d", "")) elif "m" in granularity: nperiods = (trading_year_days * 24 * 60) / int(granularity.replace("m", "")) else: raise ValueError( "Invalid granularity format. Use '1d', '2d', ..., '1m', '2m', ..." ) return _sharpe_ratio_numba_helper(returns, nperiods, rf, annualize)
class MaxDrawdownResult(NamedTuple): max_drawdown: float
def max_drawdown(nav_values) -> MaxDrawdownResult: max_drawdown = np.max(1 - nav_values / np.maximum.accumulate(nav_values)) return MaxDrawdownResult(max_drawdown=max_drawdown)
def save_files(fig, filename, output_dir, nav_data, uid, source_file=None): """Save HTML and CSV files with optional source file name in output filename""" try: output_dir = Path(output_dir).resolve() if not output_dir.exists(): logger.warning(f"Creating missing output directory: {output_dir}") output_dir.mkdir(parents=True, exist_ok=True)
# Add source filename to output if provided if source_file: source_name = Path(source_file).stem filename = filename.replace(".html", f"_{source_name}.html")
# Remove '_nav_data_synthetic' suffix if present filename = filename.replace("_nav_data_synthetic.html", ".html")
html_path = output_dir / filename csv_path = output_dir / filename.replace(".html", ".csv")
logger.debug(f"Saving files to: HTML={html_path}, CSV={csv_path}")
# Verify write permissions before attempting to save if not os.access(str(output_dir), os.W_OK): logger.error(f"No write permission for directory: {output_dir}") return False
# Save files with explicit error handling try: fig.write_html(str(html_path), include_plotlyjs="cdn", full_html=False) except Exception as e: logger.error(f"Error saving HTML file: {e}") return False
try: nav_data.to_csv(str(csv_path)) except Exception as e: logger.error(f"Error saving CSV file: {e}") return False
# Verify files were created with correct size if not html_path.exists() or html_path.stat().st_size == 0: logger.error(f"HTML file not created or empty: {html_path}") return False
if not csv_path.exists() or csv_path.stat().st_size == 0: logger.error(f"CSV file not created or empty: {csv_path}") return False
logger.success(f"Successfully saved files to {output_dir}") return True
except Exception as e: logger.error(f"Error in save_files: {e}") return False
class DeepestTroughsResult(NamedTuple): deepest_troughs_after_new_high: pd.Series new_high_flag: pd.Series
def deepest_troughs_and_new_highs(nav_values, running_max_nav) -> DeepestTroughsResult: deepest_troughs_after_new_high = pd.Series(index=nav_values.index, dtype=float) new_high_flag = pd.Series(index=nav_values.index, dtype=int) current_max = running_max_nav[0] current_trough = nav_values[0] for i in range(1, len(nav_values)): if running_max_nav[i] > current_max: current_max = running_max_nav[i] current_trough = nav_values[i] new_high_flag[i] = 1 elif nav_values[i] < current_trough: current_trough = nav_values[i] deepest_troughs_after_new_high[i] = current_trough return DeepestTroughsResult(deepest_troughs_after_new_high, new_high_flag)
class MaxDDPointsResult(NamedTuple): max_dd_points: pd.Series
def max_dd_points_after_new_high(drawdowns, new_high_flag) -> MaxDDPointsResult: max_dd_points = pd.Series(np.zeros(len(drawdowns)), index=drawdowns.index) current_max_dd = 0 max_dd_index = -1 for i in range(1, len(drawdowns)): if new_high_flag[i] == 1: if max_dd_index != -1: max_dd_points[max_dd_index] = current_max_dd current_max_dd = 0 max_dd_index = -1 else: if drawdowns[i] > current_max_dd: current_max_dd = drawdowns[i] max_dd_index = i if max_dd_index != -1: max_dd_points[max_dd_index] = current_max_dd return MaxDDPointsResult(max_dd_points)
class GeometricMeanDrawdownResult(NamedTuple): geometric_mean: float
def geometric_mean_of_drawdown(nav_values) -> GeometricMeanDrawdownResult: running_max_nav = nav_values.cummax() deepest_troughs_result = deepest_troughs_and_new_highs(nav_values, running_max_nav) drawdowns_to_deepest_troughs = ( running_max_nav - deepest_troughs_result.deepest_troughs_after_new_high ) max_dd_points_result = max_dd_points_after_new_high( drawdowns_to_deepest_troughs, deepest_troughs_result.new_high_flag ) max_dd_points_fraction = max_dd_points_result.max_dd_points / running_max_nav spike_values = max_dd_points_fraction[max_dd_points_fraction > 0] if spike_values.empty: # Check if spike_values is empty geometric_mean = np.nan # Return NaN or some other appropriate value else: geometric_mean = gmean(spike_values) return GeometricMeanDrawdownResult(geometric_mean=geometric_mean)
class ExcessGainLossResult(NamedTuple): excess_gains: np.ndarray excess_losses: np.ndarray num_of_ith_epochs: int ith_epochs: np.ndarray ith_intervals_cv: float
@njitdef _excess_gain_excess_loss_numba(nav, hurdle): excess_gain = excess_loss = 0 excess_gains = [0] excess_losses = [0] excess_gains_at_ith_epoch = [0] last_reset_state = False ith_epochs = [False] * len(nav) endorsing_crest = endorsing_nadir = candidate_crest = candidate_nadir = nav[0] for i, (equity, next_equity) in enumerate(zip(nav[:-1], nav[1:])): if next_equity > candidate_crest: excess_gain = ( next_equity / endorsing_crest - 1 if endorsing_crest != 0 else 0 ) candidate_crest = next_equity if next_equity < candidate_nadir: excess_loss = 1 - next_equity / endorsing_crest candidate_nadir = next_equity reset_candidate_nadir_excess_gain_and_excess_loss = ( excess_gain > abs(excess_loss) and excess_gain > hurdle and candidate_crest >= endorsing_crest ) if reset_candidate_nadir_excess_gain_and_excess_loss: endorsing_crest = candidate_crest endorsing_nadir = candidate_nadir = equity excess_gains_at_ith_epoch.append(excess_gain if not last_reset_state else 0) else: endorsing_nadir = min(endorsing_nadir, equity) excess_gains_at_ith_epoch.append(0) last_reset_state = reset_candidate_nadir_excess_gain_and_excess_loss excess_gains.append(excess_gain) excess_losses.append(excess_loss) if reset_candidate_nadir_excess_gain_and_excess_loss: excess_gain = excess_loss = 0 ith_epoch_condition = ( len(excess_gains) > 1 and excess_gains[-1] > excess_losses[-1] and excess_gains[-1] > hurdle ) ith_epochs[i + 1] = ith_epoch_condition num_of_ith_epochs = ith_epochs.count(True) ith_interval_separators = [i for i, x in enumerate(ith_epochs) if x] ith_interval_separators.insert(0, 0) ith_intervals = np.diff( np.array(ith_interval_separators) ) # Convert to NumPy array before using np.diff ith_intervals_cv = ( np.std(ith_intervals) / np.mean(ith_intervals) if len(ith_intervals) > 0 else np.nan ) return ExcessGainLossResult( excess_gains=np.array(excess_gains), excess_losses=np.array(excess_losses), num_of_ith_epochs=num_of_ith_epochs, ith_epochs=np.array(ith_epochs), ith_intervals_cv=ith_intervals_cv, )
def excess_gain_excess_loss_numba(hurdle, nav): original_df = ( nav.copy() if isinstance(nav, pd.DataFrame) and "NAV" in nav.columns else None ) nav = nav["NAV"].values if original_df is not None else nav.values result = _excess_gain_excess_loss_numba(nav, hurdle) if original_df is not None: original_df["Excess Gains"] = result.excess_gains original_df["Excess Losses"] = result.excess_losses original_df["ITHEs"] = result.ith_epochs return original_df else: return result
def get_first_non_zero_digits(num, digit_count): # Get first 12 non-zero, non-decimal, non-negative digits non_zero_digits = "".join([i for i in str(num) if i not in ["0", ".", "-"]][:12]) # Then trim or pad to the desired length uid = (non_zero_digits + "0" * digit_count)[:digit_count] return uid
class PnLResult(NamedTuple): nav_data: pd.DataFrame
def pnl_from_nav(nav_data) -> PnLResult: """Calculate PnL from NAV as a fractional percentage.""" try: nav_copy = nav_data.copy() # Create explicit copy to avoid chained assignment nav_copy.loc[:, "PnL"] = nav_copy["NAV"].diff() / nav_copy["NAV"].shift(1) nav_copy.loc[nav_copy.index[0], "PnL"] = 0 # Set first row PnL return PnLResult(nav_data=nav_copy) except Exception as e: logger.error(f"Error calculating PnL: {e}") raise
class ProcessNavDataResult(NamedTuple): qualified_results: int sharpe_ratio: float num_of_ith_epochs: int filename: str uid: str fig: go.Figure # Add fig to the NamedTuple recent_3_max_days: float # Add recent 3 maximum days
def log_results(results): headers = [ "TMAEG", "Sharpe<br/>Ratio", "ITH<br/>Epochs<br/>Count", "EL CV", "ITH CV", "AGG CV", "Avg Days<br/>Per Epoch", "Recent 3<br/>Max Days", "Filename", "Data Source", ] df = pd.DataFrame(results, columns=headers)
# Filter out rows where Filename is None df = df[df["Filename"].notna()]
# Add a rank column for AGG CV (convert to integer) df["AGG CV Rank"] = df["AGG CV"].rank(method="min").astype(int)
# Add a rank column for Sharpe Ratio (largest value ranks as #1, convert to integer) df["Sharpe<br/>Ratio Rank"] = ( df["Sharpe<br/>Ratio"].rank(method="min", ascending=False).astype(int) )
# Add a rank column for Recent 3 Max Days (smallest value ranks as #1, convert to integer) df["Recent 3<br/>Max Days Rank"] = ( df["Recent 3<br/>Max Days"].rank(method="min", ascending=True).astype(int) )
# Create a new column with HTML links df["Link"] = df["Filename"].apply( lambda x: f'<a href="synth_ithes/{x}" target="_blank">Open</a>' )
# Reorder columns to put Data Source before Link, and exclude Filename columns = [ col for col in df.columns if col not in ["Filename", "Data Source", "Link"] ] columns.extend(["Data Source", "Link"]) df = df[columns]
# Get the index of the Recent 3 Max Days Rank column for default sorting recent_3_rank_col_index = df.columns.get_loc("Recent 3<br/>Max Days Rank")
html_table = df.to_html( classes="table table-striped", index=False, table_id="results_table", escape=False, )
# Add DataTables script to enable sorting and set page length html_output = f""" <html> <head> <title>ITH Fitness Analysis Results</title> <link rel="stylesheet" type="text/css" href="https://cdn.datatables.net/1.10.21/css/jquery.dataTables.css"> <link rel="stylesheet" href="https://classless.de/classless.css"> <style> body {{ font-family: Osaka, 'SF Mono', Monaco, Menlo, Consolas, 'Cascadia Code', 'JetBrains Mono', 'Fira Code', 'Source Code Pro', 'Liberation Mono', 'DejaVu Sans Mono', 'Courier New', monospace !important; margin: 0 !important; padding: 20px !important; }} .container {{ max-width: 100% !important; margin: 0 auto !important; padding: 0 15px !important; }} table {{ width: 100% !important; border-collapse: collapse !important; margin: 0 auto !important; font-family: inherit !important; }} th, td {{ padding: 8px 12px !important; border-bottom: 1px solid #ddd !important; font-family: inherit !important; }} th {{ background-color: #f8f9fa !important; font-weight: 600 !important; text-align: right !important; line-height: 1.2 !important; }} td {{ text-align: right !important; }} /* Left-align text columns */ th:nth-last-child(2), td:nth-last-child(2) {{ text-align: left !important; }} /* Data Source */ th:nth-last-child(1), td:nth-last-child(1) {{ text-align: left !important; }} /* Link */ /* Multi-line header support */ th {{ vertical-align: middle !important; }} .dataTables_wrapper {{ width: 100% !important; font-family: inherit !important; }} h1 {{ font-family: inherit !important; }} </style> <script type="text/javascript" charset="utf8" src="https://code.jquery.com/jquery-3.5.1.js"></script> <script type="text/javascript" charset="utf8" src="https://cdn.datatables.net/1.10.21/js/jquery.dataTables.js"></script> <script> $(document).ready(function() {{ $('#results_table').DataTable({{ "pageLength": 200, "lengthMenu": [[200, 500, 1000, -1], [200, 500, 1000, "All"]], "order": [[ {recent_3_rank_col_index}, "asc" ]] }}); }}); </script> </head> <body> <div class="container"> <h1>ITH Fitness Analysis Results</h1> {html_table} </div> </body> </html> """
html_file_path = data_dir / "results.html" with open(html_file_path, "w") as file: file.write(html_output)
logger.success("Results have been written to results.html") console.print(f"📊 [bold green]Results saved to:[/bold green] {html_file_path}")
# Automatically open the HTML file in the default web browser if enabled if plot_config.auto_open_index_html: webbrowser.open(f"file://{html_file_path.resolve()}") console.print("🌐 [blue]Opening results in browser...[/blue]")
# Initialize a list to store resultsresults = []
def process_nav_data( nav_data, output_dir, qualified_results, nav_dir, TMAEG, bypass_thresholds=False, data_source="Synthetic",) -> ProcessNavDataResult:
# Extract the first six non-zero digits from the first two rows of the NAV column uid_part1 = get_first_non_zero_digits(nav_data["NAV"].iloc[0], 6) uid_part2 = get_first_non_zero_digits(nav_data["NAV"].iloc[1], 6) # Concatenate the two parts to form the UID uid = uid_part1 + uid_part2 logger.debug(f"{uid=}")
# Initialize filename with a default value filename = None ith_durations = None excess_losses_at_ithes = None fig = None # Initialize fig to None
# Calculate days_elapsed here days_elapsed = (nav_data.index[-1] - nav_data.index[0]).days
sharpe_ratio = sharpe_ratio_numba(nav_data["PnL"].dropna().values, "1d") calculated_nav = excess_gain_excess_loss_numba(TMAEG, nav_data)
if isinstance(calculated_nav, pd.DataFrame): ith_epochs = calculated_nav[calculated_nav["ITHEs"]].index num_of_ith_epochs = len(ith_epochs) else: ith_epochs = nav_data.index[calculated_nav.ith_epochs] num_of_ith_epochs = calculated_nav.num_of_ith_epochs
# Add detailed logging for threshold checks logger.debug(f"Threshold check details: bypass_thresholds={bypass_thresholds}") logger.debug(f"SR check: {sr_lower_bound} < {sharpe_ratio} < {sr_upper_bound}") logger.debug( f"Investment Time Horizon epochs check: {ith_epochs_lower_bound} < {num_of_ith_epochs} < {ith_epochs_upper_bound}" )
if bypass_thresholds or ( sr_lower_bound < sharpe_ratio < sr_upper_bound and ith_epochs_lower_bound < num_of_ith_epochs < ith_epochs_upper_bound ): if bypass_thresholds: logger.debug("Bypassing thresholds for custom CSV.") else: logger.debug("Thresholds met for synthetic data.")
logger.debug(f"Found {num_of_ith_epochs=}, {sharpe_ratio=}")
# Get actual ITH epoch dates (confirmed epochs only) ith_epoch_dates = calculated_nav[calculated_nav["ITHEs"]].index
# Create complete timeline with start, epochs, and end for interval calculation # Only include the end date if it's NOT already an epoch (to avoid double counting) timeline_dates = ith_epoch_dates.insert(0, calculated_nav.index[0]) if not calculated_nav["ITHEs"].iloc[-1]: # Only add end if it's not an epoch timeline_dates = timeline_dates.append(pd.Index([calculated_nav.index[-1]]))
logger.debug(f"timeline_dates: {timeline_dates}")
# Calculate actual confirmed ITH epoch count (excluding artificial boundaries) ithe_ct = len(ith_epoch_dates) # Only count real epochs
# Calculate intervals in days between timeline points timeline_indices = [ calculated_nav.index.get_loc(date) for date in timeline_dates ] ith_durations_indices = np.diff(timeline_indices)
# Convert index differences to actual days date_diffs = [ (timeline_dates[i + 1] - timeline_dates[i]).days for i in range(len(timeline_dates) - 1) ] ith_durations_days = np.array(date_diffs)
logger.debug(f"ith_durations_indices: {ith_durations_indices}") logger.debug(f"ith_durations_days: {ith_durations_days}")
# Calculate average days to ITH epoch (traditional metric) days_taken_to_ithe = days_elapsed / ithe_ct if ithe_ct > 0 else np.nan
# Calculate maximum of recent 3 timespan durations in days recent_3_max_days = ( np.max(ith_durations_days[-3:]) if len(ith_durations_days) >= 3 else np.max(ith_durations_days) if len(ith_durations_days) > 0 else np.nan )
# Calculate ITH CV using index-based durations for consistency ith_cv = ( np.std(ith_durations_indices) / np.mean(ith_durations_indices) if len(ith_durations_indices) > 0 else np.nan )
# Calculate the coefficient of variation for Excess Losses at ITH Epochs excess_losses_at_ithes = calculated_nav[calculated_nav["ITHEs"]][ "Excess Losses" ] excess_losses_at_ithes = excess_losses_at_ithes[ excess_losses_at_ithes != 0 ] # Exclude zero values last_excess_loss = calculated_nav["Excess Losses"].iloc[ -1 ] # Include the last value of Excess Losses (even if it is not flagged with ITH Epoch True), unless it's already included if not calculated_nav["ITHEs"].iloc[ -1 ]: # Check if the last value of ITH Epoch is False excess_losses_at_ithes = pd.concat( [ excess_losses_at_ithes, pd.Series([last_excess_loss], index=[calculated_nav.index[-1]]), ] ) if excess_losses_at_ithes.empty: # Check if excess_losses_at_ithes is empty el_cv = np.nan # Return NaN or some other appropriate value else: el_cv = np.std(excess_losses_at_ithes) / np.mean(excess_losses_at_ithes)
aggcv = max(el_cv, ith_cv) logger.debug(f"{aggcv=}")
# Add logging for aggcv check logger.debug(f"AGGCV check: {aggcv_low_bound} < {aggcv} < {aggcv_up_bound}")
if bypass_thresholds or (aggcv_low_bound < aggcv < aggcv_up_bound): if bypass_thresholds: source_name = Path(nav_dir).stem if nav_dir else "unknown" filename = ( f"nav_ming_xu_" f"EL_{el_cv:.5f}_" f"ITHC_{ith_cv:.5f}_" f"TMAEG_{TMAEG:.5f}_" f"ITHEs_{ithe_ct}_" f"D2ITHE_{days_taken_to_ithe:.2f}_" f"SR_{sharpe_ratio:.4f}_" f"UID_{uid}.html" ) else: filename = ( f"EL_{el_cv:.5f}_" f"ITHC_{ith_cv:.5f}_" f"TMAEG_{TMAEG:.5f}_" f"ITHEs_{ithe_ct}_" f"D2ITHE_{days_taken_to_ithe:.2f}_" f"SR_{sharpe_ratio:.4f}_" f"UID_{uid}.html" )
fig = make_subplots( rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.01, subplot_titles=("NAV", "Excess Gains & Losses"), )
ith_epochs = calculated_nav[calculated_nav["ITHEs"]].index num_of_ith_epochs = len(ith_epochs) ithes_dir = output_dir / f"ITH_Epochs_{num_of_ith_epochs}" ithes_dir.mkdir(parents=True, exist_ok=True) crossover_epochs = calculated_nav.loc[ith_epochs] fig.add_trace( go.Scatter( x=crossover_epochs.index, y=crossover_epochs["NAV"], mode="markers", name="ITH Epochs on NAV", marker=dict(color="darkgoldenrod", size=20), ), row=1, col=1, ) fig.add_trace( go.Scatter( x=crossover_epochs.index, y=crossover_epochs["Excess Gains"], mode="markers", name="ITH Epochs on Excess Gains", marker=dict(color="blue", size=20), ), row=2, col=1, ) fig.add_trace( go.Scatter( x=calculated_nav.index, y=calculated_nav["NAV"], mode="lines", name="NAV", ), row=1, col=1, ) fig.add_trace( go.Scatter( x=calculated_nav.index, y=calculated_nav["Excess Gains"], mode="lines", name="Excess Gains", line=dict(color="green"), ), row=2, col=1, ) fig.add_trace( go.Scatter( x=calculated_nav.index, y=calculated_nav["Excess Losses"], mode="lines", name="Excess Losses", line=dict(color="red"), ), row=2, col=1, ) fig.update_layout( title=f"{num_of_ith_epochs} Investment Time Horizon Epochs -- {filename}", autosize=True, # Enable autosize for responsive layout margin=plot_config.margin, paper_bgcolor=plot_config.paper_bgcolor, plot_bgcolor=plot_config.plot_bgcolor, legend=dict( x=0.01, y=0.98, traceorder="normal", font=dict( family=plot_config.legend_font_family, size=plot_config.legend_font_size, color=plot_config.legend_font_color, ), bgcolor=plot_config.legend_bgcolor, bordercolor=plot_config.legend_bordercolor, borderwidth=plot_config.legend_borderwidth, ), font=dict( family=plot_config.global_font_family, size=plot_config.global_font_size, color=plot_config.global_font_color, ), annotations=[ dict( x=0.5, y=0.95, xref="paper", yref="paper", text="NAV<br>", showarrow=False, font=dict( size=plot_config.annotation_font_size, color=plot_config.annotation_font_color, ), ), dict( x=0.5, y=0.45, xref="paper", yref="paper", text="Excess Gains & Losses<br>", showarrow=False, font=dict( size=plot_config.annotation_font_size, color=plot_config.annotation_font_color, ), ), ], ) fig.update_yaxes( gridcolor=plot_config.gridcolor, type="linear", row=1, col=1 ) fig.update_yaxes(gridcolor=plot_config.gridcolor, row=2, col=1) fig.update_xaxes(gridcolor=plot_config.gridcolor, row=1, col=1) fig.update_xaxes(gridcolor=plot_config.gridcolor, row=2, col=1)
# Generate monthly ticks between the minimum and maximum dates monthly_ticks = pd.date_range( nav_data.index.min(), nav_data.index.max(), freq="MS" ) monthly_tick_labels = monthly_ticks.strftime("%Y-%m")
# Customize X-axis grid lines custom_xaxis = XAxis( tickmode=plot_config.xaxis_tickmode, tickvals=monthly_ticks, # Set to monthly_ticks showgrid=True, # Show vertical grid gridwidth=plot_config.xaxis_gridwidth, # Vertical grid width )
custom_yaxis = YAxis( showgrid=True, # Show vertical grid gridwidth=plot_config.yaxis_gridwidth, # Vertical grid width ) fig.update_layout(xaxis=custom_xaxis.to_plotly_json()) fig.update_layout(yaxis=custom_yaxis.to_plotly_json())
# Automatically open the live view version of the HTML if enabled if plot_config.auto_open_live_view: fig.show()
save_files( fig, filename, output_dir, nav_data, uid, source_file=None if not bypass_thresholds else nav_dir, ) save_files( fig, filename, output_dir, nav_data, uid, source_file=None if not bypass_thresholds else nav_dir, )
# Increment qualified_results if not bypassing thresholds if not bypass_thresholds: qualified_results += 1
logger.debug( f"Generated {TMAEG=}, {sharpe_ratio=}, {num_of_ith_epochs=}, {el_cv=},{ith_cv=}, {aggcv=}, {days_taken_to_ithe=}" )
# Append the results to the list only if filename is not None if filename: results.append( [ TMAEG, sharpe_ratio, num_of_ith_epochs, el_cv, ith_cv, aggcv, days_taken_to_ithe, recent_3_max_days, filename, data_source, ] )
# Automatically open the NAV HTML file in the default web browser if enabled if plot_config.auto_open_nav_html and filename: file_path = str(output_dir / filename) webbrowser.open(f"file://{os.path.abspath(file_path)}")
else: pass # Removed repetitive warning logs
if ith_durations is not None: logger.debug(f"ith_durations in process_nav_data: {ith_durations}")
logger.debug( f"excess_losses_at_ithes in process_nav_data: {excess_losses_at_ithes}" )
return ProcessNavDataResult( qualified_results=qualified_results, sharpe_ratio=sharpe_ratio, num_of_ith_epochs=num_of_ith_epochs, filename=filename, uid=uid, fig=fig, # Return the fig object recent_3_max_days=( recent_3_max_days if "recent_3_max_days" in locals() else np.nan ), )
# Initialize countersqualified_results = 0counter = 0 # Add counter initialization here
# Try to load existing dataexisting_csv_files = glob.glob(str(nav_dir / "*.csv"))
logger.debug(f"{existing_csv_files=}")
def determine_tmaeg(nav_data, method): if method == "geomean": return geometric_mean_of_drawdown(nav_data["NAV"]).geometric_mean elif method == "mdd": return max_drawdown(nav_data["NAV"]).max_drawdown elif method == "fixed": return TMAEG return None
# Process existing CSV filesif existing_csv_files: existing_csv_paths = [Path(f) for f in existing_csv_files] processed, disqualified, qualified_results = process_csv_batch( existing_csv_paths, config.output_dir, nav_dir, config, bypass_thresholds=False, data_source="Synthetic", ) print_processing_summary( "Existing CSV Processing Complete", len(existing_csv_files), processed, disqualified, )
# Generate new NAV data if necessaryconsole.print(f"🔄 [bold yellow]Starting synthetic data generation...[/bold yellow]")with create_progress_bar(console) as progress: task = progress.add_task( "Generating synthetic data...", total=config.required_qualified_results, disqualified=0, qualified=qualified_results, )
while qualified_results < config.required_qualified_results: counter += 1 synthetic_nav = generate_synthetic_nav(synthetic_nav_params) TMAEG = determine_tmaeg(synthetic_nav, config.TMAEG_dynamically_determined_by) result = process_nav_data( synthetic_nav, config.output_dir, qualified_results, nav_dir, TMAEG, bypass_thresholds=False, data_source="Generated", ) qualified_results = result.qualified_results
if result.filename is not None: logger.debug( f"Processing synthetic data {counter:4}: {result.filename=}, {result.uid=}, {TMAEG=}, {result.sharpe_ratio=}, newly generated." ) if result.fig is not None: save_files( result.fig, result.filename, nav_dir, synthetic_nav, result.uid ) progress.update(task, advance=1, qualified=qualified_results) else: # Increment disqualified count without logging warning progress.update( task, disqualified=progress.tasks[0].fields["disqualified"] + 1 )
if qualified_results >= config.required_qualified_results: console.print( f"🎯 [bold green]Target reached![/bold green] Found {config.required_qualified_results} qualified results" ) logger.info( f"Required number of qualified results ({config.required_qualified_results}) reached." ) break
# Ensure clean line after progress barconsole.print("")
# After the config declarations, add this function to handle custom CSV filesdef get_date_range_from_csvs(csv_folder: Path) -> tuple[str, str]: """Get the earliest start date and latest end date from all CSVs in folder""" if not csv_folder.exists(): logger.warning( f"Custom CSV folder not found: {csv_folder}\n" "The folder will be created automatically. Place your custom CSV files there with columns: Date, NAV, PnL (optional)." ) return synthetic_nav_params.start_date, synthetic_nav_params.end_date
csv_files = list(csv_folder.glob("*.csv")) if not csv_files: logger.info( f"No custom CSV files found in {csv_folder} - using default date range" ) return synthetic_nav_params.start_date, synthetic_nav_params.end_date
all_dates = [] processed_files = 0 for csv_file in csv_files: try: df = pd.read_csv(csv_file, parse_dates=["Date"]) if df.empty: logger.warning(f"Empty CSV file: {csv_file}") continue if "Date" not in df.columns: logger.error(f"No Date column in {csv_file}") continue all_dates.extend(df["Date"].tolist()) processed_files += 1 except pd.errors.EmptyDataError: logger.error(f"Empty or corrupt CSV file: {csv_file}") continue except Exception as e: logger.error(f"Failed to process {csv_file}: {e}") continue
if not all_dates: logger.warning("No valid dates found in any CSV files") return synthetic_nav_params.start_date, synthetic_nav_params.end_date
if processed_files < len(csv_files): logger.warning( f"Only processed {processed_files} out of {len(csv_files)} CSV files" )
min_date = min(all_dates).strftime("%Y-%m-%d") max_date = max(all_dates).strftime("%Y-%m-%d") logger.info(f"Using date range from custom CSVs: {min_date} to {max_date}") return min_date, max_date
# * IMPORTANT: For first time users, replace this path with your own performance record folder# * The folder should contain CSV files with columns: Date, NAV, PnL (optional)# ^ Example path: Path('/Users/your_username/trading_records')custom_csv_folder = data_dir / "nav_data_custom"
# Create the custom CSV folder if it doesn't existcustom_csv_folder.mkdir(parents=True, exist_ok=True)console.print(f"📁 [bold cyan]Custom CSV folder ready:[/bold cyan] {custom_csv_folder}")if not any(custom_csv_folder.glob("*.csv")): console.print( f"💡 [yellow]Tip:[/yellow] Place your trading CSV files (with Date, NAV columns) in the folder above to analyze your own performance data!" )
start_date, end_date = get_date_range_from_csvs(custom_csv_folder)
# Update synthetic params with custom date rangesynthetic_nav_params = synthetic_nav_params._replace( start_date=start_date, end_date=end_date)
# Process custom CSV files if they existcustom_csvs = list(custom_csv_folder.glob("*.csv"))if custom_csvs: console.print( f"📈 [bold blue]Processing {len(custom_csvs)} custom CSV file(s)...[/bold blue]" ) processed_files, disqualified_files, qualified_results = process_csv_batch( custom_csvs, config.output_dir, nav_dir, config, bypass_thresholds=True, data_source="Custom", ) print_processing_summary( "Custom CSV Processing Complete", len(custom_csvs), processed_files, disqualified_files, )else: console.print( f"📂 [dim]No custom CSV files to process - continuing with synthetic data generation[/dim]" )
# The rest of the synthetic data generation code remains the same# but will now use the updated date range from custom CSVs if they exist
# Log the results in a tabulated format - do this at the very end after all processinglog_results(results)