Skip to content
1 hidden cell
2 hidden cells
Break of structure strategy backtest
import yfinance as yf
# Download weekly BTC-USD data starting from 2020-01-01
btc_data_W = yf.download('BTC-USD', start='2018-01-01', interval='1wk')
# Download daily BTC-USD data starting from 2020-01-01
btc_data_D = yf.download('BTC-USD', start='2020-01-01', interval='1d')
# Download hourly BTC-USD data starting from 2023-02-01
btc_data_1h = yf.download('BTC-USD', start='2023-02-01', interval='1h')
btc_data_1h.index.rename('Date', inplace=True)df_btc=btc_data_D.copy()1 hidden cell
import pandas as pd
def identify_trend_shifts(df):
"""
Identify trend shifts in a stock price dataframe
"""
# Initial setup
df['trend'] = None
df['close_minmax'] = None
df['highest_high'] = None
df['lowest_low'] = None
start_date = df.iloc[0]['Date']
extrems_date = df.iloc[0]['Date']
close_minmax = df.iloc[0]['Close']
highest_high = df.iloc[0]['High']
lowest_low = df.iloc[0]['Low'] # Initialize lowest_low with the first row's Low value
trend = 'uptrend' # Initialize trend variable
# Iterate over the dataframe
for i in range(1, len(df)):
row = df.iloc[i]
# Determine the trend
if close_minmax < row['Close'] and trend == "downtrend":
trend = "uptrend"
highest_high = float('-inf')
close_minmax = float('-inf')
lowest_low = None
elif close_minmax > row['Close'] and trend == "uptrend":
trend = "downtrend"
highest_high = None
close_minmax = float('inf')
lowest_low = float('inf')
# Process the current row
if trend == 'uptrend':
if row["High"] > highest_high or highest_high is None:
highest_high = row["High"]
# Find the new minimum close price only after we got a new high, else keep the same things
for j in range(i-1, 0, -1):
if df.loc[j-1, 'Close'] > df.loc[j, 'Close'] and df.loc[j+1, 'Close'] > df.loc[j, 'Close']:
extrems_date = df.loc[j, "Date"]
close_minmax = df.loc[j, "Close"]
break
elif trend == 'downtrend':
if row["Low"] < lowest_low or lowest_low is None:
lowest_low = row["Low"]
for j in range(i-1, 0, -1):
if df.loc[j-1, 'Close'] < df.loc[j, 'Close'] and df.loc[j+1, 'Close'] < df.loc[j, 'Close']:
extrems_date = df.loc[j, "Date"]
close_minmax = df.loc[j, "Close"]
break
# Update the trend, close_minmax, and current_high columns
df.loc[i, 'trend'] = trend
df.loc[i, 'close_minmax'] = close_minmax
df.loc[i, 'highest_high'] = highest_high
df.loc[i, 'lowest_low'] = lowest_low
return df
# Example usage with a sample dataset
sample_data = btc_data_W.copy().reset_index()
df_sample_W = pd.DataFrame(sample_data)
df_sample_W = identify_trend_shifts(df_sample_W)
sample_data = btc_data_D.copy().reset_index()
df_sample_D = pd.DataFrame(sample_data)
df_sample_D = identify_trend_shifts(df_sample_D)
df_sample_W.head(),df_sample_D.head()# Check if 'Date' column exists in both dataframes
if 'Date' not in df_sample_W.columns or 'Date' not in df_sample_D.columns:
raise KeyError("The 'Date' column is missing from one of the dataframes")
# First, ensure that the 'Date' column in both dataframes is set as the index
df_sample_W.set_index('Date', inplace=True)
df_sample_D.set_index('Date', inplace=True)
# Shift the weekly trend by one period
df_sample_W['trend'] = df_sample_W['trend'].shift(1)
df_sample_D['trend'] = df_sample_D['trend'].shift(1)
# Merge the shifted weekly trend into the daily dataframe
df_sample_D = df_sample_D.merge(df_sample_W[['trend']], how='left', left_index=True, right_index=True, suffixes=('', '_weekly'))
df_sample_D['trend_weekly'] = df_sample_D['trend_weekly'].ffill()
# Reset the index if needed
df_sample_D.reset_index(inplace=True)
df_sample_Ddf_sample = df_sample_D.copy()
df_sample['trade'] = df_sample.apply(lambda row: 1 if row['trend'] == 'uptrend' and row['trend_weekly'] == 'uptrend' else (-1 if row['trend'] == 'downtrend' and row['trend_weekly'] == 'downtrend' else 0), axis=1)
#df_sample['trade'] = df_sample['trade'].shift(1).fillna(0)
df_sample.head()# Calculate buy and hold return
buy_hold = (df_sample.iloc[-1]['Close'] - df_sample.iloc[0]['Close']) / df_sample.iloc[0]['Close']
buy_hold_return = buy_hold * 100
df_sample['return'] = df_sample['Close'].pct_change()
df_sample['ROI'] = df_sample['return'] * df_sample['trade']
# Calculate the cumulative return for trades where trade is 1
df_sample['cumulative_return_mine'] = (1 + df_sample['ROI']).cumprod() -1
df_sample['cumulative_return_B&H'] = (1 + df_sample['return']).cumprod() -1
df_sample# Define the trading fee as a percentage (e.g., 0.1% fee per trade)
trading_fee_percentage = 0.001
# Calculate buy and hold return with fees
buy_hold_return_after_fees = (df_sample.iloc[-1]['Close'] * (1 - trading_fee_percentage) /
(df_sample.iloc[0]['Close'] * (1 + trading_fee_percentage)) - 1) * 100
# Apply fees only at transitions from 1 to 0 or from 0 to 1 (buy or sell)
df_sample['trade_shift'] = df_sample['trade'].shift(1).fillna(0)
df_sample['fee'] = df_sample.apply(lambda row: trading_fee_percentage if row['trade'] != row['trade_shift'] else 0, axis=1)
# Adjust ROI for fees
df_sample['ROI_after_fees'] = df_sample['ROI'] - df_sample['fee']
# Calculate the cumulative return for trades where trade is 1, after fees
df_sample['cumulative_return_mine_after_fees'] = (1 + df_sample['ROI_after_fees']).cumprod() - 1
df_sample.iloc[-1]import plotly.graph_objects as go
from plotly.subplots import make_subplots
# Create subplots
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1)
# Add traces for the first plot
fig.add_trace(go.Scatter(x=df_sample['Date'], y=df_sample['cumulative_return_mine'], mode='lines', name='Cumulative Return (Mine)'), row=1, col=1)
fig.add_trace(go.Scatter(x=df_sample['Date'], y=df_sample['cumulative_return_mine_after_fees'], mode='lines', name='Cumulative Return (Mine After Fees)', line=dict(dash='dash')), row=1, col=1)
fig.add_trace(go.Scatter(x=df_sample['Date'], y=df_sample['cumulative_return_B&H'], mode='lines', name='Cumulative Return (B&H)', line=dict(dash='dot')), row=1, col=1)
# Add trace for the second plot
fig.add_trace(go.Scatter(x=df_sample['Date'], y=df_sample['cumulative_return_B&H'], mode='lines', name='Cumulative Return (B&H)', line=dict(color='orange')), row=2, col=1)
# Update layout
fig.update_layout(
title='Cumulative Return: Mine vs Mine After Fees vs B&H',
xaxis_title='Date',
yaxis_title='Cumulative Return',
legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1),
height=800,
xaxis2_rangeslider=dict(visible=True) # Add the range slider
)
# Update x-axis for the second plot
fig.update_xaxes(title_text='Date', row=2, col=1)
# Show plot
fig.show()import matplotlib.pyplot as plt
# Work with a copy of df_sample
df_sample_copy = df_sample.copy()
# Ensure the 'Date' column is in datetime format
df_sample_copy['Date'] = pd.to_datetime(df_sample_copy['Date'])
# Set 'Date' as the index
df_sample_copy.set_index('Date', inplace=True)
# Resample the data to monthly frequency and calculate the monthly ROI
monthly_ROI = df_sample_copy['ROI'].resample('M').sum()
# Convert the monthly ROI to percentage
monthly_ROI = monthly_ROI * 100
# Reset the index to have 'Date' as a column again
monthly_ROI = monthly_ROI.reset_index()
# Plot the distribution of monthly ROI
plt.figure(figsize=(10, 6))
plt.hist(monthly_ROI['ROI'], bins=20, edgecolor='k', alpha=0.7)
plt.title('Distribution of Monthly ROI')
plt.xlabel('Monthly ROI (%)')
plt.ylabel('Frequency')
plt.grid(True)
plt.show()
# Display the monthly ROI
monthly_ROI# Count the number of non-null and non-zero fees
non_null_non_zero_fees = df_sample_copy['fee'].notnull() & (df_sample_copy['fee'] != 0)
fee_count = non_null_non_zero_fees.sum()
# Divide the count by 2
result = fee_count / 2
result# Generate the trend combinations with daily + _ + weekly
df_sample_copy['trend_combination'] = df_sample_copy['trend'] + '_' + df_sample_copy['trend_weekly']
# Define a function to calculate the average positive and negative return for each trend combination
def calculate_average_returns(df, trend_combination):
subset = df[df['trend_combination'] == trend_combination]
positive_returns = subset[subset['return'] > 0]['return']
negative_returns = subset[subset['return'] < 0]['return']
avg_positive_return = positive_returns.mean() if positive_returns.shape[0] > 0 else 0
avg_negative_return = negative_returns.mean() if negative_returns.shape[0] > 0 else 0
return avg_positive_return, avg_negative_return
# Get unique trend combinations
trend_combinations = df_sample_copy['trend_combination'].unique()
# Calculate the average positive and negative return for each trend combination
average_returns = {trend: calculate_average_returns(df_sample_copy, trend) for trend in trend_combinations}
# Display the results
average_returnsdf_sample_copy.reset_index(inplace=True)import plotly.graph_objects as go
# Create a new figure
fig = go.Figure()
# Plot the actual closing prices
fig.add_trace(go.Scatter(x=df_sample_copy['Date'], y=df_sample_copy['Close'], mode='lines', name='Actual Price', line=dict(color='blue')))
# Plot the trades
buy_signals = df_sample_copy[df_sample_copy['trade'] == 1]
sell_signals = df_sample_copy[df_sample_copy['trade'] == -1]
fig.add_trace(go.Scatter(x=buy_signals['Date'], y=buy_signals['Close'], mode='markers', name='Buy Signal', marker=dict(symbol='triangle-up', color='green', size=10)))
fig.add_trace(go.Scatter(x=sell_signals['Date'], y=sell_signals['Close'], mode='markers', name='Sell Signal', marker=dict(symbol='triangle-down', color='red', size=10)))
# Set the title and labels
fig.update_layout(
title='Actual Price vs Trades',
xaxis_title='Date',
yaxis_title='Price',
legend=dict(x=0, y=1)
)
# Show the plot
fig.show()2 hidden cells