Onepagecode

Onepagecode

PyPM — Python Portfolio Manager

A Professional Algorithmic Trading Framework for Backtesting & Optimization

Onepagecode's avatar
Onepagecode
Dec 14, 2025
∙ Paid

Download source code/dataset using the button at the end of this artcle

PyPM (Python Portfolio Manager) is a comprehensive quantitative trading system that enables you to backtest trading strategies, optimize parameters, and evaluate performance using professional-grade metrics. The framework includes advanced features like bootstrap resampling for robustness testing and machine learning integration for predictive trading.

This Substack is reader-supported. To receive new posts and support my work, consider becoming a free or paid subscriber.

🎯 What Can You Do With PyPM?

  • ✅ Backtest Trading Strategies — Test your ideas on historical data

  • ✅ Optimize Parameters — Find the best settings using grid search

  • ✅ Evaluate Performance — Professional metrics (Sharpe, CAGR, Drawdown)

  • ✅ Bootstrap Testing — Ensure strategies work across 1,000+ scenarios

  • ✅ Machine Learning — Integrate ML predictions into trading signals

  • ✅ Visualize Results — Equity curves, heatmaps, 3D surface plots

🚀 Quick Start Guide

Download Source Code:

Use the button at the end to download source code

Step 1: Prerequisites

Ensure you have Python 3.7 or higher installed:

python --version  # Should show Python 3.7+

Step 2: Clone or Navigate to Project

cd /path/to/algorithmic-trading-with-python-master

Step 3: Create Virtual Environment

On macOS/Linux:

python3 -m venv venv
source venv/bin/activate

On Windows:

python -m venv venv
venv\Scripts\activate

You should see (venv) in your terminal prompt.

Step 4: Install Dependencies

pip install --upgrade pip
pip install -r requirements.txt

Step 5: Verify Installation

python -c “import pandas, numpy, sklearn, matplotlib; print(’✅ All dependencies installed successfully!’)”

📊 Running Your First Strategy

1. Basic Portfolio Simulation

Run a simple Bollinger Band strategy:

cd src
python simulate_portfolio.py

What it does:

  • Loads historical price data for all available stocks

  • Generates buy/sell signals using Bollinger Bands (20-day)

  • Ranks stocks by rolling Sharpe ratio (100-day)

  • Simulates trading with $10,000 starting capital

  • Shows position summaries, performance metrics, and equity curve

Expected Output:

Position summaries for each trade
Initial parameters (cash, position limits)
Performance metrics (CAGR, Sharpe, Max Drawdown)
Equity curve visualization
Benchmark comparison chart

2. Parameter Optimization

Find the best Bollinger Band and Sharpe ratio parameters:

cd src
python optimize_portfolio.py

What it does:

  • Tests 100 parameter combinations (10×10 grid)

  • Bollinger periods: 10, 20, 30, …, 100 days

  • Sharpe periods: 10, 20, 30, …, 100 days

  • Runs 100 backtests and finds optimal settings

Expected Output:

Simulating 1 / 100 ... 
Simulating 50 / 100 ... 150s remaining (3.0s avg)
Simulating 100 / 100 ...
Elapsed time: 300s
Done.
Best configuration by excess_cagr:
  bollinger_n  sharpe_n  excess_cagr
0          40        60        0.125
Summary statistics (mean, std, min, max for all metrics)
Heatmap visualization
3D surface plot

3. Bootstrap Robustness Testing ⭐

Test strategy across 1,000 alternative historical scenarios:

cd src
python bootstrap_portfolio.py

What it does:

  • Runs 1,000 simulations with bootstrap resampling

  • Each simulation creates a different “alternative history”

  • Finds parameters that work across ALL scenarios

  • Prevents overfitting to one specific historical path

Why Bootstrap?

  • Robustness: Strategies tested on 1,000 variations

  • Confidence: Statistical distribution of outcomes

  • Reliability: Avoids lucky one-time results

Expected Runtime: 15–30 minutes (1,000 simulations)

Expected Output:

Starting simulation ...
Simulating 1 / 1000 ...
Simulating 500 / 1000 ... 450s remaining (0.9s avg)
Simulating 1000 / 1000 ...
Elapsed time: 900s
Done.
Best configuration by excess_cagr
Summary statistics across all bootstrap trials
Distribution plot of performance

4. White Noise Testing

Test if your strategy beats random chance:

cd src
python white_noise_portfolio.py

What it does:

  • Replaces preference matrix with random noise

  • Runs 1,000 trials with random position selection

  • Compares results to your actual strategy

  • If your strategy doesn’t beat random noise, it’s not working!

5. Machine Learning Strategy

Step A: Train the ML Model

cd src
python fit_alternative_data_model.py

What it does:

  • Loads alternative data (revenue estimates)

  • Detects significant events in revenue data

  • Labels events based on future price movements

  • Trains Random Forest classifier

  • Saves model to ml_model.joblib

Expected Output:

Features and labels dataframe
Model training progress
Cross-validation scores
Model saved to ml_model.joblib

Step B: Run ML-Based Trading

python simulate_alternative_data_portfolio.py

What it does:

  • Loads the trained ML model

  • Generates predictions for each stock

  • Uses predictions as position preferences

  • Runs backtest with ML-driven allocation

📁 Project Structure

algorithmic-trading-with-python-master/
├── data/                          # Historical price data
│   ├── eod/                       # End-of-day stock prices (CSV files)
│   ├── alternative_data/          # Revenue/alternative data (CSV files)
│   └── SPY.csv                    # S&P 500 benchmark data
│
├── src/                           # Main strategy scripts
│   ├── simulate_portfolio.py      # Basic Bollinger Band strategy
│   ├── optimize_portfolio.py      # Parameter optimization
│   ├── bootstrap_portfolio.py     # Bootstrap robustness testing ⭐
│   ├── white_noise_portfolio.py   # Random baseline testing
│   ├── fit_alternative_data_model.py    # Train ML model
│   ├── simulate_alternative_data_portfolio.py  # ML trading
│   │
│   └── pypm/                      # Core framework library
│       ├── metrics.py             # Performance calculations
│       ├── indicators.py          # Technical indicators
│       ├── signals.py             # Signal generation
│       ├── portfolio.py           # Position tracking
│       ├── simulation.py          # Backtesting engine
│       ├── optimization.py        # Grid search framework
│       ├── data_io.py             # Data loading utilities
│       ├── weights.py             # Sample weighting
│       └── ml_model/              # Machine learning pipeline
│           ├── events.py          # Event detection
│           ├── labels.py          # Label generation
│           ├── features.py        # Feature engineering
│           └── model.py           # Model training
│
├── requirements.txt               # Python dependencies
└── README.md                      # Original project readme

🔧 Configuration Options

Simulation Parameters

Edit any strategy script to customize:

simulator = simulation.SimpleSimulator(
    initial_cash=10000,           # Starting capital ($)
    max_active_positions=5,       # Max concurrent positions
    percent_slippage=0.0005,      # 0.05% slippage per trade
    trade_fee=1,                  # $1 per trade
)

Strategy Parameters

bollinger_n = 20    # Bollinger Band lookback period (days)
sharpe_n = 100      # Sharpe ratio calculation window (days)

Optimization Ranges

optimizer.optimize(
    bollinger_n=range(10, 110, 10),  # Test 10, 20, 30, ..., 100
    sharpe_n=range(10, 110, 10),     # Test 10, 20, 30, ..., 100
)

📊 Understanding the Output

Performance Metrics

| Metric | Description | Good Value | | — |-|| | CAGR | Compound Annual Growth Rate | 10% | | Excess CAGR | CAGR above benchmark | 0% | | Sharpe Ratio | Risk-adjusted return | 1.0 | | Sortino Ratio | Downside risk-adjusted return | 1.5 | | Max Drawdown | Largest peak-to-trough decline | < 20% | | Calmar Ratio | CAGR / Max Drawdown | 0.5 |

Visualizations

  • Equity Curve: Portfolio value over time

  • Benchmark Comparison: Strategy vs. buy-and-hold S&P 500

  • Heatmaps: Parameter performance across grid

  • 3D Surface Plots: Two-parameter interactions

  • Violin Plots: Distribution of results

🎓 Learning Path

Beginner

  1. Run simulate_portfolio.py to understand basic backtesting

  2. Modify bollinger_n and sharpe_n parameters

  3. Observe how performance changes

Intermediate

  1. Run optimize_portfolio.py to find best parameters

  2. Analyze heatmaps and 3D plots

  3. Run white_noise_portfolio.py to validate strategy

Advanced

  1. Run bootstrap_portfolio.py for robustness testing

  2. Train ML model with fit_alternative_data_model.py

  3. Implement custom strategies by modifying the scripts

🛠️ Troubleshooting

Import Errors

ModuleNotFoundError: No module named ‘pandas’

Solution: Activate virtual environment and install dependencies

source venv/bin/activate  # or venv\Scripts\activate on Windows
pip install -r requirements.txt

Data Not Found

AssertionError: No data available for AAPL

Solution: Ensure you’re in the src/ directory when running scripts

cd src
python simulate_portfolio.py

Slow Performance

Bootstrap and optimization can take 15–30 minutes. To speed up:

# Reduce bootstrap trials
optimizer.optimize(bootstrap_test_id=range(100))  # Instead of 1000
# Reduce grid search resolution
optimizer.optimize(
    bollinger_n=range(20, 60, 20),  # Test fewer values
    sharpe_n=range(20, 60, 20),
)

🔬 Advanced Usage

Creating Custom Strategies

  1. Define Signal Function (in pypm/signals.py)

def create_my_signal(series: pd.Series, n: int=20) - pd.Series:
    # Your logic here
    return signal_series
  1. Define Preference Function (in pypm/metrics.py)

def calculate_my_metric(series: pd.Series) - pd.Series:
    # Your logic here
    return metric_series
  1. Create Strategy Script

signal = prices.apply(signals.create_my_signal, args=(20,), axis=0)
preference = prices.apply(metrics.calculate_my_metric, axis=0)
simulator.simulate(prices, signal, preference)

Adding Custom Data

Place CSV files in data/eod/ with format:

date,open,high,low,close,volume
2020-01-01,100.0,105.0,99.0,103.0,1000000
2020-01-02,103.0,107.0,102.0,106.0,1200000

📚 Additional Resources

  • Original Book: Algorithmic Trading with Python (2020) by Chris Conlan

  • Metrics Reference: See src/pypm/metrics.py for all performance calculations

  • Indicators Reference: See src/pypm/indicators.py for technical indicators

  • Examples: Explore listings/ directory for chapter-by-chapter tutorials

⚠️ Disclaimer

This software is for educational and research purposes only.

  • Past performance does not guarantee future results

  • Backtesting can overfit to historical data

  • Real trading involves additional costs, risks, and complexities

  • Always paper trade before risking real capital

  • Consult a financial advisor before making investment decisions

📝 License

See license.txt for details.

🤝 Contributing

See contributing.md for contribution guidelines.

💡 Quick Tips

  1. Always run bootstrap testing before trusting a strategy

  2. Compare to white noise baseline to ensure edge exists

  3. Use cross-validation when training ML models

  4. Monitor multiple metrics, not just returns

  5. Account for transaction costs in all backtests

  6. Start with simple strategies before adding complexity

Happy Trading! 📈

Remember: The best strategy is one that works consistently across many scenarios, not just one that worked once in the past.

File: bootstrap_portfolio.py

def bind_simulator(**sim_kwargs) - Callable:
    symbols: List[str] = data_io.get_all_symbols()
    prices: pd.DataFrame = data_io.load_eod_matrix(symbols)
    _bollinger: Callable = signals.create_bollinger_band_signal
    bollinger_n = 20
    returns = metrics.calculate_return_series(prices)
    sharpe_n = 20

Creates and returns a simulation function configured with bootstrap-based portfolio optimization. This factory function binds market data and trading strategy parameters into a reusable simulation function, using a bootstrap resampling approach to create randomized variations of the Sharpe ratio calculation for robustness testing. It loads historical price data for all available symbols, sets up Bollinger Band signals for entry/exit timing, creates a bootstrapped Sharpe ratio calculator for stock preference ranking, and returns a configured simulation function that can be called repeatedly with different bootstrap seeds. The design loads all market data once at binding time rather than per simulation, creates nested functions that capture this data in their closure, and allows each simulation call to use the same data but different bootstrap samples for efficient repeated simulations without reloading data.

def bootstrap_rolling_sharpe_ratio(return_series: pd.Series) - pd.Series:
        _series = return_series.iloc[1:]
        _series = _series.sample(n=return_series.shape[0], replace=True)
        _series.iloc[:1] = [np.nan]
        _series = pd.Series(_series.values, index=return_series.index)
        _windowed_series = _series.rolling(sharpe_n)
        return _windowed_series.mean() / _windowed_series.std()

Calculates a bootstrapped rolling Sharpe ratio for a return series by introducing controlled randomness through resampling with replacement. This function helps test the robustness of the trading strategy against different historical scenarios by resampling the return series to create alternative historical paths while preserving the first data point as NaN to maintain alignment. It computes rolling mean and standard deviation on the bootstrapped data and returns the Sharpe ratio as a measure of risk-adjusted returns. The implementation excludes the first return (usually NaN) from bootstrap sampling, randomly samples with replacement to create a new sequence, reinserts NaN at the beginning to maintain index alignment, and applies rolling window calculations for localized Sharpe ratio computation.

def _simulate(bootstrap_test_id: int) - Performance:
        signal = prices.apply(_bollinger, args=(bollinger_n,), axis=0)
        preference = returns.apply(_sharpe, axis=0)
        simulator = simulation.SimpleSimulator(**sim_kwargs)
        simulator.simulate(prices, signal, preference)
        return simulator.portfolio_history.get_performance_metric_data()

Executes a single portfolio simulation with the configured parameters using Bollinger Bands for signals and bootstrapped Sharpe ratios for preferences. This core simulation function gets called repeatedly during optimization to run one complete backtest of the trading strategy. It generates trading signals for all stocks using Bollinger Bands, calculates preference scores using the bootstrapped Sharpe ratio, runs the portfolio simulation with these signals and preferences, and extracts performance metrics. The function applies the signal generation function to each stock’s price series, applies the preference calculation to each stock’s return series, initializes the simulator with user-provided parameters like initial cash and position limits, runs the backtest simulation, and returns a dictionary containing performance metrics such as CAGR, Sharpe ratio, and maximum drawdown.

This Substack is reader-supported. To receive new posts and support my work, consider becoming a free or paid subscriber.

File: optimize_portfolio.py

def bind_simulator(**sim_kwargs) - Callable:
    symbols: List[str] = data_io.get_all_symbols()
    prices: pd.DataFrame = data_io.load_eod_matrix(symbols)
    _bollinger: Callable = signals.create_bollinger_band_signal
    _sharpe: Callable = metrics.calculate_rolling_sharpe_ratio

