Skip to content
btc
  • AI Chat
  • Code
  • Report
  • pip install ccxt
    Hidden output
    import ccxt
    import pandas as pd
    import talib
    from datetime import datetime
    def fetch_all_candles(exchange, symbol, timeframe, start_date, end_date=None):
        since = int(start_date.timestamp() * 1000)
    
        if not end_date:
            end_date = datetime.now()
        end_timestamp = int(end_date.timestamp() * 1000)
        
        all_candles = []
        while True:
            new_candles = exchange.fetch_ohlcv(symbol, timeframe, since=since)
            if not new_candles:
                break
            if new_candles[-1][0] >= end_timestamp:
                all_candles.extend([candle for candle in new_candles if candle[0] <= end_timestamp])
                break
            all_candles.extend(new_candles)
            timeframe_ms = new_candles[1][0] - new_candles[0][0]
            since = new_candles[-1][0] + timeframe_ms
    
        df = pd.DataFrame(all_candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
        df.set_index('timestamp', inplace=True)
        return df
    
    kucoin = ccxt.kucoin()
    start_date = datetime(2023, 5, 1, 0, 0, 0)
    end_date = datetime(2023, 10, 21, 0, 0, 0)
    btc_usdt_df = fetch_all_candles(kucoin, 'BTC/USDT', '1h', start_date)
    btc_usdt_df.to_csv('btc_usdt_data.csv')
    display(btc_usdt_df)
    def compute_tci(dataframe, n1, n2):
        average_price = dataframe[['high', 'low', 'close']].mean(axis=1)
        esa = talib.EMA(average_price, timeperiod=n1)
        divergence = talib.EMA(abs(average_price - esa), timeperiod=n1)
        channel_index = (average_price - esa) / (0.015 * divergence)
        trend_channel_index = talib.EMA(channel_index, timeperiod=n2)
        
        return trend_channel_index
    def oscillator_change_in_direction(df, osc_column, buy_level, sell_level, first_signal=True):
        """
        Filters signals based on an oscillator's change in direction (slope) when values are beyond defined thresholds.
        
        Parameters:
        - df (DataFrame): The data.
        - osc_column (str): The column name of the oscillator.
        - buy_level (float): Buy threshold.
        - sell_level (float): Sell threshold.
        - first_signal (bool): If True, only the first occurrence is considered within each group.
                               If False, all occurrences are considered.
        
        Returns:
        DataFrame with filtered signals.
        """
        
        df_copy = df.copy()
        df_copy[f'{osc_column}_dt'] = df_copy[osc_column].diff()
    
        def get_filtered_df(df, condition_initial, level, signal_name, signal_value):
            
            df[f'{signal_name}_signal_initial'] = condition_initial
            
            if first_signal:
                # Group by consecutive periods
                df['group'] = (df[osc_column] >= level if signal_value == 'LONG' else df[osc_column] <= level).cumsum()
                
                # Identify the first signal in each group
                df[f'first_{signal_name}'] = df.groupby('group')[f'{signal_name}_signal_initial'].cumsum()
                
                # Define final signal
                df[f'{signal_name}_signal'] = (df[f'{signal_name}_signal_initial']) & (df[f'first_{signal_name}'] == 1)
            else:
                df[f'{signal_name}_signal'] = df[f'{signal_name}_signal_initial']
            
            columns_to_retain = ['open', 'high', 'low', 'close', 'volume']
            filtered_df = df[df[f'{signal_name}_signal']][columns_to_retain].copy()
            filtered_df['signal'] = signal_value
            
            return filtered_df
    
        buy_condition_initial = (df_copy[f'{osc_column}_dt'] > 0) & (df_copy[f'{osc_column}_dt'].shift(1) < 0) & (df_copy[osc_column] < buy_level)
        filtered_buy_df = get_filtered_df(df_copy, buy_condition_initial, buy_level, 'buy', 'LONG')
        
        sell_condition_initial = (df_copy[f'{osc_column}_dt'] < 0) & (df_copy[f'{osc_column}_dt'].shift(1) > 0) & (df_copy[osc_column] > sell_level)
        filtered_sell_df = get_filtered_df(df_copy, sell_condition_initial, sell_level, 'sell', 'SHORT')
        
        filtered_df = pd.concat([filtered_buy_df, filtered_sell_df])
        filtered_df = filtered_df.sort_index()
    
        return filtered_df
    def evaluate_strategy(df, filtered_df, take_profit, stop_loss, trailing_stop_loss=False):
        TAKE_PROFIT = 1 + take_profit/100.0
        STOP_LOSS = 1 - stop_loss/100.0
    
        def calculate_net_percentage(entry, exit, signal_type):
            if exit is None:
                return None
            if signal_type == 'LONG':
                return (exit / entry - 1) * 100
            elif signal_type == 'SHORT':
                return (entry / exit - 1) * 100
    
        evaluated_strategy = pd.DataFrame(columns=['signal_type', 'entry_time', 'entry_price', 'exit_time', 'exit_price', 'net_percentage'])
    
        for i, signal_row in filtered_df.iterrows():
            entry_price = signal_row['close']
            exit_price = None
            exit_time = None
            signal_type = signal_row['signal']
    
            if signal_type == 'LONG':
                target_price = entry_price * TAKE_PROFIT
                target_stop = entry_price * STOP_LOSS
            elif signal_type == 'SHORT':
                target_price = entry_price / TAKE_PROFIT
                target_stop = entry_price / STOP_LOSS
            else:
                target_price = None
    
            for j, subsequent_row in df.iloc[df.index.get_loc(i) + 1:].iterrows():
                reached_target = subsequent_row['high'] >= target_price if signal_type == 'LONG' else subsequent_row['low'] <= target_price
                reached_stop = subsequent_row['low'] <= target_stop if signal_type == 'LONG' else subsequent_row['high'] >= target_stop
    
                if reached_target and reached_stop:
                    closer_to_target = abs(subsequent_row['open'] - target_price) < abs(subsequent_row['open'] - target_stop)
                    exit_price = target_price if closer_to_target else target_stop
                    exit_time = j
                    break
                elif reached_target:
                    exit_price = target_price
                    exit_time = j
                    break
                elif reached_stop:
                    exit_price = target_stop
                    exit_time = j
                    break
    
                # Adjust target_stop for trailing stop loss
                if trailing_stop_loss:
                    if signal_type == 'LONG':
                        # Adjust stop-loss upwards but ensure it remains below the entry price or the current high
                        new_stop = max(entry_price, subsequent_row['high']) * STOP_LOSS
                        # Only adjust if the new stop is higher than the previous stop
                        target_stop = max(target_stop, new_stop)
                    else:
                        # Adjust stop-loss downwards but ensure it remains above the entry price or the current low
                        new_stop = min(entry_price, subsequent_row['low']) / STOP_LOSS
                        # Only adjust if the new stop is lower than the previous stop
                        target_stop = min(target_stop, new_stop)
    
            net_percentage = calculate_net_percentage(entry_price, exit_price, signal_type)
            evaluated_strategy.loc[len(evaluated_strategy)] = [signal_type, i, entry_price, exit_time, exit_price, net_percentage]
    
        return evaluated_strategy
    
    def evaluate_strategy(df, filtered_df, 
                          take_profit_long, stop_loss_long, trailing_stop_loss_long,
                          take_profit_short, stop_loss_short, trailing_stop_loss_short,
                          market_taker_fee=0.0, slippage=0.0):
        
        market_taker_fee_factor = 1 - market_taker_fee/100.0
        slippage_factor_long = 1 + slippage/100.0
        slippage_factor_short = 1 - slippage/100.0
    
        def calculate_net_percentage(entry, exit, signal_type):
            if exit is None:
                return None
            if signal_type == 'LONG':
                return (exit / entry - 1) * 100
            elif signal_type == 'SHORT':
                return (entry / exit - 1) * 100
    
        evaluated_strategy = pd.DataFrame(columns=['signal_type', 'entry_time', 'entry_price', 'exit_time', 'exit_price', 'net_percentage'])
    
        for i, signal_row in filtered_df.iterrows():
            entry_price = signal_row['close'] * (slippage_factor_long if signal_row['signal'] == 'LONG' else slippage_factor_short)
            exit_price = None
            exit_time = None
            signal_type = signal_row['signal']
    
            if signal_type == 'LONG':
                target_price = entry_price * (1 + take_profit_long/100.0)
                target_stop = entry_price * (1 - stop_loss_long/100.0)
            elif signal_type == 'SHORT':
                target_price = entry_price / (1 + take_profit_short/100.0)
                target_stop = entry_price / (1 - stop_loss_short/100.0)
    
            for j, subsequent_row in df.iloc[df.index.get_loc(i) + 1:].iterrows():
                reached_target = subsequent_row['high'] >= target_price if signal_type == 'LONG' else subsequent_row['low'] <= target_price
                reached_stop = subsequent_row['low'] <= target_stop if signal_type == 'LONG' else subsequent_row['high'] >= target_stop
    
                if reached_target and reached_stop:
                    closer_to_target = abs(subsequent_row['open'] - target_price) < abs(subsequent_row['open'] - target_stop)
                    exit_price = target_price if closer_to_target else target_stop
                    exit_time = j
                    break
                elif reached_target:
                    exit_price = target_price
                    exit_time = j
                    break
                elif reached_stop:
                    exit_price = target_stop
                    exit_time = j
                    break
    
                # Adjust target_stop for trailing stop loss
                if signal_type == 'LONG' and trailing_stop_loss_long:
                    new_stop = max(entry_price, subsequent_row['high']) * (1 - trailing_stop_loss_long/100.0)
                    target_stop = max(target_stop, new_stop)
                elif signal_type == 'SHORT' and trailing_stop_loss_short:
                    new_stop = min(entry_price, subsequent_row['low']) / (1 - trailing_stop_loss_short/100.0)
                    target_stop = min(target_stop, new_stop)
    
            net_percentage = calculate_net_percentage(entry_price, exit_price, signal_type) * market_taker_fee_factor
            evaluated_strategy.loc[len(evaluated_strategy)] = [signal_type, i, entry_price, exit_time, exit_price, net_percentage]
    
        return evaluated_strategy
    btc_usdt_df['TCI'] = compute_tci(btc_usdt_df, n1=12, n2=27)
    btc_usdt_df['RSI'] = talib.RSI(btc_usdt_df['close'], timeperiod=14)
    btc_usdt_df['RSI_MA'] = talib.MA(btc_usdt_df['RSI'], timeperiod=6)
    TAKE_PROFIT = 8
    STOP_LOSS = 6
    
    filtered_df = oscillator_change_in_direction(btc_usdt_df, 'TCI', -45, 80, first_signal=True)
    df_result = evaluate_strategy(btc_usdt_df, filtered_df, TAKE_PROFIT, STOP_LOSS)
    display(df_result)
    summary = df_result.groupby('signal_type').apply(lambda group: pd.Series({
        'wins': len(group[group['net_percentage'] > 0]),
        'losses': len(group[group['net_percentage'] < 0]),
        'net_gain_%': group[group['net_percentage'] > 0]['net_percentage'].sum(),
        'net_loss_%': group[group['net_percentage'] < 0]['net_percentage'].sum(),
        'nulls': group['net_percentage'].isnull().sum()  # Counting the NaN values for net_percentage
    })).reset_index().set_index('signal_type') 
    
    summary['win_rate'] = (summary['wins'] / (summary['wins'] + summary['losses'])) * 100
    net_percentage_sum = df_result['net_percentage'].sum()
    summary['total_trades'] = summary['wins'] + summary['losses']
    
    display(summary)
    print("\nSum of net_percentage:", net_percentage_sum)
    
    import plotly.graph_objects as go
    from plotly.subplots import make_subplots
    
    df_result['cumulative_sum'] = df_result['net_percentage'].cumsum()
    colors = df_result['signal_type'].map({'SHORT': 'orange', 'LONG': 'purple'}).tolist()
    
    fig = go.Figure()
    
    fig.add_trace(go.Scatter(x=df_result['entry_time'], 
                             y=df_result['cumulative_sum'],
                             mode='lines+markers',
                             name='Cumulative Sum of Net Percentage',
                             marker=dict(color=colors)))
    
    fig.update_layout(title='Accumulated Sum of Net Percentage Over Time',
                       xaxis_title='Date',
                       yaxis_title='Accumulated Net Percentage')
    
    fig.show()
    
    # Create a dataframe for entry and exit times
    entry_df = df_result[['entry_time']].copy()
    entry_df['trade_count'] = 1
    entry_df.rename(columns={'entry_time': 'time'}, inplace=True)
    
    exit_df = df_result[['exit_time']].copy()
    exit_df['trade_count'] = -1
    exit_df.rename(columns={'exit_time': 'time'}, inplace=True)
    
    # Concatenate the dataframes, sort by time, and calculate cumulative sum
    combined_df = pd.concat([entry_df, exit_df]).sort_values('time')
    combined_df['cumulative_open_trades'] = combined_df['trade_count'].cumsum()
    
    # Plotting
    fig = go.Figure()
    
    fig.add_trace(go.Scatter(x=combined_df['time'], y=combined_df['cumulative_open_trades'],
                        mode='lines+markers',
                        name='Open Trades'))
    
    fig.update_layout(title='Number of Open Trades Over Time',
                       xaxis_title='Date',
                       yaxis_title='Number of Open Trades')
    
    fig.show()
    import plotly.graph_objects as go
    from plotly.subplots import make_subplots
    
    fig = make_subplots(rows=1, cols=1, shared_xaxes=True)
    
    fig.add_trace(go.Candlestick(x=btc_usdt_df.index, open=btc_usdt_df['open'], high=btc_usdt_df['high'], low=btc_usdt_df['low'], close=btc_usdt_df['close']), row=1, col=1)
    
    ############################################################
    # Add rectangles for each row in df_result
    for _, row in df_result.iterrows():
        # Skip if there's a null value in the current row
        if row['entry_time'] is None or pd.isna(row['entry_time']) or \
           row['exit_time'] is None or pd.isna(row['exit_time']) or \
           row['entry_price'] is None or pd.isna(row['entry_price']) or \
           row['exit_price'] is None or pd.isna(row['exit_price']):
            continue
    
        # Determine the color based on the net_percentage
        color = 'red' if row['net_percentage'] < 0 else 'green'
        
        fig.add_shape(
            type="rect",
            xref="x",
            yref="y",
            x0=row['entry_time'],
            y0=min(row['entry_price'], row['exit_price']),
            x1=row['exit_time'],
            y1=max(row['entry_price'], row['exit_price']),
            fillcolor=color,
            opacity=0.3,
            line=dict(
                color='black',
                width=2
            )
        )
    ############################################################
    # Filter data
    long_entries = df_result[df_result['signal_type'] == 'LONG']
    short_entries = df_result[df_result['signal_type'] == 'SHORT']
    
    # Plotting LONG entry points with green marker
    fig.add_trace(
        go.Scatter(
            x=long_entries['entry_time'],
            y=long_entries['entry_price'],
            mode='markers',
            marker=dict(
                size=15, # Adjust this for the desired size
                color='rgba(255,0,0,0)', # This makes the fill transparent (red but 0 alpha)
                line=dict(
                    width=2, # Adjust this for the desired border thickness
                    color='green'
                )
            ),
            name='LONG - ENTRY'
        )
    )
    # Plotting SHORT entry points with red marker
    fig.add_trace(
        go.Scatter(
            x=short_entries['entry_time'],
            y=short_entries['entry_price'],
            mode='markers',
            marker=dict(
                size=15, # Adjust this for the desired size
                color='rgba(255,0,0,0)', # This makes the fill transparent (red but 0 alpha)
                line=dict(
                    width=2, # Adjust this for the desired border thickness
                    color='red'
                )
            ),
            name='SHORT - ENTRY'
        )
    )
    # Plotting exit prices with blue marker
    # Filter rows with non-null exit_time and exit_price
    valid_entries = df_result.dropna(subset=['exit_time', 'exit_price'])
    
    fig.add_trace(
        go.Scatter(
            x=valid_entries['exit_time'],
            y=valid_entries['exit_price'],
            mode='markers',
            marker=dict(color='blue'),
            name='EXIT'
        )
    )
    ############################################################
    
    fig.update_layout(xaxis_rangeslider_visible=False, height=800, width=1000)
    fig.show()