import streamlit as st
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import io
import matplotlib.pyplot as plt
import time
import traceback
class DataFetcher:
@staticmethod
def get_data(file_path: str):
"""
Get data from a local CSV file.
"""
data = pd.read_csv(file_path)
data = data.reset_index().dropna()
return data
class FeatureAdder:
@staticmethod
def add_features(df: pd.DataFrame, symbol: str, rsi_window: int, ma_window: int):
"""
Add all features to dataframe (Date not included).
"""
df['ha_close'] = (df['open'] + df['high'] + df['low'] + df['close']) / 4
df['ha_open'] = (df['open'].shift(1) + df['close'].shift(1)) / 2
df['ha_signal'] = np.where(df['ha_close'] >= df['ha_open'], 1, -1)
df['signal_change'] = df['ha_signal'].diff().fillna(0)
df[f'ma{ma_window}'] = df['close'].rolling(ma_window).mean()
# Calculate RSI using pandas
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=rsi_window).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=rsi_window).mean()
rs = gain / loss
df['rsi'] = 100 - (100 / (1 + rs))
return df
class SignalGenerator:
@staticmethod
def add_signal(df: pd.DataFrame, symbol: str, rsi_buy_threshold: int, rsi_sell_threshold: int):
"""
Add signal column to dataframe.
"""
df['signal'] = -1 # Default signal value
df['signal'] = np.where(
(df['ha_signal'] == 1) & (df['rsi'] > rsi_buy_threshold) & (
df['ha_signal'].shift(1) != 1), 1, df['signal']
)
df['signal'] = np.where((df['rsi'] < rsi_sell_threshold) & (
df['rsi'].shift(1) > rsi_sell_threshold), 0, df['signal'])
return df
class TradingStrategy:
def __init__(self, file_path: str, start_date='2020-01-01', columns=None, long_only=False):
self.file_path = file_path
self.start_date = start_date
self.columns = columns or ['open', 'close']
self.long_only = long_only
self.data = None
self.trades = []
self.strategy_returns = []
self.rsi_window = 8
self.rsi_buy_threshold = 45
self.rsi_sell_threshold = 60
self.ma_window = 15 # Default value
def prepare_data(self):
self.data = DataFetcher.get_data(self.file_path)
self.data.rename(columns={'timestamp': 'date'}, inplace=True)
self.data['date'] = pd.to_datetime(self.data['date'], utc=True).dt.tz_localize(None)
self.data = self.data[self.data['date'] >= self.start_date]
self.data = FeatureAdder.add_features(self.data, self.file_path, self.rsi_window, self.ma_window)
self.data = SignalGenerator.add_signal(self.data, self.file_path, self.rsi_buy_threshold, self.rsi_sell_threshold)
def execute_trades(self):
buy_date = 0
buy_price = 0
position = None # Keeps track of current position (long or short)
for i in range(len(self.data)):
try:
if self.data['signal'].iloc[i] == 1: # Long trade
if position != 'long':
if position == 'short' and buy_date != 0:
sell_price = self.data[self.columns[1]].iloc[i]
ret = (buy_price - sell_price) / buy_price # Short sell return
self.strategy_returns.append(ret)
self.trades.append({'buy_date': buy_date, 'buy_price': buy_price,
'sell_date': self.data['date'].iloc[i],
'sell_price': sell_price, 'return': ret})
buy_date = self.data['date'].iloc[i]
buy_price = self.data[self.columns[0]].iloc[i]
position = 'long'
elif self.data['signal'].iloc[i] == 0: # Close long position
if position == 'long' and buy_date != 0:
sell_price = self.data[self.columns[1]].iloc[i]
ret = (sell_price - buy_price) / buy_price # Long position return
self.strategy_returns.append(ret)
self.trades.append({'buy_date': buy_date, 'buy_price': buy_price,
'sell_date': self.data['date'].iloc[i],
'sell_price': sell_price, 'return': ret})
position = None # Reset position after closing trade
if not self.long_only and position != 'short':
buy_date = self.data['date'].iloc[i]
buy_price = self.data[self.columns[0]].iloc[i]
position = 'short'
except Exception as e:
print(f'Error in trade execution: {e}')
def calculate_returns(self):
dollar_strategy = 1
for ret in self.strategy_returns:
dollar_strategy *= (1 + ret)
initial_price = self.data['close'].iloc[0]
final_price = self.data['close'].iloc[-1]
buy_and_hold_return = (final_price - initial_price) / initial_price
return dollar_strategy, buy_and_hold_return
def generate_reports(self, export_report=False):
trades_df = pd.DataFrame(self.trades)
trades_df.to_csv(f'{self.file_path}_trades.csv', index=False)
self.data['returns'] = self.data['close'].pct_change()
self.data['returns'] = [0 if self.data.signal.iloc[i] == 0 else self.data.returns.iloc[i] for i in range(len(self.data))]
# Calculate basic metrics
metrics = {
'Cumulative Return': self.data['returns'].sum(),
'Max Drawdown': self.data['returns'].min(),
# Add more metrics as needed
}
if export_report:
# Export logic if needed
pass
return metrics
def plot_signals(self):
buy_signals = self.data[self.data['signal'] == 1]
sell_signals = self.data[self.data['signal'] == 0]
neutral_signals = self.data[self.data['signal'] == -1]
fig = make_subplots(rows=2, cols=1, shared_xaxes=True,
vertical_spacing=0.03, row_heights=[0.7, 0.3])
# Price and signals plot
fig.add_trace(go.Scatter(x=self.data['date'], y=self.data['close'],
mode='lines', name='Close Price',
line=dict(color='grey', width=1)),
row=1, col=1)
fig.add_trace(go.Scatter(x=buy_signals['date'], y=buy_signals['close'],
mode='markers', name='Buy Signal',
marker=dict(symbol='triangle-up', size=10, color='green')),
row=1, col=1)
fig.add_trace(go.Scatter(x=sell_signals['date'], y=sell_signals['close'],
mode='markers', name='Sell Signal',
marker=dict(symbol='triangle-down', size=10, color='red')),
row=1, col=1)
# RSI plot
fig.add_trace(go.Scatter(x=self.data['date'], y=self.data['rsi'],
mode='lines', name='RSI',
line=dict(color='blue', width=1)),
row=2, col=1)
fig.add_hline(y=self.rsi_buy_threshold, line_dash="dash", line_color="green", row=2, col=1)
fig.add_hline(y=self.rsi_sell_threshold, line_dash="dash", line_color="red", row=2, col=1)
fig.update_layout(title=f'Close Price with Buy/Sell Signals and RSI for {self.file_path}',
xaxis_title='Date',
yaxis_title='Close Price',
height=800,
showlegend=True)
fig.update_yaxes(title_text="RSI", row=2, col=1)
return fig
def print_results(self, dollar_strategy, buy_and_hold_return):
print('-----------------')
print(f'Strategy for {self.file_path}')
print('Total Strategy Return:')
print(f'""{(dollar_strategy - 1) * 100:.2f}%')
print('Buy and Hold Return:')
print(f'{buy_and_hold_return * 100:.2f}%')
def run(self, show_plot=False, export_report=False):
self.prepare_data()
self.execute_trades()
dollar_strategy, buy_and_hold_return = self.calculate_returns()
report = self.generate_reports(export_report)
self.plot_signals()
self.print_results(dollar_strategy, buy_and_hold_return)
return dollar_strategy, buy_and_hold_return, report
# Add this new function after the TradingStrategy class
def grid_search(file_path, start_date, end_date, long_only, param_grid):
results = []
total_combinations = (
len(param_grid['rsi_window']) *
len(param_grid['rsi_buy_threshold']) *
len(param_grid['rsi_sell_threshold']) *
len(param_grid['ma_window'])
)
progress_bar = st.progress(0)
status_text = st.empty()
results_area = st.empty()
try:
for i, (rsi_window, rsi_buy_threshold, rsi_sell_threshold, ma_window) in enumerate(
[(w, b, s, m)
for w in param_grid['rsi_window']
for b in param_grid['rsi_buy_threshold']
for s in param_grid['rsi_sell_threshold']
for m in param_grid['ma_window']]
):
param_dict = {
'rsi_window': rsi_window,
'rsi_buy_threshold': rsi_buy_threshold,
'rsi_sell_threshold': rsi_sell_threshold,
'ma_window': ma_window
}
strategy = TradingStrategy(file_path, start_date=start_date, long_only=long_only)
strategy.rsi_window = rsi_window
strategy.rsi_buy_threshold = rsi_buy_threshold
strategy.rsi_sell_threshold = rsi_sell_threshold
strategy.ma_window = ma_window
dollar_strategy, buy_and_hold_return, report = strategy.run(show_plot=False, export_report=False)
results.append({
**param_dict,
'strategy_return': (dollar_strategy - 1) * 100,
'buy_and_hold_return': buy_and_hold_return * 100,
'cumulative_return': report.get('Cumulative Return', 0) * 100,
'max_drawdown': report.get('Max Drawdown', 0) * 100
})
# Update Streamlit progress bar and status
progress = (i + 1) / total_combinations
progress_bar.progress(progress)
status_text.text(f"Progress: {i+1}/{total_combinations}")
# Update results in real-time
if (i + 1) % 10 == 0 or (i + 1) == total_combinations: # Update every 10 iterations or at the end
temp_df = pd.DataFrame(results)
results_area.dataframe(temp_df)
time.sleep(0.1) # Small delay to allow Streamlit to update the UI
except Exception as e:
st.error(f"An error occurred during grid search: {str(e)}")
st.error(traceback.format_exc())
return None
return pd.DataFrame(results)
# Streamlit UI
st.title('Backtest Trading Strategy UI')
# File input
file_path = st.text_input('Enter File Path', value='BTC-USD_1d_max.csv')
# Date range
start_date = st.date_input('Start Date', value=pd.to_datetime('2024-01-01'))
end_date = st.date_input('End Date', value=pd.to_datetime('today'))
# Strategy parameters
long_only = st.checkbox('Long Only', value=False)
# Parameters for single backtest
rsi_window = st.slider('RSI Window', min_value=2, max_value=30, value=8)
rsi_buy_threshold = st.slider('RSI Buy Threshold', min_value=0, max_value=100, value=45)
rsi_sell_threshold = st.slider('RSI Sell Threshold', min_value=0, max_value=100, value=60)
ma_window = st.slider('Moving Average Window', min_value=5, max_value=50, value=15)
if st.button('Run Single Backtest'):
strategy = TradingStrategy(file_path, start_date=start_date.strftime('%Y-%m-%d'), long_only=long_only)
strategy.rsi_window = rsi_window
strategy.rsi_buy_threshold = rsi_buy_threshold
strategy.rsi_sell_threshold = rsi_sell_threshold
strategy.ma_window = ma_window
dollar_strategy, buy_and_hold_return, report = strategy.run(show_plot=False, export_report=False)
# Display results
st.subheader('Backtest Results')
st.write(f'Strategy Return: {(dollar_strategy - 1) * 100:.2f}%')
st.write(f'Buy and Hold Return: {buy_and_hold_return * 100:.2f}%')
# Display key statistics
st.subheader('Key Statistics')
col1, col2 = st.columns(2)
def get_metric(key, default="N/A"):
value = report.get(key, default)
if isinstance(value, (int, float)):
if key in ['Cumulative Return', 'Max Drawdown']:
return f"{value * 100:.2f}%"
return f"{value:.2f}"
return str(value)
with col1:
st.metric("Cumulative Return", get_metric('Cumulative Return'))
st.caption("The total return of the strategy for the entire period.")
st.metric("Max Drawdown", get_metric('Max Drawdown'))
st.caption("The maximum observed loss from a peak to a trough, before a new peak is attained.")
# Display strategy stats
st.subheader('Detailed Strategy Stats')
stats_df = pd.DataFrame(report, index=[0])
st.dataframe(stats_df)
# Export CSV for strategy stats
csv_buffer = io.StringIO()
stats_df.to_csv(csv_buffer, index=True)
csv_str = csv_buffer.getvalue()
st.download_button(
label="Download Strategy Stats CSV",
data=csv_str,
file_name=f"{strategy.file_path}_strategy_stats.csv",
mime="text/csv"
)
# Plot signals
st.subheader('Trade Signals')
fig = strategy.plot_signals()
st.plotly_chart(fig, use_container_width=True)
# Display trade data
st.subheader('Trade Data')
if strategy.trades:
trades_df = pd.DataFrame(strategy.trades)
trades_df['buy_date'] = pd.to_datetime(trades_df['buy_date'])
trades_df['sell_date'] = pd.to_datetime(trades_df['sell_date'])
trades_df['holding_period'] = (trades_df['sell_date'] - trades_df['buy_date']).dt.days
trades_df['return'] = trades_df['return'] * 100 # Convert to percentage
# Format the dataframe
trades_df = trades_df.rename(columns={
'buy_date': 'Buy Date',
'buy_price': 'Buy Price',
'sell_date': 'Sell Date',
'sell_price': 'Sell Price',
'return': 'Return (%)',
'holding_period': 'Holding Period (days)'
})
trades_df = trades_df.round({
'Buy Price': 2,
'Sell Price': 2,
'Return (%)': 2,
'Holding Period (days)': 0
})
# Display the trades
st.dataframe(trades_df)
# Provide download link for trades CSV
csv_buffer = io.StringIO()
trades_df.to_csv(csv_buffer, index=False)
csv_str = csv_buffer.getvalue()
st.download_button(
label="Download Trades CSV",
data=csv_str,
file_name=f"{strategy.file_path}_trades.csv",
mime="text/csv"
)
# Display some trade statistics
st.subheader('Trade Statistics')
total_trades = len(trades_df)
winning_trades = len(trades_df[trades_df['Return (%)'] > 0])
losing_trades = len(trades_df[trades_df['Return (%)'] < 0])
avg_return = trades_df['Return (%)'].mean()
avg_holding_period = trades_df['Holding Period (days)'].mean()
col1, col2 = st.columns(2)
with col1:
st.metric("Total Trades", total_trades)
st.metric("Winning Trades", winning_trades)
st.metric("Losing Trades", losing_trades)
with col2:
st.metric("Average Return", f"{avg_return:.2f}%")
st.metric("Average Holding Period", f"{avg_holding_period:.1f} days")
else:
st.write("No trades were executed during this backtest.")
# Add this new section after the single backtest button
st.subheader('Grid Search')
# Grid search parameters
use_grid_search = st.checkbox('Use Grid Search', value=False)
if use_grid_search:
col1, col2 = st.columns(2)
with col1:
rsi_window_min = st.number_input('RSI Window Min', min_value=2, max_value=30, value=5)
rsi_window_max = st.number_input('RSI Window Max', min_value=2, max_value=30, value=15)
rsi_window_step = st.number_input('RSI Window Step', min_value=1, max_value=5, value=2)
rsi_buy_min = st.number_input('RSI Buy Threshold Min', min_value=0, max_value=100, value=30)
rsi_buy_max = st.number_input('RSI Buy Threshold Max', min_value=0, max_value=100, value=50)
rsi_buy_step = st.number_input('RSI Buy Threshold Step', min_value=1, max_value=10, value=5)
with col2:
rsi_sell_min = st.number_input('RSI Sell Threshold Min', min_value=0, max_value=100, value=50)
rsi_sell_max = st.number_input('RSI Sell Threshold Max', min_value=0, max_value=100, value=70)
rsi_sell_step = st.number_input('RSI Sell Threshold Step', min_value=1, max_value=10, value=5)
ma_window_min = st.number_input('MA Window Min', min_value=5, max_value=50, value=10)
ma_window_max = st.number_input('MA Window Max', min_value=5, max_value=50, value=20)
ma_window_step = st.number_input('MA Window Step', min_value=1, max_value=5, value=2)
if st.button('Run Grid Search'):
st.write("Starting grid search...")
param_grid = {
'rsi_window': range(rsi_window_min, rsi_window_max + 1, rsi_window_step),
'rsi_buy_threshold': range(rsi_buy_min, rsi_buy_max + 1, rsi_buy_step),
'rsi_sell_threshold': range(rsi_sell_min, rsi_sell_max + 1, rsi_sell_step),
'ma_window': range(ma_window_min, ma_window_max + 1, ma_window_step)
}
results_df = grid_search(file_path, start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d'), long_only, param_grid)
if results_df is not None and not results_df.empty:
st.subheader('Final Grid Search Results')
st.dataframe(results_df)
# Sort results by strategy return and display top 5
top_results = results_df.sort_values('strategy_return', ascending=False).head()
st.subheader('Top 5 Strategies')
st.dataframe(top_results)
# Provide download link for grid search results CSV
csv_buffer = io.StringIO()
results_df.to_csv(csv_buffer, index=False)
csv_str = csv_buffer.getvalue()
st.download_button(
label="Download Grid Search Results CSV",
data=csv_str,
file_name="grid_search_results.csv",
mime="text/csv"
)
else:
st.error("Grid search failed or returned no results.")
if __name__ == "__main__":
st.set_option('deprecation.showPyplotGlobalUse', False)