Creates a closure that binds static simulation data and configuration to a simulation function, enabling efficient parameter optimization. This function loads all market data once and creates references to signal and metric calculation functions, then returns a nested function that accepts only the parameters being optimized. The design follows the closure pattern to avoid repeatedly loading the same price data during grid search optimization, significantly improving performance when testing hundreds of parameter combinations. The returned function takes strategy parameters as inputs and returns performance metrics as output, making it compatible with the GridSearchOptimizer interface. This approach separates the expensive data loading operation from the parameter-dependent simulation logic, allowing the optimizer to rapidly iterate through different parameter values while reusing the same underlying market data.

def _simulate(bollinger_n: int, sharpe_n: int) - Performance:
        
        signal = prices.apply(_bollinger, args=(bollinger_n,), axis=0)
        preference = prices.apply(_sharpe, args=(sharpe_n, ), axis=0)
        simulator = simulation.SimpleSimulator(**sim_kwargs)
        simulator.simulate(prices, signal, preference)
        return simulator.portfolio_history.get_performance_metric_data()

Executes a single backtest simulation with specified parameter values and returns performance metrics. This nested function has access to the price data and function references from its enclosing scope through closure, eliminating the need to pass or reload this data for each simulation run. The function applies the Bollinger Band signal generator across all symbols to determine buy and sell signals based on the bollinger_n parameter, then calculates rolling Sharpe ratios for each symbol using the sharpe_n parameter to establish position preference rankings. It instantiates a SimpleSimulator with the configuration passed to the parent function, runs the simulation with the generated signals and preferences, and extracts the resulting performance metrics. The return value is a dictionary containing key performance indicators like excess CAGR, Sharpe ratio, maximum drawdown, and other relevant statistics that the optimizer uses to evaluate this parameter combination’s effectiveness.

File: simulate_alternative_data_portfolio.py

def simulate_portfolio():
    symbols, eod_data, alt_data = load_data()
    classifier = load(os.path.join(SRC_DIR, ‘ml_model.joblib’))
    print(’Calculating signals ...’)
    signal = calculate_signals(classifier, symbols, eod_data, alt_data)
    first_signal_date = signal.first_valid_index()
    eod_data = eod_data[eod_data.index  first_signal_date]
    print(’Calculating preference matrix ...’)
    preference = pd.DataFrame(
        np.random.random(eod_data.shape), 
        columns=eod_data.columns, 
        index=eod_data.index,
    )
    simulator = simulation.SimpleSimulator(
        initial_cash=10000,
        max_active_positions=10,
        percent_slippage=0.0005,
        trade_fee=1,
    )
    simulator.simulate(eod_data, signal, preference)
    simulator.portfolio_history.print_position_summaries()
    simulator.print_initial_parameters()
    simulator.portfolio_history.print_summary()
    simulator.portfolio_history.plot()
    simulator.portfolio_history.plot_benchmark_comparison()

Executes a complete backtest simulation of a portfolio driven by machine learning predictions based on alternative data. This function orchestrates the entire workflow from loading the pre-trained classifier and market data through signal generation to final portfolio simulation and performance reporting. The design follows a sequential pipeline where each stage depends on the output of the previous one. First, it loads all available symbols along with their end-of-day price data and alternative data like revenue figures. Then it loads the previously trained machine learning model from disk and uses it to generate trading signals by analyzing patterns in the alternative data combined with price features. The function aligns the price data with the signal timeline by removing any historical prices that predate the first valid signal, ensuring the simulation only trades when signals are available. It creates a preference matrix using random values to determine position priority when multiple signals occur simultaneously, giving newer trades preference. The simulation runs with realistic constraints including initial capital, position limits, slippage costs, and transaction fees. Finally, it produces comprehensive output including individual position summaries, overall portfolio statistics, equity curve visualizations, and benchmark comparisons to evaluate the strategy’s performance against a buy-and-hold baseline.

File: simulate_portfolio.py

def simulate_portfolio():
    bollinger_n = 20
    sharpe_n = 100
    symbols: List[str] = data_io.get_all_symbols()
    prices: pd.DataFrame = data_io.load_eod_matrix(symbols)
    _bollinger = signals.create_bollinger_band_signal
    signal = prices.apply(_bollinger, args=(bollinger_n,), axis=0)
    _sharpe = metrics.calculate_rolling_sharpe_ratio
    preference = prices.apply(_sharpe, args=(sharpe_n, ), axis=0)
    simulator = simulation.SimpleSimulator(
        initial_cash=10000,
        max_active_positions=5,
        percent_slippage=0.0005,
        trade_fee=1,
    )
    simulator.simulate(prices, signal, preference)
    simulator.portfolio_history.print_position_summaries()
    simulator.print_initial_parameters()
    simulator.portfolio_history.print_summary()
    simulator.portfolio_history.plot()
    simulator.portfolio_history.plot_benchmark_comparison()

Executes a backtest simulation of a portfolio trading strategy based on Bollinger Bands with position sizing determined by rolling Sharpe ratios. This function implements a complete end-to-end trading system that loads historical price data for multiple symbols, generates buy and sell signals using Bollinger Band crossovers, ranks potential positions by their risk-adjusted returns using rolling Sharpe ratios, and simulates realistic portfolio performance including transaction costs and slippage. The design uses a fixed parameter configuration with a 20-day Bollinger Band period for signal generation and a 100-day window for Sharpe ratio calculations. The strategy operates by identifying when prices cross outside the Bollinger Band boundaries, which typically indicates overbought or oversold conditions that may reverse. The preference matrix based on rolling Sharpe ratios ensures that when multiple signals occur simultaneously, the portfolio allocates capital to positions with the best recent risk-adjusted performance. The simulation enforces realistic constraints including a $10,000 starting capital, maximum of 5 concurrent positions, 0.05% slippage per trade, and $1 transaction fees. After running the simulation, the function produces comprehensive output including detailed summaries of each position taken, the initial simulation parameters, overall portfolio statistics like total return and Sharpe ratio, an equity curve visualization showing portfolio value over time, and a benchmark comparison chart demonstrating how the strategy performed relative to a simple buy-and-hold approach.

File: white_noise_portfolio.py

def bind_simulator(**sim_kwargs) - Callable:
    symbols: List[str] = data_io.get_all_symbols()
    prices: pd.DataFrame = data_io.load_eod_matrix(symbols)
    _bollinger: Callable = signals.create_bollinger_band_signal
    bollinger_n = 20

Creates a closure that binds static simulation data and configuration to a simulation function designed for white noise testing of trading strategies. This function serves as a control experiment to establish a baseline performance distribution by using completely random preference values instead of calculated metrics. The design loads all market data and signal generation functions once, then returns a nested function that generates fresh random noise for each simulation run. This approach tests whether the trading strategy’s performance is genuinely driven by the signal logic or if random position selection could achieve similar results. The white noise preference matrix is generated from a normal distribution with mean zero and standard deviation one, ensuring no systematic bias in position selection. By running this simulation many times with different random seeds, the optimizer can build a distribution of expected returns under purely random position selection, which serves as a null hypothesis for evaluating whether more sophisticated preference calculations add real value. The returned function accepts a test ID parameter that doesn’t affect the simulation but allows the optimizer to track individual runs.

def _simulate(white_noise_test_id: int) - Performance:
        
        signal = prices.apply(_bollinger, args=(bollinger_n,), axis=0)
        _noise = np.random.normal(loc=0, scale=1, size=prices.shape)
        _cols = prices.columns
        _index = prices.index
        preference = pd.DataFrame(_noise, columns=_cols, index=_index)
        simulator = simulation.SimpleSimulator(**sim_kwargs)
        simulator.simulate(prices, signal, preference)
        return simulator.portfolio_history.get_performance_metric_data()

Executes a single backtest simulation using Bollinger Band signals with a randomly generated preference matrix to test baseline performance expectations. This nested function has access to the price data and signal generator from its enclosing scope through closure, avoiding repeated data loading across multiple test runs. The function applies the Bollinger Band signal generator with a fixed 20-day lookback period to identify trading opportunities, then creates a preference matrix filled with random values drawn from a standard normal distribution. This random preference matrix means that when multiple signals occur simultaneously, position selection is completely arbitrary rather than based on any calculated metric. The simulation runs with the same realistic constraints as the main strategy including initial capital, position limits, slippage, and transaction fees. The white_noise_test_id parameter serves as a unique identifier for each run but doesn’t influence the simulation logic, allowing the optimizer to execute hundreds of independent trials. The return value contains performance metrics that collectively form a distribution representing what returns could be expected from random position selection, providing a critical baseline for evaluating whether calculated preference metrics actually improve performance.

File: pypm/data_io.py

def load_eod_data(ticker: str, data_dir: str=EOD_DATA_DIR) - DataFrame:
    f_path = os.path.join(data_dir, f’{ticker}.csv’)
    assert os.path.isfile(f_path), f’No data available for {ticker}’
    return pd.read_csv(f_path, parse_dates=[’date’], index_col=’date’)

Loads end-of-day price data for a single stock ticker from a CSV file, returning a dataframe indexed by date. This function constructs the file path from the ticker symbol and data directory, validates that the file exists to provide clear error messages for missing data, then reads the CSV file while parsing the date column and setting it as the index. The design assumes CSV files are named with the ticker symbol and contain a date column along with price data columns like open, high, low, close, and volume. This function serves as the primary interface for loading individual stock data throughout the backtesting system.

def load_spy_data() - DataFrame:
    return load_eod_data(’SPY’, DATA_DIR)

Loads S&P 500 ETF price data as a convenience function for benchmark comparisons. This function wraps load_eod_data with the SPY ticker symbol and the base data directory, providing a simple interface for loading the market benchmark used throughout portfolio performance analysis. The design uses the base DATA_DIR rather than EOD_DATA_DIR to accommodate a different storage location for the benchmark data.

def _combine_columns(filepaths_by_symbol: Dict[str, str], 
    attr: str=’close’) - pd.DataFrame:
    data_frames = [
        pd.read_csv(
            filepath, 
            index_col=’date’, 
            usecols=[’date’, attr], 
            parse_dates=[’date’],
        ).rename(
            columns={
                ‘date’: ‘date’, 
                attr: symbol,
            }
        ) for symbol, filepath in filepaths_by_symbol.items()
    ]
    return pd.concat(data_frames, sort=True, axis=1)

Combines a specific attribute column from multiple CSV files into a single dataframe with symbols as column names. This internal helper function reads the specified attribute from each file, renames the column to the symbol name, then concatenates all dataframes horizontally to create a matrix where rows are dates and columns are symbols. The design uses a list comprehension to efficiently process all files, extracts only the date and specified attribute columns to minimize memory usage, parses dates for proper time series handling, and renames columns to use symbols as headers. The concatenation with sort=True ensures dates are aligned properly even if files have different date ranges, filling missing values with NaN. This function serves as the core data loading mechanism for creating the price matrices and alternative data matrices used throughout the backtesting system.

def load_eod_matrix(tickers: List[str], attr: str=’close’) - pd.DataFrame:
    filepaths_by_symbol = {
        t: os.path.join(EOD_DATA_DIR, f’{t}.csv’) for t in tickers
    }
    return _combine_columns(filepaths_by_symbol, attr)

Loads a matrix of end-of-day data for multiple tickers with a specified attribute as the values. This function constructs file paths for all requested tickers, then delegates to _combine_columns to create a dataframe where each column represents a ticker and each row represents a date. The design defaults to loading closing prices but allows specification of other attributes like open, high, low, or volume. This function is the primary interface for loading multi-stock price data used in portfolio simulations and signal generation.

def load_alternative_data_matrix(tickers: List[str]) - pd.DataFrame:
    filepaths_by_symbol = {
        t: os.path.join(ALTERNATIVE_DATA_DIR, f’{t}.csv’) for t in tickers
    }
    return _combine_columns(filepaths_by_symbol, ‘value’)

Loads a matrix of alternative data for multiple tickers from the alternative data directory. This function constructs file paths for all requested tickers in the alternative data directory, then delegates to _combine_columns to create a dataframe where each column represents a ticker and each row represents a date. The design assumes alternative data files contain a value column representing the alternative metric, which could be sentiment scores, revenue estimates, or other non-price data used for machine learning features.

def get_all_symbols() - List[str]:
    return [v.strip(’.csv’) for v in os.listdir(EOD_DATA_DIR)]

Returns a list of all available stock symbols by scanning the end-of-day data directory for CSV files. This function lists all files in the EOD data directory and strips the .csv extension from each filename to produce the ticker symbols. The design provides a convenient way to discover what data is available without manually maintaining a symbol list, enabling dynamic portfolio construction and testing across all available stocks.

def build_eod_closes() - None:
    filenames = os.listdir(EOD_DATA_DIR)
    filepaths_by_symbol = {
        v.strip(’.csv’): os.path.join(EOD_DATA_DIR, v) for v in filenames
    }
    result = _combine_columns(filepaths_by_symbol)
    result.to_csv(os.path.join(DATA_DIR, ‘eod_closes.csv’))

Builds a consolidated CSV file containing closing prices for all available stocks in a single matrix. This utility function scans the EOD data directory for all CSV files, constructs file paths for each symbol, combines all closing prices into a single dataframe using _combine_columns, then saves the result to a master CSV file. The design provides a convenient way to pre-process and cache the complete price matrix for faster loading in subsequent analyses, avoiding the need to read and combine individual files repeatedly.

def concatenate_metrics(df_by_metric: Dict[str, pd.DataFrame]) - pd.DataFrame:
    to_concatenate = []
    tuples = []
    for key, df in df_by_metric.items():
        to_concatenate.append(df)
        tuples += [(s, key) for s in df.columns.values]
    df = pd.concat(to_concatenate, sort=True, axis=1)
    df.columns = pd.MultiIndex.from_tuples(tuples, names=[’symbol’, ‘metric’])
    return df

Combines multiple dataframes with identical columns into a single hierarchical dataframe with multi-level column indices. This function accepts a dictionary mapping metric names to dataframes where each dataframe has the same column structure representing symbols, then concatenates them horizontally while creating a two-level column index with symbol as the first level and metric as the second level. The design constructs tuples pairing each symbol with its metric name, concatenates all dataframes along the column axis, then applies a MultiIndex to create the hierarchical structure. This enables efficient storage and access of multiple metrics for the same symbols, allowing code to retrieve specific symbol-metric combinations using tuple indexing. The function is essential for the simulation engine which needs to access price, signal, and preference data for each symbol in a unified structure that supports fast iteration.

File: pypm/filters.py

def calculate_non_uniform_lagged_change(series: pd.Series, n_days: int):
    _timedelta: pd.Timedelta = pd.Timedelta(days=n_days)
    _idx: pd.Series = series.index.searchsorted(series.index - _timedelta)
    _idx = _idx[_idx  0]
    _series = series.iloc[-_idx.shape[0]:]
    _pad_length = series.shape[0] - _idx.shape[0]
    _na_pad = pd.Series(None, index=series.index[:_pad_length])
    _lagged_series = series.iloc[_idx]
    _diff = pd.Series(_series.values-_lagged_series.values, index=_series.index)
    return pd.concat([_na_pad, _diff])

