ADD: added optimizer workflow

This commit is contained in:
2026-01-23 18:22:56 +01:00
parent da68106666
commit fc54074a59
14 changed files with 503 additions and 178 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,6 @@
output/
# ---> Python # ---> Python
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

Binary file not shown.

Binary file not shown.

View File

@@ -1,4 +1,6 @@
import os import os
import csv
import argparse
import pandas as pd import pandas as pd
import pandas_ta as ta import pandas_ta as ta
from dotenv import load_dotenv from dotenv import load_dotenv
@@ -18,6 +20,10 @@ load_dotenv()
API_KEY = os.getenv('ALPACA_API_KEY') API_KEY = os.getenv('ALPACA_API_KEY')
SECRET_KEY = os.getenv('ALPACA_SECRET_KEY') SECRET_KEY = os.getenv('ALPACA_SECRET_KEY')
PATH_TO_OUTPUT = "output/"
# Ensure output folder exists
os.makedirs(PATH_TO_OUTPUT, exist_ok=True)
# --- KLASSE: DATEN-ENGINE --- # --- KLASSE: DATEN-ENGINE ---
class DataEngine: class DataEngine:
@staticmethod @staticmethod
@@ -27,7 +33,7 @@ class DataEngine:
request = StockBarsRequest( request = StockBarsRequest(
symbol_or_symbols=[symbol], symbol_or_symbols=[symbol],
timeframe=TimeFrame.Day, timeframe=TimeFrame.Hour,
start=start_date start=start_date
) )
@@ -43,18 +49,52 @@ class DataEngine:
# --- KLASSE: STRATEGIE (RSI) --- # --- KLASSE: STRATEGIE (RSI) ---
class MyRsiStrategy(Strategy): class MyRsiStrategy(Strategy):
# Parameter - diese können später optimiert werden # Parameter - diese können später optimiert werden
rsi_period = 25 rsi_period = 29
rsi_low = 30 rsi_low = 35
rsi_high = 70 rsi_high = 75
# Position sizing: fraction of equity to allocate per new trade (0-1)
position_size_pct = 0.1
# Fallback fixed minimum shares to buy if computed shares is 0
min_shares = 1
# ATR-based stop-loss parameters
atr_period = 18
# Stop-loss multiplier applied to ATR (e.g. 3 -> stop = entry - 3 * ATR)
stop_loss_atr_multiplier = 4.0
def init(self): def init(self):
# Indikator berechnen (self.I stellt sicher, dass er im Chart erscheint) # Indikator berechnen (self.I stellt sicher, dass er im Chart erscheint)
self.rsi = self.I(ta.rsi, pd.Series(self.data.Close), length=self.rsi_period) self.rsi = self.I(ta.rsi, pd.Series(self.data.Close), length=self.rsi_period)
# Average True Range for volatility-based stops
self.atr = self.I(
ta.atr,
pd.Series(self.data.High),
pd.Series(self.data.Low),
pd.Series(self.data.Close),
length=self.atr_period,
)
def next(self): def next(self):
# KAUFEN: Wenn RSI die untere Grenze von unten nach oben kreuzt # KAUFEN: Wenn RSI die untere Grenze von unten nach oben kreuzt
if crossover(self.rsi, self.rsi_low): if crossover(self.rsi, self.rsi_low):
self.buy() price = float(self.data.Close[-1])
# Use a conservative fraction of current equity to size the position
max_invest = max(0, self.equity * self.position_size_pct)
shares = int(max_invest // price)
if shares < self.min_shares:
shares = self.min_shares
# Compute ATR-based stop-loss price below entry
atr_value = float(self.atr[-1]) if not pd.isna(self.atr[-1]) else None
if atr_value and atr_value > 0:
stop_price = round(price - (self.stop_loss_atr_multiplier * atr_value), 6)
# Ensure stop is below price
if stop_price >= price:
stop_price = round(price * 0.99, 6)
else:
# Fallback to a small percent-based stop if ATR not ready
stop_price = round(price * 0.98, 6)
# Place an integer-sized buy order with SL (avoids relative-size cancellation)
self.buy(size=shares, sl=stop_price)
# VERKAUFEN: Wenn RSI die obere Grenze von oben nach unten kreuzt # VERKAUFEN: Wenn RSI die obere Grenze von oben nach unten kreuzt
elif crossover(self.rsi_high, self.rsi): elif crossover(self.rsi_high, self.rsi):
@@ -62,14 +102,13 @@ class MyRsiStrategy(Strategy):
self.position.close() self.position.close()
# --- HAUPTPROGRAMM --- # --- HAUPTPROGRAMM ---
def run_backtest(symbol="AAPL", cash=10000, commission=0.002): def run_backtest(symbol="AAPL", days=365, cash=1000, commission=0.002):
# 1. Daten laden # 1. Daten laden
print(f"Lade Daten für {symbol}...") print(f"Lade Daten für {symbol}...")
data = DataEngine.get_alpaca_data(symbol) data = DataEngine.get_alpaca_data(symbol, days=days)
# 2. Backtest initialisieren # 2. Backtest initialisieren
# commission=0.002 bedeutet 0.2% Gebühren pro Trade bt = Backtest(data, MyRsiStrategy, cash=cash, commission=commission, finalize_trades=True)
bt = Backtest(data, MyRsiStrategy, cash=cash, commission=commission)
# 3. Backtest ausführen # 3. Backtest ausführen
stats = bt.run() stats = bt.run()
@@ -86,20 +125,85 @@ def run_backtest(symbol="AAPL", cash=10000, commission=0.002):
print(f"Win Rate [%]: {stats['Win Rate [%]']:.2f}%") print(f"Win Rate [%]: {stats['Win Rate [%]']:.2f}%")
print("="*30) print("="*30)
result = {
'symbol': symbol,
'start_cash': cash,
'end_equity': stats.get('Equity Final [$]', None),
'return_pct': stats.get('Return [%]', None),
'max_drawdown_pct': stats.get('Max. Drawdown [%]', None),
'n_trades': stats.get('# Trades', None),
'win_rate_pct': stats.get('Win Rate [%]', None),
}
# 4. EXPORT: Trades nach Excel # 4. EXPORT: Trades nach Excel
trades = stats['_trades'] trades = stats.get('_trades')
if not trades.empty: if trades is not None and not trades.empty:
# Dauer der Trades berechnen
trades['Duration'] = trades['ExitTime'] - trades['EntryTime'] trades['Duration'] = trades['ExitTime'] - trades['EntryTime']
excel_name = f"Backtest_Trades_{symbol}.xlsx" excel_name = os.path.join(PATH_TO_OUTPUT, f"Backtest_Trades_{symbol}.xlsx")
trades.to_excel(excel_name) trades.to_excel(excel_name)
print(f"✅ Excel-Liste gespeichert: {excel_name}") print(f"✅ Excel-Liste gespeichert: {excel_name}")
result['trades_file'] = excel_name
# 5. EXPORT: Interaktiver HTML Report # 5. EXPORT: Interaktiver HTML Report
report_name = f"Backtest_Report_{symbol}.html" report_name = os.path.join(PATH_TO_OUTPUT, f"Backtest_Report_{symbol}.html")
bt.plot(filename=report_name, open_browser=False) bt.plot(filename=report_name, open_browser=False)
print(f"✅ Interaktiver Chart gespeichert: {report_name}") print(f"✅ Interaktiver Chart gespeichert: {report_name}")
result['report_file'] = report_name
return result
def run_backtests(symbols, days=365, cash=1000, commission=0.002, summary_name=None):
summary = []
for sym in symbols:
try:
res = run_backtest(symbol=sym, days=days, cash=cash, commission=commission)
summary.append(res)
except Exception as e:
print(f"⚠️ Fehler beim Backtest für {sym}: {e}")
summary.append({'symbol': sym, 'error': str(e)})
# Write summary CSV
if summary_name is None:
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
summary_name = os.path.join(PATH_TO_OUTPUT, f"backtest_summary_{ts}.csv")
keys = set()
for row in summary:
keys.update(row.keys())
keys = list(keys)
with open(summary_name, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=keys)
writer.writeheader()
for row in summary:
writer.writerow(row)
print(f"\n✅ Zusammenfassung gespeichert: {summary_name}")
return summary_name
if __name__ == "__main__": if __name__ == "__main__":
# Starte den Backtest parser = argparse.ArgumentParser(description='Batch backtester for multiple stocks')
run_backtest(symbol="AAPL", cash=10000) parser.add_argument('--tickers', type=str, help='Comma-separated list of tickers, e.g. AAPL,MSFT,TSLA')
parser.add_argument('--file', type=str, help='Path to a file containing one ticker per line')
parser.add_argument('--days', type=int, default=365, help='Days of historical data to fetch')
parser.add_argument('--cash', type=float, default=1000, help='Starting cash per backtest')
parser.add_argument('--commission', type=float, default=0.002, help='Commission (fraction), e.g. 0.002')
args = parser.parse_args()
symbols = []
if args.tickers:
symbols = [s.strip().upper() for s in args.tickers.split(',') if s.strip()]
elif args.file:
if os.path.exists(args.file):
with open(args.file, 'r', encoding='utf-8') as f:
symbols = [line.strip().upper() for line in f if line.strip()]
else:
print(f"Ticker-Datei nicht gefunden: {args.file}")
raise SystemExit(1)
else:
symbols = ["AAPL"]
print(f"Starte Batch-Backtest für: {symbols}")
summary_csv = run_backtests(symbols, days=args.days, cash=args.cash, commission=args.commission)
print(f"Fertig. Übersicht: {summary_csv}")

115
batch_optimizer.py Normal file
View File

@@ -0,0 +1,115 @@
#!/usr/bin/env python3
import argparse
import json
import os
import concurrent.futures
from optimizer import run_optimized_backtest
import optimizers
def load_symbols_from_file(path):
with open(path, 'r', encoding='utf-8') as f:
return [line.strip().upper() for line in f if line.strip()]
def main():
parser = argparse.ArgumentParser(description='Run optimizer for multiple tickers')
parser.add_argument('--tickers', type=str, help='Comma-separated list of tickers, e.g. AAPL,MSFT')
parser.add_argument('--file', type=str, help='Path to a file containing one ticker per line')
parser.add_argument('--symbols', nargs='*', help='Space-separated tickers')
parser.add_argument('--json', type=str, help='Path to a JSON file with either an array of tickers or an object with a "tickers" key')
parser.add_argument('--workers', type=int, default=1, help='Number of parallel workers (processes) to use')
parser.add_argument('--optimizers', type=str, help='Comma-separated optimizer names from optimizers.py, e.g. RsiOptimizer')
args = parser.parse_args()
symbols = []
if args.tickers:
symbols = [s.strip().upper() for s in args.tickers.split(',') if s.strip()]
elif args.file:
if os.path.exists(args.file):
symbols = load_symbols_from_file(args.file)
else:
print(f"Ticker file not found: {args.file}")
raise SystemExit(1)
elif args.json:
if os.path.exists(args.json):
with open(args.json, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
symbols = [s.strip().upper() for s in data if isinstance(s, str) and s.strip()]
elif isinstance(data, dict):
# support {'tickers': [...]} or {'symbols': [...]} keys
arr = data.get('tickers') or data.get('symbols')
if isinstance(arr, list):
symbols = [s.strip().upper() for s in arr if isinstance(s, str) and s.strip()]
else:
print(f"JSON file does not contain a list under 'tickers' or 'symbols': {args.json}")
raise SystemExit(1)
else:
print(f"JSON root must be an array or object: {args.json}")
raise SystemExit(1)
else:
print(f"JSON file not found: {args.json}")
raise SystemExit(1)
elif args.symbols:
symbols = [s.upper() for s in args.symbols]
else:
parser.print_help()
raise SystemExit(1)
# Resolve optimizers
if args.optimizers:
optimizer_names = [s.strip() for s in args.optimizers.split(',') if s.strip()]
else:
optimizer_names = ['RsiOptimizer']
# Build job list (symbol, optimizer_name) pairs
jobs = [(sym, opt_name) for sym in symbols for opt_name in optimizer_names]
workers = max(1, int(args.workers or 1))
results = []
if workers > 1:
print(f"Running optimizations in parallel with {workers} workers...")
with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
# Use module-level runner to ensure picklable callable
future_to_job = {executor.submit(optimizers.run_optimizer, sym, opt_name): (sym, opt_name) for (sym, opt_name) in jobs}
for fut in concurrent.futures.as_completed(future_to_job):
sym, opt_name = future_to_job[fut]
try:
res = fut.result()
if isinstance(res, dict):
# tag result with optimizer name
res['optimizer'] = opt_name
results.append(res)
except Exception as e:
print(f"Error optimizing {sym} with {opt_name}: {e}")
else:
for sym, opt_name in jobs:
try:
print(f"\n--- Optimizing {sym} using {opt_name} ---")
res = optimizers.run_optimizer(sym, opt_name)
if isinstance(res, dict):
res['optimizer'] = opt_name
results.append(res)
except Exception as e:
print(f"Error optimizing {sym} with {opt_name}: {e}")
# Save results to Excel
if results:
try:
import pandas as pd
from datetime import datetime
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
out_name = os.path.join('output', f'optimized_summary_{ts}.xlsx')
df = pd.DataFrame(results)
os.makedirs('output', exist_ok=True)
df.to_excel(out_name, index=False)
print(f"\n✅ Optimized summary saved: {out_name}")
except Exception as e:
print(f"Failed to write Excel summary: {e}")
if __name__ == '__main__':
main()

View File

@@ -63,7 +63,7 @@ def scan_markets(tickers, rsi_period=25):
# Als Excel speichern # Als Excel speichern
filename = f"markt_scan_{datetime.now().strftime('%Y%m%d')}.xlsx" filename = f"markt_scan_{datetime.now().strftime('%Y%m%d')}.xlsx"
scan_df.to_excel(filename, index=False) scan_df.to_excel("output/"+filename, index=False)
print(f"\n✅ Scan abgeschlossen. Ergebnisse gespeichert in: {filename}") print(f"\n✅ Scan abgeschlossen. Ergebnisse gespeichert in: {filename}")
return scan_df return scan_df

Binary file not shown.

View File

@@ -4,8 +4,8 @@ import pandas_ta as ta
from dotenv import load_dotenv from dotenv import load_dotenv
from datetime import datetime, timedelta from datetime import datetime, timedelta
from backtesting import Backtest, Strategy from backtesting import Backtest
from backtesting.lib import crossover from strategies import RsiStrategy
from alpaca.data.historical import StockHistoricalDataClient from alpaca.data.historical import StockHistoricalDataClient
from alpaca.data.requests import StockBarsRequest from alpaca.data.requests import StockBarsRequest
from alpaca.data.timeframe import TimeFrame from alpaca.data.timeframe import TimeFrame
@@ -14,60 +14,99 @@ from alpaca.data.timeframe import TimeFrame
load_dotenv() load_dotenv()
API_KEY = os.getenv('ALPACA_API_KEY') API_KEY = os.getenv('ALPACA_API_KEY')
SECRET_KEY = os.getenv('ALPACA_SECRET_KEY') SECRET_KEY = os.getenv('ALPACA_SECRET_KEY')
PATH_TO_OUTPUT = "output/"
os.makedirs(PATH_TO_OUTPUT, exist_ok=True)
def get_data(symbol, days=365): def get_data(symbol, days=365):
client = StockHistoricalDataClient(API_KEY, SECRET_KEY) client = StockHistoricalDataClient(API_KEY, SECRET_KEY)
start_date = datetime.now() - timedelta(days=days) start_date = datetime.now() - timedelta(days=days)
request_params = StockBarsRequest(symbol_or_symbols=[symbol], timeframe=TimeFrame.Day, start=start_date) request_params = StockBarsRequest(symbol_or_symbols=[symbol], timeframe=TimeFrame.Hour, start=start_date)
df = client.get_stock_bars(request_params).df df = client.get_stock_bars(request_params).df
df = df.reset_index(level=0, drop=True) df = df.reset_index(level=0, drop=True)
df.columns = [c.capitalize() for c in df.columns] df.columns = [c.capitalize() for c in df.columns]
df.index = df.index.tz_localize(None) df.index = df.index.tz_localize(None)
return df return df
# --- 2. STRATEGIE MIT OPTIMIERBAREN PARAMETERN --- # Strategy classes moved to strategies.py
class RsiStrategy(Strategy):
# Diese Klassenvariablen werden von der Optimize-Funktion überschrieben
rsi_period = 14
rsi_lower = 30
rsi_upper = 70
def init(self):
# Wichtig: Wir übergeben die Parameter an pandas_ta
self.rsi = self.I(ta.rsi, pd.Series(self.data.Close), length=self.rsi_period)
def next(self):
if crossover(self.rsi, self.rsi_lower):
self.buy()
elif crossover(self.rsi_upper, self.rsi):
if self.position:
self.position.close()
# --- 3. OPTIMIERUNGS-ENGINE --- # --- 3. OPTIMIERUNGS-ENGINE ---
def run_optimized_backtest(symbol): def run_optimized_backtest(symbol, strategy_cls=RsiStrategy, optimize_kwargs=None, report_tag=None):
data = get_data(symbol) data = get_data(symbol)
bt = Backtest(data, RsiStrategy, cash=10000, commission=0.001) bt = Backtest(data, strategy_cls, cash=10000, commission=0.001, finalize_trades=True)
print(f"--- Starte Optimierung für {symbol} ---") print(f"--- Starte Optimierung für {symbol} using {strategy_cls.__name__} ---")
# Hier passiert die Magie: # Build common optimization params for strategy if not provided
stats = bt.optimize( if optimize_kwargs is None:
rsi_period=range(7, 30, 2), # Teste Perioden von 7 bis 29 in 2er Schritten optimize_kwargs = dict(
rsi_lower=range(20, 40, 5), # Teste Kaufsignale von 20 bis 35 rsi_period=range(7, 30, 2), # Teste Perioden von 7 bis 29 in 2er Schritten
rsi_upper=range(60, 80, 5), # Teste Verkaufsignale von 60 bis 75 rsi_lower=range(20, 40, 5), # Teste Kaufsignale von 20 bis 35
maximize='Return [%]', # Wir wollen den höchsten Gewinn (oder 'Sharpe Ratio') rsi_upper=range(60, 80, 5), # Teste Verkaufsignale von 60 bis 75
constraint=lambda p: p.rsi_upper > p.rsi_lower # Logik-Check maximize='Return [%]', # Wir wollen den höchsten Gewinn (oder 'Sharpe Ratio')
) constraint=lambda p: p.rsi_upper > p.rsi_lower, # Logik-Check
)
# Extend with ATR/stop params if strategy supports them
if hasattr(strategy_cls, 'atr_period'):
optimize_kwargs['atr_period'] = range(10, 20, 2)
if hasattr(strategy_cls, 'stop_loss_atr_multiplier'):
optimize_kwargs['stop_loss_atr_multiplier'] = [2.0, 2.5, 3.0, 3.5, 4.0]
# Run optimization
stats = bt.optimize(**optimize_kwargs)
print("\n--- BESTE PARAMETER GEFUNDEN ---") print("\n--- BESTE PARAMETER GEFUNDEN ---")
print(stats) print(stats)
print("\nDetails der besten Strategie:") print("\nDetails der besten Strategie:")
print(f"RSI Periode: {stats._strategy.rsi_period}") # Print only attributes that the strategy actually has
print(f"RSI Untergrenze: {stats._strategy.rsi_lower}") for attr in ('rsi_period', 'rsi_lower', 'rsi_upper', 'short_ema', 'long_ema', 'atr_period', 'stop_loss_atr_multiplier'):
print(f"RSI Obergrenze: {stats._strategy.rsi_upper}") if hasattr(stats._strategy, attr):
print(f"{attr.replace('_', ' ').capitalize()}: {getattr(stats._strategy, attr)}")
# Speichere den Chart der besten Strategie # Speichere den Chart der besten Strategie
bt.plot(filename=f"optimized_report_{symbol}.html", open_browser=False) tag = f"_{report_tag}" if report_tag else ""
out_path = os.path.join(PATH_TO_OUTPUT, f"optimized_report_{symbol}{tag}.html")
bt.plot(filename=out_path, open_browser=False)
print(f"Optimized report saved: {out_path}")
# Build a result dict to return to callers
result = {
'symbol': symbol,
'rsi_period': getattr(stats._strategy, 'rsi_period', None),
'rsi_lower': getattr(stats._strategy, 'rsi_lower', None),
'rsi_upper': getattr(stats._strategy, 'rsi_upper', None),
'short_ema': getattr(stats._strategy, 'short_ema', None),
'long_ema': getattr(stats._strategy, 'long_ema', None),
'atr_period': getattr(stats._strategy, 'atr_period', None),
'stop_loss_atr_multiplier': getattr(stats._strategy, 'stop_loss_atr_multiplier', None),
'return_pct': stats.get('Return [%]') if hasattr(stats, 'get') else None,
'equity_final_$': stats.get('Equity Final [$]') if hasattr(stats, 'get') else None,
'max_drawdown_pct': stats.get('Max. Drawdown [%]') if hasattr(stats, 'get') else None,
'n_trades': stats.get('# Trades') if hasattr(stats, 'get') else None,
'win_rate_pct': stats.get('Win Rate [%]') if hasattr(stats, 'get') else None,
}
# Run a final backtest with the best-found parameters to export the full trades list
best_params = {}
for attr in ('rsi_period', 'rsi_lower', 'rsi_upper', 'short_ema', 'long_ema', 'atr_period', 'stop_loss_atr_multiplier'):
if hasattr(stats._strategy, attr):
best_params[attr] = getattr(stats._strategy, attr)
try:
if best_params:
print(f"Running final backtest for {symbol} with best params: {best_params}")
final_stats = bt.run(**best_params)
trades = final_stats.get('_trades')
if trades is not None and not trades.empty:
# add duration column and export
trades['Duration'] = trades['ExitTime'] - trades['EntryTime']
tag_name = report_tag if report_tag else strategy_cls.__name__
trades_file = os.path.join(PATH_TO_OUTPUT, f"Backtest_Trades_{symbol}_{tag_name}.xlsx")
trades.to_excel(trades_file, index=False)
print(f"✅ Trades exported: {trades_file}")
result['trades_file'] = trades_file
except Exception as e:
print(f"Failed to run final backtest for trades export: {e}")
return result
if __name__ == "__main__": if __name__ == "__main__":
run_optimized_backtest("AAPL") run_optimized_backtest("GOLD")

69
optimizers.py Normal file
View File

@@ -0,0 +1,69 @@
import os
from typing import Dict, Any
import strategies
from optimizer import run_optimized_backtest
class Optimizer:
"""Base optimizer wrapper tying a strategy to an optimization grid."""
name = 'BaseOptimizer'
strategy_cls = None
optimize_kwargs: Dict[str, Any] = None
@classmethod
def run(cls, symbol):
# pass optimizer name as report_tag so output files are unique per optimizer
return run_optimized_backtest(symbol, strategy_cls=cls.strategy_cls, optimize_kwargs=cls.optimize_kwargs, report_tag=cls.name)
class RsiOptimizer(Optimizer):
name = 'RsiOptimizer'
strategy_cls = strategies.RsiStrategy
def constraint(p):
return p.rsi_upper > p.rsi_lower
optimize_kwargs = dict(
rsi_period=range(7, 30, 2),
rsi_lower=range(20, 40, 5),
rsi_upper=range(60, 80, 5),
maximize='Return [%]',
constraint=constraint,
atr_period=range(10, 20, 2),
stop_loss_atr_multiplier=[2.0, 2.5, 3.0, 3.5, 4.0],
)
class CrossEmaOptimizer(Optimizer):
name = 'CrossEmaOptimizer'
strategy_cls = strategies.CrossEmaStrategy
def constraint(p):
# ensure long EMA period is greater than short EMA period
return p.long_ema > p.short_ema
optimize_kwargs = dict(
short_ema=range(5, 20, 3),
long_ema=range(20, 60, 5),
maximize='Return [%]',
constraint=constraint,
atr_period=range(10, 20, 2),
stop_loss_atr_multiplier=[2.0, 2.5, 3.0, 3.5],
finalize_trades=False, # run final backtest with best params
)
def get_optimizer_by_name(name: str):
mapping = {cls.name: cls for cls in (RsiOptimizer, CrossEmaOptimizer)}
# allow passing class name as well
if name in mapping:
return mapping[name]
# try attribute lookup in module
cls = getattr(__import__('optimizers'), name, None)
return cls
def run_optimizer(symbol: str, optimizer_name: str):
cls = get_optimizer_by_name(optimizer_name)
if cls is None:
raise ValueError(f"Optimizer '{optimizer_name}' not found")
return cls.run(symbol)

92
strategies.py Normal file
View File

@@ -0,0 +1,92 @@
import pandas as pd
import pandas_ta as ta
from backtesting import Strategy
from backtesting.lib import crossover
class RsiStrategy(Strategy):
# Default parameters (can be overridden by optimizer)
rsi_period = 14
rsi_lower = 30
rsi_upper = 70
# Position sizing and ATR stop parameters
position_size_pct = 0.1
min_shares = 1
atr_period = 14
stop_loss_atr_multiplier = 3.0
def init(self):
self.rsi = self.I(ta.rsi, pd.Series(self.data.Close), length=self.rsi_period)
self.atr = self.I(
ta.atr,
pd.Series(self.data.High),
pd.Series(self.data.Low),
pd.Series(self.data.Close),
length=self.atr_period,
)
def next(self):
if crossover(self.rsi, self.rsi_lower):
price = float(self.data.Close[-1])
max_invest = max(0, self.equity * self.position_size_pct)
shares = int(max_invest // price)
if shares < self.min_shares:
shares = self.min_shares
atr_value = float(self.atr[-1]) if not pd.isna(self.atr[-1]) else None
if atr_value and atr_value > 0:
stop_price = round(price - (self.stop_loss_atr_multiplier * atr_value), 6)
if stop_price >= price:
stop_price = round(price * 0.99, 6)
else:
stop_price = round(price * 0.98, 6)
self.buy(size=shares, sl=stop_price)
elif crossover(self.rsi_upper, self.rsi):
if self.position:
self.position.close()
class CrossEmaStrategy(Strategy):
"""EMA crossover strategy with ATR-based stop-loss."""
short_ema = 12
long_ema = 26
position_size_pct = 0.1
min_shares = 1
atr_period = 14
stop_loss_atr_multiplier = 3.0
def init(self):
self.ema_short = self.I(ta.ema, pd.Series(self.data.Close), length=self.short_ema)
self.ema_long = self.I(ta.ema, pd.Series(self.data.Close), length=self.long_ema)
self.atr = self.I(
ta.atr,
pd.Series(self.data.High),
pd.Series(self.data.Low),
pd.Series(self.data.Close),
length=self.atr_period,
)
def next(self):
price = float(self.data.Close[-1])
# Buy when short EMA crosses above long EMA
if crossover(self.ema_short, self.ema_long):
max_invest = max(0, self.equity * self.position_size_pct)
shares = int(max_invest // price)
if shares < self.min_shares:
shares = self.min_shares
atr_value = float(self.atr[-1]) if not pd.isna(self.atr[-1]) else None
if atr_value and atr_value > 0:
stop_price = round(price - (self.stop_loss_atr_multiplier * atr_value), 6)
if stop_price >= price:
stop_price = round(price * 0.99, 6)
else:
stop_price = round(price * 0.98, 6)
self.buy(size=shares, sl=stop_price)
# Sell when short EMA crosses below long EMA
elif crossover(self.ema_long, self.ema_short):
if self.position:
self.position.close()

6
tickers.json Normal file
View File

@@ -0,0 +1,6 @@
[
"AAPL",
"MSFT",
"TSLA",
"GOOG"
]

19
tickers1.json Normal file
View File

@@ -0,0 +1,19 @@
[
"AAPL",
"MSFT",
"TSLA",
"GOOG",
"AMZN",
"GOLD",
"NFLX",
"META",
"NVDA",
"BRK.B",
"JPM",
"V",
"DIS",
"PYPL",
"ADBE",
"INTC",
"CMCSA"
]