Skip to content
pip install ccxt
Hidden output
import ccxt
import pandas as pd
import talib
from datetime import datetime
def fetch_all_candles(exchange, symbol, timeframe, start_date, end_date=None):
since = int(start_date.timestamp() * 1000)
if not end_date:
end_date = datetime.now()
end_timestamp = int(end_date.timestamp() * 1000)
all_candles = []
while True:
new_candles = exchange.fetch_ohlcv(symbol, timeframe, since=since)
if not new_candles:
break
if new_candles[-1][0] >= end_timestamp:
all_candles.extend([candle for candle in new_candles if candle[0] <= end_timestamp])
break
all_candles.extend(new_candles)
timeframe_ms = new_candles[1][0] - new_candles[0][0]
since = new_candles[-1][0] + timeframe_ms
df = pd.DataFrame(all_candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
return df
kucoin = ccxt.kucoin()
start_date = datetime(2023, 5, 1, 0, 0, 0)
end_date = datetime(2023, 10, 21, 0, 0, 0)
btc_usdt_df = fetch_all_candles(kucoin, 'BTC/USDT', '1h', start_date)
btc_usdt_df.to_csv('btc_usdt_data.csv')
display(btc_usdt_df)
def compute_tci(dataframe, n1, n2):
average_price = dataframe[['high', 'low', 'close']].mean(axis=1)
esa = talib.EMA(average_price, timeperiod=n1)
divergence = talib.EMA(abs(average_price - esa), timeperiod=n1)
channel_index = (average_price - esa) / (0.015 * divergence)
trend_channel_index = talib.EMA(channel_index, timeperiod=n2)
return trend_channel_index
def oscillator_change_in_direction(df, osc_column, buy_level, sell_level, first_signal=True):
"""
Filters signals based on an oscillator's change in direction (slope) when values are beyond defined thresholds.
Parameters:
- df (DataFrame): The data.
- osc_column (str): The column name of the oscillator.
- buy_level (float): Buy threshold.
- sell_level (float): Sell threshold.
- first_signal (bool): If True, only the first occurrence is considered within each group.
If False, all occurrences are considered.
Returns:
DataFrame with filtered signals.
"""
df_copy = df.copy()
df_copy[f'{osc_column}_dt'] = df_copy[osc_column].diff()
def get_filtered_df(df, condition_initial, level, signal_name, signal_value):
df[f'{signal_name}_signal_initial'] = condition_initial
if first_signal:
# Group by consecutive periods
df['group'] = (df[osc_column] >= level if signal_value == 'LONG' else df[osc_column] <= level).cumsum()
# Identify the first signal in each group
df[f'first_{signal_name}'] = df.groupby('group')[f'{signal_name}_signal_initial'].cumsum()
# Define final signal
df[f'{signal_name}_signal'] = (df[f'{signal_name}_signal_initial']) & (df[f'first_{signal_name}'] == 1)
else:
df[f'{signal_name}_signal'] = df[f'{signal_name}_signal_initial']
columns_to_retain = ['open', 'high', 'low', 'close', 'volume']
filtered_df = df[df[f'{signal_name}_signal']][columns_to_retain].copy()
filtered_df['signal'] = signal_value
return filtered_df
buy_condition_initial = (df_copy[f'{osc_column}_dt'] > 0) & (df_copy[f'{osc_column}_dt'].shift(1) < 0) & (df_copy[osc_column] < buy_level)
filtered_buy_df = get_filtered_df(df_copy, buy_condition_initial, buy_level, 'buy', 'LONG')
sell_condition_initial = (df_copy[f'{osc_column}_dt'] < 0) & (df_copy[f'{osc_column}_dt'].shift(1) > 0) & (df_copy[osc_column] > sell_level)
filtered_sell_df = get_filtered_df(df_copy, sell_condition_initial, sell_level, 'sell', 'SHORT')
filtered_df = pd.concat([filtered_buy_df, filtered_sell_df])
filtered_df = filtered_df.sort_index()
return filtered_df
def evaluate_strategy(df, filtered_df, take_profit, stop_loss, trailing_stop_loss=False):
TAKE_PROFIT = 1 + take_profit/100.0
STOP_LOSS = 1 - stop_loss/100.0
def calculate_net_percentage(entry, exit, signal_type):
if exit is None:
return None
if signal_type == 'LONG':
return (exit / entry - 1) * 100
elif signal_type == 'SHORT':
return (entry / exit - 1) * 100
evaluated_strategy = pd.DataFrame(columns=['signal_type', 'entry_time', 'entry_price', 'exit_time', 'exit_price', 'net_percentage'])
for i, signal_row in filtered_df.iterrows():
entry_price = signal_row['close']
exit_price = None
exit_time = None
signal_type = signal_row['signal']
if signal_type == 'LONG':
target_price = entry_price * TAKE_PROFIT
target_stop = entry_price * STOP_LOSS
elif signal_type == 'SHORT':
target_price = entry_price / TAKE_PROFIT
target_stop = entry_price / STOP_LOSS
else:
target_price = None
for j, subsequent_row in df.iloc[df.index.get_loc(i) + 1:].iterrows():
reached_target = subsequent_row['high'] >= target_price if signal_type == 'LONG' else subsequent_row['low'] <= target_price
reached_stop = subsequent_row['low'] <= target_stop if signal_type == 'LONG' else subsequent_row['high'] >= target_stop
if reached_target and reached_stop:
closer_to_target = abs(subsequent_row['open'] - target_price) < abs(subsequent_row['open'] - target_stop)
exit_price = target_price if closer_to_target else target_stop
exit_time = j
break
elif reached_target:
exit_price = target_price
exit_time = j
break
elif reached_stop:
exit_price = target_stop
exit_time = j
break
# Adjust target_stop for trailing stop loss
if trailing_stop_loss:
if signal_type == 'LONG':
# Adjust stop-loss upwards but ensure it remains below the entry price or the current high
new_stop = max(entry_price, subsequent_row['high']) * STOP_LOSS
# Only adjust if the new stop is higher than the previous stop
target_stop = max(target_stop, new_stop)
else:
# Adjust stop-loss downwards but ensure it remains above the entry price or the current low
new_stop = min(entry_price, subsequent_row['low']) / STOP_LOSS
# Only adjust if the new stop is lower than the previous stop
target_stop = min(target_stop, new_stop)
net_percentage = calculate_net_percentage(entry_price, exit_price, signal_type)
evaluated_strategy.loc[len(evaluated_strategy)] = [signal_type, i, entry_price, exit_time, exit_price, net_percentage]
return evaluated_strategy
def evaluate_strategy(df, filtered_df,
take_profit_long, stop_loss_long, trailing_stop_loss_long,
take_profit_short, stop_loss_short, trailing_stop_loss_short,
market_taker_fee=0.0, slippage=0.0):
market_taker_fee_factor = 1 - market_taker_fee/100.0
slippage_factor_long = 1 + slippage/100.0
slippage_factor_short = 1 - slippage/100.0
def calculate_net_percentage(entry, exit, signal_type):
if exit is None:
return None
if signal_type == 'LONG':
return (exit / entry - 1) * 100
elif signal_type == 'SHORT':
return (entry / exit - 1) * 100
evaluated_strategy = pd.DataFrame(columns=['signal_type', 'entry_time', 'entry_price', 'exit_time', 'exit_price', 'net_percentage'])
for i, signal_row in filtered_df.iterrows():
entry_price = signal_row['close'] * (slippage_factor_long if signal_row['signal'] == 'LONG' else slippage_factor_short)
exit_price = None
exit_time = None
signal_type = signal_row['signal']
if signal_type == 'LONG':
target_price = entry_price * (1 + take_profit_long/100.0)
target_stop = entry_price * (1 - stop_loss_long/100.0)
elif signal_type == 'SHORT':
target_price = entry_price / (1 + take_profit_short/100.0)
target_stop = entry_price / (1 - stop_loss_short/100.0)
for j, subsequent_row in df.iloc[df.index.get_loc(i) + 1:].iterrows():
reached_target = subsequent_row['high'] >= target_price if signal_type == 'LONG' else subsequent_row['low'] <= target_price
reached_stop = subsequent_row['low'] <= target_stop if signal_type == 'LONG' else subsequent_row['high'] >= target_stop
if reached_target and reached_stop:
closer_to_target = abs(subsequent_row['open'] - target_price) < abs(subsequent_row['open'] - target_stop)
exit_price = target_price if closer_to_target else target_stop
exit_time = j
break
elif reached_target:
exit_price = target_price
exit_time = j
break
elif reached_stop:
exit_price = target_stop
exit_time = j
break
# Adjust target_stop for trailing stop loss
if signal_type == 'LONG' and trailing_stop_loss_long:
new_stop = max(entry_price, subsequent_row['high']) * (1 - trailing_stop_loss_long/100.0)
target_stop = max(target_stop, new_stop)
elif signal_type == 'SHORT' and trailing_stop_loss_short:
new_stop = min(entry_price, subsequent_row['low']) / (1 - trailing_stop_loss_short/100.0)
target_stop = min(target_stop, new_stop)
net_percentage = calculate_net_percentage(entry_price, exit_price, signal_type) * market_taker_fee_factor
evaluated_strategy.loc[len(evaluated_strategy)] = [signal_type, i, entry_price, exit_time, exit_price, net_percentage]
return evaluated_strategy
btc_usdt_df['TCI'] = compute_tci(btc_usdt_df, n1=12, n2=27)
btc_usdt_df['RSI'] = talib.RSI(btc_usdt_df['close'], timeperiod=14)
btc_usdt_df['RSI_MA'] = talib.MA(btc_usdt_df['RSI'], timeperiod=6)
TAKE_PROFIT = 8
STOP_LOSS = 6
filtered_df = oscillator_change_in_direction(btc_usdt_df, 'TCI', -45, 80, first_signal=True)
df_result = evaluate_strategy(btc_usdt_df, filtered_df, TAKE_PROFIT, STOP_LOSS)
display(df_result)
summary = df_result.groupby('signal_type').apply(lambda group: pd.Series({
'wins': len(group[group['net_percentage'] > 0]),
'losses': len(group[group['net_percentage'] < 0]),
'net_gain_%': group[group['net_percentage'] > 0]['net_percentage'].sum(),
'net_loss_%': group[group['net_percentage'] < 0]['net_percentage'].sum(),
'nulls': group['net_percentage'].isnull().sum() # Counting the NaN values for net_percentage
})).reset_index().set_index('signal_type')
summary['win_rate'] = (summary['wins'] / (summary['wins'] + summary['losses'])) * 100
net_percentage_sum = df_result['net_percentage'].sum()
summary['total_trades'] = summary['wins'] + summary['losses']
display(summary)
print("\nSum of net_percentage:", net_percentage_sum)
import plotly.graph_objects as go
from plotly.subplots import make_subplots
df_result['cumulative_sum'] = df_result['net_percentage'].cumsum()
colors = df_result['signal_type'].map({'SHORT': 'orange', 'LONG': 'purple'}).tolist()
fig = go.Figure()
fig.add_trace(go.Scatter(x=df_result['entry_time'],
y=df_result['cumulative_sum'],
mode='lines+markers',
name='Cumulative Sum of Net Percentage',
marker=dict(color=colors)))
fig.update_layout(title='Accumulated Sum of Net Percentage Over Time',
xaxis_title='Date',
yaxis_title='Accumulated Net Percentage')
fig.show()
# Create a dataframe for entry and exit times
entry_df = df_result[['entry_time']].copy()
entry_df['trade_count'] = 1
entry_df.rename(columns={'entry_time': 'time'}, inplace=True)
exit_df = df_result[['exit_time']].copy()
exit_df['trade_count'] = -1
exit_df.rename(columns={'exit_time': 'time'}, inplace=True)
# Concatenate the dataframes, sort by time, and calculate cumulative sum
combined_df = pd.concat([entry_df, exit_df]).sort_values('time')
combined_df['cumulative_open_trades'] = combined_df['trade_count'].cumsum()
# Plotting
fig = go.Figure()
fig.add_trace(go.Scatter(x=combined_df['time'], y=combined_df['cumulative_open_trades'],
mode='lines+markers',
name='Open Trades'))
fig.update_layout(title='Number of Open Trades Over Time',
xaxis_title='Date',
yaxis_title='Number of Open Trades')
fig.show()
import plotly.graph_objects as go
from plotly.subplots import make_subplots
fig = make_subplots(rows=1, cols=1, shared_xaxes=True)
fig.add_trace(go.Candlestick(x=btc_usdt_df.index, open=btc_usdt_df['open'], high=btc_usdt_df['high'], low=btc_usdt_df['low'], close=btc_usdt_df['close']), row=1, col=1)
############################################################
# Add rectangles for each row in df_result
for _, row in df_result.iterrows():
# Skip if there's a null value in the current row
if row['entry_time'] is None or pd.isna(row['entry_time']) or \
row['exit_time'] is None or pd.isna(row['exit_time']) or \
row['entry_price'] is None or pd.isna(row['entry_price']) or \
row['exit_price'] is None or pd.isna(row['exit_price']):
continue
# Determine the color based on the net_percentage
color = 'red' if row['net_percentage'] < 0 else 'green'
fig.add_shape(
type="rect",
xref="x",
yref="y",
x0=row['entry_time'],
y0=min(row['entry_price'], row['exit_price']),
x1=row['exit_time'],
y1=max(row['entry_price'], row['exit_price']),
fillcolor=color,
opacity=0.3,
line=dict(
color='black',
width=2
)
)
############################################################
# Filter data
long_entries = df_result[df_result['signal_type'] == 'LONG']
short_entries = df_result[df_result['signal_type'] == 'SHORT']
# Plotting LONG entry points with green marker
fig.add_trace(
go.Scatter(
x=long_entries['entry_time'],
y=long_entries['entry_price'],
mode='markers',
marker=dict(
size=15, # Adjust this for the desired size
color='rgba(255,0,0,0)', # This makes the fill transparent (red but 0 alpha)
line=dict(
width=2, # Adjust this for the desired border thickness
color='green'
)
),
name='LONG - ENTRY'
)
)
# Plotting SHORT entry points with red marker
fig.add_trace(
go.Scatter(
x=short_entries['entry_time'],
y=short_entries['entry_price'],
mode='markers',
marker=dict(
size=15, # Adjust this for the desired size
color='rgba(255,0,0,0)', # This makes the fill transparent (red but 0 alpha)
line=dict(
width=2, # Adjust this for the desired border thickness
color='red'
)
),
name='SHORT - ENTRY'
)
)
# Plotting exit prices with blue marker
# Filter rows with non-null exit_time and exit_price
valid_entries = df_result.dropna(subset=['exit_time', 'exit_price'])
fig.add_trace(
go.Scatter(
x=valid_entries['exit_time'],
y=valid_entries['exit_price'],
mode='markers',
marker=dict(color='blue'),
name='EXIT'
)
)
############################################################
fig.update_layout(xaxis_rangeslider_visible=False, height=800, width=1000)
fig.show()