Calculates the change in values over a specified number of calendar days for non-uniformly spaced time series data, handling gaps like weekends and holidays. This function addresses the challenge of measuring lagged differences in financial time series where data points don’t occur on every calendar day due to market closures. The design uses pandas searchsorted to efficiently find the index position of the date that is n_days before each point in the series, creating a mapping between current values and their lagged counterparts. It filters out indices that would reference before the start of the series, extracts the corresponding subset of current values, builds a padding of NA values for the initial period where no lagged data exists, retrieves the lagged values using the computed indices, then calculates the difference between current and lagged values. The result is a series of the same length as the input with NA values at the beginning where lagged data isn’t available, followed by the calculated changes. This approach correctly handles non-uniform spacing by using calendar time rather than observation count, ensuring that a 30-day lag represents 30 calendar days regardless of how many trading days occurred in that period.

def calculate_cusum_events(series: pd.Series, 
    filter_threshold: float) - pd.DatetimeIndex:
    event_dates = list()
    s_up = 0
    s_down = 0
    for date, price in series.items():
        s_up = max(0, s_up + price)
        s_down = min(0, s_down + price)
        if s_up  filter_threshold:
            s_up = 0
            event_dates.append(date)
        elif s_down < -filter_threshold:
            s_down = 0
            event_dates.append(date)
    return pd.DatetimeIndex(event_dates)

Identifies significant events in a time series using a symmetric CUSUM filter that detects when cumulative deviations exceed a threshold in either direction. This function implements a cumulative sum filter that tracks both positive and negative deviations from zero, triggering events when the accumulated sum crosses the specified threshold. The design maintains two running sums, one for upward movements and one for downward movements, updating both at each time point by adding the current value and applying bounds. The upward sum is bounded below by zero and triggers an event when it exceeds the positive threshold, while the downward sum is bounded above by zero and triggers an event when it falls below the negative threshold. When either threshold is crossed, the corresponding sum resets to zero and the date is recorded as an event. This symmetric approach detects both significant upward and downward movements in the series, making it useful for identifying regime changes, volatility spikes, or other notable market events. The returned DatetimeIndex contains all dates where events were triggered, which can be used for event-driven labeling in machine learning or for filtering trading signals to focus on significant market movements.

File: pypm/indicators.py

def calculate_simple_moving_average(series: pd.Series, n: int=20) - pd.Series:
    return series.rolling(n).mean()

Calculates the simple moving average over a specified window period by taking the arithmetic mean of the most recent n values. This function applies a rolling window of size n to the input series and computes the average at each point, producing a smoothed version of the original data that filters out short-term fluctuations. The design uses pandas rolling window functionality for efficient computation, with the window size defaulting to 20 periods which is a common choice for technical analysis. The resulting series has NaN values for the first n-1 points where insufficient data exists to compute the average. This indicator is fundamental to many trading strategies and serves as a building block for more complex indicators like MACD and Bollinger Bands.

def calculate_simple_moving_sample_stdev(series: pd.Series, n: int=20) - pd.Series:
    return series.rolling(n).std()

Calculates the rolling sample standard deviation over a specified window period to measure price volatility. This function applies a rolling window of size n to the input series and computes the standard deviation at each point, quantifying how much prices fluctuate around their moving average. The design uses pandas rolling window functionality with the sample standard deviation calculation that divides by n-1 for unbiased estimation. The window size defaults to 20 periods to match the typical moving average period. The resulting series has NaN values for the first n-1 points where insufficient data exists. This indicator is essential for volatility-based strategies and is used in constructing Bollinger Bands to create dynamic price envelopes.

def calculate_macd_oscillator(series: pd.Series,
    n1: int=5, n2: int=34) - pd.Series:
    assert n1 < n2, f’n1 must be less than n2’
    return calculate_simple_moving_average(series, n1) - \
        calculate_simple_moving_average(series, n2)

Calculates the Moving Average Convergence Divergence oscillator by measuring the difference between a short-term and long-term moving average. This momentum indicator subtracts a longer moving average from a shorter one to identify trend direction and strength, with positive values indicating upward momentum and negative values indicating downward momentum. The design validates that the short period n1 is less than the long period n2 to ensure meaningful calculation, then computes both moving averages and returns their difference. The default periods of 5 and 34 are faster than traditional MACD settings, providing more responsive signals for shorter-term trading. The oscillator crosses zero when the two moving averages converge, signaling potential trend changes. This indicator is widely used for generating trading signals based on momentum shifts and trend identification.

def calculate_bollinger_bands(series: pd.Series, n: int=20) - pd.DataFrame:
    sma = calculate_simple_moving_average(series, n)
    stdev = calculate_simple_moving_sample_stdev(series, n)
    return pd.DataFrame({
        ‘middle’: sma,
        ‘upper’: sma + 2 * stdev,
        ‘lower’: sma - 2 * stdev
    })

Calculates Bollinger Bands which create a volatility-based price envelope consisting of a middle band, upper band, and lower band. This indicator computes a simple moving average as the middle band, then adds and subtracts two standard deviations to create upper and lower bands that expand and contract with market volatility. The design calculates both the moving average and standard deviation over the same window period n, then constructs a dataframe with three columns representing the bands. The upper band is positioned two standard deviations above the moving average, while the lower band is two standard deviations below, creating a channel that contains approximately 95% of price movements under normal distribution assumptions. The bands widen during volatile periods and narrow during quiet periods, making them useful for identifying overbought and oversold conditions. This indicator is commonly used for mean-reversion strategies where prices touching the outer bands suggest potential reversals.

def calculate_money_flow_volume_series(df: pd.DataFrame) - pd.Series:
    mfv = df[’volume’] * (2*df[’close’] - df[’high’] - df[’low’]) / \
                                    (df[’high’] - df[’low’])
    return mfv

Calculates the raw money flow volume series by weighting trading volume based on where the closing price falls within the day’s range. This function computes a volume-weighted measure that assigns positive values when the close is near the high and negative values when the close is near the low, indicating buying or selling pressure. The design uses the formula that multiplies volume by a ratio based on the relationship between close, high, and low prices, specifically (2*close — high — low) / (high — low). When the close equals the high, the ratio is +1 indicating maximum buying pressure, when the close equals the low, the ratio is -1 indicating maximum selling pressure, and when the close is at the midpoint, the ratio is 0. This raw series serves as the foundation for calculating money flow indicators that aggregate these values over time to assess accumulation and distribution patterns.

def calculate_money_flow_volume(df: pd.DataFrame, n: int=20) - pd.Series:
    return calculate_money_flow_volume_series(df).rolling(n).sum()

Calculates the cumulative money flow volume over a rolling window by summing the raw money flow values. This function aggregates the money flow volume series over the specified period n to measure the net buying or selling pressure over that timeframe. The design applies a rolling sum to the money flow volume series, accumulating positive and negative flows to produce a measure of whether accumulation or distribution is dominant. Positive values indicate net buying pressure with closes consistently near the highs, while negative values indicate net selling pressure with closes near the lows. This aggregated measure provides the numerator for the Chaikin Money Flow indicator and helps identify sustained trends in buying or selling activity.

def calculate_chaikin_money_flow(df: pd.DataFrame, n: int=20) - pd.Series:
    return calculate_money_flow_volume(df, n) / df[’volume’].rolling(n).sum()

Calculates the Chaikin Money Flow indicator which normalizes money flow volume by total volume to measure buying and selling pressure. This oscillator divides the cumulative money flow volume by the total volume over the same period, producing a ratio that ranges approximately from -1 to +1. The design computes both the numerator (money flow volume) and denominator (total volume) over the same rolling window n, then divides them to create a normalized measure independent of absolute volume levels. Values near +1 indicate strong buying pressure with closes consistently near the highs, values near -1 indicate strong selling pressure with closes near the lows, and values near 0 indicate balanced buying and selling. This indicator is particularly useful for confirming trends and identifying divergences where price moves in one direction while money flow moves in another, potentially signaling reversals.

File: pypm/labels.py

def compute_triple_barrier_labels(
    price_series: pd.Series, 
    event_index: pd.Series, 
    time_delta_days: int, 
    upper_delta: float=None, 
    lower_delta: float=None, 
    vol_span: int=20, 
    upper_z: float=None,
    lower_z: float=None,
    upper_label: int=1, 
    lower_label: int=-1) - Tuple[pd.Series, pd.Series]:
    timedelta = pd.Timedelta(days=time_delta_days)
    series = pd.Series(np.log(price_series.values), index=price_series.index)
    labels = list()
    label_dates = list()
    if upper_z or lower_z:
        volatility = series.ewm(span=vol_span).std()
        volatility *= np.sqrt(time_delta_days / vol_span)
    for event_date in event_index:
        date_barrier = event_date + timedelta
        start_price = series.loc[event_date]
        log_returns = series.loc[event_date:date_barrier] - start_price
        candidates: List[Tuple[int, pd.Timestamp]] = list()
        if upper_delta:
            _date = log_returns[log_returns  upper_delta].first_valid_index()
            if _date:
                candidates.append((upper_label, _date))
    
        if lower_delta:
            _date = log_returns[log_returns < lower_delta].first_valid_index()
            if _date:
                candidates.append((lower_label, _date))
        # Add the first upper_z and lower_z to candidates
        if upper_z:
            upper_barrier = upper_z * volatility[event_date]
            _date = log_returns[log_returns  upper_barrier].first_valid_index()
            if _date:
                candidates.append((upper_label, _date))
        if lower_z:
            lower_barrier = lower_z * volatility[event_date]
            _date = log_returns[log_returns < lower_barrier].first_valid_index()
            if _date:
                candidates.append((lower_label, _date))
        if candidates:
            label, label_date = min(candidates, key=lambda x: x[1])
        else:
            label, label_date = 0, date_barrier
        labels.append(label)
        label_dates.append(label_date)
    label_series = pd.Series(labels, index=event_index)
    event_spans = pd.Series(label_dates, index=event_index)
    return label_series, event_spans

Generates classification labels for machine learning using the triple-barrier method which assigns labels based on which of three barriers is touched first during a holding period. This function implements a sophisticated labeling approach for financial time series that addresses the problem of defining meaningful target variables for supervised learning in trading strategies. The design accepts a price series and event dates, then for each event it sets up three barriers: an upper barrier representing a profit target, a lower barrier representing a stop loss, and a vertical time barrier representing the maximum holding period. The barriers can be specified either as fixed log-return thresholds using upper_delta and lower_delta, or as volatility-adjusted thresholds using upper_z and lower_z multiplied by the exponentially weighted volatility. For each event, the function monitors price movements within the time window and determines which barrier is touched first, assigning a label of +1 for upper barrier (profit), -1 for lower barrier (loss), or 0 for time barrier (neutral). The volatility adjustment scales barriers based on recent price fluctuations, making them adaptive to changing market conditions and preventing unrealistic targets during low volatility periods or too-easy targets during high volatility. The function returns two series: one containing the labels indexed by event start dates for use as training targets, and one containing the event end dates indexed by start dates for calculating sample weights based on label overlap. This labeling methodology is superior to simple forward-return labeling because it incorporates realistic trading constraints like holding periods and stop losses, produces more balanced class distributions, and enables meta-labeling approaches where the model predicts whether a signal will be profitable rather than just price direction.

File: pypm/metrics.py

def calculate_return_series(series: pd.Series) - pd.Series:
    shifted_series = series.shift(1, axis=0)
    return series / shifted_series - 1

Calculates the simple return series by computing the percentage change between consecutive values in a time series. This function shifts the series by one period to align each value with its previous value, then calculates the ratio minus one to produce returns. The design assumes the series is in date-ascending order and uses pandas shift operation for efficient computation. The first value in the returned series will always be NaN since there is no previous value to compare against. This return calculation is the foundation for many performance metrics including volatility and Sharpe ratio, providing a normalized measure of price changes that can be compared across different assets and time periods.

def calculate_log_return_series(series: pd.Series) - pd.Series:
    shifted_series = series.shift(1, axis=0)
    return pd.Series(np.log(series / shifted_series))

Calculates the logarithmic return series by computing the natural log of the ratio between consecutive values. This function shifts the series by one period, calculates the ratio of current to previous values, then applies the natural logarithm to produce log returns. The design uses logarithmic returns instead of simple returns because they have superior mathematical properties including time additivity where multi-period returns can be calculated by simply summing single-period log returns. Log returns are also more symmetric around zero and better suited for statistical analysis and modeling. The first value will be NaN due to the shift operation. This metric is preferred for many financial calculations including volatility estimation and risk-adjusted performance measures.

def calculate_percent_return(series: pd.Series) - float:
    return series.iloc[-1] / series.iloc[0] - 1

Calculates the total percentage return over the entire period by comparing the first and last values in the series. This function assumes the series is in date-ascending order and computes the simple return from start to finish by dividing the final value by the initial value and subtracting one. The design provides a quick summary metric showing overall performance without considering the path taken or time elapsed. This measure is useful for comparing total returns across different strategies or assets but doesn’t account for risk, volatility, or the time value of money.

def get_years_past(series: pd.Series) - float:
    start_date = series.index[0]
    end_date = series.index[-1]
    return (end_date - start_date).days / 365.25

Calculates the number of years elapsed between the first and last dates in a time series for use in annualization calculations. This function extracts the first and last dates from the series index, computes the difference in days, then divides by 365.25 to account for leap years. The design provides accurate time period measurement needed for annualizing returns, volatility, and other performance metrics. Using 365.25 days per year ensures proper handling of leap years over long time periods.

def calculate_cagr(series: pd.Series) - float:
    start_price = series.iloc[0]
    end_price = series.iloc[-1]
    value_factor = end_price / start_price
    year_past = get_years_past(series)
    return (value_factor ** (1 / year_past)) - 1

Calculates the Compound Annual Growth Rate which represents the annualized return assuming constant growth over the period. This function computes the ratio of ending value to starting value, then raises it to the power of one divided by years elapsed to determine the equivalent constant annual growth rate. The design normalizes returns across different time periods, enabling fair comparison of strategies with different durations. CAGR is superior to simple average returns because it accounts for compounding effects and provides the actual geometric mean return. This metric is essential for evaluating long-term investment performance and comparing strategies with different time horizons.

def calculate_annualized_volatility(return_series: pd.Series) - float:
    years_past = get_years_past(return_series)
    entries_per_year = return_series.shape[0] / years_past
    return return_series.std() * np.sqrt(entries_per_year)

Calculates the annualized standard deviation of returns as a measure of risk and price fluctuation. This function computes the standard deviation of the return series, then scales it to annual terms by multiplying by the square root of the number of periods per year. The design works for any interval of date-indexed returns whether daily, weekly, or monthly by automatically determining the number of entries per year from the time span. The square root scaling follows from the statistical property that variance scales linearly with time while standard deviation scales with the square root of time. This metric quantifies the uncertainty or risk in returns and is essential for risk-adjusted performance measures like the Sharpe ratio.

def calculate_sharpe_ratio(price_series: pd.Series, 
    benchmark_rate: float=0) - float:
    cagr = calculate_cagr(price_series)
    return_series = calculate_return_series(price_series)
    volatility = calculate_annualized_volatility(return_series)
    return (cagr - benchmark_rate) / volatility

