Skip to content
New Workbook
Sign up
btc
pip install ccxt
Hidden output
import ccxt
import pandas as pd
import talib
from datetime import datetime
def fetch_all_candles(exchange, symbol, timeframe, start_date, end_date=None):
    since = int(start_date.timestamp() * 1000)

    if not end_date:
        end_date = datetime.now()
    end_timestamp = int(end_date.timestamp() * 1000)
    
    all_candles = []
    while True:
        new_candles = exchange.fetch_ohlcv(symbol, timeframe, since=since)
        if not new_candles:
            break
        if new_candles[-1][0] >= end_timestamp:
            all_candles.extend([candle for candle in new_candles if candle[0] <= end_timestamp])
            break
        all_candles.extend(new_candles)
        timeframe_ms = new_candles[1][0] - new_candles[0][0]
        since = new_candles[-1][0] + timeframe_ms

    df = pd.DataFrame(all_candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    df.set_index('timestamp', inplace=True)
    return df

kucoin = ccxt.kucoin()
start_date = datetime(2023, 5, 1, 0, 0, 0)
end_date = datetime(2023, 10, 21, 0, 0, 0)
btc_usdt_df = fetch_all_candles(kucoin, 'BTC/USDT', '1h', start_date)
btc_usdt_df.to_csv('btc_usdt_data.csv')
display(btc_usdt_df)
def compute_tci(dataframe, n1, n2):
    average_price = dataframe[['high', 'low', 'close']].mean(axis=1)
    esa = talib.EMA(average_price, timeperiod=n1)
    divergence = talib.EMA(abs(average_price - esa), timeperiod=n1)
    channel_index = (average_price - esa) / (0.015 * divergence)
    trend_channel_index = talib.EMA(channel_index, timeperiod=n2)
    
    return trend_channel_index
def oscillator_change_in_direction(df, osc_column, buy_level, sell_level, first_signal=True):
    """
    Filters signals based on an oscillator's change in direction (slope) when values are beyond defined thresholds.
    
    Parameters:
    - df (DataFrame): The data.
    - osc_column (str): The column name of the oscillator.
    - buy_level (float): Buy threshold.
    - sell_level (float): Sell threshold.
    - first_signal (bool): If True, only the first occurrence is considered within each group.
                           If False, all occurrences are considered.
    
    Returns:
    DataFrame with filtered signals.
    """
    
    df_copy = df.copy()
    df_copy[f'{osc_column}_dt'] = df_copy[osc_column].diff()

    def get_filtered_df(df, condition_initial, level, signal_name, signal_value):
        
        df[f'{signal_name}_signal_initial'] = condition_initial
        
        if first_signal:
            # Group by consecutive periods
            df['group'] = (df[osc_column] >= level if signal_value == 'LONG' else df[osc_column] <= level).cumsum()
            
            # Identify the first signal in each group
            df[f'first_{signal_name}'] = df.groupby('group')[f'{signal_name}_signal_initial'].cumsum()
            
            # Define final signal
            df[f'{signal_name}_signal'] = (df[f'{signal_name}_signal_initial']) & (df[f'first_{signal_name}'] == 1)
        else:
            df[f'{signal_name}_signal'] = df[f'{signal_name}_signal_initial']
        
        columns_to_retain = ['open', 'high', 'low', 'close', 'volume']
        filtered_df = df[df[f'{signal_name}_signal']][columns_to_retain].copy()
        filtered_df['signal'] = signal_value
        
        return filtered_df

    buy_condition_initial = (df_copy[f'{osc_column}_dt'] > 0) & (df_copy[f'{osc_column}_dt'].shift(1) < 0) & (df_copy[osc_column] < buy_level)
    filtered_buy_df = get_filtered_df(df_copy, buy_condition_initial, buy_level, 'buy', 'LONG')
    
    sell_condition_initial = (df_copy[f'{osc_column}_dt'] < 0) & (df_copy[f'{osc_column}_dt'].shift(1) > 0) & (df_copy[osc_column] > sell_level)
    filtered_sell_df = get_filtered_df(df_copy, sell_condition_initial, sell_level, 'sell', 'SHORT')
    
    filtered_df = pd.concat([filtered_buy_df, filtered_sell_df])
    filtered_df = filtered_df.sort_index()

    return filtered_df
def evaluate_strategy(df, filtered_df, take_profit, stop_loss, trailing_stop_loss=False):
    TAKE_PROFIT = 1 + take_profit/100.0
    STOP_LOSS = 1 - stop_loss/100.0

    def calculate_net_percentage(entry, exit, signal_type):
        if exit is None:
            return None
        if signal_type == 'LONG':
            return (exit / entry - 1) * 100
        elif signal_type == 'SHORT':
            return (entry / exit - 1) * 100

    evaluated_strategy = pd.DataFrame(columns=['signal_type', 'entry_time', 'entry_price', 'exit_time', 'exit_price', 'net_percentage'])

    for i, signal_row in filtered_df.iterrows():
        entry_price = signal_row['close']
        exit_price = None
        exit_time = None
        signal_type = signal_row['signal']

        if signal_type == 'LONG':
            target_price = entry_price * TAKE_PROFIT
            target_stop = entry_price * STOP_LOSS
        elif signal_type == 'SHORT':
            target_price = entry_price / TAKE_PROFIT
            target_stop = entry_price / STOP_LOSS
        else:
            target_price = None

        for j, subsequent_row in df.iloc[df.index.get_loc(i) + 1:].iterrows():
            reached_target = subsequent_row['high'] >= target_price if signal_type == 'LONG' else subsequent_row['low'] <= target_price
            reached_stop = subsequent_row['low'] <= target_stop if signal_type == 'LONG' else subsequent_row['high'] >= target_stop

            if reached_target and reached_stop:
                closer_to_target = abs(subsequent_row['open'] - target_price) < abs(subsequent_row['open'] - target_stop)
                exit_price = target_price if closer_to_target else target_stop
                exit_time = j
                break
            elif reached_target:
                exit_price = target_price
                exit_time = j
                break
            elif reached_stop:
                exit_price = target_stop
                exit_time = j
                break

            # Adjust target_stop for trailing stop loss
            if trailing_stop_loss:
                if signal_type == 'LONG':
                    # Adjust stop-loss upwards but ensure it remains below the entry price or the current high
                    new_stop = max(entry_price, subsequent_row['high']) * STOP_LOSS
                    # Only adjust if the new stop is higher than the previous stop
                    target_stop = max(target_stop, new_stop)
                else:
                    # Adjust stop-loss downwards but ensure it remains above the entry price or the current low
                    new_stop = min(entry_price, subsequent_row['low']) / STOP_LOSS
                    # Only adjust if the new stop is lower than the previous stop
                    target_stop = min(target_stop, new_stop)

        net_percentage = calculate_net_percentage(entry_price, exit_price, signal_type)
        evaluated_strategy.loc[len(evaluated_strategy)] = [signal_type, i, entry_price, exit_time, exit_price, net_percentage]

    return evaluated_strategy
def evaluate_strategy(df, filtered_df, 
                      take_profit_long, stop_loss_long, trailing_stop_loss_long,
                      take_profit_short, stop_loss_short, trailing_stop_loss_short,
                      market_taker_fee=0.0, slippage=0.0):
    
    market_taker_fee_factor = 1 - market_taker_fee/100.0
    slippage_factor_long = 1 + slippage/100.0
    slippage_factor_short = 1 - slippage/100.0

    def calculate_net_percentage(entry, exit, signal_type):
        if exit is None:
            return None
        if signal_type == 'LONG':
            return (exit / entry - 1) * 100
        elif signal_type == 'SHORT':
            return (entry / exit - 1) * 100

    evaluated_strategy = pd.DataFrame(columns=['signal_type', 'entry_time', 'entry_price', 'exit_time', 'exit_price', 'net_percentage'])

    for i, signal_row in filtered_df.iterrows():
        entry_price = signal_row['close'] * (slippage_factor_long if signal_row['signal'] == 'LONG' else slippage_factor_short)
        exit_price = None
        exit_time = None
        signal_type = signal_row['signal']

        if signal_type == 'LONG':
            target_price = entry_price * (1 + take_profit_long/100.0)
            target_stop = entry_price * (1 - stop_loss_long/100.0)
        elif signal_type == 'SHORT':
            target_price = entry_price / (1 + take_profit_short/100.0)
            target_stop = entry_price / (1 - stop_loss_short/100.0)

        for j, subsequent_row in df.iloc[df.index.get_loc(i) + 1:].iterrows():
            reached_target = subsequent_row['high'] >= target_price if signal_type == 'LONG' else subsequent_row['low'] <= target_price
            reached_stop = subsequent_row['low'] <= target_stop if signal_type == 'LONG' else subsequent_row['high'] >= target_stop

            if reached_target and reached_stop:
                closer_to_target = abs(subsequent_row['open'] - target_price) < abs(subsequent_row['open'] - target_stop)
                exit_price = target_price if closer_to_target else target_stop
                exit_time = j
                break
            elif reached_target:
                exit_price = target_price
                exit_time = j
                break
            elif reached_stop:
                exit_price = target_stop
                exit_time = j
                break

            # Adjust target_stop for trailing stop loss
            if signal_type == 'LONG' and trailing_stop_loss_long:
                new_stop = max(entry_price, subsequent_row['high']) * (1 - trailing_stop_loss_long/100.0)
                target_stop = max(target_stop, new_stop)
            elif signal_type == 'SHORT' and trailing_stop_loss_short:
                new_stop = min(entry_price, subsequent_row['low']) / (1 - trailing_stop_loss_short/100.0)
                target_stop = min(target_stop, new_stop)

        net_percentage = calculate_net_percentage(entry_price, exit_price, signal_type) * market_taker_fee_factor
        evaluated_strategy.loc[len(evaluated_strategy)] = [signal_type, i, entry_price, exit_time, exit_price, net_percentage]

    return evaluated_strategy
btc_usdt_df['TCI'] = compute_tci(btc_usdt_df, n1=12, n2=27)
btc_usdt_df['RSI'] = talib.RSI(btc_usdt_df['close'], timeperiod=14)
btc_usdt_df['RSI_MA'] = talib.MA(btc_usdt_df['RSI'], timeperiod=6)
TAKE_PROFIT = 8
STOP_LOSS = 6

filtered_df = oscillator_change_in_direction(btc_usdt_df, 'TCI', -45, 80, first_signal=True)
df_result = evaluate_strategy(btc_usdt_df, filtered_df, TAKE_PROFIT, STOP_LOSS)
display(df_result)
summary = df_result.groupby('signal_type').apply(lambda group: pd.Series({
    'wins': len(group[group['net_percentage'] > 0]),
    'losses': len(group[group['net_percentage'] < 0]),
    'net_gain_%': group[group['net_percentage'] > 0]['net_percentage'].sum(),
    'net_loss_%': group[group['net_percentage'] < 0]['net_percentage'].sum(),
    'nulls': group['net_percentage'].isnull().sum()  # Counting the NaN values for net_percentage
})).reset_index().set_index('signal_type') 

summary['win_rate'] = (summary['wins'] / (summary['wins'] + summary['losses'])) * 100
net_percentage_sum = df_result['net_percentage'].sum()
summary['total_trades'] = summary['wins'] + summary['losses']

display(summary)
print("\nSum of net_percentage:", net_percentage_sum)
import plotly.graph_objects as go
from plotly.subplots import make_subplots

df_result['cumulative_sum'] = df_result['net_percentage'].cumsum()
colors = df_result['signal_type'].map({'SHORT': 'orange', 'LONG': 'purple'}).tolist()

fig = go.Figure()

fig.add_trace(go.Scatter(x=df_result['entry_time'], 
                         y=df_result['cumulative_sum'],
                         mode='lines+markers',
                         name='Cumulative Sum of Net Percentage',
                         marker=dict(color=colors)))

fig.update_layout(title='Accumulated Sum of Net Percentage Over Time',
                   xaxis_title='Date',
                   yaxis_title='Accumulated Net Percentage')

fig.show()

# Create a dataframe for entry and exit times
entry_df = df_result[['entry_time']].copy()
entry_df['trade_count'] = 1
entry_df.rename(columns={'entry_time': 'time'}, inplace=True)

exit_df = df_result[['exit_time']].copy()
exit_df['trade_count'] = -1
exit_df.rename(columns={'exit_time': 'time'}, inplace=True)

# Concatenate the dataframes, sort by time, and calculate cumulative sum
combined_df = pd.concat([entry_df, exit_df]).sort_values('time')
combined_df['cumulative_open_trades'] = combined_df['trade_count'].cumsum()

# Plotting
fig = go.Figure()

fig.add_trace(go.Scatter(x=combined_df['time'], y=combined_df['cumulative_open_trades'],
                    mode='lines+markers',
                    name='Open Trades'))

fig.update_layout(title='Number of Open Trades Over Time',
                   xaxis_title='Date',
                   yaxis_title='Number of Open Trades')

fig.show()
import plotly.graph_objects as go
from plotly.subplots import make_subplots

fig = make_subplots(rows=1, cols=1, shared_xaxes=True)

fig.add_trace(go.Candlestick(x=btc_usdt_df.index, open=btc_usdt_df['open'], high=btc_usdt_df['high'], low=btc_usdt_df['low'], close=btc_usdt_df['close']), row=1, col=1)

############################################################
# Add rectangles for each row in df_result
for _, row in df_result.iterrows():
    # Skip if there's a null value in the current row
    if row['entry_time'] is None or pd.isna(row['entry_time']) or \
       row['exit_time'] is None or pd.isna(row['exit_time']) or \
       row['entry_price'] is None or pd.isna(row['entry_price']) or \
       row['exit_price'] is None or pd.isna(row['exit_price']):
        continue

    # Determine the color based on the net_percentage
    color = 'red' if row['net_percentage'] < 0 else 'green'
    
    fig.add_shape(
        type="rect",
        xref="x",
        yref="y",
        x0=row['entry_time'],
        y0=min(row['entry_price'], row['exit_price']),
        x1=row['exit_time'],
        y1=max(row['entry_price'], row['exit_price']),
        fillcolor=color,
        opacity=0.3,
        line=dict(
            color='black',
            width=2
        )
    )
############################################################
# Filter data
long_entries = df_result[df_result['signal_type'] == 'LONG']
short_entries = df_result[df_result['signal_type'] == 'SHORT']

# Plotting LONG entry points with green marker
fig.add_trace(
    go.Scatter(
        x=long_entries['entry_time'],
        y=long_entries['entry_price'],
        mode='markers',
        marker=dict(
            size=15, # Adjust this for the desired size
            color='rgba(255,0,0,0)', # This makes the fill transparent (red but 0 alpha)
            line=dict(
                width=2, # Adjust this for the desired border thickness
                color='green'
            )
        ),
        name='LONG - ENTRY'
    )
)
# Plotting SHORT entry points with red marker
fig.add_trace(
    go.Scatter(
        x=short_entries['entry_time'],
        y=short_entries['entry_price'],
        mode='markers',
        marker=dict(
            size=15, # Adjust this for the desired size
            color='rgba(255,0,0,0)', # This makes the fill transparent (red but 0 alpha)
            line=dict(
                width=2, # Adjust this for the desired border thickness
                color='red'
            )
        ),
        name='SHORT - ENTRY'
    )
)
# Plotting exit prices with blue marker
# Filter rows with non-null exit_time and exit_price
valid_entries = df_result.dropna(subset=['exit_time', 'exit_price'])

fig.add_trace(
    go.Scatter(
        x=valid_entries['exit_time'],
        y=valid_entries['exit_price'],
        mode='markers',
        marker=dict(color='blue'),
        name='EXIT'
    )
)
############################################################

fig.update_layout(xaxis_rangeslider_visible=False, height=800, width=1000)
fig.show()