diff --git a/.gitignore b/.gitignore index 36b13f1..9e45c54 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ + +output/ + # ---> Python # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/Backtest_Report_AAPL.html b/Backtest_Report_AAPL.html deleted file mode 100644 index f9686a8..0000000 --- a/Backtest_Report_AAPL.html +++ /dev/null @@ -1,61 +0,0 @@ - - - - - Backtest_Report_AAPL.html - - - - - -
- - - - - \ No newline at end of file diff --git a/Backtest_Report_TSLA.html b/Backtest_Report_TSLA.html deleted file mode 100644 index c3be272..0000000 --- a/Backtest_Report_TSLA.html +++ /dev/null @@ -1,61 +0,0 @@ - - - - - Backtest_Report_TSLA.html - - - - - -
- - - - - \ No newline at end of file diff --git a/Backtest_Trades_AAPL.xlsx b/Backtest_Trades_AAPL.xlsx deleted file mode 100644 index 45be3f3..0000000 Binary files a/Backtest_Trades_AAPL.xlsx and /dev/null differ diff --git a/Backtest_Trades_TSLA.xlsx b/Backtest_Trades_TSLA.xlsx deleted file mode 100644 index 3b21ccc..0000000 Binary files a/Backtest_Trades_TSLA.xlsx and /dev/null differ diff --git a/backtester.py b/backtester.py index f98b3e1..fa33d00 100644 --- a/backtester.py +++ b/backtester.py @@ -1,4 +1,6 @@ import os +import csv +import argparse import pandas as pd import pandas_ta as ta from dotenv import load_dotenv @@ -18,6 +20,10 @@ load_dotenv() API_KEY = os.getenv('ALPACA_API_KEY') SECRET_KEY = os.getenv('ALPACA_SECRET_KEY') +PATH_TO_OUTPUT = "output/" + +# Ensure output folder exists +os.makedirs(PATH_TO_OUTPUT, exist_ok=True) # --- KLASSE: DATEN-ENGINE --- class DataEngine: @staticmethod @@ -27,7 +33,7 @@ class DataEngine: request = StockBarsRequest( symbol_or_symbols=[symbol], - timeframe=TimeFrame.Day, + timeframe=TimeFrame.Hour, start=start_date ) @@ -43,18 +49,52 @@ class DataEngine: # --- KLASSE: STRATEGIE (RSI) --- class MyRsiStrategy(Strategy): # Parameter - diese können später optimiert werden - rsi_period = 25 - rsi_low = 30 - rsi_high = 70 + rsi_period = 29 + rsi_low = 35 + rsi_high = 75 + # Position sizing: fraction of equity to allocate per new trade (0-1) + position_size_pct = 0.1 + # Fallback fixed minimum shares to buy if computed shares is 0 + min_shares = 1 + # ATR-based stop-loss parameters + atr_period = 18 + # Stop-loss multiplier applied to ATR (e.g. 3 -> stop = entry - 3 * ATR) + stop_loss_atr_multiplier = 4.0 def init(self): # Indikator berechnen (self.I stellt sicher, dass er im Chart erscheint) self.rsi = self.I(ta.rsi, pd.Series(self.data.Close), length=self.rsi_period) + # Average True Range for volatility-based stops + self.atr = self.I( + ta.atr, + pd.Series(self.data.High), + pd.Series(self.data.Low), + pd.Series(self.data.Close), + length=self.atr_period, + ) def next(self): # KAUFEN: Wenn RSI die untere Grenze von unten nach oben kreuzt if crossover(self.rsi, self.rsi_low): - self.buy() + price = float(self.data.Close[-1]) + # Use a conservative fraction of current equity to size the position + max_invest = max(0, self.equity * self.position_size_pct) + shares = int(max_invest // price) + if shares < self.min_shares: + shares = self.min_shares + # Compute ATR-based stop-loss price below entry + atr_value = float(self.atr[-1]) if not pd.isna(self.atr[-1]) else None + if atr_value and atr_value > 0: + stop_price = round(price - (self.stop_loss_atr_multiplier * atr_value), 6) + # Ensure stop is below price + if stop_price >= price: + stop_price = round(price * 0.99, 6) + else: + # Fallback to a small percent-based stop if ATR not ready + stop_price = round(price * 0.98, 6) + + # Place an integer-sized buy order with SL (avoids relative-size cancellation) + self.buy(size=shares, sl=stop_price) # VERKAUFEN: Wenn RSI die obere Grenze von oben nach unten kreuzt elif crossover(self.rsi_high, self.rsi): @@ -62,18 +102,17 @@ class MyRsiStrategy(Strategy): self.position.close() # --- HAUPTPROGRAMM --- -def run_backtest(symbol="AAPL", cash=10000, commission=0.002): +def run_backtest(symbol="AAPL", days=365, cash=1000, commission=0.002): # 1. Daten laden print(f"Lade Daten für {symbol}...") - data = DataEngine.get_alpaca_data(symbol) + data = DataEngine.get_alpaca_data(symbol, days=days) # 2. Backtest initialisieren - # commission=0.002 bedeutet 0.2% Gebühren pro Trade - bt = Backtest(data, MyRsiStrategy, cash=cash, commission=commission) + bt = Backtest(data, MyRsiStrategy, cash=cash, commission=commission, finalize_trades=True) # 3. Backtest ausführen stats = bt.run() - + # --- AUSWERTUNG --- print("\n" + "="*30) print(f"BACKTEST ERGEBNISSE FÜR {symbol}") @@ -86,20 +125,85 @@ def run_backtest(symbol="AAPL", cash=10000, commission=0.002): print(f"Win Rate [%]: {stats['Win Rate [%]']:.2f}%") print("="*30) + result = { + 'symbol': symbol, + 'start_cash': cash, + 'end_equity': stats.get('Equity Final [$]', None), + 'return_pct': stats.get('Return [%]', None), + 'max_drawdown_pct': stats.get('Max. Drawdown [%]', None), + 'n_trades': stats.get('# Trades', None), + 'win_rate_pct': stats.get('Win Rate [%]', None), + } + # 4. EXPORT: Trades nach Excel - trades = stats['_trades'] - if not trades.empty: - # Dauer der Trades berechnen + trades = stats.get('_trades') + if trades is not None and not trades.empty: trades['Duration'] = trades['ExitTime'] - trades['EntryTime'] - excel_name = f"Backtest_Trades_{symbol}.xlsx" + excel_name = os.path.join(PATH_TO_OUTPUT, f"Backtest_Trades_{symbol}.xlsx") trades.to_excel(excel_name) print(f"✅ Excel-Liste gespeichert: {excel_name}") + result['trades_file'] = excel_name # 5. EXPORT: Interaktiver HTML Report - report_name = f"Backtest_Report_{symbol}.html" + report_name = os.path.join(PATH_TO_OUTPUT, f"Backtest_Report_{symbol}.html") bt.plot(filename=report_name, open_browser=False) print(f"✅ Interaktiver Chart gespeichert: {report_name}") + result['report_file'] = report_name + + return result + + +def run_backtests(symbols, days=365, cash=1000, commission=0.002, summary_name=None): + summary = [] + for sym in symbols: + try: + res = run_backtest(symbol=sym, days=days, cash=cash, commission=commission) + summary.append(res) + except Exception as e: + print(f"⚠️ Fehler beim Backtest für {sym}: {e}") + summary.append({'symbol': sym, 'error': str(e)}) + + # Write summary CSV + if summary_name is None: + ts = datetime.now().strftime('%Y%m%d_%H%M%S') + summary_name = os.path.join(PATH_TO_OUTPUT, f"backtest_summary_{ts}.csv") + + keys = set() + for row in summary: + keys.update(row.keys()) + keys = list(keys) + + with open(summary_name, 'w', newline='', encoding='utf-8') as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=keys) + writer.writeheader() + for row in summary: + writer.writerow(row) + + print(f"\n✅ Zusammenfassung gespeichert: {summary_name}") + return summary_name if __name__ == "__main__": - # Starte den Backtest - run_backtest(symbol="AAPL", cash=10000) \ No newline at end of file + parser = argparse.ArgumentParser(description='Batch backtester for multiple stocks') + parser.add_argument('--tickers', type=str, help='Comma-separated list of tickers, e.g. AAPL,MSFT,TSLA') + parser.add_argument('--file', type=str, help='Path to a file containing one ticker per line') + parser.add_argument('--days', type=int, default=365, help='Days of historical data to fetch') + parser.add_argument('--cash', type=float, default=1000, help='Starting cash per backtest') + parser.add_argument('--commission', type=float, default=0.002, help='Commission (fraction), e.g. 0.002') + args = parser.parse_args() + + symbols = [] + if args.tickers: + symbols = [s.strip().upper() for s in args.tickers.split(',') if s.strip()] + elif args.file: + if os.path.exists(args.file): + with open(args.file, 'r', encoding='utf-8') as f: + symbols = [line.strip().upper() for line in f if line.strip()] + else: + print(f"Ticker-Datei nicht gefunden: {args.file}") + raise SystemExit(1) + else: + symbols = ["AAPL"] + + print(f"Starte Batch-Backtest für: {symbols}") + summary_csv = run_backtests(symbols, days=args.days, cash=args.cash, commission=args.commission) + print(f"Fertig. Übersicht: {summary_csv}") \ No newline at end of file diff --git a/batch_optimizer.py b/batch_optimizer.py new file mode 100644 index 0000000..1e7c0f9 --- /dev/null +++ b/batch_optimizer.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +import argparse +import json +import os +import concurrent.futures +from optimizer import run_optimized_backtest +import optimizers + + +def load_symbols_from_file(path): + with open(path, 'r', encoding='utf-8') as f: + return [line.strip().upper() for line in f if line.strip()] + + +def main(): + parser = argparse.ArgumentParser(description='Run optimizer for multiple tickers') + parser.add_argument('--tickers', type=str, help='Comma-separated list of tickers, e.g. AAPL,MSFT') + parser.add_argument('--file', type=str, help='Path to a file containing one ticker per line') + parser.add_argument('--symbols', nargs='*', help='Space-separated tickers') + parser.add_argument('--json', type=str, help='Path to a JSON file with either an array of tickers or an object with a "tickers" key') + parser.add_argument('--workers', type=int, default=1, help='Number of parallel workers (processes) to use') + parser.add_argument('--optimizers', type=str, help='Comma-separated optimizer names from optimizers.py, e.g. RsiOptimizer') + args = parser.parse_args() + + symbols = [] + if args.tickers: + symbols = [s.strip().upper() for s in args.tickers.split(',') if s.strip()] + elif args.file: + if os.path.exists(args.file): + symbols = load_symbols_from_file(args.file) + else: + print(f"Ticker file not found: {args.file}") + raise SystemExit(1) + elif args.json: + if os.path.exists(args.json): + with open(args.json, 'r', encoding='utf-8') as f: + data = json.load(f) + if isinstance(data, list): + symbols = [s.strip().upper() for s in data if isinstance(s, str) and s.strip()] + elif isinstance(data, dict): + # support {'tickers': [...]} or {'symbols': [...]} keys + arr = data.get('tickers') or data.get('symbols') + if isinstance(arr, list): + symbols = [s.strip().upper() for s in arr if isinstance(s, str) and s.strip()] + else: + print(f"JSON file does not contain a list under 'tickers' or 'symbols': {args.json}") + raise SystemExit(1) + else: + print(f"JSON root must be an array or object: {args.json}") + raise SystemExit(1) + else: + print(f"JSON file not found: {args.json}") + raise SystemExit(1) + elif args.symbols: + symbols = [s.upper() for s in args.symbols] + else: + parser.print_help() + raise SystemExit(1) + + # Resolve optimizers + if args.optimizers: + optimizer_names = [s.strip() for s in args.optimizers.split(',') if s.strip()] + else: + optimizer_names = ['RsiOptimizer'] + + # Build job list (symbol, optimizer_name) pairs + jobs = [(sym, opt_name) for sym in symbols for opt_name in optimizer_names] + + workers = max(1, int(args.workers or 1)) + results = [] + + if workers > 1: + print(f"Running optimizations in parallel with {workers} workers...") + with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: + # Use module-level runner to ensure picklable callable + future_to_job = {executor.submit(optimizers.run_optimizer, sym, opt_name): (sym, opt_name) for (sym, opt_name) in jobs} + for fut in concurrent.futures.as_completed(future_to_job): + sym, opt_name = future_to_job[fut] + try: + res = fut.result() + if isinstance(res, dict): + # tag result with optimizer name + res['optimizer'] = opt_name + results.append(res) + except Exception as e: + print(f"Error optimizing {sym} with {opt_name}: {e}") + else: + for sym, opt_name in jobs: + try: + print(f"\n--- Optimizing {sym} using {opt_name} ---") + res = optimizers.run_optimizer(sym, opt_name) + if isinstance(res, dict): + res['optimizer'] = opt_name + results.append(res) + except Exception as e: + print(f"Error optimizing {sym} with {opt_name}: {e}") + + # Save results to Excel + if results: + try: + import pandas as pd + from datetime import datetime + + ts = datetime.now().strftime('%Y%m%d_%H%M%S') + out_name = os.path.join('output', f'optimized_summary_{ts}.xlsx') + df = pd.DataFrame(results) + os.makedirs('output', exist_ok=True) + df.to_excel(out_name, index=False) + print(f"\n✅ Optimized summary saved: {out_name}") + except Exception as e: + print(f"Failed to write Excel summary: {e}") + + +if __name__ == '__main__': + main() diff --git a/checkStocks.py b/checkStocks.py index 4f4564f..3878246 100644 --- a/checkStocks.py +++ b/checkStocks.py @@ -63,7 +63,7 @@ def scan_markets(tickers, rsi_period=25): # Als Excel speichern filename = f"markt_scan_{datetime.now().strftime('%Y%m%d')}.xlsx" - scan_df.to_excel(filename, index=False) + scan_df.to_excel("output/"+filename, index=False) print(f"\n✅ Scan abgeschlossen. Ergebnisse gespeichert in: {filename}") return scan_df diff --git a/markt_scan_20260121.xlsx b/markt_scan_20260121.xlsx deleted file mode 100644 index 21a0693..0000000 Binary files a/markt_scan_20260121.xlsx and /dev/null differ diff --git a/optimizer.py b/optimizer.py index a30a978..4c81ebf 100644 --- a/optimizer.py +++ b/optimizer.py @@ -4,8 +4,8 @@ import pandas_ta as ta from dotenv import load_dotenv from datetime import datetime, timedelta -from backtesting import Backtest, Strategy -from backtesting.lib import crossover +from backtesting import Backtest +from strategies import RsiStrategy from alpaca.data.historical import StockHistoricalDataClient from alpaca.data.requests import StockBarsRequest from alpaca.data.timeframe import TimeFrame @@ -14,60 +14,99 @@ from alpaca.data.timeframe import TimeFrame load_dotenv() API_KEY = os.getenv('ALPACA_API_KEY') SECRET_KEY = os.getenv('ALPACA_SECRET_KEY') +PATH_TO_OUTPUT = "output/" +os.makedirs(PATH_TO_OUTPUT, exist_ok=True) def get_data(symbol, days=365): client = StockHistoricalDataClient(API_KEY, SECRET_KEY) start_date = datetime.now() - timedelta(days=days) - request_params = StockBarsRequest(symbol_or_symbols=[symbol], timeframe=TimeFrame.Day, start=start_date) + request_params = StockBarsRequest(symbol_or_symbols=[symbol], timeframe=TimeFrame.Hour, start=start_date) df = client.get_stock_bars(request_params).df df = df.reset_index(level=0, drop=True) df.columns = [c.capitalize() for c in df.columns] df.index = df.index.tz_localize(None) return df -# --- 2. STRATEGIE MIT OPTIMIERBAREN PARAMETERN --- -class RsiStrategy(Strategy): - # Diese Klassenvariablen werden von der Optimize-Funktion überschrieben - rsi_period = 14 - rsi_lower = 30 - rsi_upper = 70 - - def init(self): - # Wichtig: Wir übergeben die Parameter an pandas_ta - self.rsi = self.I(ta.rsi, pd.Series(self.data.Close), length=self.rsi_period) - - def next(self): - if crossover(self.rsi, self.rsi_lower): - self.buy() - elif crossover(self.rsi_upper, self.rsi): - if self.position: - self.position.close() +# Strategy classes moved to strategies.py # --- 3. OPTIMIERUNGS-ENGINE --- -def run_optimized_backtest(symbol): +def run_optimized_backtest(symbol, strategy_cls=RsiStrategy, optimize_kwargs=None, report_tag=None): data = get_data(symbol) - bt = Backtest(data, RsiStrategy, cash=10000, commission=0.001) + bt = Backtest(data, strategy_cls, cash=10000, commission=0.001, finalize_trades=True) - print(f"--- Starte Optimierung für {symbol} ---") - - # Hier passiert die Magie: - stats = bt.optimize( - rsi_period=range(7, 30, 2), # Teste Perioden von 7 bis 29 in 2er Schritten - rsi_lower=range(20, 40, 5), # Teste Kaufsignale von 20 bis 35 - rsi_upper=range(60, 80, 5), # Teste Verkaufsignale von 60 bis 75 - maximize='Return [%]', # Wir wollen den höchsten Gewinn (oder 'Sharpe Ratio') - constraint=lambda p: p.rsi_upper > p.rsi_lower # Logik-Check - ) + print(f"--- Starte Optimierung für {symbol} using {strategy_cls.__name__} ---") + + # Build common optimization params for strategy if not provided + if optimize_kwargs is None: + optimize_kwargs = dict( + rsi_period=range(7, 30, 2), # Teste Perioden von 7 bis 29 in 2er Schritten + rsi_lower=range(20, 40, 5), # Teste Kaufsignale von 20 bis 35 + rsi_upper=range(60, 80, 5), # Teste Verkaufsignale von 60 bis 75 + maximize='Return [%]', # Wir wollen den höchsten Gewinn (oder 'Sharpe Ratio') + constraint=lambda p: p.rsi_upper > p.rsi_lower, # Logik-Check + ) + # Extend with ATR/stop params if strategy supports them + if hasattr(strategy_cls, 'atr_period'): + optimize_kwargs['atr_period'] = range(10, 20, 2) + if hasattr(strategy_cls, 'stop_loss_atr_multiplier'): + optimize_kwargs['stop_loss_atr_multiplier'] = [2.0, 2.5, 3.0, 3.5, 4.0] + + # Run optimization + stats = bt.optimize(**optimize_kwargs) print("\n--- BESTE PARAMETER GEFUNDEN ---") print(stats) print("\nDetails der besten Strategie:") - print(f"RSI Periode: {stats._strategy.rsi_period}") - print(f"RSI Untergrenze: {stats._strategy.rsi_lower}") - print(f"RSI Obergrenze: {stats._strategy.rsi_upper}") - + # Print only attributes that the strategy actually has + for attr in ('rsi_period', 'rsi_lower', 'rsi_upper', 'short_ema', 'long_ema', 'atr_period', 'stop_loss_atr_multiplier'): + if hasattr(stats._strategy, attr): + print(f"{attr.replace('_', ' ').capitalize()}: {getattr(stats._strategy, attr)}") # Speichere den Chart der besten Strategie - bt.plot(filename=f"optimized_report_{symbol}.html", open_browser=False) + tag = f"_{report_tag}" if report_tag else "" + out_path = os.path.join(PATH_TO_OUTPUT, f"optimized_report_{symbol}{tag}.html") + bt.plot(filename=out_path, open_browser=False) + print(f"Optimized report saved: {out_path}") + + # Build a result dict to return to callers + result = { + 'symbol': symbol, + 'rsi_period': getattr(stats._strategy, 'rsi_period', None), + 'rsi_lower': getattr(stats._strategy, 'rsi_lower', None), + 'rsi_upper': getattr(stats._strategy, 'rsi_upper', None), + 'short_ema': getattr(stats._strategy, 'short_ema', None), + 'long_ema': getattr(stats._strategy, 'long_ema', None), + 'atr_period': getattr(stats._strategy, 'atr_period', None), + 'stop_loss_atr_multiplier': getattr(stats._strategy, 'stop_loss_atr_multiplier', None), + 'return_pct': stats.get('Return [%]') if hasattr(stats, 'get') else None, + 'equity_final_$': stats.get('Equity Final [$]') if hasattr(stats, 'get') else None, + 'max_drawdown_pct': stats.get('Max. Drawdown [%]') if hasattr(stats, 'get') else None, + 'n_trades': stats.get('# Trades') if hasattr(stats, 'get') else None, + 'win_rate_pct': stats.get('Win Rate [%]') if hasattr(stats, 'get') else None, + } + + # Run a final backtest with the best-found parameters to export the full trades list + best_params = {} + for attr in ('rsi_period', 'rsi_lower', 'rsi_upper', 'short_ema', 'long_ema', 'atr_period', 'stop_loss_atr_multiplier'): + if hasattr(stats._strategy, attr): + best_params[attr] = getattr(stats._strategy, attr) + + try: + if best_params: + print(f"Running final backtest for {symbol} with best params: {best_params}") + final_stats = bt.run(**best_params) + trades = final_stats.get('_trades') + if trades is not None and not trades.empty: + # add duration column and export + trades['Duration'] = trades['ExitTime'] - trades['EntryTime'] + tag_name = report_tag if report_tag else strategy_cls.__name__ + trades_file = os.path.join(PATH_TO_OUTPUT, f"Backtest_Trades_{symbol}_{tag_name}.xlsx") + trades.to_excel(trades_file, index=False) + print(f"✅ Trades exported: {trades_file}") + result['trades_file'] = trades_file + except Exception as e: + print(f"Failed to run final backtest for trades export: {e}") + + return result if __name__ == "__main__": - run_optimized_backtest("AAPL") \ No newline at end of file + run_optimized_backtest("GOLD") \ No newline at end of file diff --git a/optimizers.py b/optimizers.py new file mode 100644 index 0000000..d2c5869 --- /dev/null +++ b/optimizers.py @@ -0,0 +1,69 @@ +import os +from typing import Dict, Any +import strategies +from optimizer import run_optimized_backtest + + +class Optimizer: + """Base optimizer wrapper tying a strategy to an optimization grid.""" + name = 'BaseOptimizer' + strategy_cls = None + optimize_kwargs: Dict[str, Any] = None + + @classmethod + def run(cls, symbol): + # pass optimizer name as report_tag so output files are unique per optimizer + return run_optimized_backtest(symbol, strategy_cls=cls.strategy_cls, optimize_kwargs=cls.optimize_kwargs, report_tag=cls.name) + + +class RsiOptimizer(Optimizer): + name = 'RsiOptimizer' + strategy_cls = strategies.RsiStrategy + def constraint(p): + return p.rsi_upper > p.rsi_lower + + optimize_kwargs = dict( + rsi_period=range(7, 30, 2), + rsi_lower=range(20, 40, 5), + rsi_upper=range(60, 80, 5), + maximize='Return [%]', + constraint=constraint, + atr_period=range(10, 20, 2), + stop_loss_atr_multiplier=[2.0, 2.5, 3.0, 3.5, 4.0], + ) + + +class CrossEmaOptimizer(Optimizer): + name = 'CrossEmaOptimizer' + strategy_cls = strategies.CrossEmaStrategy + + def constraint(p): + # ensure long EMA period is greater than short EMA period + return p.long_ema > p.short_ema + + optimize_kwargs = dict( + short_ema=range(5, 20, 3), + long_ema=range(20, 60, 5), + maximize='Return [%]', + constraint=constraint, + atr_period=range(10, 20, 2), + stop_loss_atr_multiplier=[2.0, 2.5, 3.0, 3.5], + finalize_trades=False, # run final backtest with best params + ) + + +def get_optimizer_by_name(name: str): + mapping = {cls.name: cls for cls in (RsiOptimizer, CrossEmaOptimizer)} + # allow passing class name as well + if name in mapping: + return mapping[name] + # try attribute lookup in module + cls = getattr(__import__('optimizers'), name, None) + return cls + + +def run_optimizer(symbol: str, optimizer_name: str): + cls = get_optimizer_by_name(optimizer_name) + if cls is None: + raise ValueError(f"Optimizer '{optimizer_name}' not found") + return cls.run(symbol) diff --git a/strategies.py b/strategies.py new file mode 100644 index 0000000..be5189d --- /dev/null +++ b/strategies.py @@ -0,0 +1,92 @@ +import pandas as pd +import pandas_ta as ta +from backtesting import Strategy +from backtesting.lib import crossover + + +class RsiStrategy(Strategy): + # Default parameters (can be overridden by optimizer) + rsi_period = 14 + rsi_lower = 30 + rsi_upper = 70 + # Position sizing and ATR stop parameters + position_size_pct = 0.1 + min_shares = 1 + atr_period = 14 + stop_loss_atr_multiplier = 3.0 + + def init(self): + self.rsi = self.I(ta.rsi, pd.Series(self.data.Close), length=self.rsi_period) + self.atr = self.I( + ta.atr, + pd.Series(self.data.High), + pd.Series(self.data.Low), + pd.Series(self.data.Close), + length=self.atr_period, + ) + + def next(self): + if crossover(self.rsi, self.rsi_lower): + price = float(self.data.Close[-1]) + max_invest = max(0, self.equity * self.position_size_pct) + shares = int(max_invest // price) + if shares < self.min_shares: + shares = self.min_shares + + atr_value = float(self.atr[-1]) if not pd.isna(self.atr[-1]) else None + if atr_value and atr_value > 0: + stop_price = round(price - (self.stop_loss_atr_multiplier * atr_value), 6) + if stop_price >= price: + stop_price = round(price * 0.99, 6) + else: + stop_price = round(price * 0.98, 6) + + self.buy(size=shares, sl=stop_price) + elif crossover(self.rsi_upper, self.rsi): + if self.position: + self.position.close() + + +class CrossEmaStrategy(Strategy): + """EMA crossover strategy with ATR-based stop-loss.""" + short_ema = 12 + long_ema = 26 + position_size_pct = 0.1 + min_shares = 1 + atr_period = 14 + stop_loss_atr_multiplier = 3.0 + + def init(self): + self.ema_short = self.I(ta.ema, pd.Series(self.data.Close), length=self.short_ema) + self.ema_long = self.I(ta.ema, pd.Series(self.data.Close), length=self.long_ema) + self.atr = self.I( + ta.atr, + pd.Series(self.data.High), + pd.Series(self.data.Low), + pd.Series(self.data.Close), + length=self.atr_period, + ) + + def next(self): + price = float(self.data.Close[-1]) + # Buy when short EMA crosses above long EMA + if crossover(self.ema_short, self.ema_long): + max_invest = max(0, self.equity * self.position_size_pct) + shares = int(max_invest // price) + if shares < self.min_shares: + shares = self.min_shares + + atr_value = float(self.atr[-1]) if not pd.isna(self.atr[-1]) else None + if atr_value and atr_value > 0: + stop_price = round(price - (self.stop_loss_atr_multiplier * atr_value), 6) + if stop_price >= price: + stop_price = round(price * 0.99, 6) + else: + stop_price = round(price * 0.98, 6) + + self.buy(size=shares, sl=stop_price) + + # Sell when short EMA crosses below long EMA + elif crossover(self.ema_long, self.ema_short): + if self.position: + self.position.close() diff --git a/tickers.json b/tickers.json new file mode 100644 index 0000000..c67c417 --- /dev/null +++ b/tickers.json @@ -0,0 +1,6 @@ +[ + "AAPL", + "MSFT", + "TSLA", + "GOOG" +] \ No newline at end of file diff --git a/tickers1.json b/tickers1.json new file mode 100644 index 0000000..159ce24 --- /dev/null +++ b/tickers1.json @@ -0,0 +1,19 @@ +[ + "AAPL", + "MSFT", + "TSLA", + "GOOG", + "AMZN", + "GOLD", + "NFLX", + "META", + "NVDA", + "BRK.B", + "JPM", + "V", + "DIS", + "PYPL", + "ADBE", + "INTC", + "CMCSA" +]