Calculates the Sharpe ratio which measures risk-adjusted return by comparing excess return to volatility. This function computes the CAGR as the return measure, calculates annualized volatility as the risk measure, then divides the excess return above the benchmark by the volatility. The design defaults to a benchmark rate of zero representing a risk-free rate assumption, but can accept any benchmark for comparison. The Sharpe ratio provides a standardized measure of how much return is earned per unit of risk taken, enabling comparison across strategies with different risk profiles. Higher Sharpe ratios indicate better risk-adjusted performance, with values above 1.0 generally considered good and above 2.0 considered excellent.

def calculate_rolling_sharpe_ratio(price_series: pd.Series,
    n: float=20) - pd.Series:
    rolling_return_series = calculate_return_series(price_series).rolling(n)
    return rolling_return_series.mean() / rolling_return_series.std()

Computes an approximation of the Sharpe ratio on a rolling basis for use as a position preference metric. This function calculates returns, applies a rolling window of size n, then divides the rolling mean return by the rolling standard deviation at each point. The design provides a time-varying measure of risk-adjusted performance that can be used to dynamically size positions or rank trading opportunities. Unlike the standard Sharpe ratio, this rolling version is not annualized and serves as a relative ranking metric rather than an absolute performance measure. Higher values indicate periods of better risk-adjusted returns, making this useful for preferring positions with more favorable recent performance characteristics.

def calculate_annualized_downside_deviation(return_series: pd.Series,
    benchmark_rate: float=0) - float:
    years_past = get_years_past(return_series)
    entries_per_year = return_series.shape[0] / years_past
    adjusted_benchmark_rate = ((1+benchmark_rate) ** (1/entries_per_year)) - 1
    downside_series = adjusted_benchmark_rate - return_series
    downside_sum_of_squares = (downside_series[downside_series  0] ** 2).sum()
    denominator = return_series.shape[0] - 1
    downside_deviation = np.sqrt(downside_sum_of_squares / denominator)
    return downside_deviation * np.sqrt(entries_per_year)

Calculates the downside deviation which measures volatility of negative returns for use in the Sortino ratio. This function computes a risk measure that only considers returns below a benchmark threshold, providing a more relevant risk metric for investors who care primarily about downside volatility rather than total volatility. The design adjusts the annualized benchmark rate to match the data frequency, calculates deviations below this threshold, squares only the negative deviations, then annualizes the result using the square root of periods per year. Unlike standard deviation which penalizes both upside and downside volatility equally, downside deviation focuses exclusively on unfavorable outcomes. This asymmetric risk measure is more aligned with investor preferences since upside volatility is generally desirable while downside volatility represents true risk.

def calculate_sortino_ratio(price_series: pd.Series,
    benchmark_rate: float=0) - float:
    cagr = calculate_cagr(price_series)
    return_series = calculate_return_series(price_series)
    downside_deviation = calculate_annualized_downside_deviation(return_series)
    return (cagr - benchmark_rate) / downside_deviation

Calculates the Sortino ratio which measures risk-adjusted return using downside deviation instead of total volatility. This function computes CAGR as the return measure and downside deviation as the risk measure, then divides excess return by downside risk. The design provides a more appropriate risk-adjusted metric than the Sharpe ratio for strategies with asymmetric return distributions or when investors care primarily about downside risk. By penalizing only negative volatility, the Sortino ratio better reflects the true risk-return tradeoff for strategies that may have high upside volatility but controlled downside risk. Higher Sortino ratios indicate better downside risk-adjusted performance.

def calculate_pure_profit_score(price_series: pd.Series) - float:
    cagr = calculate_cagr(price_series)
    t: np.ndarray = np.arange(0, price_series.shape[0]).reshape(-1, 1)
    regression = LinearRegression().fit(t, price_series)
    r_squared = regression.score(t, price_series)
    return cagr * r_squared

Calculates the pure profit score which combines return and trend consistency by multiplying CAGR by the R-squared of a linear regression. This function fits a linear regression to the price series using time as the predictor, then multiplies the CAGR by the R-squared value to reward strategies with smooth, consistent trends. The design penalizes strategies with high volatility or erratic price movements even if they achieve good returns, favoring equity curves that follow a steady upward trajectory. The R-squared component measures how well a straight line fits the price series, with values near 1.0 indicating consistent trends and values near 0.0 indicating random or choppy movements. This metric is particularly useful for identifying strategies with sustainable, predictable growth patterns rather than those with volatile or luck-driven returns.

def calculate_jensens_alpha(return_series: pd.Series, 
    benchmark_return_series: pd.Series) - float: 
    df = pd.concat([return_series, benchmark_return_series], sort=True, axis=1)
    df = df.dropna()
    clean_returns: pd.Series = df[df.columns.values[0]]
    clean_benchmarks = pd.DataFrame(df[df.columns.values[1]])
    regression = LinearRegression().fit(clean_benchmarks, y=clean_returns)
    return regression.intercept_

Calculates Jensen’s alpha which measures the excess return of a strategy after adjusting for market risk exposure. This function performs a linear regression of the strategy returns against benchmark returns to determine the alpha (intercept) and beta (slope), with alpha representing the return attributable to skill rather than market exposure. The design joins the two return series along their date indices, removes any NaN values to ensure clean data, then fits a linear regression where the benchmark returns predict the strategy returns. The intercept from this regression is Jensen’s alpha, representing the average return above or below what would be expected given the strategy’s beta exposure to the market. Positive alpha indicates outperformance after adjusting for systematic risk, while negative alpha indicates underperformance. This metric is superior to simple excess returns because it accounts for the strategy’s risk profile.

def calculate_jensens_alpha_v2(return_series: pd.Series) - float: 
    spy_data = load_spy_data()
    benchmark_return_series = calculate_log_return_series(spy_data[’close’])
    return calculate_jensens_alpha(return_series, benchmark_return_series)

Calculates Jensen’s alpha using the S&P 500 as the default benchmark, automatically loading SPY data for convenience. This wrapper function loads the SPY ETF price data, calculates its log return series, then delegates to calculate_jensens_alpha to compute the alpha. The design provides a simplified interface when the benchmark is always the S&P 500, eliminating the need to manually load and prepare benchmark data. However, this function can be slow if called repeatedly due to repeated data loading, so it’s best used for one-off calculations rather than in tight loops or optimization routines.

def calculate_drawdown_series(series: pd.Series, method: str=’log’) - pd.Series:
    assert method in DRAWDOWN_EVALUATORS, \
        f’Method “{method}” must by one of {list(DRAWDOWN_EVALUATORS.keys())}’
    evaluator = DRAWDOWN_EVALUATORS[method]
    return evaluator(series, series.cummax())

Calculates the drawdown series showing the decline from peak at each point in time using the specified calculation method. This function computes the running maximum (cummax) to track the highest value seen so far, then applies the selected evaluator function to measure the drawdown from that peak. The design supports three methods: dollar drawdown measuring absolute price decline, percent drawdown measuring relative decline as a fraction, and log drawdown using logarithmic differences for better statistical properties. The function validates that the method is one of the supported options before proceeding. The returned series shows how far below the peak the price is at each point, with higher values indicating larger drawdowns. This time series is useful for visualizing drawdown patterns and identifying periods of significant losses.

def calculate_max_drawdown(series: pd.Series, method: str=’log’) - float:
    return calculate_drawdown_series(series, method).max()

Calculates the maximum drawdown as a single scalar value representing the worst peak-to-trough decline. This function computes the entire drawdown series using the specified method, then returns the maximum value which represents the largest loss from any peak. The design provides a simple summary statistic of the worst-case drawdown experienced, which is a key risk metric for evaluating strategies. Maximum drawdown is particularly important for understanding the potential for large losses and is often used in risk management and position sizing decisions.

def calculate_max_drawdown_with_metadata(series: pd.Series, 
    method: str=’log’) - Dict[str, Any]:
    assert method in DRAWDOWN_EVALUATORS, \
        f’Method “{method}” must by one of {list(DRAWDOWN_EVALUATORS.keys())}’
    evaluator = DRAWDOWN_EVALUATORS[method]

This Substack is reader-supported. To receive new posts and support my work, consider becoming a free or paid subscriber.

    max_drawdown = 0
    local_peak_date = peak_date = trough_date = series.index[0]
    local_peak_price = peak_price = trough_price = series.iloc[0]
    for date, price in series.iteritems():
        if price  local_peak_price:
            local_peak_date = date
            local_peak_price = price
        drawdown = evaluator(price, local_peak_price)
        if drawdown  max_drawdown:
            max_drawdown = drawdown
            peak_date = local_peak_date
            peak_price = local_peak_price
            trough_date = date
            trough_price = price
    return {
        ‘max_drawdown’: max_drawdown,
        ‘peak_date’: peak_date,
        ‘peak_price’: peak_price,
        ‘trough_date’: trough_date,
        ‘trough_price’: trough_price
    }

Calculates the maximum drawdown along with detailed metadata about when and where it occurred, returning a dictionary with peak and trough information. This function iterates through the entire price series tracking the running maximum and computing drawdowns at each point, storing the dates and prices when the maximum drawdown is observed. The design validates the calculation method, initializes tracking variables for local peaks and global maximum drawdown, then updates these as it encounters new peaks or deeper drawdowns. When a new maximum drawdown is found, it records both the peak date and price where the decline started, and the trough date and price where the maximum loss was reached. The returned dictionary contains five fields: max_drawdown as the scalar loss value, peak_date and peak_price identifying when and where the decline began, and trough_date and trough_price identifying when and where the maximum loss occurred. This rich metadata is valuable for understanding the context and duration of the worst drawdown period, enabling deeper analysis of risk characteristics.

def calculate_log_max_drawdown_ratio(series: pd.Series) - float:
    log_drawdown = calculate_max_drawdown(series, method=’log’)
    log_return = np.log(series.iloc[-1]) - np.log(series.iloc[0])
    return log_return - log_drawdown

Calculates a risk-adjusted return metric by subtracting the maximum log drawdown from the total log return. This function computes the logarithmic return from start to finish, calculates the maximum log drawdown, then returns their difference to measure return after accounting for the worst drawdown. The design provides a simple metric that rewards strategies with high returns and low drawdowns, penalizing those with large losses even if they eventually recover. Higher values indicate better risk-adjusted performance with strong returns and controlled drawdowns.

def calculate_calmar_ratio(series: pd.Series, years_past: int=3) - float:
    last_date = series.index[-1]
    three_years_ago = last_date - pd.Timedelta(days=years_past*365.25)
    series = series[series.index  three_years_ago]
    percent_drawdown = calculate_max_drawdown(series, method=’percent’)
    cagr = calculate_cagr(series)
    return cagr / percent_drawdown

Calculates the Calmar ratio which measures the ratio of CAGR to maximum percent drawdown over a specified time period, typically three years. This function filters the series to include only the most recent years_past years of data, computes the CAGR and maximum percent drawdown over that period, then divides return by drawdown. The design focuses on recent performance by default using a three-year lookback window, providing a risk-adjusted metric that is particularly relevant for evaluating current strategy performance. The Calmar ratio is similar to other risk-adjusted metrics but specifically uses maximum drawdown as the risk measure, making it intuitive for investors who think in terms of worst-case losses. Higher Calmar ratios indicate better return relative to maximum drawdown, with values above 1.0 generally considered good.

File: pypm/optimization.py

class OptimizationResult(object):

Encapsulates the results of a single optimization trial by storing both the input parameters and resulting performance metrics in a unified container. This class serves as a data structure that pairs each parameter combination tested during grid search with its corresponding performance outcomes. The design enforces a critical constraint that parameter names and performance metric names must not overlap, preventing ambiguity when combining these dictionaries. This validation occurs at initialization to catch naming conflicts early. The class provides a convenient property that merges parameters and performance into a single flat dictionary, which simplifies downstream analysis and dataframe construction. By separating the storage of inputs and outputs while providing easy access to both, this class maintains clear data organization throughout the optimization process while enabling efficient aggregation of results across multiple trials.

def __init__(self, parameters: Parameters, performance: Performance):
        assert len(parameters.keys() & performance.keys()) == 0, \
            ‘parameter name matches performance metric name’
        self.parameters = parameters
        self.performance = performance

Initializes an optimization result by storing the parameter configuration and corresponding performance metrics while validating that no naming conflicts exist between them. This constructor accepts a parameters dictionary containing the specific values used for this simulation run and a performance dictionary containing the resulting metrics like returns, Sharpe ratio, and drawdown. The design includes an assertion that checks for any overlapping keys between these two dictionaries, which would cause ambiguity when later combining them into a single flat structure. This validation is critical because the class’s primary purpose is to enable seamless merging of parameters and performance for tabular analysis. By enforcing this constraint at initialization, the class prevents subtle bugs that could arise from parameter names shadowing metric names or vice versa.

def as_dict(self) - Dict[str, float]:
        return {**self.parameters, **self.performance}

Returns a unified dictionary containing both parameters and performance metrics merged into a single flat structure. This property provides convenient access to all data associated with this optimization trial in a format that’s ideal for creating dataframe rows or performing comparisons. The design leverages Python’s dictionary unpacking syntax to merge the two separate dictionaries efficiently. Because the constructor validates that no key collisions exist between parameters and performance, this merge operation is guaranteed to be safe and unambiguous. The resulting dictionary maps both parameter names and metric names to their respective numeric values, enabling straightforward conversion to dataframe rows where each column represents either an input parameter or an output metric.

class GridSearchOptimizer(object):

Implements a comprehensive grid search optimization framework for systematically exploring parameter spaces and evaluating trading strategy performance across all combinations. This class provides a complete solution for parameter optimization by accepting a simulation function and parameter ranges, then exhaustively testing every possible combination while tracking timing statistics and collecting results. The design separates concerns between simulation execution, result storage, and analysis capabilities. During optimization, it generates the Cartesian product of all parameter ranges to ensure complete coverage of the search space, executes the simulation function for each combination, and stores both parameters and performance metrics in structured result objects. The class includes sophisticated progress reporting that displays elapsed time, average simulation duration, and estimated time remaining, helping users monitor long-running optimizations. After completion, it offers multiple analysis methods including statistical summaries, sorting by performance metrics, and various visualization options ranging from simple histograms to interactive 3D surface plots. The plotting functionality intelligently dispatches to appropriate visualization types based on the number of parameters and metrics specified, supporting 1D histograms for metric distributions, 2D line or violin plots for single-parameter sweeps, and 3D mesh plots for exploring interactions between two parameters. This comprehensive toolkit enables efficient parameter tuning and deep understanding of how different configurations affect strategy performance.

def __init__(self, simulation_function: SimFunction):
        self.simulate = simulation_function
        self._results_list: List[OptimizationResult] = list()
        self._results_df = pd.DataFrame()
        self._optimization_finished = False

Initializes the grid search optimizer with a simulation function and sets up internal data structures for storing results. This constructor accepts a callable simulation function that must take parameter values as keyword arguments and return a performance metrics dictionary. The design initializes three key internal state variables: a list to accumulate optimization results as they’re generated, an empty dataframe that will be lazily populated when results are first accessed, and a flag tracking whether optimization has completed. The results list stores OptimizationResult objects that pair each parameter combination with its performance outcomes. The dataframe remains empty until explicitly requested through the results property, implementing lazy evaluation to avoid unnecessary computation if results aren’t needed. The completion flag serves as a guard to prevent accessing analysis methods before optimization runs, ensuring methods that depend on results can safely assume data exists.

def add_results(self, parameters: Parameters, performance: Performance):
        _results = OptimizationResult(parameters, performance)
        self._results_list.append(_results)

Appends a new optimization result to the internal results list by creating an OptimizationResult object from the provided parameters and performance metrics. This method serves as the primary interface for accumulating results during the optimization loop, encapsulating the creation of result objects and maintaining the results collection. The design wraps the raw parameter and performance dictionaries in an OptimizationResult object, which validates that no naming conflicts exist and provides convenient access to the combined data. By centralizing result addition through this method, the class maintains consistent data structures and ensures all results undergo the same validation process.

def optimize(self, **optimization_ranges: SimKwargs):
        assert optimization_ranges, ‘Must provide non-empty parameters.’
        param_ranges = {k: list(v) for k, v in optimization_ranges.items()}
        self.param_names = param_names = list(param_ranges.keys())
        n = total_simulations = np.prod([len(r) for r in param_ranges.values()])
        total_time_elapsed = 0
        print(f’Starting simulation ...’)
        print(f’Simulating 1 / {n} ...’, end=’\r’)
        for i, params in enumerate(product(*param_ranges.values())):
            if i  0:
                _avg = avg_time = total_time_elapsed / i
                _rem = remaining_time = (n - (i + 1)) * avg_time
                s =  f’Simulating {i+1} / {n} ... ‘
                s += f’{_rem:.0f}s remaining ({_avg:.1f}s avg)’
                s += ‘ ‘*8
                print(s, end=’\r’)
            timer_start = default_timer()
            parameters = {n: param for n, param in zip(param_names, params)}
            results = self.simulate(**parameters)
            self.add_results(parameters, results)
            timer_end = default_timer()
            total_time_elapsed += timer_end - timer_start 
        print(f’Simulated {total_simulations} / {total_simulations} ...’)
        print(f’Elapsed time: {total_time_elapsed:.0f}s’)
        print(f’Done.’)
        self._optimization_finished = True

Executes the complete grid search optimization by systematically testing every combination of parameter values and collecting performance results with detailed progress tracking. This method accepts parameter ranges as keyword arguments where each argument name corresponds to a parameter name and its value is an iterable of values to test for that parameter. The design first converts all iterables to lists for consistent indexing and calculates the total number of simulations by taking the product of all range lengths. It then uses itertools.product to generate the Cartesian product of all parameter ranges, ensuring exhaustive coverage of the parameter space. For each combination, it constructs a parameter dictionary by zipping parameter names with values, executes the simulation function with these parameters, and stores the results. The method includes sophisticated progress reporting that updates on each iteration, displaying the current simulation number, total count, estimated time remaining based on average simulation duration, and the average time per simulation. This real-time feedback is crucial for long-running optimizations that may test hundreds or thousands of parameter combinations. The method tracks total elapsed time across all simulations and sets a completion flag when finished, enabling downstream methods to verify that optimization has run before attempting to access results.

def _assert_finished(self):
        assert self._optimization_finished, \
            ‘Run self.optimize before accessing this method.’

Validates that optimization has completed before allowing access to result-dependent methods, raising an assertion error with a helpful message if called prematurely. This internal guard method serves as a precondition check for all analysis and visualization methods that require optimization results to exist. The design uses a simple assertion on the completion flag that was set at the end of the optimize method, providing clear feedback to users who attempt to access results before running optimization. By centralizing this check in a single method, the class avoids code duplication across multiple public methods while maintaining consistent error messaging.

def results(self) - pd.DataFrame:
        self._assert_finished()
        if self._results_df.empty:
            _results_list = self._results_list
            self._results_df = pd.DataFrame([r.as_dict for r in _results_list])
            _columns = set(list(self._results_df.columns.values))
            _params = set(self.param_names)
            self.metric_names = list(_columns - _params)
        return self._results_df

Returns a pandas DataFrame containing all optimization results with parameters and performance metrics as columns, constructing it lazily on first access. This property provides the primary interface for accessing optimization results in a tabular format suitable for analysis, filtering, and visualization. The design implements lazy evaluation by checking if the dataframe is empty and only constructing it when first requested, avoiding unnecessary computation if results aren’t needed. Construction involves iterating through the results list and converting each OptimizationResult’s combined dictionary into a dataframe row. The method also determines which columns are performance metrics by computing the set difference between all column names and parameter names, storing this information for later use in analysis methods. This cached dataframe is returned on subsequent accesses, ensuring the potentially expensive conversion from list to dataframe only happens once.

def print_summary(self):
        df = self.results
        metric_names = self.metric_names
        print(’Summary statistics’)
        print(df[metric_names].describe().T)

Displays comprehensive summary statistics for all performance metrics across all optimization trials, including count, mean, standard deviation, min, max, and quartiles. This method provides a quick overview of the distribution of results, helping users understand the range and central tendencies of performance across the entire parameter space. The design extracts only the performance metric columns from the results dataframe, applies pandas’ describe method to compute standard statistical measures, and transposes the output for better readability with metrics as rows. This summary is particularly valuable for understanding the variability in strategy performance and identifying whether certain metrics show consistent patterns or high variance across different parameter combinations.

def get_best(self, metric_name: str) - pd.DataFrame:
        self._assert_finished()
        results = self.results
        param_names = self.param_names
        metric_names = self.metric_names
        assert metric_name in metric_names, ‘Not a performance metric’
        partial_df = self.results[param_names+[metric_name]]
        return partial_df.sort_values(metric_name, ascending=False)

Returns a sorted DataFrame showing parameter combinations ranked by a specified performance metric in descending order, with only relevant columns included. This method enables quick identification of the best-performing parameter configurations according to any chosen metric such as excess CAGR, Sharpe ratio, or maximum drawdown. The design validates that the requested metric name exists in the performance metrics, then creates a filtered dataframe containing only the parameter columns and the target metric column. It sorts this dataframe by the metric in descending order so the best-performing combinations appear first. This focused view eliminates clutter from other metrics and makes it easy to see which parameter values correlate with superior performance, facilitating both manual inspection and programmatic extraction of optimal configurations.

def plot_1d_hist(self, x, show=True):
        self.results.hist(x)
        if show:
            plt.show()

Generates a histogram showing the distribution of values for a single column, typically used to visualize the frequency distribution of a performance metric across all trials. This method creates a simple histogram plot that reveals the shape of the distribution, helping identify whether results are normally distributed, skewed, or multimodal. The design delegates to pandas’ built-in histogram functionality and optionally displays the plot immediately based on the show parameter, allowing users to either view it interactively or save it for later use. This visualization is particularly useful for understanding the range and concentration of metric values when exploring large parameter spaces.

def plot_2d_line(self, x, y, show=True, **filter_kwargs):
        _results = self.results
        for k, v in filter_kwargs.items():
            _results = _results[getattr(_results, k) == v]
        ax = _results.plot(x, y)
        if filter_kwargs:
            k_str = ‘, ‘.join([f’{k}={v}’ for k,v in filter_kwargs.items()])
            ax.legend([f’{x} ({k_str})’])
        if show:
            plt.show()

Creates a 2D line plot showing the relationship between a parameter and a performance metric, with optional filtering to hold other parameters constant. This method visualizes how a single parameter affects performance while controlling for other variables, enabling focused analysis of individual parameter effects. The design accepts filter keyword arguments that specify fixed values for other parameters, applying these filters to the results dataframe before plotting. It creates a line plot with the parameter on the x-axis and the metric on the y-axis, and if filters were applied, adds a legend indicating which parameter values were held constant. This visualization is essential for understanding parameter sensitivity and identifying optimal ranges for individual parameters while accounting for interactions with other variables.

def plot_2d_violin(self, x, y, show=True):
        x_values = self.results[x].unique()
        x_values.sort()
        y_by_x = OrderedDict([(v, []) for v in x_values])
        for _, row in self.results.iterrows():
            y_by_x[row[x]].append(row[y])
        fig, ax = plt.subplots()
        ax.violinplot(dataset=list(y_by_x.values()), showmedians=True)
        ax.set_xlabel(x)
        ax.set_ylabel(y)
        ax.set_xticks(range(0, len(y_by_x)+1))
        ax.set_xticklabels([’‘] + list(y_by_x.keys()))
        if show:
            plt.show()

Creates violin plots showing the distribution of a performance metric for each value of a parameter, revealing both central tendency and variance across parameter settings. This method is particularly valuable when multiple trials exist for each parameter value, as it displays the full distribution shape rather than just summary statistics. The design groups all y-values by their corresponding x-values, creating an ordered dictionary that maps each unique parameter value to a list of metric values observed at that parameter setting. It then generates violin plots for each group, which show the probability density of the metric at different values, along with median markers. The x-axis shows parameter values while the y-axis shows the metric, with each violin’s width indicating the frequency of results at that metric level. This visualization excels at revealing whether certain parameter values produce consistent results or high variance, and whether distributions are symmetric or skewed.

def plot_3d_mesh(self, x, y, z, show=True, **filter_kwargs):
        _results = self.results
        fig = plt.figure()
        ax = Axes3D(fig)
        for k, v in filter_kwargs.items():
            _results = _results[getattr(_results, k) == v]
        X, Y, Z = [getattr(_results, attr) for attr in (x, y, z)]
        ax.plot_trisurf(X, Y, Z, cmap=cm.jet, linewidth=0.2)
        ax.set_xlabel(x)
        ax.set_ylabel(y)
        ax.set_zlabel(z)
        if show:
            plt.show()

Generates an interactive 3D surface plot showing how a performance metric varies with two parameters simultaneously, revealing complex interactions and optimal regions in the parameter space. This method creates a triangulated surface mesh where the x and y axes represent two different parameters and the z axis shows the resulting performance metric, with color mapping to enhance depth perception. The design accepts filter keyword arguments to hold any additional parameters constant, ensuring the visualization focuses on just two parameters at a time. It extracts the relevant columns from the results dataframe, creates a 3D axes object, and renders a surface using triangulation to handle potentially irregular grid spacing. The color mapping uses the jet colormap to provide visual cues about metric values, while thin lines delineate the mesh structure. This visualization is invaluable for identifying parameter interactions where the optimal value of one parameter depends on the value of another, and for locating global optima or performance plateaus in the two-parameter subspace.

def plot(self, *attrs: Tuple[str], show=True, 
        **filter_kwargs: Dict[str, Any]):
        self._assert_finished()
        param_names = self.param_names
        metric_names = self.metric_names
        if len(attrs) == 3:
            assert attrs[0] in param_names and attrs[1] in param_names, \
                ‘First two positional arguments must be parameter names.’
            assert attrs[2] in metric_names, \
                ‘Last positional argument must be a metric name.’
            assert len(filter_kwargs) + 2 == len(param_names), \
                ‘Must filter remaining parameters. e.g. p_three=some_number.’
            self.plot_3d_mesh(*attrs, show=show, **filter_kwargs)
        elif len(attrs) == 2:
            if len(param_names) == 1 or filter_kwargs:
                self.plot_2d_line(*attrs, show=show, **filter_kwargs)
            elif len(param_names)  1:
                self.plot_2d_violin(*attrs, show=show)
        elif len(attrs) == 1:
            self.plot_1d_hist(*attrs, show=show)
        else:
            raise ValueError(’Must pass between one and three column names.’)

Intelligently dispatches to the appropriate plotting function based on the number of attributes provided and the optimization context, automatically selecting between histogram, line, violin, or 3D mesh visualizations. This method serves as a convenient high-level interface that eliminates the need to explicitly choose visualization types, instead inferring the user’s intent from the arguments. The design implements a decision tree based on the number of positional arguments: one attribute triggers a histogram showing metric distribution, two attributes trigger either a line plot if only one parameter was optimized or filters are provided, or a violin plot if multiple parameters were optimized without filters, and three attributes trigger a 3D mesh plot showing two parameters and one metric. For the three-attribute case, it validates that the first two arguments are parameter names and the third is a metric name, and requires that all other parameters be filtered to specific values. This intelligent dispatching simplifies the user experience by automatically selecting appropriate visualizations while maintaining flexibility through optional filter arguments that control which data subsets to display.

File: pypm/portfolio.py

def _pdate(date: pd.Timestamp):
    return date.strftime(DATE_FORMAT_STR)

Formats a pandas Timestamp into a human-readable date string for display purposes, stripping away time information to show only the date portion. This utility function provides consistent date formatting throughout the portfolio reporting system, converting timestamps into a standardized string representation that includes the abbreviated weekday, abbreviated month, day number, and full year. The design uses a module-level format string constant to ensure uniform date presentation across all position summaries and reports.

class Position(object):

Represents a single long stock position with complete lifecycle tracking from entry through exit, maintaining price history and calculating performance metrics. This class encapsulates all data and operations related to buying and holding a fixed number of shares of a single asset, enforcing constraints that prevent short positions, variable share counts, and same-day trading. The design treats initialization as a buy operation and the exit method as a sell operation, creating a clear two-phase lifecycle for each position. Throughout the position’s lifetime, it records intermediate price updates to build a complete price series, enabling accurate portfolio valuation at any point in time. The class maintains both entry and exit information including dates and prices, calculates derived metrics like percent return and dollar profit, and provides a value series for equity curve construction. It implements caching for the pandas Series representation of price data to optimize performance when the series is accessed multiple times without changes. The position enforces uniqueness through a hash function based on entry date and symbol, preventing duplicate positions from being recorded in the portfolio history.

def __init__(self, symbol: Symbol, entry_date: pd.Timestamp,
                 entry_price: Dollars, shares: int):
        self.entry_date = entry_date
        assert entry_price  0, ‘Cannot buy asset with zero or negative price.’
        self.entry_price = entry_price
        assert shares  0, ‘Cannot buy zero or negative shares.’
        self.shares = shares
        self.symbol = symbol
        self.exit_date: pd.Timestamp = None
        self.exit_price: Dollars = None
        self.last_date: pd.Timestamp = None
        self.last_price: Dollars = None
        self._dict_series: Dict[pd.Timestamp, Dollars] = OrderedDict()
        self.record_price_update(entry_date, entry_price)
        self._price_series: pd.Series = None
        self._needs_update_pd_series: bool = True

Initializes a new position by recording the entry details and validating that the purchase parameters are valid, effectively executing a buy operation. This constructor accepts the stock symbol, entry date, entry price, and number of shares, then stores these values while enforcing critical business rules. The design validates that the entry price is positive to prevent nonsensical zero or negative price purchases, and ensures the share count is positive to prevent zero-share or short positions. It initializes exit-related fields to None since the position starts in an active state, and sets up tracking fields for the most recent price update to enable quick portfolio valuation. The constructor creates an ordered dictionary to maintain the chronological price history and records the initial entry price as the first data point. It also initializes caching infrastructure for the pandas Series representation of price data, using a flag to track when the cache needs invalidation. This initialization establishes all the state needed to track the position throughout its lifecycle from entry through intermediate price updates to eventual exit.

def exit(self, exit_date, exit_price):
        assert self.entry_date != exit_date, ‘Churned a position same-day.’
        assert not self.exit_date, ‘Position already closed.’
        self.record_price_update(exit_date, exit_price)
        self.exit_date = exit_date
        self.exit_price = exit_price

Closes the position by recording the exit date and price, effectively executing a sell operation while enforcing trading constraints. This method accepts the exit date and price, validates that the position hasn’t already been closed and that the exit doesn’t occur on the same day as entry, then records the final price update and marks the position as closed. The design prevents same-day trading by asserting the exit date differs from entry date, avoiding unrealistic churning behavior. It also ensures positions can only be exited once by checking that no exit date has been previously set. The method delegates to record_price_update to add the exit price to the price history before storing the exit details, maintaining consistency in how price data is tracked throughout the position’s lifecycle.

def record_price_update(self, date, price):
        self.last_date = date
        self.last_price = price
        self._dict_series[date] = price
        self._needs_update_pd_series = True

Records a price observation at a specific date to build the position’s price history, updating tracking fields and invalidating cached data structures. This method serves as the central mechanism for maintaining the chronological price series throughout the position’s lifetime, whether called during initialization, intermediate updates, or exit. The design updates the last_date and last_price fields to enable quick access to the most recent valuation without scanning the entire price history. It adds the new price to the ordered dictionary that maintains the complete chronological record, ensuring dates remain in sequence. The method also invalidates the cached pandas Series representation by setting a flag, ensuring that subsequent accesses to the price_series property will regenerate the Series with the updated data rather than returning stale cached values.

def price_series(self) - pd.Series:
        if self._needs_update_pd_series or self._price_series is None:
            self._price_series = pd.Series(self._dict_series)
            self._needs_update_pd_series = False
        return self._price_series

Returns a cached pandas Series containing the complete price history of the position, lazily constructing it from the internal dictionary only when needed or when data has changed. This property implements a caching strategy to optimize performance by avoiding repeated conversions from dictionary to Series when the underlying data hasn’t changed. The design checks both the invalidation flag and whether the cache exists, regenerating the Series only if either condition indicates stale or missing cached data. After regeneration, it clears the invalidation flag to prevent unnecessary recomputation on subsequent accesses. This approach balances the convenience of the pandas Series interface with the efficiency of maintaining data in dictionary form during updates.

def last_value(self) - Dollars:
        return self.last_price * self.shares

Calculates the current dollar value of the position by multiplying the most recent price by the number of shares held. This property provides quick access to the position’s current valuation without requiring iteration through the price history or conversion to pandas Series. The design leverages the last_price field that’s maintained by record_price_update to enable constant-time value calculation.

def is_active(self) - bool:
        return self.exit_date is None

Indicates whether the position is currently open by checking if an exit date has been recorded. This property returns True for positions that have been entered but not yet exited, enabling filtering and status checks throughout the simulation. The design uses the presence or absence of an exit date as the definitive indicator of position status.

def is_closed(self) - bool:
        return not self.is_active

Indicates whether the position has been closed by returning the logical inverse of is_active. This property provides a more semantically clear way to check for closed positions compared to negating is_active in calling code. The design simply inverts the active status rather than duplicating the exit date check.

def value_series(self) - pd.Series:
        assert self.is_closed, ‘Position must be closed to access this property’
        return self.shares * self.price_series[:-1]

Returns a pandas Series showing the dollar value of the position over time, excluding the exit date to avoid double-counting in equity curve calculations. This property multiplies the price series by the constant share count to convert from per-share prices to total position values. The design requires the position to be closed before accessing this property, ensuring all price data has been recorded. It excludes the final exit price by slicing off the last element, preventing the exit value from appearing in the equity curve where it would overlap with the cash received from the sale. This series is essential for constructing accurate equity curves that show portfolio value evolution over time.

def percent_return(self) - float:
        return (self.exit_price / self.entry_price) - 1

Calculates the percentage return of the position by comparing exit price to entry price. This property computes the simple return as the ratio of exit to entry price minus one, expressing the gain or loss as a decimal fraction. The design provides a standard measure of position performance that’s independent of position size or holding period.

def entry_value(self) - Dollars:
        return self.shares * self.entry_price

Calculates the total dollar amount invested when entering the position by multiplying entry price by share count. This property represents the initial capital committed to this position, useful for calculating returns and analyzing capital allocation.

def exit_value(self) - Dollars:
        return self.shares * self.exit_price

Calculates the total dollar amount received when exiting the position by multiplying exit price by share count. This property represents the capital returned from closing the position, used in profit calculations and cash flow tracking.

def change_in_value(self) - Dollars:
        return self.exit_value - self.entry_value

Calculates the absolute dollar profit or loss from the position by subtracting entry value from exit value. This property provides the net dollar gain or loss realized from the trade, offering a complementary view to percent_return that shows actual dollar impact on portfolio value.

def trade_length(self):
        return len(self._dict_series) - 1

Returns the number of days the position was held by counting price updates minus one. This property calculates holding period by examining the length of the price history dictionary, subtracting one because the entry and exit dates both appear in the series but represent a single holding period. The design provides a simple measure of trade duration for analysis and reporting.

def print_position_summary(self):
        _entry_date = _pdate(self.entry_date)
        _exit_date = _pdate(self.exit_date)
        _days = self.trade_length
        _entry_price = round(self.entry_price, 2)
        _exit_price = round(self.exit_price, 2)
        _entry_value = round(self.entry_value, 2)
        _exit_value = round(self.exit_value, 2)
        _return = round(100 * self.percent_return, 1)
        _diff = round(self.change_in_value, 2)
        print(f’{self.symbol:<5}     Trade summary’)
        print(f’Date:     {_entry_date} - {_exit_date} [{_days} days]’)
        print(f’Price:    ${_entry_price} - ${_exit_price} [{_return}%]’)
        print(f’Value:    ${_entry_value} - ${_exit_value} [${_diff}]’)
        print()

Displays a formatted summary of the position’s key metrics including dates, prices, values, and returns in a human-readable format. This method extracts all relevant position data, formats it for display with appropriate rounding and units, then prints a structured summary showing the trade’s entry and exit details along with performance metrics. The design formats dates using the _pdate utility function for consistent presentation, rounds prices and values to two decimal places for readability, and converts the percent return to a percentage for intuitive interpretation. The output includes the symbol, date range with holding period, price movement with percentage change, and value change with dollar profit or loss, providing a complete at-a-glance view of the trade’s performance.

def __hash__(self):
        return hash((self.entry_date, self.symbol))

Generates a unique hash for the position based on entry date and symbol to enable set membership and duplicate detection. This method defines position uniqueness as the combination of when and what was traded, aligning with the constraint that prevents multiple positions in the same symbol entered on the same date. The design hashes the tuple of entry_date and symbol, enabling positions to be stored in sets and checked for duplicates efficiently. This hash function supports the portfolio history’s duplicate prevention logic by providing a consistent identity for each position.

class PortfolioHistory(object):

Manages the complete history of portfolio positions and cash balances throughout a trading simulation, computing comprehensive performance metrics and generating visualizations. This class serves as the central repository for all trading activity, maintaining chronological records of closed positions and cash levels while providing access to derived time series like equity curves and performance statistics. The design separates data collection during simulation from analysis after completion, using a finish flag to enforce that certain computations only occur once all trading is complete. It stores closed positions in a list for chronological access and a set for duplicate detection, tracks cash balances over time in a dictionary, and lazily loads benchmark data for comparison. The class computes multiple time series including cash, portfolio value, total equity, and log returns, all derived from the underlying position and cash histories. It provides numerous performance metrics as properties including returns, CAGR, volatility, Sharpe ratio, maximum drawdown, and Jensen’s alpha, comparing portfolio performance against the S&P 500 benchmark. The visualization capabilities include equity curve plots, cash and portfolio value charts, and benchmark comparisons showing how the strategy performed relative to a simple buy-and-hold approach. This comprehensive tracking and analysis infrastructure enables thorough evaluation of trading strategies across multiple dimensions of performance.

def __init__(self):
        self.position_history: List[Position] = []
        self._logged_positions: Set[Position] = set()
        self.last_date: pd.Timestamp = pd.Timestamp.min
        self._cash_history: Dict[pd.Timestamp, Dollars] = dict()
        self._simulation_finished = False
        self._spy: pd.DataFrame = pd.DataFrame()
        self._spy_log_returns: pd.Series = pd.Series()

Initializes an empty portfolio history with data structures ready to record positions and cash balances throughout the simulation. This constructor sets up lists and dictionaries for tracking trading activity, initializes the last seen date to the minimum timestamp to ensure any real date will be later, and prepares empty containers for computed time series and benchmark data. The design creates both a list for maintaining chronological position order and a set for efficient duplicate detection, initializes a dictionary for cash history tracking, sets the simulation finished flag to False to prevent premature access to analysis methods, and creates empty dataframes for benchmark data that will be lazily loaded when needed.

def add_to_history(self, position: Position):
        _log = self._logged_positions
        assert not position in _log, ‘Recorded the same position twice.’
        assert position.is_closed, ‘Position is not closed.’
        self._logged_positions.add(position)
        self.position_history.append(position)
        self.last_date = max(self.last_date, position.last_date)

Adds a closed position to the portfolio history after validating it hasn’t been recorded before and is properly closed. This method serves as the primary interface for logging completed trades, enforcing data integrity by checking that the position is unique and in a closed state. The design uses a set to track previously logged positions for efficient duplicate detection, preventing the same position from being recorded multiple times. It validates that the position has an exit date before adding it to the history, ensuring only complete trades appear in the records. After validation, it adds the position to both the set for duplicate checking and the list for chronological access, then updates the last seen date to track the simulation’s temporal progress.

def record_cash(self, date, cash):
        self._cash_history[date] = cash
        self.last_date = max(self.last_date, date)

Records the cash balance at a specific date to build the cash history time series. This method maintains a dictionary mapping dates to cash amounts, enabling reconstruction of how available capital changed throughout the simulation. The design updates the last seen date to track temporal progress, ensuring the portfolio history knows the full extent of the simulation period.

def _as_oseries(d: Dict[pd.Timestamp, Any]) - pd.Series:
        return pd.Series(d).sort_index()

Converts a dictionary with timestamp keys into a sorted pandas Series for chronological time series representation. This static utility method provides a consistent way to transform the internal dictionary representations into pandas Series objects with dates in ascending order. The design sorts by index to ensure all time series maintain chronological ordering regardless of the insertion order in the source dictionary.

def _compute_cash_series(self):
        self._cash_series = self._as_oseries(self._cash_history)

Constructs the cash time series from the cash history dictionary by converting it to a sorted pandas Series. This internal method is called during the finish process to prepare the cash data for analysis and visualization. The design delegates to the _as_oseries utility to ensure consistent chronological ordering.

def cash_series(self) - pd.Series:
        return self._cash_series

Returns the time series of cash balances throughout the simulation showing how available capital changed over time. This property provides read-only access to the computed cash series that was constructed during the finish process. The design assumes finish has been called, making the cash series available for analysis and plotting.

def _compute_portfolio_value_series(self):
        value_by_date = defaultdict(float)
        last_date = self.last_date
        for position in self.position_history:
            for date, value in position.value_series.items():
                value_by_date[date] += value
        for date in self.cash_series.index:
            value_by_date[date] += 0
        self._portfolio_value_series = self._as_oseries(value_by_date)

Constructs the portfolio value time series by aggregating the value of all positions across all dates. This internal method iterates through every position’s value series and accumulates the values by date using a defaultdict, then ensures all dates from the cash series are represented even if no positions were held. The design handles the complexity of overlapping positions by summing values at each date, creates a complete date range by adding zero to dates that appear in cash but not in positions, and converts the accumulated dictionary to a sorted pandas Series for chronological representation.

def portfolio_value_series(self):
        return self._portfolio_value_series

Returns the time series showing the total value of all held positions throughout the simulation. This property provides the market value of the portfolio’s stock holdings over time, excluding cash. The design computes this series during finish by aggregating all position values, enabling analysis of how capital was deployed in the market.

def _compute_equity_series(self):
        c_series = self.cash_series
        p_series = self.portfolio_value_series
        assert all(c_series.index == p_series.index), \
            ‘portfolio_series has dates not in cash_series’
        self._equity_series = c_series + p_series

Constructs the total equity time series by adding cash and portfolio value at each date. This internal method combines the two fundamental components of portfolio value to create the complete equity curve showing total account value over time. The design validates that both series have identical date indices to ensure proper alignment, then performs element-wise addition to compute total equity at each point.

def equity_series(self):
        return self._equity_series

Returns the time series of total portfolio equity combining cash and position values. This property provides the complete equity curve showing how total account value evolved throughout the simulation. The design sums cash and portfolio value to give the total capital available at each point in time.

def _compute_log_return_series(self):
        self._log_return_series = \
            metrics.calculate_log_return_series(self.equity_series)

Constructs the log return time series from the equity curve for use in performance metric calculations. This internal method delegates to the metrics module to compute logarithmic returns, which are preferred for many statistical calculations due to their mathematical properties. The design transforms the equity series into returns that can be used for volatility, Sharpe ratio, and other risk-adjusted metrics.

def log_return_series(self):
        return self._log_return_series

Returns the time series of logarithmic returns calculated from the equity curve. This property provides returns in log form which are used for various performance metrics including volatility and Jensen’s alpha. The design computes this during finish to make returns available for all subsequent metric calculations.

def _assert_finished(self):
        assert self._simulation_finished, \
            ‘Simulation must be finished by running self.finish() in order ‘ + \
            ‘to access this method or property.’

Validates that the simulation has completed and finish has been called before allowing access to computed metrics and series. This internal guard method prevents accessing properties that depend on complete data before all computations are done. The design provides a clear error message directing users to call finish before attempting to access analysis methods.

def finish(self):
        self._simulation_finished = True
        self._compute_cash_series()
        self._compute_portfolio_value_series()
        self._compute_equity_series()
        self._compute_log_return_series()
        self._assert_finished()

Marks the simulation as complete and computes all derived time series and metrics from the collected position and cash histories. This method serves as the transition point from data collection to analysis, triggering computation of cash series, portfolio value series, equity series, and log returns. The design enforces that this method must be called before accessing any analysis properties, ensuring all computations are performed once with complete data rather than incrementally during simulation. After calling this method, all performance metrics and visualization methods become available.

def compute_portfolio_size_series(self) - pd.Series:
        size_by_date = defaultdict(int)
        for position in self.position_history:
            for date in position.value_series.index:
                size_by_date[date] += 1
        return self._as_oseries(size_by_date)

Calculates a time series showing the number of active positions held on each date throughout the simulation. This method iterates through all positions and counts how many were active on each date by examining their value series indices. The design uses a defaultdict to accumulate counts and converts the result to a sorted pandas Series for chronological representation. This metric helps analyze how capital was distributed across positions over time.

def spy(self):
        if self._spy.empty:
            first_date = self.cash_series.index[0]
            _spy = data_io.load_spy_data()
            self._spy = _spy[_spy.index  first_date]
        return self._spy

Returns S&P 500 price data starting from the simulation’s first date for benchmark comparison. This property lazily loads SPY data only when needed, filtering it to match the simulation’s timeframe. The design caches the loaded data to avoid repeated file access, and ensures the benchmark starts at the same time as the portfolio for fair comparison.

def spy_log_returns(self):
        if self._spy_log_returns.empty:
            close = self.spy[’close’]
            self._spy_log_returns = metrics.calculate_log_return_series(close)
        return self._spy_log_returns

Returns the logarithmic return series for the S&P 500 benchmark calculated from closing prices. This property lazily computes log returns from the SPY price data when first accessed, caching the result for subsequent use. The design provides returns in the same format as the portfolio’s log returns to enable direct comparison in metrics like Jensen’s alpha.

def percent_return(self):
        return metrics.calculate_percent_return(self.equity_series)

Calculates the total percentage return of the portfolio from start to finish. This property delegates to the metrics module to compute simple return from the equity curve, expressing overall performance as a decimal fraction.

def spy_percent_return(self):
        return metrics.calculate_percent_return(self.spy[’close’])

Calculates the total percentage return of the S&P 500 benchmark over the same period. This property provides the buy-and-hold return for comparison against the portfolio’s active trading strategy.

def cagr(self):
        return metrics.calculate_cagr(self.equity_series)

Calculates the compound annual growth rate of the portfolio, annualizing returns to enable comparison across different time periods. This property delegates to the metrics module to compute CAGR from the equity curve, providing a time-normalized measure of performance.

def volatility(self):
        return metrics.calculate_annualized_volatility(self.log_return_series)

Calculates the annualized volatility of portfolio returns as a measure of risk. This property computes the standard deviation of log returns scaled to annual terms, quantifying how much the portfolio’s value fluctuated over time.

def sharpe_ratio(self):
        return metrics.calculate_sharpe_ratio(self.equity_series)

Calculates the Sharpe ratio measuring risk-adjusted returns by comparing excess returns to volatility. This property delegates to the metrics module to compute the ratio of average excess return to return standard deviation, providing a key measure of risk-adjusted performance.

def spy_cagr(self):
        return metrics.calculate_cagr(self.spy[’close’])

Calculates the compound annual growth rate of the S&P 500 benchmark for comparison. This property provides the annualized return of a simple buy-and-hold strategy in the market index.

def excess_cagr(self):
        return self.cagr - self.spy_cagr

Calculates the excess CAGR by subtracting the benchmark’s CAGR from the portfolio’s CAGR. This property quantifies how much additional annualized return the trading strategy generated compared to simply holding the S&P 500, providing a clear measure of alpha generation.

def jensens_alpha(self):
        return metrics.calculate_jensens_alpha(
            self.log_return_series,
            self.spy_log_returns,
        )

Calculates Jensen’s alpha measuring the portfolio’s excess return after adjusting for systematic market risk. This property delegates to the metrics module to compute alpha by comparing the portfolio’s returns to what would be expected given its beta exposure to the market, providing a risk-adjusted measure of skill.

This Substack is reader-supported. To receive new posts and support my work, consider becoming a free or paid subscriber.

def dollar_max_drawdown(self):
        return metrics.calculate_max_drawdown(self.equity_series, ‘dollar’)

Calculates the maximum peak-to-trough decline in dollar terms throughout the simulation. This property measures the largest absolute loss from a portfolio high to a subsequent low, quantifying worst-case capital loss.

def percent_max_drawdown(self):
        return metrics.calculate_max_drawdown(self.equity_series, ‘percent’)

Calculates the maximum peak-to-trough decline as a percentage of the peak value. This property measures the largest relative loss from a portfolio high, providing a normalized view of drawdown risk.

def log_max_drawdown_ratio(self):
        return metrics.calculate_log_max_drawdown_ratio(self.equity_series)

Calculates the ratio of maximum drawdown in logarithmic terms for a mathematically consistent risk measure. This property delegates to the metrics module to compute drawdown using log returns, providing a measure that’s consistent with other log-based calculations.

def number_of_trades(self):
        return len(self.position_history)

Returns the total count of positions taken throughout the simulation. This property simply counts the position history list, providing a basic measure of trading activity.

def average_active_trades(self):
        return self.compute_portfolio_size_series().mean()

Calculates the average number of positions held concurrently throughout the simulation. This property computes the mean of the portfolio size series, indicating typical capital deployment across positions.

def final_cash(self):
        self._assert_finished()
        return self.cash_series[-1]

Returns the cash balance at the end of the simulation after all positions are closed. This property accesses the last value in the cash series, requiring that finish has been called first.

def final_equity(self):
        self._assert_finished()
        return self.equity_series[-1]

Returns the total portfolio value at the end of the simulation including cash and any remaining positions. This property accesses the last value in the equity series, providing the final account value after all trading.

def get_performance_metric_data(self) - PerformancePayload:
        props = self._PERFORMANCE_METRICS_PROPS
        return {prop: getattr(self, prop) for prop in props}

Returns a dictionary containing all performance metrics with property names as keys and computed values as values. This method iterates through the predefined list of performance metric property names, retrieves each value using getattr, and constructs a dictionary suitable for optimization analysis or reporting. The design provides a convenient way to extract all metrics at once for use in grid search optimization or batch analysis.

def print_position_summaries(self):
        for position in self.position_history:
            position.print_position_summary()

Prints detailed summaries for all positions in the portfolio history by iterating through each position and calling its print method. This method provides a complete trade-by-trade breakdown showing entry and exit details for every position taken during the simulation.

def print_summary(self):
        self._assert_finished()
        s = f’Equity: ${self.final_equity:.2f}\n’ \
            f’Percent Return: {100 * self.percent_return:.2f}%\n’ \
            f’S&P 500 Return: {100 * self.spy_percent_return:.2f}%\n\n’ \
            f’Number of trades: {self.number_of_trades}\n’ \
            f’Average active trades: {self.average_active_trades:.2f}\n\n’ \
            f’CAGR: {100 * self.cagr:.2f}%\n’ \
            f’S&P 500 CAGR: {100 * self.spy_cagr:.2f}%\n’ \
            f’Excess CAGR: {100 * self.excess_cagr:.2f}%\n\n’ \
            f’Annualized Volatility: {100 * self.volatility:.2f}%\n’ \
            f’Sharpe Ratio: {self.sharpe_ratio:.2f}\n’ \
            f’Jensen\’s Alpha: {self.jensens_alpha:.6f}\n\n’ \
            f’Dollar Max Drawdown: ${self.dollar_max_drawdown:.2f}\n’ \
            f’Percent Max Drawdown: {100 * self.percent_max_drawdown:.2f}%\n’ \
            f’Log Max Drawdown Ratio: {self.log_max_drawdown_ratio:.2f}\n’
        print(s)

Displays a comprehensive formatted summary of all key portfolio performance metrics including returns, risk measures, and trading statistics. This method constructs a multi-line string containing final equity, percent returns for both portfolio and benchmark, trade counts, CAGR metrics, volatility, Sharpe ratio, Jensen’s alpha, and maximum drawdown measures. The design formats all values with appropriate precision and units, presenting a complete performance overview in a readable format. The method requires that finish has been called to ensure all metrics are available.

def plot(self, show=True) - plt.Figure:
        self._assert_finished()
figure, axes = plt.subplots(nrows=3, ncols=1)
        figure.tight_layout(pad=3.0)
        axes[0].plot(self.equity_series)
        axes[0].set_title(’Equity’)
        axes[0].grid()
        axes[1].plot(self.cash_series)
        axes[1].set_title(’Cash’)
        axes[1].grid()
        axes[2].plot(self.portfolio_value_series)
        axes[2].set_title(’Portfolio Value’)
        axes[2].grid()
        if show:
            plt.show()
        return figure

Creates a three-panel visualization showing equity, cash, and portfolio value curves over time. This method generates a matplotlib figure with three vertically stacked subplots, each displaying one of the key time series with gridlines for readability. The design uses tight layout to prevent label overlap, plots each series in its own axes with appropriate titles, and optionally displays the figure immediately based on the show parameter. The method returns the figure object to enable further customization or saving. This visualization provides a comprehensive view of how total value, available cash, and invested capital evolved throughout the simulation.

def plot_benchmark_comparison(self, show=True) - plt.Figure:
        self._assert_finished()
        equity_curve = self.equity_series
        ax = equity_curve.plot()
        spy_closes = self.spy[’close’]
        initial_cash = self.cash_series[0]
        initial_spy = spy_closes[0]
        scaled_spy = spy_closes * (initial_cash / initial_spy)
        scaled_spy.plot()
        baseline = pd.Series(initial_cash, index=equity_curve.index)
        ax = baseline.plot(color=’black’)
        ax.grid()
        ax.legend([’Equity curve’, ‘S&P 500 portfolio’])
        if show:
            plt.show()

Creates a visualization comparing the portfolio’s equity curve against an equivalent investment in the S&P 500 and a baseline of initial capital. This method plots three lines: the portfolio equity curve, a scaled S&P 500 curve representing what would have happened with a buy-and-hold strategy using the same initial capital, and a horizontal line at the initial capital level showing the zero-return baseline. The design scales the S&P 500 prices to match the portfolio’s initial capital for fair comparison, adds a legend to identify each curve, includes gridlines for readability, and optionally displays the figure immediately. This visualization enables quick assessment of whether the trading strategy outperformed, underperformed, or matched the market benchmark.

File: pypm/signals.py

def create_macd_signal(series: pd.Series, n1: int=5, n2: int=34) - pd.Series:
    macd = calculate_macd_oscillator(series, n1, n2)
    macd_sign = np.sign(macd)
    macd_shifted_sign = macd_sign.shift(1, axis=0)
    return macd_sign * (macd_sign != macd_shifted_sign)

Generates momentum-based trading signals using the MACD crossover strategy by detecting when the MACD oscillator crosses the zero line. This function implements a classic momentum trading approach where buy signals are generated when the MACD crosses above zero indicating strengthening upward momentum, and sell signals are generated when it crosses below zero indicating weakening or reversing momentum. The design calculates the MACD oscillator using two exponential moving average periods, extracts the sign of each MACD value to determine whether it’s positive or negative, then compares the current sign to the previous period’s sign to detect crossovers. When a crossover occurs, the function returns a non-zero signal value whose sign indicates the direction: positive for bullish crossovers above zero and negative for bearish crossovers below zero. The multiplication of the current sign by the crossover boolean creates a signal that’s zero when no crossover occurs, +1 when crossing above zero, and -1 when crossing below zero. This approach provides clear entry and exit signals based on momentum changes while filtering out noise from minor MACD fluctuations that don’t cross the zero threshold.

def create_bollinger_band_signal(series: pd.Series, n: int=20) - pd.Series:
    bollinger_bands = calculate_bollinger_bands(series, n)
    sell = series  bollinger_bands[’upper’]
    buy = series < bollinger_bands[’lower’]
    return (1*buy - 1*sell)

Generates mean-reversion trading signals using Bollinger Bands by identifying when prices move outside the normal volatility range. This function implements a contrarian trading strategy based on the assumption that prices tend to revert to their mean after extreme movements. The design calculates Bollinger Bands with a specified lookback period, then generates buy signals when the price drops below the lower band indicating oversold conditions, and sell signals when the price rises above the upper band indicating overbought conditions. The strategy assumes that prices touching or crossing the outer bands represent temporary extremes that will likely reverse, creating profitable entry points for mean-reversion trades. The function returns a series of signal values where +1 indicates a buy signal triggered by price below the lower band, -1 indicates a sell signal triggered by price above the upper band, and 0 indicates no signal when price remains within the bands. This approach works best in ranging markets where prices oscillate around a stable mean, but may generate false signals in strongly trending markets where prices can remain outside the bands for extended periods.

File: pypm/simulation.py

class SimpleSimulator(object):

Implements a realistic backtesting simulation engine that executes trading strategies with position limits, transaction costs, and slippage modeling. This class provides a complete framework for testing trading strategies by processing historical price data, signal matrices, and preference rankings to simulate realistic portfolio management. The design enforces practical constraints including maximum concurrent positions, transaction fees, and market impact through slippage, while maintaining detailed records of all trades and portfolio values over time. The simulator operates by iterating through historical data chronologically, executing buy and sell decisions based on signals, managing position entries and exits with realistic cost modeling, and tracking cash and equity throughout the simulation. It supports position swapping where higher-preference opportunities can replace lower-preference holdings when position limits are reached, enabling dynamic portfolio rebalancing. The class integrates with the PortfolioHistory system to maintain complete records of all positions, cash flows, and performance metrics, ultimately producing comprehensive performance statistics and visualizations for strategy evaluation.

def __init__(self, initial_cash: float=10000, max_active_positions: int=5,
        percent_slippage: float=0.0005, trade_fee: float=1):
        self.initial_cash = self.cash = initial_cash
        self.max_active_positions: int = max_active_positions
        self.percent_slippage = percent_slippage
        self.trade_fee = trade_fee
        self.active_positions_by_symbol: Dict[Symbol, Position] = OrderedDict()
        self.portfolio_history = PortfolioHistory()

Initializes the trading simulator with specified capital constraints and transaction cost parameters. This constructor sets up the simulation environment by establishing the starting cash balance, position limits, and cost assumptions that will govern all trading activity. The design accepts initial capital that determines buying power, a maximum position count that limits diversification, a slippage percentage that models market impact and adverse price movement during execution, and a fixed transaction fee charged on each trade. It initializes the cash balance to the starting capital, creates an ordered dictionary to track active positions by symbol, and instantiates a PortfolioHistory object to record all trading activity and portfolio values throughout the simulation. The slippage parameter models the realistic cost of market orders where actual fill prices differ from closing prices, while the trade fee represents brokerage commissions or exchange fees. These cost parameters ensure the simulation reflects real-world trading conditions rather than idealized frictionless markets.

def active_positions_count(self):
        return len(self.active_positions_by_symbol)

Returns the current number of open positions in the portfolio. This property provides a quick count of active holdings by measuring the length of the positions dictionary, enabling checks against position limits and capacity calculations.

def free_position_slots(self):
        return self.max_active_positions - self.active_positions_count

Calculates the number of additional positions that can be opened before reaching the maximum position limit. This property subtracts the current position count from the maximum allowed, providing the available capacity for new trades. This value determines how capital should be allocated among new opportunities.

def active_symbols(self) - List[Symbol]:
        return list(self.active_positions_by_symbol.keys())

Returns a list of all symbols currently held in the portfolio. This property extracts the keys from the active positions dictionary, providing quick access to which assets are currently owned for iteration and decision-making during simulation.

def print_initial_parameters(self):
        s = f’Initial Cash: ${self.initial_cash} \n’ \
            f’Maximum Number of Assets: {self.max_active_positions}\n’
        print(s)
        return s

Displays the simulation’s initial configuration parameters including starting capital and position limits. This method formats and prints the key constraints that govern the simulation, providing transparency about the trading environment’s setup. The method returns the formatted string for potential logging or further use.

def make_tuple_lookup(columns) - Callable[[str, str], int]:
        tuple_lookup: Dict[Tuple[str, str], int] = { 
            col: i + 1 for i, col in enumerate(columns) 
        }
        return lambda symbol, metric: tuple_lookup[(symbol, metric)]

Creates a lookup function that maps hierarchical column indices to tuple positions for efficient data access during iteration. This static method addresses the performance optimization of using itertuples instead of iterrows by building a dictionary that translates symbol-metric pairs into integer indices. The design creates a mapping from each hierarchical column tuple to its position in the tuple row, then returns a lambda function that accepts a symbol and metric name and returns the corresponding index. This enables fast O(1) lookups during the simulation loop where accessing data by column name would be slower. The returned function allows code like _idx(symbol, ‘price’) to retrieve the tuple index for that symbol’s price column, making the iteration code both fast and readable.

def make_all_valid_lookup(_idx: Callable):
        return lambda row, symbol: (
            not pd.isna(row[_idx(symbol, ‘pref’)]) and \
            not pd.isna(row[_idx(symbol, ‘signal’)]) and \
            not pd.isna(row[_idx(symbol, ‘price’)])
        )

Creates a validation function that checks whether all required data fields are present and non-null for a given symbol at a specific time point. This static method returns a lambda function that uses the provided index lookup to check if preference, signal, and price data all exist and are not NaN for a symbol. The design ensures that trading decisions are only made when complete data is available, preventing errors from missing values or data gaps. The returned function takes a row tuple and symbol, then verifies that all three critical metrics have valid numeric values before allowing that symbol to be considered for trading.

def buy_to_open(self, symbol, date, price):
        cash_available = self.cash - self.trade_fee
        cash_to_spend = cash_available / self.free_position_slots
        
        purchase_price = (1 + self.percent_slippage) * price
        shares = cash_to_spend / purchase_price
        self.cash -= cash_to_spend + self.trade_fee
        assert self.cash = 0, ‘Spent cash you do not have.’
        self.portfolio_history.record_cash(date, self.cash)   
        positions_by_symbol = self.active_positions_by_symbol
        assert not symbol in positions_by_symbol, ‘Symbol already in portfolio.’        
        position = Position(symbol, date, purchase_price, shares)
        positions_by_symbol[symbol] = position

Executes a buy order to open a new position by allocating available capital, calculating shares to purchase with slippage, and recording the position. This method implements the complete logic for entering a new trade including capital allocation, cost modeling, and position tracking. The design first determines available cash after accounting for the trade fee, then divides this among free position slots to calculate how much to spend on this particular position. It applies slippage to the closing price to model adverse price movement during execution, calculates the number of shares that can be purchased including fractional shares, then deducts the total cost from available cash. The method validates that sufficient cash exists, records the cash balance change in portfolio history, creates a new Position object with the entry details, and adds it to the active positions dictionary. The equal capital allocation across free slots ensures balanced diversification when multiple positions are held simultaneously.

def sell_to_close(self, symbol, date, price):
        positions_by_symbol = self.active_positions_by_symbol
        position = positions_by_symbol[symbol]
        position.exit(date, price)
        sale_value = position.last_value * (1 - self.percent_slippage)
        self.cash += sale_value
        self.portfolio_history.record_cash(date, self.cash)
        self.portfolio_history.add_to_history(position)
        del positions_by_symbol[symbol]

Executes a sell order to close an existing position by recording the exit price, recovering cash with slippage, and moving the position to history. This method implements the complete logic for exiting a trade including position closure, cash recovery with costs, and record keeping. The design retrieves the active position for the symbol, calls its exit method to record the exit date and price, calculates the sale proceeds by applying slippage to reduce the position value, adds the recovered cash to the available balance, and records the updated cash level. It then adds the completed position to the portfolio history for performance tracking and removes it from the active positions dictionary. The method will raise a KeyError if attempting to close a position that doesn’t exist, providing clear error feedback for logic bugs. The slippage on the sale models the adverse price movement when executing market sell orders.

def _assert_equal_columns(*args: Iterable[pd.DataFrame]):
        column_names = set(args[0].columns.values)
        for arg in args[1:]:
            assert set(arg.columns.values) == column_names, \
                ‘Found unequal column names in input dataframes.’

Validates that all provided dataframes have identical column names to ensure data alignment during simulation. This static method checks that the price, signal, and preference dataframes all contain the same set of symbols, preventing errors from mismatched data. The design extracts column names from the first dataframe, then iterates through remaining dataframes asserting that each has the same column set. This validation catches data preparation errors before simulation begins, ensuring all required data is present for every symbol.

def simulate(self, price: pd.DataFrame, signal: pd.DataFrame, 
        preference: pd.DataFrame):
        self._assert_equal_columns(price, signal, preference)
        df = data_io.concatenate_metrics({
            ‘price’: price,
            ‘signal’: signal,
            ‘pref’: preference,
        })
        all_symbols = list(set(price.columns.values))
        _idx = self.make_tuple_lookup(df.columns)
        _all_valid = self.make_all_valid_lookup(_idx)
        active_positions_by_symbol = self.active_positions_by_symbol
        max_active_positions = self.max_active_positions
        for row in df.itertuples():
            date = row[0]
            symbols: List[str] = [s for s in all_symbols if _all_valid(row, s)]
            _active = self.active_symbols
            to_exit = [s for s in _active if row[_idx(s, ‘signal’)] == -1]
            for s in to_exit:
                sell_price = row[_idx(s, ‘price’)]
                self.sell_to_close(s, date, sell_price)
            to_buy = [
                s for s in symbols if \
                    row[_idx(s, ‘signal’)] == 1 and \
                    not s in active_positions_by_symbol
            ]
            to_buy.sort(key=lambda s: row[_idx(s, ‘pref’)], reverse=True)
            to_buy = to_buy[:max_active_positions]
            for s in to_buy:
                buy_price = row[_idx(s, ‘price’)]
                buy_preference = row[_idx(s, ‘pref’)]
                if self.active_positions_count < max_active_positions:
                    self.buy_to_open(s, date, buy_price)
                    continue
                _active = self.active_symbols
                active_prefs = [(s, row[_idx(s, ‘pref’)]) for s in _active]
                _min = min(active_prefs, key=lambda k: k[1])
                min_active_symbol, min_active_preference = _min
                if min_active_preference < buy_preference:
                    sell_price = row[_idx(min_active_symbol, ‘price’)]
                    self.sell_to_close(min_active_symbol, date, sell_price)
                    self.buy_to_open(s, date, buy_price)
            for s in self.active_symbols:
                price = row[_idx(s, ‘price’)]
                position = active_positions_by_symbol[s]
                position.record_price_update(date, price)
            self.portfolio_history.record_cash(date, self.cash)
        for s in self.active_symbols:
            self.sell_to_close(s, date, row[_idx(s, ‘price’)])
        self.portfolio_history.finish()

Executes the complete backtesting simulation by processing historical data chronologically and making trading decisions based on signals and preferences. This method orchestrates the entire simulation workflow from data preparation through final position closure and performance calculation. The design validates that all input dataframes have matching columns, combines them into a hierarchical dataframe for efficient iteration, creates lookup functions for fast data access, then iterates through each date executing the trading logic. On each date, it first identifies symbols with valid data, then processes sell signals by closing positions that have exit signals, followed by processing buy signals where it identifies candidates, sorts them by preference, and either opens new positions if slots are available or swaps lower-preference holdings for higher-preference opportunities. Throughout iteration, it updates price data for all active positions and records cash balances. After processing all historical data, it closes all remaining positions and calls finish on the portfolio history to compute final metrics. The use of itertuples provides significant performance benefits over iterrows, while the tuple lookup functions maintain code readability. The preference-based position swapping enables dynamic rebalancing where the portfolio continuously holds the most attractive opportunities within the position limit constraint.

File: pypm/weights.py

def calculate_uniqueness(event_spans: pd.Series, 
    price_index: pd.Series) - pd.Series:
    df = pd.DataFrame(0, index=price_index, columns=range(event_spans.shape[0]))
    for i, (event_start, event_end) in enumerate(event_spans.items()):
        df[i].loc[event_start:event_end] += 1
    avg_uniquenesses = list()
    for i, (event_start, event_end) in enumerate(event_spans.items()):
        concurrency: pd.Series = df.loc[event_start:event_end].sum(axis=1)
        avg_uniqueness = 1 / hmean(concurrency)
        avg_uniquenesses.append(avg_uniqueness)
    return pd.Series(avg_uniquenesses, index=event_spans.index)

Calculates uniqueness weights for labeled training samples based on temporal overlap between events, addressing the problem of label concurrency in time series machine learning. This function implements a weighting scheme that reduces the influence of training samples whose time periods overlap heavily with other samples, preventing the model from overfitting to periods that appear multiple times in the training data. The design accepts event_spans as a series mapping event start dates to end dates, and price_index as the complete timeline of dates, then constructs a binary matrix indicating which events are active on each date. For each event, it computes the concurrency level at each point in its span by summing how many other events are simultaneously active, then calculates the harmonic mean of these concurrency values and inverts it to produce a uniqueness score. Events that occur in isolation receive high uniqueness weights approaching 1.0, while events that heavily overlap with many others receive lower weights, ensuring that the model training process gives appropriate emphasis to each unique time period rather than over-weighting frequently represented periods. The returned series of uniqueness values is indexed by event start dates and can be used directly as sample weights in machine learning algorithms, though they may need standardization depending on the specific training framework. This weighting approach is particularly important for financial time series where labels often span multiple days and overlapping events can create information leakage or bias in model training.

File: pypm/ml_model/data_io.py

def load_data() - Tuple[List[str], pd.DataFrame, pd.DataFrame]:
	symbols: List[str] = data_io.get_all_symbols()
	alt_data = data_io.load_alternative_data_matrix(symbols)
	eod_data = data_io.load_eod_matrix(symbols)
	eod_data = eod_data[eod_data.index = alt_data.index.min()]
	return symbols, eod_data, alt_data

Load the data as is will be used in the alternative data model

File: pypm/ml_model/events.py

def calculate_events_for_revenue_series(series: pd.Series, 
    filter_threshold: float, lookback: int=365) - pd.DatetimeIndex:
    series = np.log(series)
    series = filters.calculate_non_uniform_lagged_change(series, lookback)
    return filters.calculate_cusum_events(series, filter_threshold)

Calculate the symmetric cusum filter to generate events on YoY changes in the log revenue series

File: pypm/ml_model/features.py

def calculate_features(price_series, revenue_series) - pd.DataFrame:
    log_revenue = np.log(revenue_series)
    log_prices = np.log(price_series)
    log_revenue_ma = _calc_ma(log_revenue, 10)
    log_prices_ma = _calc_ma(log_prices, 10)
    log_returns = _calc_log_return(price_series)
    features_by_name = dict()
    for i in [7, 30, 90, 180, 360]:
        rev_feature = _calc_delta(log_revenue_ma, i)
        price_feature = _calc_delta(log_prices_ma, i)
        vol_feature = _calc_rolling_vol(log_returns, i)
        features_by_name.update({
            f’{i}_day_revenue_delta’: rev_feature,
            f’{i}_day_return’: price_feature,
            f’{i}_day_vol’: vol_feature,
        })
    features_df = pd.DataFrame(features_by_name)    
    return features_df

Calculate any and all potentially useful features. Return as a dataframe.

File: pypm/ml_model/labels.py

def calculate_labels(price_series, event_index) - Tuple[pd.Series, pd.Series]:
    # Remove event that don’t have a proper chance to materialize
    time_delta_days = 90
    max_date = price_series.index.max()
    cutoff = max_date - pd.Timedelta(days=time_delta_days)
    event_index = event_index[event_index <= cutoff]
    # Use triple barrier method
    event_labels, event_spans = labels.compute_triple_barrier_labels(
        price_series,
        event_index,
        time_delta_days=time_delta_days,
        # upper_delta=0.10,
        # lower_delta=-0.10,
        upper_z=1.8,
        lower_z=-1.8,
        lower_label=-1,
    )
    return event_labels, event_spans

Calculate labels based on the triple barrier method. Return a series of event labels index by event start date, and return a series of event end dates indexed by event start date.

File: pypm/ml_model/model.py

def _fit_and_score(classifier, X, y, w, train_index, test_index, i) - float:
    X_train = X.iloc[train_index]
    X_test = X.iloc[test_index]
    y_train = y.iloc[train_index]
    y_test = y.iloc[test_index]
    w_train = w.iloc[train_index]
    w_test = w.iloc[test_index]
    classifier.fit(X_train, y_train, w_train)
    score = classifier.score(X_test, y_test, w_test)
    print(f’Finished {i} ({100*score:.1f}%)’)
    return score

The function used by joblib to split, train, and score cross-validations

def repeated_k_fold(classifier, X, y, w) - np.ndarray:
    n_jobs = N_JOBS
    n_splits = N_SPLITS
    n_repeats = N_REPEATS
    total_fits =  n_splits * n_repeats
    _k_fold = RepeatedKFold(n_splits=n_splits, n_repeats=n_repeats)
    print(f’Fitting {total_fits} models {n_jobs} at a time ...’)
    print()
    parallel = Parallel(n_jobs=n_jobs)
    scores = parallel(
        delayed(_fit_and_score)(
            clone(classifier), X, y, w, train_index, test_index, i
        ) for i, (train_index, test_index) in enumerate(_k_fold.split(X))
    )
    return np.array(scores)

Perform repeated k-fold cross-validation on a classifier. Spread fitting job over multiple computer cores.

def calculate_model(df: pd.DataFrame) - RandomForestClassifier:
    classifier = RandomForestClassifier(n_estimators=100)
    # Separate data
    predictor_columns = [
        c for c in df.columns.values if not c in (’y’, ‘weights’)
    ]
    X = df[predictor_columns]
    y = df[’y’]
    w = df[’weights’]
    # Fit cross-validation
    scores = repeated_k_fold(classifier, X, y, w)
    # Get a full dataset fit for importance scores
    classifier.fit(X, y, w)
    # Compute diagnostics
    _imp = classifier.feature_importances_
    importance_series = pd.Series(_imp, index=predictor_columns)
    importance_series = importance_series.sort_values(ascending=False)
    # baseline accuracy is the best value achievable with a constant guess
    baseline = np.max(y.value_counts() / y.shape[0])
    # Compute a rough confidence interval for the improvement
    mean_score = scores.mean()
    std_score = scores.std()
    upper_bound = mean_score + 2 * std_score
    lower_bound = mean_score - 2 * std_score
    ibounds = (lower_bound - baseline, upper_bound - baseline)
    print()
    print(’Feature importances’)
    for col, imp in importance_series.items():
        print(f’{col:24} {imp:.3f}’)
    print()
    print(’Cross validation scores’)
    print(np.round(100 * scores, 1))
    print()
    print(f’Baseline accuracy {100*baseline:.1f}%’)
    print(f’OOS accuracy {100*mean_score:.1f}% +/- {200 * scores.std():.1f}%’)
    print(f’Improvement {100*(ibounds[0]):.1f} to {100*(ibounds[1]):.1f}%’)
    print()
    return classifier

Given a dataframe with a y column, weights column, and predictor columns with arbitrary names, cross-validated and fit a classifier. Print diagnostics.

User's avatar

Continue reading this post for free, courtesy of Onepagecode.

Or purchase a paid subscription.
© 2025 Onepagecode · Privacy ∙ Terms ∙ Collection notice
Start your SubstackGet the app
Substack is the home for great culture