Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save pishguy/9cfadce875bcc2a9bee6f12f3a73fbf2 to your computer and use it in GitHub Desktop.

Select an option

Save pishguy/9cfadce875bcc2a9bee6f12f3a73fbf2 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import logging
import mimetypes
import os
import re
import requests
import sqlite3
import threading
import time
import urllib.parse
from collections import deque
from contextlib import contextmanager
from datetime import datetime, timedelta
from http.server import HTTPServer, BaseHTTPRequestHandler
import webbrowser
from typing import Dict, List, Optional, Any, Set
import asyncio
import socket
import struct
import hashlib
import base64
import select
import pandas as pd
import pandas_ta as ta
import websocket
from parameter_optimizer import ParameterOptimizer
from smart_filters import SmartFilter
from trade_journal import TradeJournal
from logging.handlers import RotatingFileHandler
try:
from playwright.async_api import async_playwright, Page, BrowserContext
except ImportError:
print("⚠️ لطفاً playwright را نصب کنید: pip install playwright")
print(" سپس: playwright install chromium")
raise
# logging.basicConfig(
# level=logging.INFO,
# format='%(asctime)s - %(levelname)s - %(message)s',
# handlers=[
# logging.FileHandler('trading_bot.log'),
# logging.StreamHandler()
# ]
# )
logger = logging.getLogger(__name__)
# noinspection PyPep8Naming
class HivaGoldManager:
def __init__(self):
"""
مقداردهی اولیه ربات سیگنال‌دهی مظنه
نسخه ساده‌شده - فقط سیگنال‌دهی
"""
self.debug_logger = logging.getLogger("mazaneh_debug")
self.debug_logger.setLevel(logging.INFO)
# جلوگیری از چندبار اضافه شدن handler در اجرای مجدد
if not self.debug_logger.handlers:
handler = RotatingFileHandler(
"mazaneh_debug.log",
maxBytes=5_000_000,
backupCount=5,
encoding="utf-8"
)
formatter = logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
handler.setFormatter(formatter)
self.debug_logger.addHandler(handler)
self.SIGNAL_ONLY_MODE = True
self.M3_TABLE = "candles_m3"
# ════════════ دریافت داده های تاریخی از سایت فراز برای هیواگلد ═══════════
# 📥 تنظیمات دانلود داده از فرازگلد
# ═══════════════════════════════════════════════════════════
self.FARAZ_API_URL = "https://farazgold.com/room/api/get-bars/"
self.FARAZ_COOKIES = {
"csrftoken": "8IbnI080U1moAHjWLvP2KUcsDEmU2Tcc",
"sessionid": "axei1d3aelm4s7dp3lu2sni2buyhaeht"
}
self.FARAZ_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": "https://farazgold.com/"
}
self.HISTORY_DAYS = 365 # تعداد روز داده تاریخی
# ═══════════════════════════════════════════════════════════
# 🗄️ تنظیمات دیتابیس
# ═══════════════════════════════════════════════════════════
self.DB_PATH = 'mazaneh_data.db'
self.market = 'mazaneh'
# ═══════════════════════════════════════════════════════════
# 📝 شناسه معامله فعلی (برای ردیابی)
# ═══════════════════════════════════════════════════════════
self.current_internal_trade_id = None
# ═══════════════════════════════════════════════════════════
# ❌ خطاها
# ═══════════════════════════════════════════════════════════
self.errors: List[Dict] = []
# ═══════════════════════════════════════════════════════════
# 💰 تنظیمات مالی و پورتفولیو
# ═══════════════════════════════════════════════════════════
self.REQUIRED_MINUTES = 1
self.PORTFOLIO_SIZE = 5_000_000
self.TOTAL_BALANCE = 100_000_000
self.LINE_VALUE_TOMAN = 23_000
self.DOLLAR_VALUE_TOMAN = 80_000
self.POSITION_SIZE = 1
# ═══════════════════════════════════════════════════════════
# 🎯 تنظیمات استراتژی
# ═══════════════════════════════════════════════════════════
self.STRATEGY_MODE = 'auto'
self.current_trade_strategy = None
self.ENABLE_COUNTER_TREND = True
# ═══════════════════════════════════════════════════════════
# 📊 پارامترهای اسکالپ
# ═══════════════════════════════════════════════════════════
self.SCALP_ADX_THRESHOLD = 18
self.SCALP_MIN_PROFIT_LINES = 12
self.SCALP_MAX_TRADE_MINUTES = 15
self.SCALP_RISK_PERCENTAGE = 0.8
self.SCALP_ATR_MULTIPLIER = 0.8
self.SCALP_RR_RATIO = 1.5
self.SCALP_STOP_LOSS_LINES = 8
# ═══════════════════════════════════════════════════════════
# 📈 پارامترهای سوئینگ
# ═══════════════════════════════════════════════════════════
self.SWING_ADX_THRESHOLD = 25
self.SWING_MIN_PROFIT_LINES = 5
self.SWING_MAX_TRADE_HOURS = 8
self.SWING_RISK_PERCENTAGE = 0.5
self.SWING_ATR_MULTIPLIER = 0.3
self.SWING_RR_RATIO = 2.0
self.SWING_STOP_LOSS_LINES = 8
# ═══════════════════════════════════════════════════════════
# 🛡️ پارامترهای مدیریت ریسک
# ═══════════════════════════════════════════════════════════
self.ADX_THRESHOLD = 15
self.TRAILING_STOP_LINES = 3
self.TRAILING_STEP_LINES = 2
self.MIN_PROFIT_LINES = 5
self.RISK_PERCENTAGE = 0.5
self.MIN_CANDLES_BETWEEN_TRADES = 1
self.ATR_MULTIPLIER = 0.5
self.DEFAULT_STOP_LOSS_LINES = 8
self.BREAKEVEN_LINES = 3
# ═══════════════════════════════════════════════════════════
# 🔧 محدوده‌های حد ضرر
# ═══════════════════════════════════════════════════════════
self.SCALP_MIN_STOP = 3
self.SCALP_MAX_STOP = 8
self.SWING_MIN_STOP = 5
self.SWING_MAX_STOP = 10
# ═══════════════════════════════════════════════════════════
# ⏱️ تنظیمات تایم‌فریم
# ═══════════════════════════════════════════════════════════
self.M3_TIMEFRAME = '3min'
self.DAILY_TIMEFRAME = 'D'
self.STATUS_DISPLAY_INTERVAL = 60
# ═══════════════════════════════════════════════════════════
# 🌐 تنظیمات اتصال به سرور قیمت (WebSocket)
# ═══════════════════════════════════════════════════════════
self.MARKET_WSS_URL = "wss://hivagold.com/mazaneh/ws/mazaneh/price/"
self.CONNECTION_STATUS = "disconnected"
self.MARKET_PRICE = None
self.LAST_CANDLE_TIME = None
self.LAST_KNOWN_PRICE = None
self.COOKIES = {
"csrftoken": "JSrBoDZKsy9FlDV8BkX2cSszwlZROqQD",
"sessionid": "5ro2sgb4a3xwxefkur81i5smnsinzk3b"
}
# ═══════════════════════════════════════════════════════════
# 📡 تنظیمات WebSocket Server داخلی (جایگزین اتصال به سرور خارجی)
# ═══════════════════════════════════════════════════════════
self.WS_SERVER_HOST = "0.0.0.0"
self.WS_SERVER_PORT = 8765
self.ws_server_socket = None
self.ws_server_thread = None
self.ws_server_running = False
self.connected_clients: Set[socket.socket] = set()
self.clients_lock = threading.Lock()
self.COMMAND_SERVER_STATUS = "stopped"
# ═══════════════════════════════════════════════════════════
# 💼 متغیرهای وضعیت مالی (برای آمار)
# ═══════════════════════════════════════════════════════════
self.balance = self.TOTAL_BALANCE
self.portfolio = self.PORTFOLIO_SIZE
self.trades = 0
self.winning_trades = 0
self.losing_trades = 0
self.total_profit = 0
self.total_loss = 0
self.max_drawdown = 0
self.peak_balance = self.TOTAL_BALANCE
# ═══════════════════════════════════════════════════════════
# 📊 متغیرهای معامله فعلی (برای trailing stop)
# ═══════════════════════════════════════════════════════════
self.in_trade = False
self.trade_direction = None
self.entry_price = 0
self.stop_loss_price = 0
self.take_profit_price = 0
self.last_trade_exit_time = None
self.trade_start_time = None
self.trade_history = []
self.open_trades = []
# ═══════════════════════════════════════════════════════════
# 📈 متغیرهای ردیابی قیمت در معامله (ضروری برای trailing)
# ═══════════════════════════════════════════════════════════
self.highest_price_in_trade = 0
self.lowest_price_in_trade = float('inf')
# ═══════════════════════════════════════════════════════════
# 📊 ذخیره‌سازی داده‌های کندل
# ═══════════════════════════════════════════════════════════
self.m3_data = deque(maxlen=500)
self.daily_data = deque(maxlen=400)
self.m3 = pd.DataFrame()
self.daily = pd.DataFrame()
# ═══════════════════════════════════════════════════════════
# 🎯 تنظیمات Trailing Stop
# ═══════════════════════════════════════════════════════════
self.TRAILING_ENABLED = True
self.BREAKEVEN_ENABLED = True
self.init_database()
# ═══════════════════════════════════════════════════════════
# تنظمیات مربوط به اوردربلاک
# ═══════════════════════════════════════════════════════════
self.order_blocks = []
self.active_order_blocks = []
self.order_block_expiry_hours = 24
self.ob_lookback_period = 100
self.min_body_ratio = 1.5
self.max_ob_distance = 2
self.pin_bar_min_ratio = 0.7
self.engulfing_min_ratio = 1.1
self.ORDER_BLOCK_SETTINGS = {
'enabled': True,
'lookback_period': 100,
'min_body_ratio': 1.5,
'max_ob_distance': 2,
'expiry_hours': 24,
'max_entry_attempts': 3,
'require_confirmation': True,
'confirmation_patterns': ['pin_bar', 'engulfing', 'strong_candle'],
'min_adx': 20,
'rsi_min': 30,
'rsi_max': 70
}
# ═══════════════════════════════════════════════════════════
# 🎯 سیستم اولویت‌بندی استراتژی‌ها
# ═══════════════════════════════════════════════════════════
self.STRATEGY_PRIORITY = {
'order_block': {
'priority': 1,
'conditions': [
'valid_order_block_exists',
'candle_confirmation',
'price_in_ob_zone'
]
},
'scalp': {
'priority': 2,
'conditions': [
'low_volatility',
'weak_trend',
'short_term_signal'
]
},
'swing': {
'priority': 3,
'conditions': [
'high_volatility',
'strong_trend',
'confirmed_signal'
]
}
}
# ═══════════════════════════════════════════════════════════
# 🔥 تنظیمات Warm-up (گرم شدن ربات)
# ═══════════════════════════════════════════════════════════
self.WARMUP_ENABLED = True
self.WARMUP_CANDLES = 2
self.WARMUP_MINUTES = 1
self.warmup_complete = False
self.warmup_start_time = None
self.warmup_candles_seen = 0
self.first_realtime_candle_time = None
# ═══════════════════════════════════════════════════════════
# ✅ اصلاح مهم: جزئیات رژیم و جلوگیری از تغییر تجمعی پارامترها
# ═══════════════════════════════════════════════════════════
self.last_regime_details: Dict = {}
self._last_params_regime = None # آخرین رژیمی که روی پارامترها اعمال شده
# مقدار پایه برای برگشت (تا _adjust_* تجمعی نشود)
self._base_params = {
'SCALP_ADX_THRESHOLD': float(self.SCALP_ADX_THRESHOLD),
'SWING_ADX_THRESHOLD': float(self.SWING_ADX_THRESHOLD),
'SCALP_STOP_LOSS_LINES': int(self.SCALP_STOP_LOSS_LINES),
'SWING_STOP_LOSS_LINES': int(self.SWING_STOP_LOSS_LINES),
'SCALP_RR_RATIO': float(self.SCALP_RR_RATIO),
'SWING_RR_RATIO': float(self.SWING_RR_RATIO),
}
# ═══════════════════════════════════════════════════════════
# 📊 سیستم ژورنال‌نویسی پیشرفته
# ═══════════════════════════════════════════════════════════
try:
self.journal = TradeJournal(self)
self.smart_filter = SmartFilter(self.journal)
self.param_optimizer = ParameterOptimizer(self.journal, self)
print("✅ سیستم ژورنال‌نویسی پیشرفته فعال شد")
except Exception as e:
print(f"⚠️ خطا در راه‌اندازی سیستم ژورنال: {e}")
self.journal = None
self.smart_filter = None
self.param_optimizer = None
self.market_regime = "unknown"
self.regime_confidence = 0
self.regime_history = deque(maxlen=20)
# تنظیمات تشخیص بازار
self.REGIME_SETTINGS = {
'lookback_candles': 30,
'min_confidence': 60,
'update_interval': 5 # هر 5 کندل آپدیت شود
}
print("✅ در حال آماده‌سازی هسته سیگنال‌دهی مظنه...")
print(f" 📊 حالت استراتژی: {self.STRATEGY_MODE}")
print(f" 💰 پورتفو: {self.PORTFOLIO_SIZE:,} تومان")
print(f" 💵 ارزش هر خط: {self.LINE_VALUE_TOMAN:,} تومان")
print(f" 🔄 معاملات خلاف روند: {'فعال' if self.ENABLE_COUNTER_TREND else 'غیرفعال'}")
print(f" 🛡️ تریلینگ استاپ: فعال‌سازی بعد از {self.TRAILING_STOP_LINES} خط سود")
print(f" 📏 گام تریلینگ: {self.TRAILING_STEP_LINES} خط")
print(f" 📡 WebSocket Server داخلی: ws://{self.WS_SERVER_HOST}:{self.WS_SERVER_PORT}")
def _infer_atr_unit(self, atr_value: float, current_price: float | None = None) -> str:
"""
حدس واحد ATR با اتکا به دیتا:
- اگر last_regime_details از atr_absolute_regime آمده باشد معمولاً 'lines' است (طبق معماری شما)
- اگر به high/low نزدیک باشد احتمالاً 'price'
این فقط حدس است؛ لاگ‌ها برای تصمیم‌گیری نهایی استفاده می‌شوند.
"""
try:
if atr_value is None:
return "unknown"
# اگر خیلی کوچک است، اغلب "خط" است
# (این فقط heuristic است)
if current_price and current_price > 1000 and atr_value <= 50:
return "lines"
# اگر رنج کندل‌های m3 را داریم، مقایسه کنیم
if hasattr(self, "m3") and isinstance(self.m3, pd.DataFrame) and len(self.m3) >= 30:
if 'high' in self.m3.columns and 'low' in self.m3.columns:
ranges = (self.m3['high'] - self.m3['low']).tail(30)
ranges = ranges.dropna()
if not ranges.empty:
median_range = float(ranges.median())
# اگر ATR خیلی نزدیک به رنج‌هاست احتمالاً واحد price است
# اگر خیلی کوچکتر/بزرگتر باشد احتمال mismatch وجود دارد
if median_range > 0:
ratio = atr_value / median_range
if 0.2 <= ratio <= 2.5:
return "price_or_lines_unclear"
if ratio < 0.2:
return "likely_lines_or_scaled"
if ratio > 2.5:
return "likely_scaled_or_wrong"
return "unknown"
except Exception:
return "unknown"
def _log_regime_and_atr_snapshot(self, stage: str, current_price: float | None = None) -> None:
"""
لاگ جامع برای بررسی اینکه atr_absolute داخل last_regime_details دقیقاً چیست.
"""
try:
details = getattr(self, "last_regime_details", {}) or {}
atr_regime = details.get("atr_absolute", None)
atr_regime_f = None
try:
atr_regime_f = float(atr_regime) if atr_regime is not None else None
except Exception:
atr_regime_f = None
# ATR داخل m3 (اگر وجود دارد)
atr_m3 = None
if hasattr(self, "m3") and isinstance(self.m3, pd.DataFrame) and 'atr' in self.m3.columns and not self.m3.empty:
v = self.m3['atr'].iloc[-1]
if pd.notna(v):
try:
atr_m3 = float(v)
except Exception:
atr_m3 = None
# رنج کندل‌های اخیر
median_range = None
last_range = None
if hasattr(self, "m3") and isinstance(self.m3, pd.DataFrame) and len(self.m3) >= 5:
if 'high' in self.m3.columns and 'low' in self.m3.columns:
ranges = (self.m3['high'] - self.m3['low']).tail(30).dropna()
if not ranges.empty:
median_range = float(ranges.median())
last_r = (self.m3['high'].iloc[-1] - self.m3['low'].iloc[-1])
if pd.notna(last_r):
last_range = float(last_r)
unit_guess = None
if atr_regime_f is not None:
unit_guess = self._infer_atr_unit(atr_regime_f, current_price=current_price)
msg = {
"stage": stage,
"market_regime": getattr(self, "market_regime", "unknown"),
"regime_confidence": getattr(self, "regime_confidence", None),
"current_price": current_price,
"atr_regime": atr_regime,
"atr_regime_float": atr_regime_f,
"atr_m3": atr_m3,
"median_m3_range": median_range,
"last_m3_range": last_range,
"unit_guess": unit_guess,
"last_regime_details_keys": list(details.keys())[:50], # سقف برای طول لاگ
}
# اگر نسبت‌ها قابل محاسبه‌اند، اضافه کن
if atr_regime_f is not None and median_range and median_range > 0:
msg["atr_to_median_range_ratio"] = atr_regime_f / median_range
if atr_regime_f is not None and current_price and current_price > 0:
msg["atr_to_price_percent"] = (atr_regime_f / current_price) * 100.0
self.debug_logger.info(str(msg))
except Exception as e:
try:
self.debug_logger.error(f"failed_to_log_snapshot stage={stage} err={e}")
except Exception:
pass
def _signal_cooldown_ok(self, candle_time: pd.Timestamp, direction: str) -> bool:
"""
جلوگیری از اسپم سیگنال:
- حداقل فاصله زمانی بین سیگنال‌ها (بر اساس کندل)
- جلوگیری از تکرار سیگنال هم‌جهت پشت سر هم
"""
try:
if self.SIGNAL_COOLDOWN_CANDLES is None or int(self.SIGNAL_COOLDOWN_CANDLES) <= 0:
return True
if self._last_signal_candle_time is None:
return True
last_t = pd.Timestamp(self._last_signal_candle_time)
now_t = pd.Timestamp(candle_time)
# تعداد کندل فاصله (با فرض TF ثابت)
tf_minutes = self._timeframe_minutes()
diff_min = (now_t - last_t).total_seconds() / 60.0
diff_candles = diff_min / max(1, tf_minutes)
if diff_candles < float(self.SIGNAL_COOLDOWN_CANDLES):
return False
# جلوگیری از تکرار سیگنال هم جهت پشت سر هم
if self._last_signal_direction == direction:
return False
return True
except Exception:
return True
def _to_tehran_time(self, ts: pd.Timestamp, tz_offset_minutes: int = 0) -> pd.Timestamp:
"""
تبدیل زمان دیتابیس به زمان قابل فیلتر.
اگر دیتات UTC است: tz_offset_minutes=210 بده (3:30).
اگر دیتات همین لوکال است: tz_offset_minutes=0
"""
ts = pd.Timestamp(ts)
return ts + pd.Timedelta(minutes=int(tz_offset_minutes))
def _in_session(self, ts: pd.Timestamp, start_hour: int = 11, end_hour: int = 19, tz_offset_minutes: int = 0) -> bool:
"""
فیلتر سشن بازار مظنه: از start_hour تا end_hour (end خارج از بازه)
"""
local_ts = self._to_tehran_time(ts, tz_offset_minutes=tz_offset_minutes)
h = int(local_ts.hour)
return (h >= int(start_hour)) and (h < int(end_hour))
def _signal_timeframe(self) -> str:
"""
تایم‌فریم واقعی کندل‌های سیگنال‌دهی/اندیکاتورها.
این پروژه باید روی 3min باشد.
"""
tf = getattr(self, "M3_TIMEFRAME", None)
if isinstance(tf, str) and tf.strip():
return tf.strip()
return "3min"
def _timeframe_minutes(self) -> int:
"""تبدیل تایم‌فریم pandas (مثل '3min') به دقیقه."""
try:
td = pd.Timedelta(self._signal_timeframe())
minutes = int(td.total_seconds() // 60)
return max(1, minutes)
except Exception:
return 3
def start_websocket_server(self):
"""
راه‌اندازی WebSocket Server داخلی
کلاینت‌ها مستقیماً به این سرور وصل می‌شوند
"""
try:
self.ws_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.ws_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.ws_server_socket.bind((self.WS_SERVER_HOST, self.WS_SERVER_PORT))
self.ws_server_socket.listen(10)
self.ws_server_socket.setblocking(False)
self.ws_server_running = True
self.COMMAND_SERVER_STATUS = "running"
self.ws_server_thread = threading.Thread(target=self._ws_server_loop, daemon=True)
self.ws_server_thread.start()
print(f"\n{'═' * 60}")
print(f"📡 WebSocket Server داخلی راه‌اندازی شد!")
print(f"{'═' * 60}")
print(f" 🌐 آدرس: ws://{self.WS_SERVER_HOST}:{self.WS_SERVER_PORT}")
print(f" 📱 کلاینت‌ها می‌توانند به این آدرس متصل شوند")
print(f"{'═' * 60}\n")
return True
except Exception as e:
print(f"❌ خطا در راه‌اندازی WebSocket Server: {e}")
import traceback
traceback.print_exc()
return False
def stop_websocket_server(self):
"""
توقف WebSocket Server
"""
self.ws_server_running = False
self.COMMAND_SERVER_STATUS = "stopped"
with self.clients_lock:
for client in list(self.connected_clients):
try:
client.close()
except:
pass
self.connected_clients.clear()
if self.ws_server_socket:
try:
self.ws_server_socket.close()
except:
pass
print("🛑 WebSocket Server متوقف شد")
def download_historical_from_faraz(self):
"""
دانلود داده‌های تاریخی از فرازگلد
این متد:
1. داده‌های یک سال اخیر را از API فرازگلد دریافت می‌کند
2. داده‌ها را پاک‌سازی می‌کند
3. در دیتابیس ذخیره می‌کند
Returns:
bool: آیا دانلود موفق بود
"""
print(f"\n{'═' * 60}")
print(f"📥 دانلود داده‌های تاریخی از فرازگلد")
print(f"{'═' * 60}")
try:
# محاسبه بازه زمانی
to_ts = int(time.time())
from_ts = to_ts - (self.HISTORY_DAYS * 24 * 60 * 60)
from_date = datetime.fromtimestamp(from_ts).strftime('%Y-%m-%d %H:%M')
to_date = datetime.fromtimestamp(to_ts).strftime('%Y-%m-%d %H:%M')
print(f" 📅 بازه زمانی: {from_date} تا {to_date}")
print(f" 📊 تعداد روز: {self.HISTORY_DAYS}")
# تنظیم پارامترها
params = {
"symbol": "mazane",
"from": from_ts,
"to": to_ts,
"resolution": "1"
}
print(f" 🌐 ارسال درخواست به فرازگلد...")
# ارسال درخواست
response = requests.get(
self.FARAZ_API_URL,
params=params,
cookies=self.FARAZ_COOKIES,
headers=self.FARAZ_HEADERS,
timeout=120
)
if response.status_code == 200:
data = response.json()
count = len(data)
print(f" ✅ دریافت موفق! تعداد رکورد: {count:,}")
if count > 0:
# ذخیره و پاک‌سازی
success = self.save_and_clean_faraz_data(data)
if success:
print(f"{'═' * 60}")
print(f"✅ داده‌های تاریخی با موفقیت ذخیره شد")
print(f"{'═' * 60}\n")
return True
else:
print(f"❌ خطا در ذخیره داده‌ها")
return False
else:
print(f" ⚠️ داده‌ای دریافت نشد")
return False
elif response.status_code == 403:
print(f" ❌ خطای 403: کوکی‌ها منقضی شده‌اند")
print(f" 💡 لطفاً کوکی‌های جدید را از فرازگلد دریافت کنید")
return False
else:
print(f" ❌ خطا: کد وضعیت {response.status_code}")
print(f" 📄 پاسخ: {response.text[:200]}")
return False
except requests.exceptions.Timeout:
print(f" ❌ خطای Timeout: سرور پاسخ نداد")
return False
except requests.exceptions.ConnectionError:
print(f" ❌ خطای اتصال: عدم دسترسی به سرور")
return False
except requests.exceptions.RequestException as e:
print(f" ❌ خطای شبکه: {e}")
return False
except json.JSONDecodeError as e:
print(f" ❌ خطای JSON: پاسخ سرور قابل تفسیر نیست")
return False
except Exception as e:
print(f" ❌ خطای غیرمنتظره: {e}")
import traceback
traceback.print_exc()
return False
def _prepare_signal_data(self, direction: str, strategy: str, current_price: float) -> Dict:
"""آماده‌سازی داده‌های سیگنال برای ثبت در ژورنال"""
try:
signal_data = {
'direction': direction,
'strategy': strategy,
'current_price': current_price,
'adx_entry': self.m3['adx'].iloc[-1] if 'adx' in self.m3.columns else 0,
'rsi_entry': self.m3['rsi14'].iloc[-1] if 'rsi14' in self.m3.columns else 50,
'atr_entry': self.m3['atr'].iloc[-1] if 'atr' in self.m3.columns else 0,
'ema9_entry': self.m3['ema9'].iloc[-1] if 'ema9' in self.m3.columns else current_price,
'ema21_entry': self.m3['ema21'].iloc[-1] if 'ema21' in self.m3.columns else current_price,
'daily_trend_score': self.calculate_daily_trend_score(),
'stoch_k': self.m3['stoch_k'].iloc[-1] if 'stoch_k' in self.m3.columns else 50,
'stoch_d': self.m3['stoch_d'].iloc[-1] if 'stoch_d' in self.m3.columns else 50,
'bb_position': self._calculate_bb_position(),
'market_volatility': self.m3['atr'].iloc[-1] if 'atr' in self.m3.columns else 0,
'session_hour': datetime.now().hour
}
return signal_data
except Exception as e:
print(f"⚠️ خطا در آماده‌سازی داده سیگنال: {e}")
return {}
def _calculate_bb_position(self) -> float:
"""محاسبه موقعیت قیمت در بولینگر باند"""
try:
if 'bb_lower' in self.m3.columns and 'bb_upper' in self.m3.columns:
current = self.m3['close'].iloc[-1]
bb_lower = self.m3['bb_lower'].iloc[-1]
bb_upper = self.m3['bb_upper'].iloc[-1]
if bb_upper > bb_lower:
position = ((current - bb_lower) / (bb_upper - bb_lower)) * 100
return max(0, min(100, position))
return 50.0
except Exception:
return 50.0
def clear_database(self):
"""
پاک‌سازی کامل داده‌های دیتابیس
"""
print(f"\n🧹 پاک‌سازی دیتابیس...")
try:
with self._get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(f'DELETE FROM {self.M3_TABLE}')
m3_deleted = cursor.rowcount
cursor.execute('DELETE FROM candles')
daily_deleted = cursor.rowcount
cursor.execute('DROP TABLE IF EXISTS candles_raw')
conn.commit()
conn.execute("VACUUM")
print(f" ✅ {m3_deleted} کندل {self.M3_TIMEFRAME} حذف شد")
print(f" ✅ {daily_deleted} کندل روزانه حذف شد")
print(f" ✅ دیتابیس فشرده‌سازی شد")
return True
except Exception as e:
print(f" ❌ خطا در پاک‌سازی: {e}")
return False
def save_and_clean_faraz_data(self, data):
"""
ذخیره و پاک‌سازی داده‌های دریافتی از فرازگلد
✅ ساخت کندل‌ها با تایم‌فریم واقعی سیگنال‌دهی (3min) و ذخیره در candles_m3
"""
print(f"\n 💾 ذخیره و پاک‌سازی داده‌های فرازگلد...")
try:
with self._get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('DROP TABLE IF EXISTS candles_raw')
cursor.execute(f'DROP TABLE IF EXISTS {self.M3_TABLE}')
cursor.execute('DROP TABLE IF EXISTS candles')
cursor.execute(f'''
CREATE TABLE {self.M3_TABLE} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
time TEXT UNIQUE NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE candles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
time TEXT UNIQUE NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE candles_raw (
time INTEGER PRIMARY KEY,
open REAL,
high REAL,
low REAL,
close REAL
)
''')
records = [(i['time'], i['open'], i['high'], i['low'], i['close']) for i in data]
cursor.executemany('''
INSERT OR REPLACE INTO candles_raw (time, open, high, low, close)
VALUES (?, ?, ?, ?, ?)
''', records)
print(f" 📊 {len(records):,} رکورد خام ذخیره شد")
print(f"\n 🧹 پاک‌سازی داده‌ها...")
cursor.execute('''
DELETE FROM candles_raw
WHERE open <= 0 OR high <= 0 OR low <= 0 OR close <= 0
''')
deleted_zeros = cursor.rowcount
cursor.execute('DELETE FROM candles_raw WHERE high < low')
deleted_logic = cursor.rowcount
cursor.execute('''
DELETE FROM candles_raw
WHERE open = high AND high = low AND low = close
''')
deleted_flat = cursor.rowcount
total_deleted = deleted_zeros + deleted_logic + deleted_flat
print(f" 📊 مجموع حذف شده: {total_deleted}")
print(f"\n 📊 ایجاد کندل‌های {self._signal_timeframe()}...")
cursor.execute("SELECT time, open, high, low, close FROM candles_raw ORDER BY time ASC")
rows = cursor.fetchall()
if not rows:
print(f" ⚠️ داده‌ای برای پردازش وجود ندارد")
return False
df_data = [{
'time': pd.Timestamp.fromtimestamp(r[0]),
'open': r[1],
'high': r[2],
'low': r[3],
'close': r[4]
} for r in rows]
df = pd.DataFrame(df_data).set_index('time')
m3_data = df.resample(self._signal_timeframe()).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last'
}).dropna()
print(f" 📊 {len(m3_data)} کندل {self._signal_timeframe()} ایجاد شد")
for index, row in m3_data.iterrows():
cursor.execute(f'''
INSERT OR REPLACE INTO {self.M3_TABLE} (time, open, high, low, close)
VALUES (?, ?, ?, ?, ?)
''', (index.isoformat(), row['open'], row['high'], row['low'], row['close']))
print(f"\n 📊 ایجاد کندل‌های روزانه...")
daily_data = df.resample(self.DAILY_TIMEFRAME).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last'
}).dropna()
print(f" 📊 {len(daily_data)} کندل روزانه ایجاد شد")
for index, row in daily_data.iterrows():
cursor.execute('''
INSERT OR REPLACE INTO candles (time, open, high, low, close)
VALUES (?, ?, ?, ?, ?)
''', (index.isoformat(), row['open'], row['high'], row['low'], row['close']))
cursor.execute('DROP TABLE IF EXISTS candles_raw')
conn.commit()
conn.execute("VACUUM")
return True
except Exception as e:
print(f" ❌ خطا در save_and_clean_faraz_data: {e}")
import traceback
traceback.print_exc()
return False
def _ws_server_loop(self):
"""
حلقه اصلی سرور برای پذیرش اتصالات جدید
"""
print("🔄 حلقه سرور WebSocket شروع شد...")
while self.ws_server_running:
try:
readable, _, _ = select.select([self.ws_server_socket], [], [], 0.5)
if readable:
client_socket, client_address = self.ws_server_socket.accept()
print(f"📥 اتصال جدید از: {client_address}")
client_thread = threading.Thread(
target=self._handle_client,
args=(client_socket, client_address),
daemon=True
)
client_thread.start()
except Exception as e:
if self.ws_server_running:
print(f"⚠️ خطا در حلقه سرور: {e}")
time.sleep(0.1)
def _handle_client(self, client_socket: socket.socket, client_address):
"""
مدیریت هر کلاینت به صورت جداگانه - نسخه پایدار
"""
import time
try:
if not self._perform_websocket_handshake(client_socket):
print(f"❌ Handshake ناموفق با {client_address}")
client_socket.close()
return
with self.clients_lock:
self.connected_clients.add(client_socket)
client_count = len(self.connected_clients)
print(f"✅ کلاینت {client_address} متصل شد (تعداد کل: {client_count})")
# ارسال پیام خوش‌آمد
welcome_message = {
"type": "welcome",
"message": "به سرور سیگنال مظنه خوش آمدید",
"timestamp": datetime.now().isoformat(),
"server_status": {
"in_trade": self.in_trade,
"trade_direction": self.trade_direction,
"market_price": self.MARKET_PRICE,
"warmup_complete": self.warmup_complete
}
}
self._send_to_client(client_socket, welcome_message)
self._send_candles_to_client(client_socket)
# ═══════════════════════════════════════════════════════════════
# ✅ تنظیمات سوکت برای پایداری
# ═══════════════════════════════════════════════════════════════
client_socket.setblocking(False)
# فعال کردن TCP keepalive
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# تنظیمات keepalive (اگر سیستم‌عامل پشتیبانی کند)
try:
# شروع keepalive بعد از 60 ثانیه بی‌فعالیت
client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
# ارسال keepalive هر 30 ثانیه
client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30)
# تعداد تلاش‌های keepalive قبل از قطع
client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
except (AttributeError, OSError):
pass # بعضی سیستم‌ها این گزینه‌ها را ندارند
# ═══════════════════════════════════════════════════════════════
# ✅ متغیرهای مدیریت اتصال
# ═══════════════════════════════════════════════════════════════
last_ping_time = time.time()
last_activity_time = time.time()
PING_INTERVAL = 1 # ارسال ping هر 25 ثانیه
ACTIVITY_TIMEOUT = 3600 * 24
consecutive_errors = 0
MAX_CONSECUTIVE_ERRORS = 10
while self.ws_server_running:
try:
# ═══════════════════════════════════════════════════════
# select با timeout کوتاه
# ═══════════════════════════════════════════════════════
readable, _, exceptional = select.select(
[client_socket], [], [client_socket], 0.5
)
current_time = time.time()
# چک exceptional
if exceptional:
print(f"⚠️ خطای سوکت برای {client_address}")
break
# ═══════════════════════════════════════════════════════
# ✅ ارسال ping برای نگه داشتن کانکشن
# ═══════════════════════════════════════════════════════
if current_time - last_ping_time >= PING_INTERVAL:
try:
# ارسال WebSocket ping frame
ping_frame = bytes([0x89, 0x00])
client_socket.send(ping_frame)
last_ping_time = current_time
consecutive_errors = 0 # ریست شمارنده خطا
except (BrokenPipeError, ConnectionResetError, OSError):
print(f"🔌 خطا در ارسال ping به {client_address}")
break
# ═══════════════════════════════════════════════════════
# ✅ چک timeout بی‌فعالیت
# ═══════════════════════════════════════════════════════
if current_time - last_activity_time >= ACTIVITY_TIMEOUT:
print(f"⏰ Timeout بی‌فعالیت برای {client_address}")
break
# ═══════════════════════════════════════════════════════
# پردازش داده‌های دریافتی
# ═══════════════════════════════════════════════════════
if readable:
data = self._receive_websocket_frame(client_socket)
if data is None:
# ✅ کانکشن واقعاً قطع شده
print(f"🔌 کانکشن {client_address} بسته شد")
break
if data == "":
# ✅ داده خالی یا فریم کنترلی - ادامه بده
consecutive_errors = 0
continue
if data:
# ✅ داده معتبر دریافت شد
last_activity_time = current_time
consecutive_errors = 0
self._process_client_message(client_socket, data, client_address)
except BlockingIOError:
# ✅ عادیه در حالت non-blocking
continue
except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError):
print(f"🔌 کانکشن {client_address} ریست شد")
break
except OSError as e:
if e.errno == 9: # Bad file descriptor
break
consecutive_errors += 1
if consecutive_errors >= MAX_CONSECUTIVE_ERRORS:
print(f"⚠️ تعداد خطاهای متوالی زیاد برای {client_address}")
break
except Exception as e:
consecutive_errors += 1
if consecutive_errors >= MAX_CONSECUTIVE_ERRORS:
print(f"⚠️ خطاهای متوالی زیاد: {e}")
break
if self.ws_server_running:
time.sleep(0.1) # کمی صبر کن
except Exception as e:
print(f"⚠️ خطا در مدیریت کلاینت {client_address}: {e}")
finally:
# ═══════════════════════════════════════════════════════════════
# ✅ پاکسازی امن
# ═══════════════════════════════════════════════════════════════
with self.clients_lock:
self.connected_clients.discard(client_socket)
try:
# ارسال close frame قبل از بستن
close_frame = bytes([0x88, 0x00])
client_socket.send(close_frame)
except:
pass
try:
client_socket.shutdown(socket.SHUT_RDWR)
except:
pass
try:
client_socket.close()
except:
pass
client_count = len(self.connected_clients)
print(f"📤 کلاینت {client_address} قطع شد (تعداد باقی‌مانده: {client_count})")
def _perform_websocket_handshake(self, client_socket: socket.socket) -> bool:
"""
انجام WebSocket Handshake (RFC 6455)
"""
try:
client_socket.setblocking(True)
client_socket.settimeout(10)
request = client_socket.recv(4096).decode('utf-8')
if 'Upgrade: websocket' not in request and 'upgrade: websocket' not in request.lower():
print("⚠️ درخواست WebSocket نیست")
return False
key = None
for line in request.split('\r\n'):
if line.lower().startswith('sec-websocket-key:'):
key = line.split(':', 1)[1].strip()
break
if not key:
print("⚠️ کلید WebSocket یافت نشد")
return False
magic_string = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
accept_key = base64.b64encode(
hashlib.sha1((key + magic_string).encode()).digest()
).decode()
response = (
"HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
f"Sec-WebSocket-Accept: {accept_key}\r\n"
"\r\n"
)
client_socket.send(response.encode())
return True
except Exception as e:
print(f"⚠️ خطا در handshake: {e}")
return False
def _receive_websocket_frame(self, client_socket: socket.socket) -> Optional[str]:
"""
دریافت و decode کردن فریم WebSocket
Returns:
str: داده دریافتی
"": داده خالی یا پیامی نبود (ادامه بده)
None: کانکشن واقعاً قطع شده (break کن)
"""
try:
# ═══════════════════════════════════════════════════════════════
# ✅ اصلاح: مدیریت بهتر خواندن header
# ═══════════════════════════════════════════════════════════════
try:
header = client_socket.recv(2, socket.MSG_PEEK) # اول peek کن
except BlockingIOError:
return "" # داده‌ای نیست، ادامه بده
except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError):
return None # واقعاً قطع شده
if not header:
return None # کانکشن بسته شده
if len(header) < 2:
return "" # هنوز کامل نرسیده
# حالا واقعی بخوان
header = client_socket.recv(2)
if len(header) < 2:
return None
opcode = header[0] & 0x0F
fin = (header[0] & 0x80) != 0
# ═══════════════════════════════════════════════════════════════
# ✅ مدیریت فریم‌های کنترلی
# ═══════════════════════════════════════════════════════════════
# Close frame
if opcode == 0x8:
# ارسال Close frame در پاسخ
try:
close_frame = bytes([0x88, 0x00])
client_socket.send(close_frame)
except:
pass
return None
# Ping frame
if opcode == 0x9:
# خواندن payload پینگ و ارسال pong
masked = (header[1] & 0x80) != 0
payload_length = header[1] & 0x7F
ping_payload = b""
if payload_length > 0:
if masked:
mask_key = client_socket.recv(4)
ping_payload = client_socket.recv(payload_length)
if masked and mask_key:
ping_payload = bytes([ping_payload[i] ^ mask_key[i % 4] for i in range(len(ping_payload))])
# ارسال Pong با همان payload
pong_header = bytes([0x8A, len(ping_payload)])
try:
client_socket.send(pong_header + ping_payload)
except:
pass
return ""
# Pong frame
if opcode == 0xA:
# خواندن و نادیده گرفتن
masked = (header[1] & 0x80) != 0
payload_length = header[1] & 0x7F
if masked:
client_socket.recv(4)
if payload_length > 0:
client_socket.recv(payload_length)
return ""
# ═══════════════════════════════════════════════════════════════
# پردازش فریم داده
# ═══════════════════════════════════════════════════════════════
masked = (header[1] & 0x80) != 0
payload_length = header[1] & 0x7F
if payload_length == 126:
extended = client_socket.recv(2)
if len(extended) < 2:
return ""
payload_length = struct.unpack(">H", extended)[0]
elif payload_length == 127:
extended = client_socket.recv(8)
if len(extended) < 8:
return ""
payload_length = struct.unpack(">Q", extended)[0]
mask_key = None
if masked:
mask_key = client_socket.recv(4)
if len(mask_key) < 4:
return ""
if payload_length == 0:
return ""
# خواندن payload با timeout
payload = b""
while len(payload) < payload_length:
try:
remaining = payload_length - len(payload)
chunk = client_socket.recv(min(remaining, 4096))
if not chunk:
return None
payload += chunk
except BlockingIOError:
# صبر کن تا بقیه برسه
import time
time.sleep(0.01)
continue
if masked and mask_key:
decoded = bytearray(payload_length)
for i in range(payload_length):
decoded[i] = payload[i] ^ mask_key[i % 4]
payload = bytes(decoded)
try:
return payload.decode('utf-8')
except UnicodeDecodeError:
return ""
except BlockingIOError:
return "" # ✅ داده‌ای نیست، ادامه بده
except socket.timeout:
return "" # ✅ timeout، ادامه بده
except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError, OSError):
return None # ✅ واقعاً قطع شده
except Exception as e:
# لاگ خطا ولی break نکن
print(f"⚠️ خطا در دریافت فریم: {type(e).__name__}: {e}")
return None
def _send_websocket_frame(self, client_socket: socket.socket, message: str) -> bool:
"""
ارسال پیام به صورت فریم WebSocket
"""
try:
payload = message.encode('utf-8')
length = len(payload)
if length <= 125:
header = bytes([0x81, length])
elif length <= 65535:
header = bytes([0x81, 126]) + struct.pack(">H", length)
else:
header = bytes([0x81, 127]) + struct.pack(">Q", length)
client_socket.send(header + payload)
return True
except Exception as e:
return False
def _send_to_client(self, client_socket: socket.socket, data: dict) -> bool:
"""
ارسال داده به یک کلاینت خاص
"""
try:
message = json.dumps(data, ensure_ascii=False)
return self._send_websocket_frame(client_socket, message)
except Exception as e:
return False
def _send_candles_to_client(self, client_socket: socket.socket):
"""
ارسال کندل‌های تاریخی به کلاینت جدید
"""
try:
candles_data = self.load_candles_from_db('candles')
data = []
for candle in candles_data:
data.append({
'time': candle['time'] if isinstance(candle, dict) else candle[0],
'open': float(candle['open']) if isinstance(candle, dict) else float(candle[1]),
'high': float(candle['high']) if isinstance(candle, dict) else float(candle[2]),
'low': float(candle['low']) if isinstance(candle, dict) else float(candle[3]),
'close': float(candle['close']) if isinstance(candle, dict) else float(candle[4])
})
command = {
"type": "last_candles",
"data": data,
"timestamp": datetime.now().isoformat()
}
self._send_to_client(client_socket, command)
except Exception as e:
print(f"⚠️ خطا در ارسال کندل‌ها به کلاینت: {e}")
def _process_client_message(self, client_socket: socket.socket, message: str, client_address):
"""
پردازش پیام دریافتی از کلاینت
"""
try:
if not message:
return
data = json.loads(message)
msg_type = data.get("type", "unknown")
if msg_type == "ping":
response = {
"type": "pong",
"timestamp": datetime.now().isoformat()
}
self._send_to_client(client_socket, response)
elif msg_type == "get_status":
response = {
"type": "status",
"data": {
"in_trade": self.in_trade,
"trade_direction": self.trade_direction,
"entry_price": self.entry_price,
"stop_loss": self.stop_loss_price,
"take_profit": self.take_profit_price,
"market_price": self.MARKET_PRICE,
"balance": self.balance,
"trades_count": self.trades,
"warmup_complete": self.warmup_complete
},
"timestamp": datetime.now().isoformat()
}
self._send_to_client(client_socket, response)
elif msg_type == "get_candles":
self._send_candles_to_client(client_socket)
else:
print(f"📩 پیام از {client_address}: {data}")
except json.JSONDecodeError:
print(f"⚠️ پیام نامعتبر از {client_address}: {message[:100]}")
except Exception as e:
print(f"⚠️ خطا در پردازش پیام: {e}")
def broadcast_to_clients(self, data: dict) -> int:
"""
ارسال پیام به همه کلاینت‌های متصل
Returns:
تعداد کلاینت‌هایی که با موفقیت پیام دریافت کردند
"""
if not self.connected_clients:
return 0
message = json.dumps(data, ensure_ascii=False)
success_count = 0
failed_clients = []
with self.clients_lock:
for client in list(self.connected_clients):
try:
if self._send_websocket_frame(client, message):
success_count += 1
else:
failed_clients.append(client)
except:
failed_clients.append(client)
for client in failed_clients:
self.connected_clients.discard(client)
try:
client.close()
except:
pass
return success_count
def get_connected_clients_count(self) -> int:
"""
تعداد کلاینت‌های متصل
"""
return len(self.connected_clients)
@contextmanager
def _get_db_connection(self):
"""Context manager برای اتصال به دیتابیس"""
conn = sqlite3.connect(self.DB_PATH)
conn.row_factory = sqlite3.Row # برای دسترسی با نام ستون
try:
yield conn
finally:
conn.close()
def init_database(self):
"""ایجاد جداول دیتابیس"""
with self._get_db_connection() as conn:
cursor = conn.cursor()
# جدول کندل‌های تایم‌فریم سیگنال (۳ دقیقه‌ای)
cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {self.M3_TABLE} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
time TEXT UNIQUE NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# جدول کندل‌های روزانه
cursor.execute('''
CREATE TABLE IF NOT EXISTS candles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
time TEXT UNIQUE NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# ایندکس برای سرعت بیشتر
cursor.execute(f'''
CREATE INDEX IF NOT EXISTS idx_m3_time
ON {self.M3_TABLE}(time)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_daily_time
ON candles(time)
''')
conn.commit()
print("✅ جداول دیتابیس ایجاد/بررسی شد")
def check_warmup_status(self):
"""
بررسی وضعیت گرم شدن ربات
ربات باید:
1. حداقل WARMUP_CANDLES کندل real-time ببیند
2. یا حداقل WARMUP_MINUTES دقیقه از شروع گذشته باشد
Returns:
bool: آیا warm-up تمام شده و می‌توان معامله کرد
"""
# اگر warm-up غیرفعال است
if not self.WARMUP_ENABLED:
return True
# اگر قبلاً تایید شده
if self.warmup_complete:
return True
# بررسی تعداد کندل‌ها
candles_ok = self.warmup_candles_seen >= self.WARMUP_CANDLES
# بررسی زمان
time_ok = False
if self.warmup_start_time:
elapsed_minutes = (datetime.now() - self.warmup_start_time).total_seconds() / 60
time_ok = elapsed_minutes >= self.WARMUP_MINUTES
# هر دو شرط یا یکی از آن‌ها
if candles_ok or time_ok:
self.warmup_complete = True
print(f"\n{'═' * 50}")
print(f"🔥 دوره گرم شدن ربات تمام شد!")
print(f"{'═' * 50}")
print(f" 📊 کندل‌های مشاهده شده: {self.warmup_candles_seen}")
if self.warmup_start_time:
elapsed = (datetime.now() - self.warmup_start_time).total_seconds() / 60
print(f" ⏱️ زمان سپری شده: {elapsed:.1f} دقیقه")
print(f" ✅ ربات آماده صدور سیگنال است!")
print(f"{'═' * 50}\n")
return True
# هنوز در حال گرم شدن
return False
def start_warmup(self):
"""شروع دوره warm-up"""
if self.WARMUP_ENABLED and not self.warmup_start_time:
self.warmup_start_time = datetime.now()
self.warmup_candles_seen = 0
self.warmup_complete = False
print(f"\n{'═' * 50}")
print(f"🔥 شروع دوره گرم شدن ربات")
print(f"{'═' * 50}")
print(f" 📊 تعداد کندل مورد نیاز: {self.WARMUP_CANDLES}")
print(f" ⏱️ یا حداقل: {self.WARMUP_MINUTES} دقیقه")
print(f" ⚠️ در این مدت سیگنالی صادر نمی‌شود")
print(f"{'═' * 50}\n")
def save_candle_to_db(self, table, datetime_str, open_price, high, low, close):
"""ذخیره یک کندل در دیتابیس"""
with self._get_db_connection() as conn:
cursor = conn.cursor()
# INSERT OR REPLACE: اگر وجود داشت آپدیت کن، نداشت اضافه کن
cursor.execute(f'''
INSERT OR REPLACE INTO {table}
(time, open, high, low, close)
VALUES (?, ?, ?, ?, ?)
''', (datetime_str, open_price, high, low, close))
conn.commit()
def load_candles_from_db(self, table, limit=500):
"""خواندن کندل‌ها از دیتابیس"""
with self._get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(f'''
SELECT time, open, high, low, close
FROM {table}
ORDER BY time DESC
LIMIT ?
''', (limit,))
rows = cursor.fetchall()
# برگرداندن به ترتیب صحیح (قدیمی به جدید)
return list(reversed(rows))
def send_last_candles(self) -> bool:
"""
ارسال کندل‌های تاریخی به همه کلاینت‌ها
"""
try:
candles_data = self.load_candles_from_db('candles')
data = []
for candle in candles_data:
data.append({
'time': candle['time'] if isinstance(candle, dict) else candle[0],
'open': float(candle['open']) if isinstance(candle, dict) else float(candle[1]),
'high': float(candle['high']) if isinstance(candle, dict) else float(candle[2]),
'low': float(candle['low']) if isinstance(candle, dict) else float(candle[3]),
'close': float(candle['close']) if isinstance(candle, dict) else float(candle[4])
})
command = {
"type": "last_candles",
"data": data,
"timestamp": datetime.now().isoformat()
}
return self.send_command(command)
except Exception as e:
print(f"⚠️ خطا در ارسال کندل‌ها: {e}")
import traceback
traceback.print_exc()
return False
def log_error(self, username: Optional[str], operation: str, error: str, details: Dict = None) -> None:
"""ذخیره خطا در لیست خطاها"""
error_info = {
'username': username,
'operation': operation,
'error': error,
'details': details or {},
'timestamp': datetime.now().isoformat()
}
self.errors.append(error_info)
print(f"❌ خطا {'برای ' + username if username else ''}: [{operation}] {error}")
def get_errors(self) -> List[Dict]:
"""دریافت لیست خطاها"""
return self.errors.copy()
def get_errors_for_user(self, username: str) -> List[Dict]:
"""دریافت خطاهای یک کاربر خاص"""
return [e for e in self.errors if e.get('username') == username]
def clear_errors(self) -> None:
"""پاک کردن لیست خطاها"""
self.errors = []
print("🗑️ لیست خطاها پاک شد")
def get_candle_count(self, table):
"""تعداد کندل‌های موجود در دیتابیس"""
with self._get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(f'SELECT COUNT(*) FROM {table}')
return cursor.fetchone()[0]
def load_historical_data(self, filepath='mazaneh_data.json'):
print(f"\n{'═' * 60}")
print(f"📂 بارگذاری داده‌های تاریخی")
print(f"{'═' * 60}")
try:
self.clear_database()
if not self.download_historical_from_faraz():
print(f"\n❌ خطا در دانلود داده‌ها از فرازگلد")
return False
m3_count = self.get_candle_count(self.M3_TABLE)
daily_count = self.get_candle_count('candles')
print(f"\n📊 کندل‌های ذخیره شده: M3={m3_count}, Daily={daily_count}")
if m3_count < 21 or daily_count < 3:
print(f"❌ داده کافی دانلود نشد")
return False
print(f"\n📥 بارگذاری کندل‌های M3...")
m3_rows = self.load_candles_from_db(self.M3_TABLE, limit=500)
self.m3_data.clear()
for row in m3_rows:
# sqlite3.Row است؛ else مسیر index هم جواب می‌دهد
time_value = row['time'] if isinstance(row, dict) else row[0]
open_value = row['open'] if isinstance(row, dict) else row[1]
high_value = row['high'] if isinstance(row, dict) else row[2]
low_value = row['low'] if isinstance(row, dict) else row[3]
close_value = row['close'] if isinstance(row, dict) else row[4]
self.m3_data.append({
'time': pd.Timestamp(time_value),
'open': float(open_value),
'high': float(high_value),
'low': float(low_value),
'close': float(close_value)
})
print(f" ✅ {len(self.m3_data)} کندل M3 بارگذاری شد")
print(f"\n📥 بارگذاری کندل‌های روزانه...")
daily_rows = self.load_candles_from_db('candles', limit=400)
self.daily_data.clear()
for row in daily_rows:
time_value = row['time'] if isinstance(row, dict) else row[0]
open_value = row['open'] if isinstance(row, dict) else row[1]
high_value = row['high'] if isinstance(row, dict) else row[2]
low_value = row['low'] if isinstance(row, dict) else row[3]
close_value = row['close'] if isinstance(row, dict) else row[4]
self.daily_data.append({
'time': pd.Timestamp(time_value),
'open': float(open_value),
'high': float(high_value),
'low': float(low_value),
'close': float(close_value)
})
print(f" ✅ {len(self.daily_data)} کندل روزانه بارگذاری شد")
self.m3 = pd.DataFrame(list(self.m3_data))
if not self.m3.empty:
self.m3.set_index('time', inplace=True)
self.daily = pd.DataFrame(list(self.daily_data))
if not self.daily.empty:
self.daily.set_index('time', inplace=True)
print(f"\n📊 DataFrame ایجاد شد:")
print(f" M3: {len(self.m3)} ردیف")
print(f" Daily: {len(self.daily)} ردیف")
print(f"\n📈 محاسبه اندیکاتورها...")
self.calculate_initial_indicators()
if self.COMMAND_SERVER_STATUS == "running" and self.get_connected_clients_count() > 0:
print(f"\n📤 ارسال کندل‌ها به کلاینت‌های متصل...")
self.send_last_candles()
self.warmup_complete = False
self.warmup_start_time = None
self.warmup_candles_seen = 0
print(f"\n{'═' * 60}")
print(f"✅ بارگذاری داده‌های تاریخی کامل شد")
print(f"{'═' * 60}")
print(f" 📊 M3: {len(self.m3)} کندل")
print(f" 📊 Daily: {len(self.daily)} کندل")
print(f" 🔥 منتظر داده‌های real-time برای warm-up...")
print(f"{'═' * 60}\n")
return True
except Exception as e:
print(f"\n❌ خطا در بارگذاری داده‌ها: {e}")
import traceback
traceback.print_exc()
return False
def load_from_database(self):
"""بارگذاری داده‌ها از دیتابیس"""
try:
m3_rows = self.load_candles_from_db(self.M3_TABLE, limit=500)
for row in m3_rows:
self.m3_data.append({
'time': pd.Timestamp(row['time']),
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close']
})
daily_rows = self.load_candles_from_db('candles', limit=400)
for row in daily_rows:
self.daily_data.append({
'time': pd.Timestamp(row['time']),
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close']
})
self.m3 = pd.DataFrame(list(self.m3_data))
if not self.m3.empty:
self.m3.set_index('time', inplace=True)
self.daily = pd.DataFrame(list(self.daily_data))
if not self.daily.empty:
self.daily.set_index('time', inplace=True)
print(f"✅ از دیتابیس بارگذاری شد: M3={len(self.m3)}, Daily={len(self.daily)}")
self.calculate_initial_indicators()
self.warmup_complete = False
self.warmup_start_time = None
self.warmup_candles_seen = 0
return True
except Exception as e:
print(f"❌ خطا در بارگذاری از دیتابیس: {e}")
import traceback
traceback.print_exc()
return False
def _reset_dynamic_params_to_base(self):
"""بازگردانی پارامترهای داینامیک به مقدار پایه (برای جلوگیری از تغییر تجمعی)."""
self.SCALP_ADX_THRESHOLD = float(self._base_params['SCALP_ADX_THRESHOLD'])
self.SWING_ADX_THRESHOLD = float(self._base_params['SWING_ADX_THRESHOLD'])
self.SCALP_STOP_LOSS_LINES = int(self._base_params['SCALP_STOP_LOSS_LINES'])
self.SWING_STOP_LOSS_LINES = int(self._base_params['SWING_STOP_LOSS_LINES'])
self.SCALP_RR_RATIO = float(self._base_params['SCALP_RR_RATIO'])
self.SWING_RR_RATIO = float(self._base_params['SWING_RR_RATIO'])
def determine_strategy_mode(self):
"""
تشخیص خودکار استراتژی مناسب (اسکالپ یا سوئینگ)
✅ اصلاحات:
- استفاده از کندل بسته‌شده برای جلوگیری از range_ratio=0
- در رژیم ranging با confidence کافی، اسکالپ/رنج را ترجیح می‌دهد
- ارزیابی ATR به صورت درصدی هم انجام می‌شود تا مقیاس‌پذیر باشد
"""
# حالت دستی
if hasattr(self, 'STRATEGY_MODE') and self.STRATEGY_MODE != 'auto':
print(f"🎯 استفاده از استراتژی دستی: {self.STRATEGY_MODE.upper()}")
return self.STRATEGY_MODE
if not isinstance(self.m3, pd.DataFrame) or len(self.m3) < 50:
print("⚠️ داده M3 کافی نیست، استفاده از سوئینگ به عنوان پیش‌فرض")
return 'swing'
# ✅ اگر بازار رنج با اطمینان کافی است، اسکالپ/رنج مناسب‌تر است
try:
if str(getattr(self, "market_regime", "unknown")) == "ranging" and float(getattr(self, "regime_confidence", 0) or 0) >= 60:
return 'scalp'
except Exception:
pass
try:
df = self.m3.copy()
try:
df = df.sort_index()
except Exception:
pass
# ✅ کندل مبنا: آخرین کندل بسته‌شده (نه کندل جاری)
if len(df) >= 2:
base = df.iloc[-2]
else:
base = df.iloc[-1]
base_close = float(base.get('close', df['close'].iloc[-1]))
# ATR
current_atr = None
if 'atr' in df.columns:
v = base.get('atr', None)
if pd.notna(v):
try:
current_atr = float(v)
except Exception:
current_atr = None
if current_atr is None or current_atr <= 0:
current_atr = float(df['atr'].dropna().iloc[-1]) if 'atr' in df.columns and not df['atr'].dropna().empty else 1.0
# ADX
current_adx = None
if 'adx' in df.columns:
v = base.get('adx', None)
if pd.notna(v):
try:
current_adx = float(v)
except Exception:
current_adx = None
if current_adx is None:
current_adx = 25.0
# range ratio روی 20 کندل بسته‌شده اخیر
recent = df.iloc[-21:-1] if len(df) >= 21 else df.iloc[:-1]
if len(recent) < 5:
recent = df.tail(20)
range_ratio = 1.0
if 'high' in recent.columns and 'low' in recent.columns and len(recent) >= 5:
candle_ranges = (recent['high'] - recent['low']).dropna()
if len(candle_ranges) >= 5:
avg_range = float(candle_ranges.mean())
current_range = float(candle_ranges.iloc[-1])
range_ratio = (current_range / avg_range) if avg_range > 0 else 1.0
atr_pct = (current_atr / base_close) * 100.0 if base_close > 0 else 0.0
# سیستم امتیازدهی جدید
score = 0
# ATR (درصدی)
# (حدودها قابل تنظیم‌اند، اما نسبت به قیمت مقیاس‌پذیر است)
if atr_pct < 0.06:
score += 35
atr_status = "کم"
elif atr_pct < 0.14:
score += 10
atr_status = "متوسط"
else:
score -= 25
atr_status = "زیاد"
# ADX
if current_adx < 18:
score += 35
adx_status = "ضعیف (مناسب اسکالپ)"
elif current_adx < 25:
score += 10
adx_status = "متوسط"
else:
score -= 20
adx_status = "قوی (مناسب سوئینگ)"
# رنج نسبی
if range_ratio < 0.7:
score += 20
range_status = "کم نسبت به میانگین"
elif range_ratio > 1.3:
score -= 20
range_status = "زیاد نسبت به میانگین"
else:
range_status = "نرمال"
if score >= 35:
strategy = 'scalp'
confidence = "متوسط/بالا"
else:
strategy = 'swing'
confidence = "متوسط/بالا"
print(f"\n{'─' * 50}")
print(f"🎯 تشخیص استراتژی خودکار (M3) [اصلاح‌شده]")
print(f"{'─' * 50}")
print(f" 📊 ATR: {current_atr:.2f} | ATR%: {atr_pct:.3f}% - {atr_status}")
print(f" 📈 ADX: {current_adx:.1f} - {adx_status}")
print(f" 📉 نوسان نسبی: {range_ratio:.2f}x - {range_status}")
print(f" 🔢 امتیاز نهایی: {score:+d}")
print(f" ✅ استراتژی: {strategy.upper()} (اطمینان: {confidence})")
print(f"{'─' * 50}")
return strategy
except Exception as e:
print(f"⚠️ خطا در تشخیص استراتژی: {e}")
import traceback
traceback.print_exc()
return 'swing'
def calculate_daily_trend_score(self):
"""
محاسبه امتیاز روند روزانه (100- تا 100+)
این تابع روند روزانه را با سیستم امتیازدهی تحلیل می‌کند
به جای شرط باینری (آری/خیر) که در کد قبلی بود.
امتیاز مثبت → روند صعودی (مناسب LONG)
امتیاز منفی → روند نزولی (مناسب SHORT)
امتیاز نزدیک به صفر → بازار خنثی
معیارها و وزن‌ها:
─────────────────────────────────────────
معیار | وزن
─────────────────────────────────────────
1. قیمت نسبت به EMA55 | ±40 امتیاز
2. فاصله EMA55 از EMA200 | ±30 امتیاز
3. RSI روزانه | ±30 امتیاز
─────────────────────────────────────────
بازگشت:
- امتیاز بین 100- تا 100+
"""
# بررسی داده کافی
if len(self.daily) < 3:
print("⚠️ داده روزانه کافی نیست برای محاسبه امتیاز روند")
return 0
try:
last = self.daily.iloc[-1]
score = 0
# دریافت مقادیر پایه
daily_close = last['close']
# ═══════════════════════════════════════════════════════════
# 📊 1. موقعیت قیمت نسبت به EMA55 (±40 امتیاز)
# این مهم‌ترین معیار است
# ═══════════════════════════════════════════════════════════
daily_ema55 = last.get('ema55', daily_close)
if pd.isna(daily_ema55):
daily_ema55 = daily_close
price_vs_ema55 = daily_close - daily_ema55
if price_vs_ema55 > 15: # قیمت بسیار بالاتر از EMA55
score += 40
ema55_status = f"بسیار بالاتر (+{price_vs_ema55:.1f}$)"
elif price_vs_ema55 > 8: # 8-15 دلار بالاتر
score += 30
ema55_status = f"بالاتر (+{price_vs_ema55:.1f}$)"
elif price_vs_ema55 > 0: # 0-8 دلار بالاتر
score += 20
ema55_status = f"کمی بالاتر (+{price_vs_ema55:.1f}$)"
elif price_vs_ema55 > -8: # 0 تا 8- دلار پایین‌تر
score += 0
ema55_status = f"نزدیک ({price_vs_ema55:+.1f}$)"
elif price_vs_ema55 > -15: # 8-15 دلار پایین‌تر
score -= 20
ema55_status = f"پایین‌تر ({price_vs_ema55:+.1f}$)"
else: # بیشتر از 15 دلار پایین‌تر
score -= 40
ema55_status = f"بسیار پایین‌تر ({price_vs_ema55:+.1f}$)"
# ═══════════════════════════════════════════════════════════
# 📊 2. فاصله EMA55 از EMA200 (±30 امتیاز)
# نشان‌دهنده قدرت روند بلندمدت
# ═══════════════════════════════════════════════════════════
daily_ema200 = last.get('ema200', daily_close)
if pd.isna(daily_ema200):
daily_ema200 = daily_close
ema55_vs_ema200 = daily_ema55 - daily_ema200
if ema55_vs_ema200 > 25: # فاصله زیاد = روند صعودی قوی
score += 30
ema_cross_status = f"صعودی قوی (+{ema55_vs_ema200:.1f}$)"
elif ema55_vs_ema200 > 12: # فاصله متوسط
score += 20
ema_cross_status = f"صعودی (+{ema55_vs_ema200:.1f}$)"
elif ema55_vs_ema200 > 0: # فاصله کم اما مثبت
score += 10
ema_cross_status = f"صعودی ضعیف (+{ema55_vs_ema200:.1f}$)"
elif ema55_vs_ema200 > -12: # فاصله کم اما منفی
score -= 10
ema_cross_status = f"نزولی ضعیف ({ema55_vs_ema200:+.1f}$)"
elif ema55_vs_ema200 > -25: # فاصله متوسط منفی
score -= 20
ema_cross_status = f"نزولی ({ema55_vs_ema200:+.1f}$)"
else: # فاصله زیاد منفی = روند نزولی قوی
score -= 30
ema_cross_status = f"نزولی قوی ({ema55_vs_ema200:+.1f}$)"
# ═══════════════════════════════════════════════════════════
# 📊 3. RSI روزانه (±30 امتیاز)
# به عنوان تأیید، نه فیلتر سخت
# ═══════════════════════════════════════════════════════════
daily_rsi = last.get('rsi14', 50)
if pd.isna(daily_rsi):
daily_rsi = 50
if daily_rsi > 75: # اشباع خرید شدید
score -= 15 # هشدار برای LONG جدید
rsi_status = f"اشباع خرید ({daily_rsi:.1f})"
elif daily_rsi > 65: # نزدیک به اشباع خرید
score += 5
rsi_status = f"نزدیک اشباع خرید ({daily_rsi:.1f})"
elif daily_rsi > 45: # منطقه طبیعی صعودی
score += 15 # بهترین منطقه برای LONG
rsi_status = f"طبیعی صعودی ({daily_rsi:.1f})"
elif daily_rsi > 35: # منطقه طبیعی نزولی
score += 10
rsi_status = f"طبیعی نزولی ({daily_rsi:.1f})"
elif daily_rsi > 25: # نزدیک به اشباع فروش
score += 5
rsi_status = f"نزدیک اشباع فروش ({daily_rsi:.1f})"
else: # اشباع فروش شدید
score -= 15 # هشدار برای SHORT جدید
rsi_status = f"اشباع فروش ({daily_rsi:.1f})"
# ═══════════════════════════════════════════════════════════
# 📊 تفسیر امتیاز نهایی
# ═══════════════════════════════════════════════════════════
if score >= 60:
trend_interpretation = "روند صعودی قوی 📈📈"
elif score >= 30:
trend_interpretation = "روند صعودی متوسط 📈"
elif score >= 10:
trend_interpretation = "روند صعودی ضعیف 📊"
elif score <= -60:
trend_interpretation = "روند نزولی قوی 📉📉"
elif score <= -30:
trend_interpretation = "روند نزولی متوسط 📉"
elif score <= -10:
trend_interpretation = "روند نزولی ضعیف 📊"
else:
trend_interpretation = "بازار خنثی ➡️"
# ═══════════════════════════════════════════════════════════
# 📊 نمایش نتایج
# ═══════════════════════════════════════════════════════════
print(f"\n{'─' * 50}")
print(f"📊 تحلیل روند روزانه")
print(f"{'─' * 50}")
print(f" 💰 قیمت: {daily_close:.2f}")
print(f" 📈 قیمت vs EMA55: {ema55_status}")
print(f" 📊 EMA55 vs EMA200: {ema_cross_status}")
print(f" 📉 RSI روزانه: {rsi_status}")
print(f" 🔢 امتیاز نهایی: {score:+d}")
print(f" 🎯 تفسیر: {trend_interpretation}")
print(f"{'─' * 50}")
return score
except Exception as e:
print(f"⚠️ خطا در محاسبه امتیاز روند روزانه: {e}")
import traceback
traceback.print_exc()
return 0
def calculate_current_profit_lines(self, current_price):
"""
محاسبه سود/ضرر فعلی بر حسب خط
بازگشت:
- تعداد خطوط سود (مثبت) یا ضرر (منفی)
"""
if not self.in_trade or not self.entry_price:
return 0
current_price = int(current_price)
entry_price = int(self.entry_price)
if self.trade_direction == 'long':
return current_price - entry_price
elif self.trade_direction == 'short':
return entry_price - current_price
else:
return 0
def is_market_ranging_simple(self):
"""یک تشخیص ساده بازار رنج (اصلاح‌شده: بدون استفاده از متغیر تعریف‌نشده)"""
if len(self.m3) < 20:
return False
current_adx = self.m3['adx'].iloc[-1] if 'adx' in self.m3.columns else 25
if pd.notna(current_adx) and float(current_adx) > 25:
return False
# اگر بولینگر نداریم، فقط با ADX تصمیم می‌گیریم
if not ('bb_upper' in self.m3.columns and 'bb_lower' in self.m3.columns and 'bb_middle' in self.m3.columns):
return True
current_price = self.m3['close'].iloc[-1]
bb_upper = self.m3['bb_upper'].iloc[-1]
bb_lower = self.m3['bb_lower'].iloc[-1]
bb_middle = self.m3['bb_middle'].iloc[-1]
if pd.isna(current_price) or pd.isna(bb_upper) or pd.isna(bb_lower) or pd.isna(bb_middle):
return False
if not (bb_lower < current_price < bb_upper):
return False
bb_width = bb_upper - bb_lower
if bb_middle == 0 or pd.isna(bb_width):
return False
if (bb_width / bb_middle) > 0.03:
return False
return True
def send_realtime_price(self, current_price) -> bool:
"""
ارسال قیمت لحظه‌ای به همه کلاینت‌ها
"""
current_price = int(current_price)
unrealized_pnl = None
if self.in_trade:
unrealized_pnl = self.calculate_unrealized_pnl(current_price)
command = {
"type": "realtime_price",
"price": current_price,
"unrealized_pnl": unrealized_pnl,
"timestamp": datetime.now().isoformat()
}
clients_count = self.get_connected_clients_count()
if clients_count > 0:
self.broadcast_to_clients(command)
return True
return False
def send_heartbeat(self) -> bool:
"""
ارسال پیام heartbeat به همه کلاینت‌ها
"""
command = {
"type": "heartbeat",
"timestamp": datetime.now().isoformat(),
"status": {
"connected": self.CONNECTION_STATUS == "connected",
"in_trade": self.in_trade,
"trade_direction": self.trade_direction,
"balance": round(self.balance, 0),
"trades_count": self.trades,
"market_price": round(self.MARKET_PRICE, 2) if self.MARKET_PRICE else None,
"connected_clients": self.get_connected_clients_count(),
"warmup_complete": self.warmup_complete
}
}
clients_count = self.get_connected_clients_count()
if clients_count > 0:
self.broadcast_to_clients(command)
return True
return False
def send_command(self, command_data: dict) -> bool:
"""
ارسال دستور به همه کلاینت‌های متصل
این متد جایگزین متد قبلی است که به سرور خارجی وصل می‌شد
حالا مستقیماً به کلاینت‌های متصل broadcast می‌کند
"""
try:
if "command_id" not in command_data:
command_data["command_id"] = f"cmd_{datetime.now().strftime('%Y%m%d%H%M%S%f')}"
if "timestamp" not in command_data:
command_data["timestamp"] = datetime.now().isoformat()
clients_count = self.get_connected_clients_count()
if clients_count == 0:
print(f"⚠️ هیچ کلاینتی متصل نیست. دستور: {command_data.get('type')}")
return False
success_count = self.broadcast_to_clients(command_data)
if success_count > 0:
print(f"📤 دستور به {success_count}/{clients_count} کلاینت ارسال شد: {command_data.get('type')}")
return True
else:
print(f"❌ ارسال دستور ناموفق: {command_data.get('type')}")
return False
except Exception as e:
print(f"❌ خطا در ارسال دستور: {e}")
import traceback
traceback.print_exc()
return False
def send_enter_trade_command(self, direction, entry_price, stop_loss, take_profit):
"""
ارسال سیگنال ورود به معامله به کلاینت‌ها
Args:
direction: جهت معامله - 'long' یا 'short'
entry_price: قیمت ورود
stop_loss: قیمت حد ضرر
take_profit: قیمت حد سود
position_size: حجم معامله (پیش‌فرض 1)
Returns:
bool: آیا ارسال سیگنال موفق بود
"""
hiva_direction = ''
if direction == 'long':
hiva_direction = 'buy'
else:
hiva_direction = 'sell'
print(f"Hiva Direction: {hiva_direction}")
strategy = self.current_trade_strategy if self.current_trade_strategy else 'unknown'
entry_price = int(entry_price)
stop_loss = int(stop_loss)
take_profit = int(take_profit)
stop_distance_lines = abs(entry_price - stop_loss)
profit_distance_lines = abs(take_profit - entry_price)
risk_toman = stop_distance_lines * self.LINE_VALUE_TOMAN
potential_profit_toman = profit_distance_lines * self.LINE_VALUE_TOMAN
trade_id = f"trade_{datetime.now().strftime('%Y%m%d%H%M%S%f')}"
self.current_internal_trade_id = trade_id
command = {
"type": "enter_trade",
"direction": hiva_direction,
"symbol": "MAZANEH",
"entry_price": entry_price,
"stop_loss": stop_loss,
"take_profit": take_profit,
"strategy": strategy,
"stop_distance_lines": stop_distance_lines,
"profit_distance_lines": profit_distance_lines,
"risk_toman": round(risk_toman, 0),
"potential_profit_toman": round(potential_profit_toman, 0),
"risk_reward_ratio": round(profit_distance_lines / stop_distance_lines, 2) if stop_distance_lines > 0 else 0,
"line_value_toman": self.LINE_VALUE_TOMAN,
"risk_percent": self.RISK_PERCENTAGE * 100,
"trade_id": trade_id,
"timestamp": datetime.now().isoformat()
}
print(f"\n{'═' * 60}")
print(f"📤 سیگنال ورود به معامله {direction.upper()} ({strategy.upper()})")
print(f"{'═' * 60}")
print(f" 💰 قیمت ورود: {entry_price}")
print(f" 🛡️ حد ضرر: {stop_loss} ({stop_distance_lines} خط)")
print(f" 🎯 حد سود: {take_profit} ({profit_distance_lines} خط)")
print(f" 💸 ریسک: {risk_toman:,.0f} تومان")
print(f" 💰 سود بالقوه: {potential_profit_toman:,.0f} تومان")
print(f" 🔑 شناسه: {trade_id}")
print(f"{'═' * 60}")
return self.send_command(command)
def send_exit_trade_command(self, exit_price, reason, pnl_toman, pnl_percent):
"""
ارسال سیگنال خروج از معامله به کلاینت‌ها
Args:
exit_price: قیمت خروج
reason: دلیل خروج
pnl_toman: سود/ضرر به تومان
pnl_percent: سود/ضرر به درصد
Returns:
bool: آیا ارسال سیگنال موفق بود
"""
exit_price = int(exit_price)
entry_price = int(self.entry_price)
if self.trade_direction == 'long':
pnl_lines = exit_price - entry_price
else:
pnl_lines = entry_price - exit_price
hiva_direction = ''
if self.trade_direction == 'long':
hiva_direction = 'buy'
else:
hiva_direction = 'sell'
command = {
"type": "exit_trade",
"symbol": "MAZANEH",
"exit_price": exit_price,
"reason": reason,
"direction": hiva_direction,
"entry_price": entry_price,
"pnl_lines": pnl_lines,
"pnl_toman": round(pnl_toman, 0),
"pnl_percent": round(pnl_percent, 2),
"line_value_toman": self.LINE_VALUE_TOMAN,
"trade_duration": str(datetime.now() - self.trade_start_time) if self.trade_start_time else "0:00:00",
"strategy": self.current_trade_strategy,
"trade_id": self.current_internal_trade_id
}
emoji = "💰" if pnl_toman >= 0 else "💸"
print(f"\n{'═' * 60}")
print(f"📤 سیگنال خروج از معامله")
print(f"{'═' * 60}")
print(f" {emoji} سود/ضرر: {pnl_lines:+} خط ({pnl_toman:+,.0f} تومان) ({pnl_percent:+.1f}%)")
print(f" 📍 قیمت خروج: {exit_price}")
print(f" 📋 دلیل: {reason}")
print(f" 🔑 شناسه: {self.current_internal_trade_id}")
print(f"{'═' * 60}")
return self.send_command(command)
def send_update_stop_loss_command(self, new_stop_loss, reason):
"""
ارسال سیگنال تغییر حد ضرر به کلاینت‌ها
Args:
new_stop_loss: قیمت حد ضرر جدید
reason: دلیل تغییر
Returns:
bool: آیا ارسال سیگنال موفق بود
"""
new_stop_loss = int(new_stop_loss)
old_stop_loss = int(self.stop_loss_price)
entry_price = int(self.entry_price)
current_price = int(self.MARKET_PRICE) if self.MARKET_PRICE else 0
take_profit = int(self.take_profit_price)
if self.trade_direction == 'long':
locked_profit_lines = new_stop_loss - entry_price
else:
locked_profit_lines = entry_price - new_stop_loss
locked_profit_toman = locked_profit_lines * self.LINE_VALUE_TOMAN
command = {
"type": "update_stop_loss",
"symbol": "MAZANEH",
"direction": self.trade_direction,
"old_stop_loss": old_stop_loss,
"new_stop_loss": new_stop_loss,
"entry_price": entry_price,
"current_price": current_price,
"take_profit": take_profit,
"reason": reason,
"locked_profit_lines": locked_profit_lines,
"locked_profit_toman": locked_profit_toman,
"line_value_toman": self.LINE_VALUE_TOMAN,
"trade_id": self.current_internal_trade_id
}
print(f"\n{'═' * 60}")
print(f"📤 سیگنال تغییر حد ضرر")
print(f"{'═' * 60}")
print(f" 🛡️ حد ضرر: {old_stop_loss} → {new_stop_loss}")
print(f" 💰 سود قفل شده: {locked_profit_lines} خط ({locked_profit_toman:,.0f} تومان)")
print(f" 📋 دلیل: {reason}")
print(f" 🔑 شناسه: {self.current_internal_trade_id}")
print(f"{'═' * 60}")
return self.send_command(command)
def send_update_take_profit_command(self, new_take_profit, reason):
"""
ارسال سیگنال تغییر حد سود به کلاینت‌ها
Args:
new_take_profit: قیمت حد سود جدید
reason: دلیل تغییر
Returns:
bool: آیا ارسال سیگنال موفق بود
"""
new_take_profit = int(new_take_profit)
old_take_profit = int(self.take_profit_price)
entry_price = int(self.entry_price)
current_price = int(self.MARKET_PRICE) if self.MARKET_PRICE else 0
stop_loss = int(self.stop_loss_price)
if self.trade_direction == 'long':
tp_distance_lines = new_take_profit - entry_price
else:
tp_distance_lines = entry_price - new_take_profit
potential_profit_toman = tp_distance_lines * self.LINE_VALUE_TOMAN
command = {
"type": "update_take_profit",
"symbol": "MAZANEH",
"direction": self.trade_direction,
"old_take_profit": old_take_profit,
"new_take_profit": new_take_profit,
"entry_price": entry_price,
"current_price": current_price,
"stop_loss": stop_loss,
"reason": reason,
"tp_distance_lines": tp_distance_lines,
"potential_profit_toman": potential_profit_toman,
"line_value_toman": self.LINE_VALUE_TOMAN,
"trade_id": self.current_internal_trade_id
}
print(f"\n{'═' * 60}")
print(f"📤 سیگنال تغییر حد سود")
print(f"{'═' * 60}")
print(f" 🎯 حد سود: {old_take_profit} → {new_take_profit}")
print(f" 📏 فاصله از ورود: {tp_distance_lines} خط ({potential_profit_toman:,.0f} تومان)")
print(f" 📋 دلیل: {reason}")
print(f" 🔑 شناسه: {self.current_internal_trade_id}")
print(f"{'═' * 60}")
return self.send_command(command)
def calculate_unrealized_pnl(self, current_price):
"""
محاسبه سود/ضرر شناور (برای معامله باز)
🔄 تغییر از تیک به خط
بازگشت:
- dict با کلیدهای: toman, percent, lines
"""
if not self.in_trade or not self.entry_price:
return {"toman": 0, "percent": 0, "lines": 0}
try:
# 🔄 تبدیل به عدد صحیح
current_price = int(current_price)
entry_price = int(self.entry_price)
# محاسبه تعداد خطوط
if self.trade_direction == 'long':
lines = current_price - entry_price
elif self.trade_direction == 'short':
lines = entry_price - current_price
else:
return {"toman": 0, "percent": 0, "lines": 0}
# 🔄 محاسبه به تومان
pnl_toman = lines * self.LINE_VALUE_TOMAN
pnl_percent = (pnl_toman / self.PORTFOLIO_SIZE) * 100
return {
"toman": round(pnl_toman, 0),
"percent": round(pnl_percent, 2),
"lines": int(lines)
}
except Exception as e:
print(f"⚠️ خطا در محاسبه سود شناور: {e}")
return {"toman": 0, "percent": 0, "lines": 0}
def calculate_initial_indicators(self):
try:
# --- Daily indicators ---
if len(self.daily) >= 200:
ema55_series = ta.ema(self.daily['close'], length=55)
if ema55_series is not None:
self.daily['ema55'] = ema55_series
ema200_series = ta.ema(self.daily['close'], length=200)
if ema200_series is not None:
self.daily['ema200'] = ema200_series
rsi_series = ta.rsi(self.daily['close'], length=14)
if rsi_series is not None:
self.daily['rsi14'] = rsi_series
print("✅ اندیکاتورهای روزانه محاسبه شدند")
else:
print(f"⚠️ داده روزانه کافی نیست برای EMA200 (نیاز: 200، موجود: {len(self.daily)})")
# --- M3 indicators ---
if len(self.m3) >= 50:
self.update_m3_indicators()
print("✅ اندیکاتورهای M3 (3 دقیقه‌ای) محاسبه شدند")
else:
print(f"⚠️ داده M3 کافی نیست برای اندیکاتورها (نیاز: 50، موجود: {len(self.m3)})")
# ✅ مهم: ست کردن رژیم بازار از همینجا
try:
# اگر از قبل چیزی ست نشده، یا هنوز unknown/0 است، یک بار آپدیت کن
cur_regime = str(getattr(self, "market_regime", "unknown") or "unknown")
cur_conf = float(getattr(self, "regime_confidence", 0) or 0)
if cur_regime == "unknown" or cur_conf == 0.0:
self._update_market_regime()
print(f"✅ رژیم بازار اولیه ست شد: {self.market_regime} ({self.regime_confidence}%)")
except Exception as re:
print(f"⚠️ خطا در مقداردهی اولیه رژیم بازار: {re}")
except Exception as e:
print(f"خطا در محاسبه اندیکاتورها: {e}")
import traceback
traceback.print_exc()
def update_m3_indicators(self):
"""محاسبه اندیکاتورهای M3 (3 دقیقه‌ای) - نسخه پایدارتر"""
try:
if not isinstance(self.m3, pd.DataFrame) or self.m3.empty:
return
if len(self.m3) < 50:
return
required_cols = {'open', 'high', 'low', 'close'}
if not required_cols.issubset(set(self.m3.columns)):
missing = required_cols - set(self.m3.columns)
print(f"⚠️ ستون‌های OHLC ناقص است: {missing}")
return
df = self.m3.copy()
try:
df = df.sort_index()
except Exception:
pass
# EMA
df['ema9'] = ta.ema(df['close'], length=9)
df['ema21'] = ta.ema(df['close'], length=21)
df['ema50'] = ta.ema(df['close'], length=50)
# RSI
df['rsi14'] = ta.rsi(df['close'], length=14)
# ADX
try:
adx_result = ta.adx(df['high'], df['low'], df['close'], length=14)
if adx_result is not None and hasattr(adx_result, 'columns'):
adx_col = None
for col in adx_result.columns:
if 'ADX' in str(col).upper():
adx_col = col
break
df['adx'] = adx_result[adx_col] if adx_col else 25.0
else:
df['adx'] = 25.0
except Exception as e:
print(f"⚠️ خطا در محاسبه ADX: {e}")
df['adx'] = 25.0
# MACD
try:
macd_result = ta.macd(df['close'])
if macd_result is not None and not macd_result.empty:
df['macd'] = macd_result.iloc[:, 0] if macd_result.shape[1] > 0 else 0
df['macd_signal'] = macd_result.iloc[:, 1] if macd_result.shape[1] > 1 else 0
df['macd_histogram'] = macd_result.iloc[:, 2] if macd_result.shape[1] > 2 else 0
else:
df['macd'] = 0
df['macd_signal'] = 0
df['macd_histogram'] = 0
except Exception as e:
print(f"⚠️ خطا در محاسبه MACD: {e}")
df['macd'] = 0
df['macd_signal'] = 0
df['macd_histogram'] = 0
# ATR
try:
df['atr'] = ta.atr(df['high'], df['low'], df['close'], length=14)
except Exception as e:
print(f"⚠️ خطا در محاسبه ATR: {e}")
df['atr'] = 20
# Bollinger
try:
bb = ta.bbands(df['close'], length=20, std=2)
if bb is None or not hasattr(bb, 'columns') or len(bb.columns) == 0:
raise ValueError("بولینگر باند معتبر نیست")
upper_col = middle_col = lower_col = None
for col in bb.columns:
c = str(col).upper()
if 'BBU' in c:
upper_col = col
elif 'BBM' in c:
middle_col = col
elif 'BBL' in c:
lower_col = col
if upper_col and middle_col and lower_col:
df['bb_upper'] = bb[upper_col]
df['bb_middle'] = bb[middle_col]
df['bb_lower'] = bb[lower_col]
else:
raise ValueError(f"ستون‌های بولینگر پیدا نشد: {bb.columns.tolist()}")
except Exception as e:
print(f"⚠️ خطا در بولینگر باند: {e}")
window = 20
sma = df['close'].rolling(window=window).mean()
std = df['close'].rolling(window=window).std()
df['bb_upper'] = sma + (std * 2)
df['bb_middle'] = sma
df['bb_lower'] = sma - (std * 2)
# Stochastic
try:
stoch_result = ta.stoch(df['high'], df['low'], df['close'])
if stoch_result is not None and hasattr(stoch_result, 'columns'):
k_col = d_col = None
for col in stoch_result.columns:
c = str(col).upper()
if 'STOCHK' in c:
k_col = col
elif 'STOCHD' in c:
d_col = col
df['stoch_k'] = stoch_result[k_col] if k_col else 50
df['stoch_d'] = stoch_result[d_col] if d_col else 50
else:
df['stoch_k'] = 50
df['stoch_d'] = 50
except Exception as e:
print(f"⚠️ خطا در محاسبه Stochastic: {e}")
df['stoch_k'] = 50
df['stoch_d'] = 50
# price_vs_bb
if 'bb_lower' in df.columns and 'bb_upper' in df.columns:
try:
bb_range = df['bb_upper'] - df['bb_lower']
mask = bb_range != 0
df.loc[mask, 'price_vs_bb'] = (df.loc[mask, 'close'] - df.loc[mask, 'bb_lower']) / bb_range[mask] * 100
df.loc[~mask, 'price_vs_bb'] = 50
except Exception:
df['price_vs_bb'] = 50
else:
df['price_vs_bb'] = 50
self.m3 = df
print("✅ اندیکاتورهای M3 به‌روز شدند")
try:
print(f" 📊 نمونه: ADX={df['adx'].iloc[-1]:.1f}, RSI={df['rsi14'].iloc[-1]:.1f}, ATR={df['atr'].iloc[-1]:.1f}")
print(f" 📈 EMA: 9={df['ema9'].iloc[-1]:.0f}, 21={df['ema21'].iloc[-1]:.0f}, 50={df['ema50'].iloc[-1]:.0f}")
except Exception:
pass
except Exception as e:
print(f"⚠️ خطای کلی در update_m3_indicators: {e}")
import traceback
traceback.print_exc()
def calculate_dynamic_stop_loss(self, current_price: float, direction: str) -> int:
"""محاسبه حد ضرر پویا - اصلاح‌شده: شرط بولینگر کامل"""
strategy = self.determine_strategy_mode()
if strategy == 'scalp':
base_stop = self.SCALP_STOP_LOSS_LINES
min_stop = self.SCALP_MIN_STOP
max_stop = self.SCALP_MAX_STOP
atr_mult = self.SCALP_ATR_MULTIPLIER
risk_percent = self.SCALP_RISK_PERCENTAGE
strategy_name = "اسکالپ"
else:
base_stop = self.SWING_STOP_LOSS_LINES
min_stop = self.SWING_MIN_STOP
max_stop = self.SWING_MAX_STOP
atr_mult = self.SWING_ATR_MULTIPLIER
risk_percent = self.SWING_RISK_PERCENTAGE
strategy_name = "سوئینگ"
atr_lines = 0
if len(self.m3) >= 14 and 'atr' in self.m3.columns:
atr = self.m3['atr'].iloc[-1]
if pd.notna(atr) and float(atr) > 0:
atr_lines = int(float(atr))
atr_based_stop = int(atr_lines * atr_mult)
bb_based_stop = 0
if 'bb_upper' in self.m3.columns and 'bb_lower' in self.m3.columns:
bb_upper = self.m3['bb_upper'].iloc[-1]
bb_lower = self.m3['bb_lower'].iloc[-1]
if pd.notna(bb_upper) and pd.notna(bb_lower):
bb_width = float(bb_upper) - float(bb_lower)
if bb_width > 0:
bb_based_stop = int(bb_width * 0.3)
stop_loss = max(base_stop, atr_based_stop, bb_based_stop)
stop_loss = max(min_stop, min(max_stop, stop_loss))
risk_amount = int(self.portfolio * risk_percent)
risk_in_lines = risk_amount // self.LINE_VALUE_TOMAN
if risk_in_lines > 0 and risk_in_lines < stop_loss:
stop_loss = max(min_stop, risk_in_lines)
print(f" ⚠️ محدودیت ریسک مالی: حد ضرر به {stop_loss} خط کاهش یافت")
current_price_int = int(current_price)
if direction == 'long':
stop_price = current_price_int - stop_loss
else:
stop_price = current_price_int + stop_loss
print(f"\n{'─' * 50}")
print(f"🛡️ محاسبه حد ضرر پویا - {strategy_name}")
print(f"{'─' * 50}")
print(f" 📊 استراتژی: {strategy_name}")
print(f" 📏 حد ضرر پایه: {base_stop} خط")
print(f" 📈 ATR: {atr_lines} خط → {atr_based_stop} خط")
print(f" 📊 Bollinger: {bb_based_stop} خط")
print(f" 🔢 بزرگترین: {stop_loss} خط")
print(f" 📍 قیمت ورود: {current_price_int}")
print(f" 🛡️ قیمت حد ضرر: {stop_price}")
print(f" 📏 فاصله: {stop_loss} خط")
print(f" 💰 ریسک مالی: {risk_amount:,.0f} تومان = {risk_in_lines} خط")
print(f"{'─' * 50}")
return stop_loss
def analyze_market_momentum(self):
"""تحلیل مومنتوم بازار برای فیلتر سیگنال‌ها"""
if len(self.m3) < 50:
return {'momentum': 'neutral', 'strength': 0}
df = self.m3
momentum_score = 0
reasons = []
# 1. بررسی MACD
if 'macd' in df.columns and 'macd_signal' in df.columns:
macd_value = df['macd'].iloc[-1]
macd_signal = df['macd_signal'].iloc[-1]
if macd_value > macd_signal:
momentum_score += 25
reasons.append("MACD صعودی")
else:
momentum_score -= 25
reasons.append("MACD نزولی")
# 2. بررسی Stochastic
if 'stoch_k' in df.columns and 'stoch_d' in df.columns:
stoch_k = df['stoch_k'].iloc[-1]
stoch_d = df['stoch_d'].iloc[-1]
if stoch_k > 80 and stoch_d > 80:
momentum_score -= 15 # اشباع خرید
reasons.append("Stochastic اشباع خرید")
elif stoch_k < 20 and stoch_d < 20:
momentum_score += 15 # اشباع فروش
reasons.append("Stochastic اشباع فروش")
elif stoch_k > stoch_d:
momentum_score += 10
reasons.append("Stochastic صعودی")
else:
momentum_score -= 10
reasons.append("Stochastic نزولی")
# 3. بررسی موقعیت قیمت در Bollinger Bands
if 'bb_upper' in df.columns and 'bb_lower' in df.columns:
current_price = df['close'].iloc[-1]
bb_upper = df['bb_upper'].iloc[-1]
bb_lower = df['bb_lower'].iloc[-1]
bb_middle = df['bb_middle'].iloc[-1]
if current_price > bb_upper:
momentum_score -= 20 # خارج از باند بالا - اصلاح احتمالی
reasons.append("خارج از باند بالا")
elif current_price < bb_lower:
momentum_score += 20 # خارج از باند پایین - بازگشت احتمالی
reasons.append("خارج از باند پایین")
elif current_price > bb_middle:
momentum_score += 10
reasons.append("بالای وسط باند")
else:
momentum_score -= 10
reasons.append("زیر وسط باند")
# تفسیر نتیجه
if momentum_score >= 30:
momentum = 'strong_bullish'
elif momentum_score >= 10:
momentum = 'bullish'
elif momentum_score <= -30:
momentum = 'strong_bearish'
elif momentum_score <= -10:
momentum = 'bearish'
else:
momentum = 'neutral'
return {
'momentum': momentum,
'strength': momentum_score,
'reasons': reasons
}
def smart_trailing_stop(self, current_price):
"""
تریلینگ استاپ آویزان - نسخه اصلاح شده نهایی
فقط وقتی قیمت به سمت سود حرکت می‌کند حدها تغییر می‌کنند
"""
if not self.TRAILING_ENABLED:
return False
if not self.in_trade:
return False
if self.trade_direction is None:
return False
current_price = int(current_price)
entry_price = int(self.entry_price)
stop_loss = int(self.stop_loss_price)
take_profit = int(self.take_profit_price)
direction = self.trade_direction
sl_distance = self.TRAILING_STEP_LINES
if direction == 'long':
# ✅ اصلاح ۱: همیشه old_highest را ذخیره کن
old_highest = self.highest_price_in_trade
# ✅ اصلاح ۲: همیشه highest را آپدیت کن (با ماکسیمم)
self.highest_price_in_trade = max(self.highest_price_in_trade, current_price)
# ✅ اصلاح ۳: بررسی کن آیا قیمت واقعاً بالا رفته؟
price_moved_up = current_price > old_highest
# محاسبه سود نسبت به بالاترین قیمت
current_profit_lines = self.highest_price_in_trade - entry_price
# ✅ اصلاح ۴: شرط فعال‌سازی تریلینگ
if current_profit_lines < self.TRAILING_STOP_LINES:
if price_moved_up:
print(f"📈 بالاترین قیمت آپدیت شد: {current_price} (سود: {current_profit_lines} خط - منتظر {self.TRAILING_STOP_LINES} خط برای تریلینگ)")
current_profit = current_price - entry_price
print(f"📊 LONG - قیمت: {current_price} | بالاترین: {self.highest_price_in_trade} | سود: {current_profit} خط | SL: {stop_loss} | TP: {take_profit}")
return False
# ✅ اصلاح ۵: فقط وقتی قیمت بالا رفته تریلینگ فعال کن
if price_moved_up:
move_amount = current_price - old_highest
# محاسبه حد ضرر جدید
new_stop_loss = current_price - sl_distance
# ✅ اصلاح ۶: شرط باید new_stop_loss > stop_loss باشد
if new_stop_loss > stop_loss:
old_sl = stop_loss
self.stop_loss_price = new_stop_loss
# محاسبه حد سود جدید (با نسبت ریسک به ریوارد)
new_take_profit = take_profit + move_amount
# ✅ اصلاح ۷: حد سود جدید باید حداقل ۱.۵ برابر حد ضرر باشد
min_profit = entry_price + int((new_stop_loss - entry_price) * 1.5)
new_take_profit = max(new_take_profit, min_profit)
old_tp = take_profit
self.take_profit_price = new_take_profit
locked_profit = self.stop_loss_price - entry_price
locked_profit_toman = locked_profit * self.LINE_VALUE_TOMAN
print(f"\n🎯 تریلینگ آویزان LONG آپدیت شد!")
print(f" 📈 قیمت: {old_highest} → {current_price} (+{move_amount} خط)")
print(f" 🛡️ SL: {old_sl} → {self.stop_loss_price}")
print(f" 🎯 TP: {old_tp} → {self.take_profit_price}")
print(f" 💰 سود قفل شده: {locked_profit} خط ({locked_profit_toman:,.0f} تومان)")
self.send_update_stop_loss_command(
new_stop_loss=self.stop_loss_price,
reason=f"تریلینگ آویزان - سود قفل: {locked_profit} خط"
)
self.send_update_take_profit_command(
new_take_profit=self.take_profit_price,
reason=f"تریلینگ آویزان - TP جدید"
)
return True
# نمایش وضعیت فعلی
current_profit = current_price - entry_price
print(f"📊 LONG - قیمت: {current_price} | بالاترین: {self.highest_price_in_trade} | سود: {current_profit} خط | SL: {stop_loss} | TP: {take_profit}")
elif direction == 'short':
# ✅ اصلاح ۱: همیشه old_lowest را ذخیره کن
if self.lowest_price_in_trade == float('inf'):
self.lowest_price_in_trade = current_price
old_lowest = current_price
else:
old_lowest = self.lowest_price_in_trade
# ✅ اصلاح ۲: همیشه lowest را آپدیت کن (با مینیمم)
self.lowest_price_in_trade = min(self.lowest_price_in_trade, current_price)
# ✅ اصلاح ۳: بررسی کن آیا قیمت واقعاً پایین رفته؟
price_moved_down = current_price < old_lowest
# محاسبه سود نسبت به پایین‌ترین قیمت
if self.lowest_price_in_trade != float('inf'):
current_profit_lines = entry_price - self.lowest_price_in_trade
else:
current_profit_lines = 0
# ✅ اصلاح ۴: شرط فعال‌سازی تریلینگ
if current_profit_lines < self.TRAILING_STOP_LINES:
if price_moved_down:
print(f"📉 پایین‌ترین قیمت آپدیت شد: {current_price} (سود: {current_profit_lines} خط - منتظر {self.TRAILING_STOP_LINES} خط برای تریلینگ)")
current_profit = entry_price - current_price
lowest_display = self.lowest_price_in_trade if self.lowest_price_in_trade != float('inf') else entry_price
print(f"📊 SHORT - قیمت: {current_price} | پایین‌ترین: {lowest_display} | سود: {current_profit} خط | SL: {stop_loss} | TP: {take_profit}")
return False
# ✅ اصلاح ۵: فقط وقتی قیمت پایین رفته تریلینگ فعال کن
if price_moved_down:
move_amount = old_lowest - current_price
# محاسبه حد ضرر جدید
new_stop_loss = current_price + sl_distance
# ✅ اصلاح ۶: شرط باید new_stop_loss < stop_loss باشد
if new_stop_loss < stop_loss:
old_sl = stop_loss
self.stop_loss_price = new_stop_loss
# محاسبه حد سود جدید (با نسبت ریسک به ریوارد)
new_take_profit = take_profit - move_amount
# ✅ اصلاح ۷: حد سود جدید باید حداقل ۱.۵ برابر حد ضرر باشد
min_profit = entry_price - int((entry_price - new_stop_loss) * 1.5)
new_take_profit = min(new_take_profit, min_profit)
old_tp = take_profit
self.take_profit_price = new_take_profit
locked_profit = entry_price - self.stop_loss_price
locked_profit_toman = locked_profit * self.LINE_VALUE_TOMAN
print(f"\n🎯 تریلینگ آویزان SHORT آپدیت شد!")
print(f" 📉 قیمت: {old_lowest} → {current_price} (+{move_amount} خط سود)")
print(f" 🛡️ SL: {old_sl} → {self.stop_loss_price}")
print(f" 🎯 TP: {old_tp} → {self.take_profit_price}")
print(f" 💰 سود قفل شده: {locked_profit} خط ({locked_profit_toman:,.0f} تومان)")
self.send_update_stop_loss_command(
new_stop_loss=self.stop_loss_price,
reason=f"تریلینگ آویزان - سود قفل: {locked_profit} خط"
)
self.send_update_take_profit_command(
new_take_profit=self.take_profit_price,
reason=f"تریلینگ آویزان - TP جدید"
)
return True
# نمایش وضعیت فعلی
current_profit = entry_price - current_price
lowest_display = self.lowest_price_in_trade if self.lowest_price_in_trade != float('inf') else entry_price
print(f"📊 SHORT - قیمت: {current_price} | پایین‌ترین: {lowest_display} | سود: {current_profit} خط | SL: {stop_loss} | TP: {take_profit}")
return False
def calculate_pnl(self, entry_price, exit_price, direction):
"""
محاسبه سود/ضرر نهایی معامله بر حسب تومان
🔄 تغییر از تیک به خط:
- قبلاً: تیک × حجم × ارزش_تیک
- الان: خط × ارزش_خط (حجم همیشه 1)
پارامترها:
- entry_price: قیمت ورود (عدد صحیح)
- exit_price: قیمت خروج (عدد صحیح)
- direction: جهت معامله ('long' یا 'short')
بازگشت:
- سود/ضرر به تومان
مثال:
- LONG: ورود 53200، خروج 53220 → 20 خط × 23000 = 460,000 تومان سود
- SHORT: ورود 53200، خروج 53180 → 20 خط × 23000 = 460,000 تومان سود
"""
if entry_price <= 0:
print(f"⚠️ قیمت ورود نامعتبر: {entry_price}")
return 0
if exit_price <= 0:
print(f"⚠️ قیمت خروج نامعتبر: {exit_price}")
return 0
if direction is None:
print(f"⚠️ جهت معامله نامعتبر: {direction}")
return 0
try:
# 🔄 تبدیل به عدد صحیح
entry_price = int(entry_price)
exit_price = int(exit_price)
# محاسبه تعداد خطوط سود/ضرر
if direction == 'long':
lines = exit_price - entry_price
print(f"🔍 PnL LONG: خروج({exit_price}) - ورود({entry_price}) = {lines} خط")
elif direction == 'short':
lines = entry_price - exit_price
print(f"🔍 PnL SHORT: ورود({entry_price}) - خروج({exit_price}) = {lines} خط")
else:
print(f"⚠️ جهت معامله نامعتبر: {direction}")
return 0
# 🔄 محاسبه سود/ضرر به تومان
# فرمول جدید: خط × ارزش_هر_خط
pnl_toman = lines * self.LINE_VALUE_TOMAN
# بررسی منطقی بودن
max_reasonable_lines = 1000 # حداکثر 1000 خط تغییر منطقی
if abs(lines) > max_reasonable_lines:
print(f"⚠️ تغییر قیمت غیرمنطقی: {lines} خط، محدود شد به {max_reasonable_lines}")
if lines > 0:
lines = max_reasonable_lines
else:
lines = -max_reasonable_lines
pnl_toman = lines * self.LINE_VALUE_TOMAN
# محدودیت سود/ضرر
max_reasonable_pnl = self.PORTFOLIO_SIZE * 2
if abs(pnl_toman) > max_reasonable_pnl:
print(f"⚠️ سود/ضرر غیرمنطقی: {pnl_toman:,.0f} تومان، محدود شد")
if pnl_toman > 0:
pnl_toman = max_reasonable_pnl
else:
pnl_toman = -max_reasonable_pnl
print(f"📊 PnL نهایی: {pnl_toman:+,.0f} تومان ({lines:+} خط × {self.LINE_VALUE_TOMAN:,} تومان)")
return pnl_toman
except Exception as e:
print(f"⚠️ خطا در محاسبه PnL: {e}")
return 0
def get_valid_exit_price(self, preferred_price):
if preferred_price is not None and preferred_price > 0:
if self.entry_price > 0:
lower_bound = self.entry_price * 0.5
upper_bound = self.entry_price * 2
if preferred_price >= lower_bound and preferred_price <= upper_bound:
return preferred_price
else:
print(f"⚠️ قیمت ترجیحی غیرمنطقی: {preferred_price:.2f} (محدوده مجاز: {lower_bound:.2f} - {upper_bound:.2f})")
else:
return preferred_price
if self.MARKET_PRICE is not None and self.MARKET_PRICE > 0:
print(f"📊 استفاده از قیمت بازار: {self.MARKET_PRICE:.2f}")
return self.MARKET_PRICE
if self.LAST_KNOWN_PRICE is not None and self.LAST_KNOWN_PRICE > 0:
print(f"⚠️ استفاده از آخرین قیمت شناخته شده: {self.LAST_KNOWN_PRICE:.2f}")
return self.LAST_KNOWN_PRICE
if self.entry_price > 0:
print(f"⚠️ استفاده از قیمت ورود به عنوان قیمت خروج: {self.entry_price:.2f}")
return self.entry_price
print("❌ هیچ قیمت معتبری موجود نیست!")
return None
def exit_trade(self, exit_price, reason="دستی"):
try:
if not self.in_trade:
print("⚠️ معامله‌ای باز نیست")
return False
exit_price = int(exit_price)
entry_price = int(self.entry_price)
if self.trade_direction == 'long':
pnl_lines = exit_price - entry_price
else:
pnl_lines = entry_price - exit_price
pnl_toman = pnl_lines * self.LINE_VALUE_TOMAN
pnl_percent = (pnl_toman / self.portfolio) * 100
# ثبت خروج در ژورنال
if hasattr(self, 'journal') and self.journal:
exit_data = {
'trade_id': self.current_internal_trade_id,
'exit_price': exit_price,
'reason': reason,
'pnl_lines': pnl_lines,
'pnl_toman': pnl_toman,
'pnl_percent': pnl_percent
}
self.journal.log_trade_exit(exit_data)
# یادگیری در SmartFilter
if hasattr(self, 'smart_filter') and self.smart_filter:
trade_record = {
'trade_id': self.current_internal_trade_id,
'pnl_lines': pnl_lines,
'direction': self.trade_direction,
'strategy': self.current_trade_strategy,
'entry_price': entry_price,
'exit_price': exit_price,
'reason': reason
}
self.smart_filter.learn_from_trade(trade_record)
self.send_exit_trade_command(exit_price, reason, pnl_toman, pnl_percent)
self.trades += 1
if pnl_toman >= 0:
self.winning_trades += 1
self.total_profit += pnl_toman
else:
self.losing_trades += 1
self.total_loss += abs(pnl_toman)
self.balance += pnl_toman
if self.balance > self.peak_balance:
self.peak_balance = self.balance
current_drawdown = ((self.peak_balance - self.balance) / self.peak_balance) * 100
if current_drawdown > self.max_drawdown:
self.max_drawdown = current_drawdown
trade_record = {
'entry_time': self.trade_start_time.isoformat() if self.trade_start_time else None,
'exit_time': datetime.now().isoformat(),
'direction': self.trade_direction,
'entry_price': entry_price,
'exit_price': exit_price,
'stop_loss': int(self.stop_loss_price),
'take_profit': int(self.take_profit_price),
'pnl_lines': pnl_lines,
'pnl_toman': pnl_toman,
'pnl_percent': round(pnl_percent, 2),
'duration': str(datetime.now() - self.trade_start_time) if self.trade_start_time else "N/A",
'reason': reason,
'balance_after': self.balance,
'highest_price': self.highest_price_in_trade,
'lowest_price': self.lowest_price_in_trade if self.lowest_price_in_trade != float('inf') else 0,
'strategy': self.current_trade_strategy,
'trade_id': self.current_internal_trade_id
}
self.trade_history.append(trade_record)
print(f"\n{'═' * 50}")
if pnl_toman >= 0:
print(f"✅ معامله با سود بسته شد!")
else:
print(f"❌ معامله با ضرر بسته شد!")
print(f"{'═' * 50}")
print(f" 📈 جهت: {self.trade_direction.upper()}")
print(f" 📍 ورود: {entry_price} → خروج: {exit_price}")
print(f" {'💰' if pnl_toman >= 0 else '💸'} سود/ضرر: {pnl_toman:+,.0f} تومان ({pnl_lines:+} خط)")
print(f" 📊 درصد: {pnl_percent:+.2f}%")
print(f" 💼 موجودی: {self.balance:,.0f} تومان")
print(f" 📋 دلیل: {reason}")
print(f"{'═' * 50}")
self.last_trade_exit_time = datetime.now()
self.reset_trade_variables()
return True
except Exception as e:
print(f"❌ خطا در خروج از معامله: {e}")
import traceback
traceback.print_exc()
return False
def clear_open_trades(self):
"""
پاک کردن لیست معاملات باز
این تابع جداگانه برای استفاده در جاهای مختلف
"""
if hasattr(self, 'open_trades') and self.open_trades:
trade_count = len(self.open_trades)
self.open_trades.clear()
print(f"🧹 {trade_count} معامله از لیست باز پاک شد")
return True
else:
print("📭 لیست معاملات باز خالی است")
return False
def update_m3_data(self, current_time, current_price):
"""
✅ آپدیت واقعی کندل‌های M3 با هر تیک (OHLC واقعی)
خروجی:
(is_new_candle, closed_candle_to_save, current_candle_time_floor)
- is_new_candle: آیا کندل جدید شروع شد؟
- closed_candle_to_save: دیکشنری کندل بسته‌شده قبلی برای ذخیره در DB (اگر وجود داشته باشد)
- current_candle_time_floor: زمان floor شده کندل جاری
"""
try:
tf = self._signal_timeframe()
# تبدیل ورودی‌ها
current_time_ts = pd.Timestamp(current_time)
current_price_f = float(current_price)
current_time_floor = current_time_ts.floor(tf)
# اگر دیتافریم m3 آماده نیست
if not isinstance(getattr(self, "m3", None), pd.DataFrame):
self.m3 = pd.DataFrame()
# اگر m3 خالی است → اولین کندل را بساز
if self.m3.empty or len(self.m3) == 0:
new_row = pd.DataFrame([{
'open': current_price_f,
'high': current_price_f,
'low': current_price_f,
'close': current_price_f
}], index=[current_time_floor])
self.m3 = new_row
return True, None, current_time_floor
# مرتب‌سازی ایندکس برای اطمینان
try:
self.m3 = self.m3.sort_index()
except Exception:
pass
last_index = self.m3.index[-1]
# اگر کندل جدید شروع شده
if current_time_floor > last_index:
# کندل قبلی بسته شد → برای ذخیره آماده کن
closed_candle_to_save = {
'datetime': pd.Timestamp(last_index).isoformat(),
'open': float(self.m3.loc[last_index, 'open']),
'high': float(self.m3.loc[last_index, 'high']),
'low': float(self.m3.loc[last_index, 'low']),
'close': float(self.m3.loc[last_index, 'close']),
}
# کندل جدید را باز کن
new_row = pd.DataFrame([{
'open': current_price_f,
'high': current_price_f,
'low': current_price_f,
'close': current_price_f
}], index=[current_time_floor])
self.m3 = pd.concat([self.m3, new_row])
# محدودیت طول
if len(self.m3) > 500:
self.m3 = self.m3.iloc[-500:]
return True, closed_candle_to_save, current_time_floor
# هنوز در همان کندل هستیم → OHLC را آپدیت کن
self.m3.loc[last_index, 'high'] = max(float(self.m3.loc[last_index, 'high']), current_price_f)
self.m3.loc[last_index, 'low'] = min(float(self.m3.loc[last_index, 'low']), current_price_f)
self.m3.loc[last_index, 'close'] = current_price_f
return False, None, current_time_floor
except Exception as e:
print(f"⚠️ خطا در update_m3_data ({self._signal_timeframe()}): {e}")
import traceback
traceback.print_exc()
return False, None, None
def check_trade_management(self, current_price):
"""مدیریت معامله باز - نسخه اصلاح شده"""
if not self.in_trade:
return
if self.trade_direction is None:
return
current_price = int(current_price)
entry_price = int(self.entry_price)
stop_loss = int(self.stop_loss_price)
take_profit = int(self.take_profit_price)
# ═══════════════════════════════════════════════════════════════
# 🛡️ اول تریلینگ استاپ را اجرا کن (این خودش highest/lowest را آپدیت می‌کند)
# ═══════════════════════════════════════════════════════════════
try:
self.smart_trailing_stop(current_price)
except Exception as e:
print(f"⚠️ خطا در trailing stop: {e}")
import traceback
traceback.print_exc()
# ═══════════════════════════════════════════════════════════════
# 🔴 بعد چک کن آیا به حد ضرر رسیده
# ═══════════════════════════════════════════════════════════════
# مقادیر جدید را بگیر (ممکن است trailing stop تغییرشان داده باشد)
stop_loss = int(self.stop_loss_price)
take_profit = int(self.take_profit_price)
stop_loss_hit = False
if self.trade_direction == 'long' and current_price <= stop_loss:
stop_loss_hit = True
print(f"🛡️ حد ضرر LONG فعال شد! قیمت: {current_price} <= SL: {stop_loss}")
elif self.trade_direction == 'short' and current_price >= stop_loss:
stop_loss_hit = True
print(f"🛡️ حد ضرر SHORT فعال شد! قیمت: {current_price} >= SL: {stop_loss}")
if stop_loss_hit:
self.exit_trade(stop_loss, "حد ضرر")
return
# ═══════════════════════════════════════════════════════════════
# 🎯 چک کن آیا به حد سود رسیده
# ═══════════════════════════════════════════════════════════════
take_profit_hit = False
if self.trade_direction == 'long' and current_price >= take_profit:
take_profit_hit = True
print(f"🎯 حد سود LONG فعال شد! قیمت: {current_price} >= TP: {take_profit}")
elif self.trade_direction == 'short' and current_price <= take_profit:
take_profit_hit = True
print(f"🎯 حد سود SHORT فعال شد! قیمت: {current_price} <= TP: {take_profit}")
if take_profit_hit:
self.exit_trade(take_profit, "حد سود")
return
def check_time_based_exit(self):
if not self.in_trade:
return False
if not self.trade_start_time:
return False
current_price = self.MARKET_PRICE if self.MARKET_PRICE else self.LAST_KNOWN_PRICE
if not current_price:
return False
current_price = int(current_price)
duration = datetime.now() - self.trade_start_time
minutes = duration.total_seconds() / 60
hours = minutes / 60
# ✅ اصلاح: determine_strategy_mode پارامتر نمی‌گیرد
strategy = self.current_trade_strategy if self.current_trade_strategy else self.determine_strategy_mode()
current_profit_lines = self.calculate_current_profit_lines(current_price)
current_profit_toman = current_profit_lines * self.LINE_VALUE_TOMAN
if strategy == 'scalp':
max_minutes = self.SCALP_MAX_TRADE_MINUTES
if minutes >= max_minutes:
self.exit_trade(current_price, f"حداکثر زمان اسکالپ ({max_minutes} دقیقه)")
return True
if minutes >= 45 and current_profit_lines < -2:
self.exit_trade(current_price, "ضرر مداوم اسکالپ (45 دقیقه)")
return True
if minutes >= 60 and 0 <= current_profit_lines < 2:
self.exit_trade(current_price, "سود ناچیز اسکالپ (60 دقیقه)")
return True
if minutes >= 30 and current_profit_lines < -1:
current_adx = 25.0
if 'adx' in self.m3.columns and not self.m3['adx'].empty:
adx_val = self.m3['adx'].iloc[-1]
if pd.notna(adx_val):
current_adx = float(adx_val)
if current_adx > 30:
self.exit_trade(current_price, "ضرر در روند قوی اسکالپ")
return True
else:
max_hours = self.SWING_MAX_TRADE_HOURS
if hours >= max_hours:
self.exit_trade(current_price, f"حداکثر زمان سوئینگ ({max_hours} ساعت)")
return True
if hours >= 2 and current_profit_lines < -5:
self.exit_trade(current_price, "ضرر مداوم سوئینگ (2 ساعت)")
return True
if hours >= 4 and 0 <= current_profit_lines < 3:
self.exit_trade(current_price, "سود ناچیز سوئینگ (4 ساعت)")
return True
if hours >= 6 and 3 <= current_profit_lines < 8:
print(f"\n📊 اطلاع‌رسانی سوئینگ - 6 ساعت گذشت")
print(f" ⏱️ مدت معامله: {hours:.1f} ساعت")
print(f" 💰 سود: {current_profit_lines:+} خط ({current_profit_toman:+,.0f} تومان)")
return False
def analyze_market_regime(self, lookback_candles=50):
"""
تحلیل حالت بازار (اصلاح‌شده در بخش ATR برای جلوگیری از ranging دائمی)
"""
if len(self.m3) < lookback_candles:
return "unknown", 50, {"error": "داده کافی نیست"}
try:
recent = self.m3.iloc[-lookback_candles:]
scores = {
'trending_up': 0,
'trending_down': 0,
'ranging': 0,
'volatile': 0
}
details = {}
# معیار 1: ADX
if 'adx' in recent.columns:
adx_values = recent['adx'].dropna()
if len(adx_values) > 0:
avg_adx = float(adx_values.mean())
adx_std = float(adx_values.std()) if len(adx_values) > 1 else 0.0
details['adx_avg'] = round(avg_adx, 1)
details['adx_std'] = round(adx_std, 1)
if avg_adx < 18:
scores['ranging'] += 35
details['adx_signal'] = 'low_trend_strength'
elif avg_adx > 25:
if 'ema9' in recent.columns and 'ema21' in recent.columns:
ema9_current = recent['ema9'].iloc[-1]
ema21_current = recent['ema21'].iloc[-1]
if pd.notna(ema9_current) and pd.notna(ema21_current) and float(ema9_current) > float(ema21_current):
scores['trending_up'] += 40
details['adx_signal'] = 'strong_uptrend'
else:
scores['trending_down'] += 40
details['adx_signal'] = 'strong_downtrend'
else:
scores['ranging'] += 20
details['adx_signal'] = 'medium_trend'
# معیار 2: Bollinger
if all(col in recent.columns for col in ['bb_upper', 'bb_lower', 'bb_middle']):
in_band_count = 0
bb_positions = []
for i in range(len(recent)):
price = recent['close'].iloc[i]
bb_upper = recent['bb_upper'].iloc[i]
bb_lower = recent['bb_lower'].iloc[i]
if pd.isna(price) or pd.isna(bb_upper) or pd.isna(bb_lower):
continue
if float(bb_upper) > float(bb_lower):
position = ((float(price) - float(bb_lower)) / (float(bb_upper) - float(bb_lower))) * 100
position = max(0, min(100, position))
bb_positions.append(position)
if float(bb_lower) <= float(price) <= float(bb_upper):
in_band_count += 1
valid_len = max(1, len(recent))
time_in_bands_pct = (in_band_count / valid_len) * 100
bb_widths = (recent['bb_upper'] - recent['bb_lower']).dropna()
avg_bb_width = float(bb_widths.mean()) if not bb_widths.empty else 0.0
avg_price = float(recent['close'].mean()) if pd.notna(recent['close'].mean()) else 0.0
bb_width_pct = (avg_bb_width / avg_price) * 100 if avg_price > 0 else 0.0
details['time_in_bands'] = round(time_in_bands_pct, 1)
details['bb_width_pct'] = round(bb_width_pct, 2)
if time_in_bands_pct > 80 and bb_width_pct < 3.0:
scores['ranging'] += 30
details['bb_signal'] = 'tight_range'
elif time_in_bands_pct > 60:
scores['ranging'] += 20
details['bb_signal'] = 'in_range'
else:
current_position = bb_positions[-1] if bb_positions else 50
if current_position > 80:
scores['trending_up'] += 15
details['bb_signal'] = 'above_band'
elif current_position < 20:
scores['trending_down'] += 15
details['bb_signal'] = 'below_band'
# معیار 3: EMA crosses
if all(col in recent.columns for col in ['ema9', 'ema21']):
crosses = 0
for i in range(1, len(recent)):
prev_ema9 = recent['ema9'].iloc[i - 1]
prev_ema21 = recent['ema21'].iloc[i - 1]
curr_ema9 = recent['ema9'].iloc[i]
curr_ema21 = recent['ema21'].iloc[i]
if pd.isna(prev_ema9) or pd.isna(prev_ema21) or pd.isna(curr_ema9) or pd.isna(curr_ema21):
continue
prev_bullish = float(prev_ema9) > float(prev_ema21)
curr_bullish = float(curr_ema9) > float(curr_ema21)
if prev_bullish != curr_bullish:
crosses += 1
cross_frequency = (crosses / max(1, len(recent))) * 100
details['ema_crosses'] = crosses
details['cross_frequency'] = round(cross_frequency, 1)
if cross_frequency > 40:
scores['ranging'] += 25
details['ema_signal'] = 'frequent_crosses'
elif cross_frequency < 15:
current_bullish = float(recent['ema9'].iloc[-1]) > float(recent['ema21'].iloc[-1])
if current_bullish:
scores['trending_up'] += 25
details['ema_signal'] = 'stable_uptrend'
else:
scores['trending_down'] += 25
details['ema_signal'] = 'stable_downtrend'
else:
scores['ranging'] += 15
details['ema_signal'] = 'mixed_signals'
# معیار 4: RSI
if 'rsi14' in recent.columns:
rsi_values = recent['rsi14'].dropna()
if len(rsi_values) > 0:
avg_rsi = float(rsi_values.mean())
rsi_std = float(rsi_values.std()) if len(rsi_values) > 1 else 0.0
rsi_range = float(rsi_values.max() - rsi_values.min())
details['rsi_avg'] = round(avg_rsi, 1)
details['rsi_std'] = round(rsi_std, 1)
details['rsi_range'] = round(rsi_range, 1)
if 40 <= avg_rsi <= 60 and rsi_std < 10 and rsi_range < 25:
scores['ranging'] += 20
details['rsi_signal'] = 'neutral_range'
elif avg_rsi > 60 and rsi_std < 8:
scores['trending_up'] += 20
details['rsi_signal'] = 'bullish_stable'
elif avg_rsi < 40 and rsi_std < 8:
scores['trending_down'] += 20
details['rsi_signal'] = 'bearish_stable'
else:
scores['volatile'] += 15
details['rsi_signal'] = 'volatile'
# ✅ معیار 5: ATR (اصلاح‌شده)
if 'atr' in recent.columns and 'high' in recent.columns and 'low' in recent.columns:
atr_values = recent['atr'].dropna()
ranges = (recent['high'] - recent['low']).dropna()
if len(atr_values) > 5 and len(ranges) > 5:
avg_atr = float(atr_values.mean())
q25 = float(atr_values.quantile(0.25))
q75 = float(atr_values.quantile(0.75))
current_atr = float(atr_values.iloc[-1])
median_range = float(ranges.median())
atr_to_range = (avg_atr / median_range) if median_range > 0 else 0.0
details['atr_absolute'] = round(avg_atr, 2)
details['atr_current'] = round(current_atr, 2)
details['atr_q25'] = round(q25, 2)
details['atr_q75'] = round(q75, 2)
details['range_median'] = round(median_range, 2)
details['atr_to_range_ratio'] = round(atr_to_range, 3)
# واحد در این دیتاست عملاً "خط" است
details['atr_unit'] = 'lines'
# امتیازدهی:
# - اگر ATR نسبت به توزیع خودش بالا باشد + نسبت به range بزرگ باشد => volatile
# - اگر پایین باشد => ranging
if current_atr >= q75 and atr_to_range >= 1.3:
scores['volatile'] += 25
details['atr_signal'] = 'high_volatility'
elif current_atr <= q25 and atr_to_range <= 0.9:
scores['ranging'] += 15
details['atr_signal'] = 'low_volatility'
else:
scores['ranging'] += 8
details['atr_signal'] = 'medium_volatility'
# معیار 6: روند روزانه
daily_trend_score = self.calculate_daily_trend_score()
details['daily_trend_score'] = daily_trend_score
if abs(daily_trend_score) < 30:
scores['ranging'] += 10
details['daily_signal'] = 'weak_daily_trend'
elif daily_trend_score > 50:
scores['trending_up'] += 15
details['daily_signal'] = 'strong_daily_uptrend'
elif daily_trend_score < -50:
scores['trending_down'] += 15
details['daily_signal'] = 'strong_daily_downtrend'
total_scores = sum(scores.values())
if total_scores > 0:
confidence_scores = {k: (v / total_scores) * 100 for k, v in scores.items()}
dominant_regime = max(confidence_scores, key=confidence_scores.get)
confidence = float(confidence_scores[dominant_regime])
if confidence < 40:
dominant_regime = "transitional"
confidence = 100 - confidence
if scores['volatile'] > 30:
dominant_regime = "volatile"
confidence = max(confidence, 70)
details['final_scores'] = scores
details['confidence_breakdown'] = confidence_scores
return dominant_regime, round(confidence, 1), details
return "unknown", 50, details
except Exception as e:
print(f"⚠️ خطا در تحلیل رژیم بازار: {e}")
import traceback
traceback.print_exc()
return "unknown", 0, {"error": str(e)}
def _update_market_regime(self):
"""به‌روزرسانی رژیم بازار (اصلاح‌شده: نرمال‌سازی details + لاگ کامل برای ATR/واحد + مقایسه درست confidence)"""
try:
# وضعیت قبلی برای مقایسه و تشخیص تغییر
prev_regime = getattr(self, "market_regime", "unknown")
prev_conf = float(getattr(self, "regime_confidence", 0) or 0)
# قیمت فعلی برای لاگ/تاریخچه
current_price = self.MARKET_PRICE if self.MARKET_PRICE is not None else self.LAST_KNOWN_PRICE
regime, confidence, details = self.analyze_market_regime(
lookback_candles=self.REGIME_SETTINGS['lookback_candles']
)
# نرمال‌سازی confidence
try:
confidence = float(confidence)
except Exception:
confidence = 0.0
# نرمال‌سازی details (حتماً dict باشد)
if details is None:
details = {}
elif not isinstance(details, dict):
details = {"raw_details": details}
# ---- استخراج ATR از details (برای لاگ و تشخیص واحد) ----
# کلید اصلی شما طبق صحبت‌ها: atr_absolute
atr_regime_raw = details.get("atr_absolute", None)
atr_regime = None
if atr_regime_raw is not None:
try:
atr_regime = float(atr_regime_raw)
except Exception:
atr_regime = None
# ATR از m3 (اگر وجود دارد)
atr_m3 = None
if isinstance(getattr(self, "m3", None), pd.DataFrame) and not self.m3.empty and 'atr' in self.m3.columns:
v = self.m3['atr'].iloc[-1]
if pd.notna(v):
try:
atr_m3 = float(v)
except Exception:
atr_m3 = None
# رنج کندل‌های اخیر m3 برای مقایسه واحد/اسکیل
median_m3_range = None
last_m3_range = None
if isinstance(getattr(self, "m3", None), pd.DataFrame) and len(self.m3) >= 5:
if 'high' in self.m3.columns and 'low' in self.m3.columns:
ranges = (self.m3['high'] - self.m3['low']).tail(30).dropna()
if not ranges.empty:
try:
median_m3_range = float(ranges.median())
except Exception:
median_m3_range = None
try:
lr = float(self.m3['high'].iloc[-1] - self.m3['low'].iloc[-1])
if lr == lr: # not NaN
last_m3_range = lr
except Exception:
last_m3_range = None
# ---- تشخیص/ثبت واحد ATR (حدس + ذخیره داخل details برای downstream) ----
# اگر analyze_market_regime خودش atr_unit می‌دهد، همان را نگه می‌داریم.
atr_unit = details.get("atr_unit", None)
if atr_unit is not None:
atr_unit = str(atr_unit).lower().strip()
else:
# حدس واحد (heuristic)
atr_unit = "unknown"
if atr_regime is not None:
# اگر قیمت بزرگ و ATR کوچک باشد اغلب "خط" است (مثل 3..15)
if current_price and float(current_price) >= 1000 and atr_regime <= 50:
atr_unit = "lines"
# اگر رنج کندل‌ها موجود است، نسبت را هم بررسی می‌کنیم
if median_m3_range and median_m3_range > 0 and atr_regime is not None:
ratio = atr_regime / median_m3_range
# اگر خیلی نزدیک به رنج است ممکن است واحد قیمت باشد یا هر دو هم‌مقیاس باشند
if 0.2 <= ratio <= 2.5 and atr_unit == "unknown":
atr_unit = "price_or_lines_unclear"
elif ratio < 0.2 and atr_unit == "unknown":
atr_unit = "likely_lines_or_scaled"
elif ratio > 2.5 and atr_unit == "unknown":
atr_unit = "likely_scaled_or_wrong"
details["atr_unit"] = atr_unit # ✅ حالا downstream لازم نیست فقط حدس بزند
# ---- لاگ کامل (اگر debug_logger ست شده باشد) ----
# (بدون وابستگی اجباری: اگر logger نباشد فقط پرینت نمی‌کنیم)
dbg = getattr(self, "debug_logger", None)
if dbg is not None:
log_payload = {
"stage": "regime_update",
"prev_regime": prev_regime,
"prev_conf": prev_conf,
"new_regime": regime,
"new_conf": confidence,
"market_price": current_price,
"atr_regime_raw": atr_regime_raw,
"atr_regime": atr_regime,
"atr_unit": atr_unit,
"atr_m3": atr_m3,
"median_m3_range": median_m3_range,
"last_m3_range": last_m3_range,
"details_keys": list(details.keys())[:50],
}
if atr_regime is not None and median_m3_range and median_m3_range > 0:
log_payload["atr_to_median_range_ratio"] = atr_regime / median_m3_range
if atr_regime is not None and current_price and float(current_price) > 0:
log_payload["atr_to_price_percent"] = (atr_regime / float(current_price)) * 100.0
dbg.info(str(log_payload))
# ---- اعمال رژیم جدید ----
self.market_regime = regime
self.regime_confidence = confidence
self.last_regime_details = details
# ذخیره در تاریخچه (با ATR برای ردیابی دقیق‌تر)
self.regime_history.append({
'time': datetime.now(),
'regime': regime,
'confidence': confidence,
'price': current_price,
'atr_absolute': atr_regime_raw,
'atr_unit': atr_unit,
})
# نمایش در صورت تغییر قابل توجه
if len(self.regime_history) >= 2:
last_regime = self.regime_history[-2]['regime']
last_confidence = float(self.regime_history[-2]['confidence'] or 0)
min_conf = float(self.REGIME_SETTINGS.get('min_confidence', 60) or 60)
# ✅ اصلاح: >= به جای >
if (regime != last_regime and confidence >= min_conf and last_confidence >= min_conf):
print(f"\n🔄 تغییر رژیم بازار: {last_regime} → {regime}")
print(f" اطمینان: {last_confidence:.0f}% → {confidence:.0f}%")
# اطلاع‌رسانی به کلاینت‌ها
try:
self._broadcast_regime_change(last_regime, regime)
except Exception as be:
print(f"⚠️ خطا در broadcast تغییر رژیم: {be}")
except Exception as e:
print(f"⚠️ خطا در آپدیت رژیم بازار: {e}")
import traceback
traceback.print_exc()
def get_market_simple_regime(self):
"""
نسخه ساده‌شده برای استفاده در لاگ‌ها
"""
regime, confidence, details = self.analyze_market_regime(lookback_candles=30)
emoji = "📊"
if regime == "trending_up":
emoji = "📈"
elif regime == "trending_down":
emoji = "📉"
elif regime == "ranging":
emoji = "➡️"
elif regime == "volatile":
emoji = "🌊"
elif regime == "transitional":
emoji = "🔄"
return f"{emoji} {regime} ({confidence}%)"
def check_entry_signal(self, current_price):
self._log_regime_and_atr_snapshot(stage="before_signal_filter", current_price=float(current_price))
try:
if not isinstance(self.m3, pd.DataFrame) or len(self.m3) < 3:
return None
# ensure market regime state
try:
if (str(getattr(self, "market_regime", "unknown") or "unknown") == "unknown"
or float(getattr(self, "regime_confidence", 0) or 0) == 0.0):
self._update_market_regime()
except Exception:
pass
min_conf = float(self.REGIME_SETTINGS.get('min_confidence', 60) or 60)
# ----------------------------
# Bars: prev closed, closed
# ----------------------------
closed_bar = self.m3.iloc[-2]
prev_bar = self.m3.iloc[-3]
closed_time = None
try:
closed_time = pd.Timestamp(self.m3.index[-2])
except Exception:
closed_time = None
signal_close = float(closed_bar.get('close', float(current_price)))
prev_close = float(prev_bar.get('close', signal_close))
# ----------------------------
# Indicators from closed_bar
# ----------------------------
current_adx = 25.0
if 'adx' in self.m3.columns and pd.notna(closed_bar.get('adx', None)):
current_adx = float(closed_bar.get('adx'))
current_rsi = 50.0
if 'rsi14' in self.m3.columns and pd.notna(closed_bar.get('rsi14', None)):
current_rsi = float(closed_bar.get('rsi14'))
ema9 = signal_close
if 'ema9' in self.m3.columns and pd.notna(closed_bar.get('ema9', None)):
ema9 = float(closed_bar.get('ema9'))
ema21 = signal_close
if 'ema21' in self.m3.columns and pd.notna(closed_bar.get('ema21', None)):
ema21 = float(closed_bar.get('ema21'))
prev_ema9 = prev_close
if 'ema9' in self.m3.columns and pd.notna(prev_bar.get('ema9', None)):
prev_ema9 = float(prev_bar.get('ema9'))
prev_ema21 = prev_close
if 'ema21' in self.m3.columns and pd.notna(prev_bar.get('ema21', None)):
prev_ema21 = float(prev_bar.get('ema21'))
# Bollinger position (closed + prev)
bb_position = 50.0
prev_bb_position = 50.0
if all(c in self.m3.columns for c in ['bb_lower', 'bb_upper']):
bbl = closed_bar.get('bb_lower', None)
bbu = closed_bar.get('bb_upper', None)
pbbl = prev_bar.get('bb_lower', None)
pbbu = prev_bar.get('bb_upper', None)
if pd.notna(bbl) and pd.notna(bbu):
bbl = float(bbl);
bbu = float(bbu)
if bbu > bbl:
bb_position = ((signal_close - bbl) / (bbu - bbl)) * 100.0
bb_position = max(0.0, min(100.0, bb_position))
if pd.notna(pbbl) and pd.notna(pbbu):
pbbl = float(pbbl);
pbbu = float(pbbu)
if pbbu > pbbl:
prev_bb_position = ((prev_close - pbbl) / (pbbu - pbbl)) * 100.0
prev_bb_position = max(0.0, min(100.0, prev_bb_position))
# ATR (برای SmartFilter)
atr_entry = None
if 'atr' in self.m3.columns and pd.notna(closed_bar.get('atr', None)):
try:
atr_entry = float(closed_bar.get('atr'))
except Exception:
atr_entry = None
# Daily trend score (as-of)
daily_trend_score = 0
try:
daily_trend_score = self.calculate_daily_trend_score(asof_time=closed_time)
except TypeError:
daily_trend_score = self.calculate_daily_trend_score()
except Exception:
daily_trend_score = 0
atr_unit = "price"
market_regime = str(getattr(self, "market_regime", "unknown"))
regime_conf = float(getattr(self, "regime_confidence", 0) or 0)
strategy = self.determine_strategy_mode()
# چاپ در بک‌تست را کنترل کن
allow_print = True
if getattr(self, "BACKTEST_MODE", False) and not getattr(self, "BACKTEST_VERBOSE", False):
allow_print = False
if allow_print:
print(f"\n{'═' * 50}")
print(f"🔍 بررسی سیگنال ورود - استراتژی: {strategy.upper()}")
print(f"{'═' * 50}")
if closed_time is not None:
print(f" 🕯️ کندل مبنا (بسته‌شده): {closed_time.isoformat()}")
print(f" 💰 close: {signal_close:.2f} | current: {float(current_price):.2f}")
print(f" 📊 ADX: {current_adx:.1f} | RSI: {current_rsi:.1f} | BBpos: {bb_position:.1f} (prev={prev_bb_position:.1f})")
print(f" 📈 EMA9: {ema9:.2f} | EMA21: {ema21:.2f} (prev9={prev_ema9:.2f}, prev21={prev_ema21:.2f})")
print(f" 🎯 daily_trend_score: {int(daily_trend_score):+d}")
print(f" 📊 market_regime: {market_regime} ({regime_conf:.1f}%)")
# ✅ فیلتر خلاف رژیم (وقتی رژیم با اطمینان کافی تعیین شده)
strong_regime = (regime_conf >= min_conf and market_regime in ("trending_up", "trending_down"))
long_allowed_by_regime = not (strong_regime and market_regime == "trending_down")
short_allowed_by_regime = not (strong_regime and market_regime == "trending_up")
# ------------------------------------------------------------
# 1) RANGE / mean-reversion (Cross-based)
# ------------------------------------------------------------
ranging_mode = (market_regime == "ranging" and regime_conf >= min_conf)
if ranging_mode:
if self._last_params_regime != "ranging":
self._adjust_for_range_market()
self._last_params_regime = "ranging"
adx_ok_range = (current_adx <= 22.0)
LONG_ENTER_POS = 35.0
LONG_RESET_POS = 40.0
SHORT_ENTER_POS = 65.0
SHORT_RESET_POS = 60.0
long_cross = (prev_bb_position > LONG_RESET_POS) and (bb_position <= LONG_ENTER_POS)
short_cross = (prev_bb_position < SHORT_RESET_POS) and (bb_position >= SHORT_ENTER_POS)
long_rsi_ok = (current_rsi <= 50.0)
short_rsi_ok = (current_rsi >= 50.0)
# بایاس روند روزانه برای رنج
if daily_trend_score >= 60:
short_rsi_ok = (current_rsi >= 58.0) and (bb_position >= 75.0)
if daily_trend_score <= -60:
long_rsi_ok = (current_rsi <= 42.0) and (bb_position <= 25.0)
# ✅ فیلتر خلاف رژیم روی رنج هم
if not long_allowed_by_regime:
long_cross = False
if not short_allowed_by_regime:
short_cross = False
# RANGE LONG
if adx_ok_range and long_cross and long_rsi_ok:
if closed_time is not None and hasattr(self, "_signal_cooldown_ok"):
if not self._signal_cooldown_ok(closed_time, "long"):
return None
self._last_signal_candle_time = closed_time
self._last_signal_direction = "long"
if hasattr(self, 'smart_filter') and self.smart_filter:
signal_data = {
'direction': 'long',
'strategy': 'counter_trend',
'current_price': float(current_price),
'signal_close_price': float(signal_close),
'adx_entry': float(current_adx),
'rsi_entry': float(current_rsi),
'ema9_entry': float(ema9),
'ema21_entry': float(ema21),
'bb_position': float(bb_position),
'daily_trend_score': int(daily_trend_score),
'market_regime': market_regime,
'regime_confidence': float(regime_conf),
'atr_unit': atr_unit,
}
if atr_entry is not None and atr_entry > 0:
signal_data['atr_entry'] = float(atr_entry)
allowed, _reason = self.smart_filter.should_enter_trade(signal_data)
if not allowed:
return None
return 'long'
# RANGE SHORT
if adx_ok_range and short_cross and short_rsi_ok:
if closed_time is not None and hasattr(self, "_signal_cooldown_ok"):
if not self._signal_cooldown_ok(closed_time, "short"):
return None
self._last_signal_candle_time = closed_time
self._last_signal_direction = "short"
if hasattr(self, 'smart_filter') and self.smart_filter:
signal_data = {
'direction': 'short',
'strategy': 'counter_trend',
'current_price': float(current_price),
'signal_close_price': float(signal_close),
'adx_entry': float(current_adx),
'rsi_entry': float(current_rsi),
'ema9_entry': float(ema9),
'ema21_entry': float(ema21),
'bb_position': float(bb_position),
'daily_trend_score': int(daily_trend_score),
'market_regime': market_regime,
'regime_confidence': float(regime_conf),
'atr_unit': atr_unit,
}
if atr_entry is not None and atr_entry > 0:
signal_data['atr_entry'] = float(atr_entry)
allowed, _reason = self.smart_filter.should_enter_trade(signal_data)
if not allowed:
return None
return 'short'
return None
# ------------------------------------------------------------
# 2) TREND-FOLLOW (Cross-based EMA + فیلترهای قبلی)
# ------------------------------------------------------------
if regime_conf < min_conf and self._last_params_regime is not None:
self._reset_dynamic_params_to_base()
self._last_params_regime = None
if strategy == 'scalp':
adx_threshold = float(self.SCALP_ADX_THRESHOLD)
min_daily_score_long = 20
min_daily_score_short = -20
rsi_long_min = 40.0
rsi_short_max = 60.0
else:
adx_threshold = float(self.SWING_ADX_THRESHOLD)
min_daily_score_long = 40
min_daily_score_short = -40
rsi_long_min = 45.0
rsi_short_max = 55.0
# EMA Cross event
long_cross = (prev_ema9 <= prev_ema21) and (ema9 > ema21)
short_cross = (prev_ema9 >= prev_ema21) and (ema9 < ema21)
# ✅ فیلتر خلاف رژیم روی trend-follow
if not long_allowed_by_regime:
long_cross = False
if not short_allowed_by_regime:
short_cross = False
long_conditions = (
long_cross
and current_rsi >= rsi_long_min
and current_adx >= adx_threshold
and daily_trend_score >= min_daily_score_long
)
short_conditions = (
short_cross
and current_rsi <= rsi_short_max
and current_adx >= adx_threshold
and daily_trend_score <= min_daily_score_short
)
if long_conditions:
if closed_time is not None and hasattr(self, "_signal_cooldown_ok"):
if not self._signal_cooldown_ok(closed_time, "long"):
return None
self._last_signal_candle_time = closed_time
self._last_signal_direction = "long"
if hasattr(self, 'smart_filter') and self.smart_filter:
signal_data = {
'direction': 'long',
'strategy': strategy,
'current_price': float(current_price),
'signal_close_price': float(signal_close),
'adx_entry': float(current_adx),
'rsi_entry': float(current_rsi),
'ema9_entry': float(ema9),
'ema21_entry': float(ema21),
'bb_position': float(bb_position),
'daily_trend_score': int(daily_trend_score),
'market_regime': market_regime,
'regime_confidence': float(regime_conf),
'atr_unit': atr_unit,
}
if atr_entry is not None and atr_entry > 0:
signal_data['atr_entry'] = float(atr_entry)
allowed, _reason = self.smart_filter.should_enter_trade(signal_data)
if not allowed:
return None
return 'long'
if short_conditions:
if closed_time is not None and hasattr(self, "_signal_cooldown_ok"):
if not self._signal_cooldown_ok(closed_time, "short"):
return None
self._last_signal_candle_time = closed_time
self._last_signal_direction = "short"
if hasattr(self, 'smart_filter') and self.smart_filter:
signal_data = {
'direction': 'short',
'strategy': strategy,
'current_price': float(current_price),
'signal_close_price': float(signal_close),
'adx_entry': float(current_adx),
'rsi_entry': float(current_rsi),
'ema9_entry': float(ema9),
'ema21_entry': float(ema21),
'bb_position': float(bb_position),
'daily_trend_score': int(daily_trend_score),
'market_regime': market_regime,
'regime_confidence': float(regime_conf),
'atr_unit': atr_unit,
}
if atr_entry is not None and atr_entry > 0:
signal_data['atr_entry'] = float(atr_entry)
allowed, _reason = self.smart_filter.should_enter_trade(signal_data)
if not allowed:
return None
return 'short'
return None
except Exception as e:
print(f"⚠️ خطا در بررسی سیگنال ورود: {e}")
import traceback
traceback.print_exc()
return None
def evaluate_signals_barrier(
self,
signals: list[dict],
candles: list[dict],
tp_lines: int = 10,
sl_lines: int = 8,
horizon_candles: int = 10,
intrabar_mode: str = "worst", # "worst" | "best" | "neutral"
) -> dict:
"""
ارزیابی سیگنال‌ها با Triple Barrier + جزئیات
- intrabar_mode:
worst: اگر داخل یک کندل هم TP هم SL لمس شد => loss
best: اگر داخل یک کندل هم TP هم SL لمس شد => win
neutral: اگر هر دو لمس شد => neutral
"""
import pandas as pd
if not signals or not candles:
return {"error": "signals/candles empty"}
df = pd.DataFrame(candles).copy()
df["time"] = pd.to_datetime(df["time"])
df = df.sort_values("time").set_index("time")
wins = losses = neutrals = 0
mfe_list, mae_list = [], []
details = []
for s in signals:
t = pd.Timestamp(s["time"])
direction = str(s.get("signal", "")).strip().lower()
entry = float(s["entry_price"])
# اگر دقیق match نبود: نزدیک‌ترین کندل بعدی
if t not in df.index:
pos = df.index.searchsorted(t, side="left")
if pos >= len(df.index):
continue
t = df.index[pos]
start_pos = df.index.get_loc(t)
end_pos = min(len(df) - 1, start_pos + int(horizon_candles))
future = df.iloc[start_pos:end_pos + 1]
if direction == "long":
tp_price = entry + float(tp_lines)
sl_price = entry - float(sl_lines)
mfe = float(future["high"].max() - entry)
mae = float(future["low"].min() - entry) # منفی
elif direction == "short":
tp_price = entry - float(tp_lines)
sl_price = entry + float(sl_lines)
mfe = float(entry - future["low"].min())
mae = float(entry - future["high"].max()) # منفی
else:
continue
mfe_list.append(mfe)
mae_list.append(mae)
outcome = "neutral"
hit_time = None
hit_reason = None
for idx, row in future.iterrows():
hi = float(row["high"])
lo = float(row["low"])
if direction == "long":
hit_tp = hi >= tp_price
hit_sl = lo <= sl_price
else:
hit_tp = lo <= tp_price
hit_sl = hi >= sl_price
if hit_tp and hit_sl:
hit_time = idx
hit_reason = "both_hit_same_candle"
if intrabar_mode == "worst":
outcome = "loss"
elif intrabar_mode == "best":
outcome = "win"
else:
outcome = "neutral"
break
if hit_tp:
outcome = "win"
hit_time = idx
hit_reason = "tp_hit"
break
if hit_sl:
outcome = "loss"
hit_time = idx
hit_reason = "sl_hit"
break
if outcome == "win":
wins += 1
elif outcome == "loss":
losses += 1
else:
neutrals += 1
details.append({
"time": pd.Timestamp(s["time"]).isoformat(),
"direction": direction,
"entry": entry,
"tp_lines": int(tp_lines),
"sl_lines": int(sl_lines),
"horizon_candles": int(horizon_candles),
"outcome": outcome,
"hit_time": hit_time.isoformat() if hit_time is not None else None,
"hit_reason": hit_reason,
"mfe": round(mfe, 2),
"mae": round(mae, 2),
"market_regime": s.get("market_regime"),
"regime_confidence": s.get("regime_confidence"),
})
total = wins + losses + neutrals
winrate = (wins / (wins + losses)) * 100 if (wins + losses) > 0 else 0.0
return {
"total": total,
"wins": wins,
"losses": losses,
"neutrals": neutrals,
"winrate_ex_neutral": round(winrate, 2),
"avg_mfe": round(sum(mfe_list) / max(1, len(mfe_list)), 2),
"avg_mae": round(sum(mae_list) / max(1, len(mae_list)), 2),
"tp_lines": int(tp_lines),
"sl_lines": int(sl_lines),
"horizon_candles": int(horizon_candles),
"intrabar_mode": intrabar_mode,
"details": details,
}
def check_and_save_daily_candle(self, candle_time):
"""
بررسی و ذخیره کندل روزانه
وقتی روز عوض می‌شود، کندل روزانه قبلی را ذخیره می‌کند
"""
try:
current_date = candle_time.floor('D')
# اگر daily خالی است، برگرد
if len(self.daily) == 0:
return
last_daily_index = self.daily.index[-1]
last_daily_date = last_daily_index.floor('D')
# اگر روز عوض شده
if current_date > last_daily_date:
# ذخیره کندل روزانه قبلی
self.save_candle_to_db(
'candles',
last_daily_index.isoformat(),
self.daily.loc[last_daily_index, 'open'],
self.daily.loc[last_daily_index, 'high'],
self.daily.loc[last_daily_index, 'low'],
self.daily.loc[last_daily_index, 'close']
)
print(f"💾 کندل روزانه ذخیره شد: {last_daily_date.date()}")
except Exception as e:
print(f"⚠️ خطا در ذخیره کندل روزانه: {e}")
def _adjust_for_range_market(self):
"""
تنظیم پارامترها برای بازار رنج
✅ اصلاح مهم:
- در رنج، بالا بردن ADX threshold باعث مرگ سیگنال‌ها می‌شود (چون ADX طبیعی رنج پایین است)
- اینجا thresholds را منطقی‌تر و سازگار با رنج می‌کنیم
"""
self._reset_dynamic_params_to_base()
# در رنج، بهتر است اسکالپ/mean-reversion فعال باشد
# پس threshold اسکالپ را سخت‌تر نکن؛ حتی کمی پایین‌تر بیاور
self.SCALP_ADX_THRESHOLD = max(12, min(22, float(self.SCALP_ADX_THRESHOLD) - 2))
self.SWING_ADX_THRESHOLD = max(18, min(30, float(self.SWING_ADX_THRESHOLD))) # سوئینگ در رنج کمتر ترجیح داده می‌شود
# SLها را مثل قبل دست نزن (طبق اصلاحات قبلی خودت)
self.SCALP_STOP_LOSS_LINES = max(6, min(self.SCALP_MAX_STOP, self.SCALP_STOP_LOSS_LINES))
self.SWING_STOP_LOSS_LINES = max(7, min(self.SWING_MAX_STOP, self.SWING_STOP_LOSS_LINES))
print(
f" ⚙️ تنظیمات برای بازار رنج (غیرتجمعی/سازگار با سیگنال): "
f"SCALP_ADX>={self.SCALP_ADX_THRESHOLD:.0f}, "
f"SWING_ADX>={self.SWING_ADX_THRESHOLD:.0f}"
)
def _adjust_for_trend_market(self):
"""تنظیم پارامترها برای بازار رونددار (اصلاح‌شده: غیرتجمعی)."""
# ✅ جلوگیری از تغییر تجمعی
self._reset_dynamic_params_to_base()
# کمی سخت‌گیرتر برای تایید روند
self.SCALP_ADX_THRESHOLD = min(30, self.SCALP_ADX_THRESHOLD + 2)
self.SWING_ADX_THRESHOLD = min(35, self.SWING_ADX_THRESHOLD + 3)
# افزایش RR به شکل غیرتجمعی
self.SCALP_RR_RATIO = min(2.5, self.SCALP_RR_RATIO + 0.3)
self.SWING_RR_RATIO = min(3.0, self.SWING_RR_RATIO + 0.5)
print(
f" ⚙️ تنظیمات برای بازار رونددار (غیرتجمعی): "
f"SCALP_ADX>{self.SCALP_ADX_THRESHOLD:.0f}, SCALP_RR={self.SCALP_RR_RATIO:.1f}, "
f"SWING_ADX>{self.SWING_ADX_THRESHOLD:.0f}, SWING_RR={self.SWING_RR_RATIO:.1f}"
)
def process_new_candle(self, current_time, current_price):
"""
✅ پردازش ریل‌تایم با هر تیک:
- آپدیت کندل جاری (OHLC واقعی)
- وقتی کندل جدید شروع شد:
* ذخیره کندل بسته‌شده قبلی در DB
* آپدیت اندیکاتورها
* آپدیت رژیم بازار با شمارنده واقعی کندل (نه len=500)
* تولید سیگنال فقط بر اساس کندل بسته‌شده
"""
try:
# قیمت و زمان
self.LAST_KNOWN_PRICE = float(current_price)
tf = self._signal_timeframe()
tf_minutes = self._timeframe_minutes()
# شمارنده کندل جدید (برای رژیم بازار)
if not hasattr(self, "_new_candle_count"):
self._new_candle_count = 0
is_new_candle, closed_candle_to_save, candle_time_floor = self.update_m3_data(current_time, current_price)
# اگر هنوز کندل جدید شروع نشده، فقط اگر in_trade داری (اختیاری) مدیریت کن و خارج شو
if not is_new_candle:
if self.in_trade and self.MARKET_PRICE:
try:
self.check_trade_management(self.MARKET_PRICE)
except Exception as e:
print(f"⚠️ خطا در check_trade_management: {e}")
return
# ✅ از اینجا به بعد: کندل جدید شروع شده (پس کندل قبلی بسته شده)
self._new_candle_count += 1
self.LAST_CANDLE_TIME = candle_time_floor
print(f"\n📊 کندل جدید ({tf}) شروع شد: {candle_time_floor} | price={float(current_price):.2f}")
# Warm-up: فقط روی کندل جدید حساب شود
if self.WARMUP_ENABLED and not self.warmup_complete:
self.warmup_candles_seen += 1
remaining = self.WARMUP_CANDLES - self.warmup_candles_seen
if remaining > 0:
print(f"🔥 Warm-up: کندل {self.warmup_candles_seen}/{self.WARMUP_CANDLES} - {remaining} کندل دیگر مانده")
# ذخیره کندل بسته‌شده قبلی (اگر وجود دارد)
if closed_candle_to_save:
try:
self.save_candle_to_db(
self.M3_TABLE,
closed_candle_to_save['datetime'],
closed_candle_to_save['open'],
closed_candle_to_save['high'],
closed_candle_to_save['low'],
closed_candle_to_save['close']
)
# ذخیره/چک کندل روزانه بر اساس زمان کندل بسته‌شده
self.check_and_save_daily_candle(pd.Timestamp(closed_candle_to_save['datetime']))
except Exception as e:
print(f"⚠️ خطا در ذخیره کندل بسته‌شده: {e}")
# آپدیت رژیم بازار هر N کندل واقعی (نه بر اساس len(self.m3))
try:
update_interval = int(self.REGIME_SETTINGS.get('update_interval', 5) or 5)
if update_interval < 1:
update_interval = 5
if self._new_candle_count % update_interval == 0:
self._update_market_regime()
except Exception as e:
print(f"⚠️ خطا در _update_market_regime: {e}")
# آپدیت اندیکاتورها فقط وقتی دیتا کافی است
if isinstance(self.m3, pd.DataFrame) and len(self.m3) >= 50:
try:
self.update_m3_indicators()
except Exception as e:
print(f"⚠️ خطا در update_m3_indicators: {e}")
# اگر در معامله‌ای هستی، اینجا مدیریت/خروج (برای حالت معامله داخلی)
# اگر فقط سیگنال می‌خواهی، پیشنهاد می‌کنم SIGNAL_ONLY_MODE را فعال کنی (پایین توضیح می‌دهم)
signal_only = bool(getattr(self, "SIGNAL_ONLY_MODE", False))
if self.in_trade and not signal_only:
try:
self.check_trade_management(float(current_price))
except Exception as e:
print(f"⚠️ خطا در check_trade_management: {e}")
try:
self.check_time_based_exit()
except Exception as e:
print(f"⚠️ خطا در check_time_based_exit: {e}")
return
# Warmup
if not self.check_warmup_status():
return
# فاصله بین سیگنال‌ها (بر اساس کندل)
if self.last_trade_exit_time:
minutes_since = (datetime.now() - self.last_trade_exit_time).total_seconds() / 60
required_minutes = self.MIN_CANDLES_BETWEEN_TRADES * tf_minutes
if minutes_since < required_minutes:
remaining = required_minutes - minutes_since
print(f"⏳ زمان انتظار بین سیگنال‌ها: {remaining:.0f} دقیقه مانده (TF={tf})")
return
# چک دیتا کافی
if len(self.m3) < 50 or len(self.daily) < 55:
return
# ✅ مهم: سیگنال فقط روی کندل بسته‌شده ساخته می‌شود (داخل check_entry_signal انجام می‌دهیم)
signal = self.check_entry_signal(float(current_price))
if signal:
print(f"\n🎯 سیگنال {signal.upper()} شناسایی شد!")
self.execute_entry(signal, float(current_price))
else:
print(f"⚪ سیگنال ورودی یافت نشد")
except Exception as e:
print(f"⚠️ خطا در process_new_candle: {e}")
import traceback
traceback.print_exc()
def execute_entry(self, direction, current_price):
"""
✅ نسخه سیگنال-محور:
- فقط سیگنال ورود را ارسال می‌کند
- وارد معامله داخلی نمی‌شود (اگر SIGNAL_ONLY_MODE=True باشد)
- last_trade_exit_time را ست می‌کند تا cooldown حفظ شود
"""
try:
print(f"\n{'═' * 60}")
print(f"🎯 شروع فرآیند سیگنال {direction.upper()}")
print(f"{'═' * 60}")
current_price = int(float(current_price))
signal_only = bool(getattr(self, "SIGNAL_ONLY_MODE", False))
# استراتژی برای برچسب‌گذاری سیگنال
self.current_trade_strategy = self.determine_strategy_mode()
strategy = self.current_trade_strategy
# برای سازگاری با کلاینت‌ها، SL/TP را هم محاسبه می‌کنیم (ولی هیچ فیلتری روی ارسال سیگنال اعمال نمی‌کنیم)
stop_distance_lines = self.calculate_dynamic_stop_loss(current_price, direction)
if stop_distance_lines < 1:
stop_distance_lines = 1
if direction == 'long':
stop_loss_price = current_price - stop_distance_lines
take_profit_price = current_price + int(stop_distance_lines * (self.SCALP_RR_RATIO if strategy == 'scalp' else self.SWING_RR_RATIO))
else:
stop_loss_price = current_price + stop_distance_lines
take_profit_price = current_price - int(stop_distance_lines * (self.SCALP_RR_RATIO if strategy == 'scalp' else self.SWING_RR_RATIO))
self.current_internal_trade_id = f"sig_{direction}_{datetime.now().strftime('%Y%m%d%H%M%S%f')}"
# ژورنال entry (اختیاری)
if hasattr(self, 'journal') and self.journal:
try:
signal_data = self._prepare_signal_data(direction, strategy, current_price)
signal_data.update({
'trade_id': self.current_internal_trade_id,
'entry_price': current_price,
'stop_loss': int(stop_loss_price),
'take_profit': int(take_profit_price),
'symbol': 'MAZANEH'
})
self.journal.log_trade_entry(signal_data)
except Exception as je:
print(f"⚠️ خطا در ثبت ژورنال ورود: {je}")
# ارسال سیگنال
self.send_enter_trade_command(
direction=direction,
entry_price=current_price,
stop_loss=int(stop_loss_price),
take_profit=int(take_profit_price)
)
# ✅ اگر فقط سیگنال می‌خواهی، اینجا معامله داخلی را فعال نکن
if signal_only:
print(f"✅ SIGNAL_ONLY_MODE فعال است → معامله داخلی باز نمی‌شود.")
# برای cooldown
self.last_trade_exit_time = datetime.now()
return True
# اگر سیگنال-اونلی نیست، رفتار قبلی (ورود داخلی) را می‌توانی اینجا نگه داری
# اما چون گفتی مهم نیست، عمداً انجام نمی‌دهم.
self.last_trade_exit_time = datetime.now()
return True
except Exception as e:
print(f"\n❌ خطا در execute_entry: {e}")
import traceback
traceback.print_exc()
return False
def reset_trade_variables(self):
self.in_trade = False
self.trade_direction = None
self.entry_price = 0
self.stop_loss_price = 0
self.take_profit_price = 0
self.current_trade_strategy = None
self.trade_start_time = None
self.highest_price_in_trade = 0
self.lowest_price_in_trade = float('inf')
self.current_internal_trade_id = None
# ✅ اضافه شد: پاک کردن لیست معاملات باز
if hasattr(self, 'open_trades') and self.open_trades:
self.open_trades.clear()
def on_market_message(self, ws, message):
"""✅ پردازش پیام‌های دریافتی از WebSocket قیمت (با آپدیت واقعی OHLC در هر تیک)"""
try:
data = json.loads(message)
if "price" not in data:
return
self.MARKET_PRICE = float(data["price"])
self.LAST_KNOWN_PRICE = self.MARKET_PRICE
current_time = pd.Timestamp.now()
# Warmup start
if self.WARMUP_ENABLED and self.warmup_start_time is None:
self.start_warmup()
self.first_realtime_candle_time = current_time
# ارسال قیمت لحظه‌ای (اختیاری)
if not hasattr(self, '_last_price_send_time'):
self._last_price_send_time = datetime.now()
time_since_last_send = (datetime.now() - self._last_price_send_time).total_seconds()
PRICE_SEND_INTERVAL = 1
if time_since_last_send >= PRICE_SEND_INTERVAL:
self.send_realtime_price(self.MARKET_PRICE)
self._last_price_send_time = datetime.now()
# ✅ مهم: هر تیک را بده به process_new_candle تا:
# - OHLC کندل جاری آپدیت شود
# - و فقط در شروع کندل جدید، اندیکاتورها/سیگنال اجرا شود
self.process_new_candle(current_time, self.MARKET_PRICE)
except json.JSONDecodeError as e:
print(f"⚠️ خطا در پارس JSON: {e}")
except Exception as e:
print(f"⚠️ خطا در پردازش پیام: {e}")
import traceback
traceback.print_exc()
def on_market_open(self, ws):
self.CONNECTION_STATUS = "connected"
print("✅ اتصال به WebSocket قیمت زنده MAZANEH برقرار شد.")
def on_market_close(self, ws, close_status_code, close_msg):
self.CONNECTION_STATUS = "disconnected"
print(f"❌ اتصال WebSocket بسته شد. کد: {close_status_code}, پیام: {close_msg}")
def on_market_error(self, ws, error):
print(f"⚠️ خطای WebSocket: {error}")
def connect_to_price_socket(self):
"""
اتصال به WebSocket قیمت با ارسال کوکی‌های احراز هویت
"""
# ═══════════════════════════════════════════════════════════════
# 🍪 ساخت رشته کوکی برای ارسال در هدر
# ═══════════════════════════════════════════════════════════════
cookie_string = "; ".join([f"{key}={value}" for key, value in self.COOKIES.items()])
# ═══════════════════════════════════════════════════════════════
# 📋 تنظیم هدرها به صورت لیست (فرمت صحیح برای websocket-client)
# ⚠️ نکته: Origin را حذف کردیم چون کتابخانه خودش اضافه می‌کند
# ═══════════════════════════════════════════════════════════════
headers = [
f"Cookie: {cookie_string}",
"User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
]
print(f"🔗 اتصال به WebSocket: {self.MARKET_WSS_URL}")
ws = websocket.WebSocketApp(
self.MARKET_WSS_URL,
on_message=self.on_market_message,
on_open=self.on_market_open,
on_close=self.on_market_close,
on_error=self.on_market_error,
header=headers # هدرها به صورت لیست
)
# ═══════════════════════════════════════════════════════════════
# 🔌 اتصال با تنظیم origin صحیح
# ═══════════════════════════════════════════════════════════════
ws.run_forever(
ping_interval=30,
ping_timeout=10,
reconnect=5,
origin="https://hivagold.com" # Origin را اینجا تنظیم می‌کنیم
)
def wait_for_connection(self, timeout=60):
start_time = time.time()
elapsed = 0
print(f"⏳ منتظر اتصال... (تا {timeout} ثانیه)")
while self.CONNECTION_STATUS != "connected":
elapsed = time.time() - start_time
if elapsed > timeout:
print(f"❌ زمان انتظار برای اتصال به پایان رسید ({timeout} ثانیه)")
return False
if int(elapsed) % 10 == 0 and int(elapsed) > 0:
print(f"⏳ در انتظار اتصال... ({int(elapsed)}/{timeout} ثانیه)")
time.sleep(1)
print(f"✅ اتصال پس از {elapsed:.1f} ثانیه برقرار شد")
return True
def show_status(self):
"""
نمایش وضعیت فعلی ربات
🔄 تغییرات:
- نمایش تعداد کلاینت‌های متصل به WebSocket Server داخلی
- تغییر از تیک به خط
"""
portfolio_pnl = self.balance - self.TOTAL_BALANCE
portfolio_percent = 0
if self.PORTFOLIO_SIZE > 0:
portfolio_percent = (portfolio_pnl / self.PORTFOLIO_SIZE) * 100
print("\n" + "=" * 70)
print("📊 وضعیت ربات معامله‌گر مظنه")
print("=" * 70)
if self.CONNECTION_STATUS == 'connected':
print(f"🔗 اتصال به سرور قیمت: ✅ متصل")
else:
print(f"🔗 اتصال به سرور قیمت: ❌ قطع")
if self.COMMAND_SERVER_STATUS == 'running':
client_count = self.get_connected_clients_count()
print(f"📡 WebSocket Server: ✅ فعال (پورت {self.WS_SERVER_PORT})")
print(f"📱 کلاینت‌های متصل: {client_count}")
else:
print(f"📡 WebSocket Server: ❌ غیرفعال")
print(f"💰 کل موجودی: {self.balance:,.0f} تومان")
print(f"💼 سایز پورتفو: {self.PORTFOLIO_SIZE:,.0f} تومان")
print(f"📈 سود/ضرر پورتفو: {portfolio_pnl:+,.0f} تومان ({portfolio_percent:+.1f}%)")
print(f"📊 تغییر از ابتدا: {(self.balance - self.TOTAL_BALANCE):+,.0f} تومان")
print(f"💵 ارزش هر خط: {self.LINE_VALUE_TOMAN:,.0f} تومان")
print(f"📊 رژیم بازار: {self.get_market_simple_regime()}")
if self.market_regime == "ranging" and self.regime_confidence > 70:
print(" 💡 بازار در رنج با کیفیت - مناسب استراتژی رنج")
elif self.market_regime == "trending_up":
print(" 💡 بازار صعودی - مناسب استراتژی دنبال‌کننده روند")
elif self.market_regime == "volatile":
print(" ⚠️ بازار پرنوسان - ریسک بالا")
if self.warmup_complete:
print(f"🔥 Warm-up: ✅ تمام شد")
else:
if self.warmup_start_time:
elapsed = (datetime.now() - self.warmup_start_time).total_seconds() / 60
print(f"🔥 Warm-up: در حال گرم شدن ({self.warmup_candles_seen}/{self.WARMUP_CANDLES} کندل | {elapsed:.1f}/{self.WARMUP_MINUTES} دقیقه)")
else:
print(f"🔥 Warm-up: منتظر شروع...")
if self.trades > 0:
win_rate = (self.winning_trades / self.trades) * 100
avg_profit = 0
if self.winning_trades > 0:
avg_profit = self.total_profit / self.winning_trades
avg_loss = 0
if self.losing_trades > 0:
avg_loss = self.total_loss / self.losing_trades
profit_factor = float('inf')
if self.total_loss > 0:
profit_factor = self.total_profit / self.total_loss
total_pnl = self.total_profit - self.total_loss
print("-" * 70)
print(f"📊 تعداد معاملات: {self.trades}")
print(f"✅ معاملات برنده: {self.winning_trades}")
print(f"❌ معاملات بازنده: {self.losing_trades}")
print(f"📈 نرخ برد: {win_rate:.1f}%")
print(f"💰 میانگین سود: {avg_profit:,.0f} تومان")
print(f"💸 میانگین ضرر: {avg_loss:,.0f} تومان")
print(f"⚖️ فاکتور سود: {profit_factor:.2f}")
print(f"📊 سود خالص: {total_pnl:+,.0f} تومان")
print(f"📉 حداکثر افت سرمایه: {self.max_drawdown:.2f}%")
if self.in_trade and self.trade_direction:
print("-" * 70)
print(f"🔥 معامله باز: {self.trade_direction.upper()}")
if self.current_trade_strategy:
print(f"🎯 استراتژی: {self.current_trade_strategy.upper()}")
print(f"📈 قیمت ورود: {int(self.entry_price)}")
current_display_price = "نامشخص"
if self.MARKET_PRICE:
current_display_price = f"{int(self.MARKET_PRICE)}"
elif self.LAST_KNOWN_PRICE:
current_display_price = f"{int(self.LAST_KNOWN_PRICE)} (آخرین)"
print(f"📊 قیمت فعلی: {current_display_price}")
print(f"🛡️ حد ضرر: {int(self.stop_loss_price)}")
print(f"🎯 حد سود: {int(self.take_profit_price)}")
if self.trade_direction == 'long':
print(f"📈 بالاترین قیمت: {int(self.highest_price_in_trade)}")
else:
lowest = self.lowest_price_in_trade if self.lowest_price_in_trade != float('inf') else 0
print(f"📉 پایین‌ترین قیمت: {int(lowest)}")
current_price_for_calc = self.MARKET_PRICE if self.MARKET_PRICE else self.LAST_KNOWN_PRICE
if current_price_for_calc:
current_price_for_calc = int(current_price_for_calc)
entry_price = int(self.entry_price)
if self.trade_direction == 'long':
unrealized_lines = current_price_for_calc - entry_price
else:
unrealized_lines = entry_price - current_price_for_calc
unrealized_pnl = unrealized_lines * self.LINE_VALUE_TOMAN
pnl_percent = 0
if self.portfolio > 0:
pnl_percent = (unrealized_pnl / self.portfolio) * 100
print(f"📊 سود/ضرر شناور: {unrealized_pnl:+,.0f} تومان ({pnl_percent:+.1f}%) [{unrealized_lines:+} خط]")
try:
take_profit = int(self.take_profit_price)
if self.trade_direction == 'long':
if take_profit > entry_price:
total_distance = take_profit - entry_price
current_progress = current_price_for_calc - entry_price
progress_percent = (current_progress / total_distance) * 100
progress_percent = max(0, min(100, progress_percent))
print(f"📈 پیشرفت به حد سود: {progress_percent:.1f}%")
else:
if take_profit < entry_price:
total_distance = entry_price - take_profit
current_progress = entry_price - current_price_for_calc
progress_percent = (current_progress / total_distance) * 100
progress_percent = max(0, min(100, progress_percent))
print(f"📉 پیشرفت به حد سود: {progress_percent:.1f}%")
except ZeroDivisionError:
pass
if self.trade_start_time:
duration = datetime.now() - self.trade_start_time
hours = int(duration.seconds // 3600)
minutes = int((duration.seconds % 3600) // 60)
seconds = int(duration.seconds % 60)
print(f"⏱️ مدت معامله: {hours:02d}:{minutes:02d}:{seconds:02d}")
now = datetime.now()
current_minute = now.minute
next_candle_minute = ((current_minute // 5) + 1) * 5
if next_candle_minute >= 60:
next_candle_time = now.replace(hour=now.hour + 1, minute=0, second=0, microsecond=0)
else:
next_candle_time = now.replace(minute=next_candle_minute, second=0, microsecond=0)
time_to_next = next_candle_time - now
if time_to_next.total_seconds() > 0:
mins = int(time_to_next.total_seconds() // 60)
secs = int(time_to_next.total_seconds() % 60)
print(f"🕒 زمان تا کندل بعدی: {mins}:{secs:02d}")
else:
print("-" * 70)
print("⏳ در حال انتظار برای سیگنال مناسب...")
if self.MARKET_PRICE:
print(f"📊 آخرین قیمت: {int(self.MARKET_PRICE)}")
elif self.LAST_KNOWN_PRICE:
print(f"📊 آخرین قیمت شناخته شده: {int(self.LAST_KNOWN_PRICE)}")
else:
print("📊 آخرین قیمت: در انتظار داده")
try:
if len(self.m3) >= 21:
if 'adx' in self.m3.columns and 'rsi14' in self.m3.columns:
adx_value = self.m3['adx'].iloc[-1]
rsi_value = self.m3['rsi14'].iloc[-1]
if pd.notna(adx_value) and pd.notna(rsi_value):
print(f"📊 ADX: {adx_value:.1f} | RSI: {rsi_value:.1f}")
if 'ema9' in self.m3.columns and 'ema21' in self.m3.columns:
ema9_value = self.m3['ema9'].iloc[-1]
ema21_value = self.m3['ema21'].iloc[-1]
if pd.notna(ema9_value) and pd.notna(ema21_value):
print(f"📈 EMA9: {ema9_value:.0f} | EMA21: {ema21_value:.0f}")
if ema9_value > ema21_value:
print("🎯 سیگنال EMA: صعودی (EMA9 > EMA21)")
else:
print("🎯 سیگنال EMA: نزولی (EMA9 < EMA21)")
except Exception as e:
print(f"📊 اندیکاتورها: در حال محاسبه")
if self.last_trade_exit_time:
time_since_last = datetime.now() - self.last_trade_exit_time
minutes_since = time_since_last.total_seconds() / 60
required_minutes = self.MIN_CANDLES_BETWEEN_TRADES * self.REQUIRED_MINUTES
remaining = required_minutes - minutes_since
if remaining > 0:
print(f"⏳ زمان انتظار بین معاملات: {remaining:.0f} دقیقه باقی‌مانده")
print("=" * 70)
def save_state(self):
"""
ذخیره وضعیت ربات در فایل
🔄 تغییر: حذف position_size، اضافه کردن pnl_lines
"""
try:
trade_start_time_str = None
if self.trade_start_time:
trade_start_time_str = self.trade_start_time.isoformat()
last_trade_exit_time_str = None
if self.last_trade_exit_time:
last_trade_exit_time_str = self.last_trade_exit_time.isoformat()
last_candle_time_str = None
if self.LAST_CANDLE_TIME is not None:
if isinstance(self.LAST_CANDLE_TIME, pd.Timestamp):
last_candle_time_str = self.LAST_CANDLE_TIME.isoformat()
# تبدیل تاریخچه معاملات
serializable_trade_history = []
for trade in self.trade_history[-20:]:
trade_copy = trade.copy()
if 'entry_time' in trade_copy and trade_copy['entry_time'] is not None:
if isinstance(trade_copy['entry_time'], datetime):
trade_copy['entry_time'] = trade_copy['entry_time'].isoformat()
if 'exit_time' in trade_copy and trade_copy['exit_time'] is not None:
if isinstance(trade_copy['exit_time'], datetime):
trade_copy['exit_time'] = trade_copy['exit_time'].isoformat()
serializable_trade_history.append(trade_copy)
state = {
'timestamp': datetime.now().isoformat(),
'balance': self.balance,
'portfolio': self.portfolio,
'warmup_complete': self.warmup_complete,
'warmup_candles_seen': self.warmup_candles_seen,
'ws_server_port': self.WS_SERVER_PORT,
'trades': self.trades,
'winning_trades': self.winning_trades,
'losing_trades': self.losing_trades,
'total_profit': self.total_profit,
'total_loss': self.total_loss,
'max_drawdown': self.max_drawdown,
'peak_balance': self.peak_balance,
'in_trade': self.in_trade,
'trade_direction': self.trade_direction,
'current_trade_strategy': self.current_trade_strategy,
'entry_price': int(self.entry_price) if self.entry_price else 0,
'stop_loss_price': int(self.stop_loss_price) if self.stop_loss_price else 0,
'take_profit_price': int(self.take_profit_price) if self.take_profit_price else 0,
'trade_start_time': trade_start_time_str,
'last_trade_exit_time': last_trade_exit_time_str,
'trade_history': serializable_trade_history,
'market_price': int(self.MARKET_PRICE) if self.MARKET_PRICE else None,
'last_known_price': int(self.LAST_KNOWN_PRICE) if self.LAST_KNOWN_PRICE else None,
'last_candle_time': last_candle_time_str,
'connection_status': self.CONNECTION_STATUS,
'm5_data_length': len(self.m3),
'daily_data_length': len(self.daily),
'highest_price_in_trade': int(self.highest_price_in_trade) if self.highest_price_in_trade else 0,
'lowest_price_in_trade': int(self.lowest_price_in_trade) if self.lowest_price_in_trade != float('inf') else 0,
'line_value_toman': self.LINE_VALUE_TOMAN
}
# with open('bot_state.json', 'w', encoding='utf-8') as f:
# json.dump(state, f, indent=4, default=str, ensure_ascii=False)
#
# print(f"💾 وضعیت ربات ذخیره شد (معاملات: {self.trades}, موجودی: {self.balance:,.0f} تومان)")
return True
except Exception as e:
print(f"⚠️ خطا در ذخیره وضعیت: {e}")
import traceback
traceback.print_exc()
return False
def load_state(self):
"""
بارگذاری وضعیت ربات از فایل
🔄 تغییر: حذف position_size
"""
try:
with open('bot_state.json', 'r', encoding='utf-8') as f:
state = json.load(f)
print(f"📂 بارگذاری وضعیت از {state.get('timestamp', 'نامشخص')}")
# بارگذاری موجودی
loaded_balance = state.get('balance', self.TOTAL_BALANCE)
if loaded_balance < 0 or loaded_balance > self.TOTAL_BALANCE * 10:
print(f"⚠️ موجودی بارگذاری شده غیرمنطقی: {loaded_balance:,.0f} تومان")
print(f" استفاده از موجودی اولیه: {self.TOTAL_BALANCE:,.0f} تومان")
self.balance = self.TOTAL_BALANCE
else:
self.balance = loaded_balance
self.warmup_complete = state.get('warmup_complete', False)
self.warmup_candles_seen = state.get('warmup_candles_seen', 0)
self.portfolio = state.get('portfolio', self.PORTFOLIO_SIZE)
self.trades = state.get('trades', 0)
self.winning_trades = state.get('winning_trades', 0)
self.losing_trades = state.get('losing_trades', 0)
self.total_profit = state.get('total_profit', 0)
self.total_loss = state.get('total_loss', 0)
self.max_drawdown = state.get('max_drawdown', 0)
self.peak_balance = state.get('peak_balance', self.TOTAL_BALANCE)
self.in_trade = state.get('in_trade', False)
self.trade_direction = state.get('trade_direction')
self.current_trade_strategy = state.get('current_trade_strategy')
self.entry_price = state.get('entry_price', 0)
self.stop_loss_price = state.get('stop_loss_price', 0)
self.take_profit_price = state.get('take_profit_price', 0)
self.highest_price_in_trade = state.get('highest_price_in_trade', 0)
loaded_lowest = state.get('lowest_price_in_trade', 0)
if loaded_lowest == 0:
self.lowest_price_in_trade = float('inf')
else:
self.lowest_price_in_trade = loaded_lowest
# بارگذاری زمان‌ها
trade_start_time_str = state.get('trade_start_time')
if trade_start_time_str:
self.trade_start_time = datetime.fromisoformat(trade_start_time_str)
else:
self.trade_start_time = None
last_trade_exit_time_str = state.get('last_trade_exit_time')
if last_trade_exit_time_str:
self.last_trade_exit_time = datetime.fromisoformat(last_trade_exit_time_str)
else:
self.last_trade_exit_time = None
# بارگذاری تاریخچه
self.trade_history = state.get('trade_history', [])
for trade in self.trade_history:
if isinstance(trade.get('entry_time'), str):
trade['entry_time'] = datetime.fromisoformat(trade['entry_time'])
if isinstance(trade.get('exit_time'), str):
trade['exit_time'] = datetime.fromisoformat(trade['exit_time'])
self.MARKET_PRICE = state.get('market_price')
self.LAST_KNOWN_PRICE = state.get('last_known_price')
last_candle_time_str = state.get('last_candle_time')
if last_candle_time_str:
self.LAST_CANDLE_TIME = pd.Timestamp.fromisoformat(last_candle_time_str)
else:
self.LAST_CANDLE_TIME = None
self.CONNECTION_STATUS = state.get('connection_status', 'disconnected')
print(f"✅ وضعیت بارگذاری شد: {self.trades} معامله، موجودی: {self.balance:,.0f} تومان")
if self.in_trade:
print(f"⚠️ معامله باز یافت شد: {self.trade_direction} در قیمت {self.entry_price}")
if self.entry_price <= 0:
print(f"⚠️ اطلاعات معامله باز غیرمنطقی است. بستن معامله...")
self.in_trade = False
self.trade_direction = None
self.entry_price = 0
self.stop_loss_price = 0
self.take_profit_price = 0
self.current_trade_strategy = None
return True
except FileNotFoundError:
print("ℹ️ فایل وضعیت یافت نشد. شروع با تنظیمات اولیه")
return False
except Exception as e:
print(f"⚠️ خطا در بارگذاری وضعیت: {e}")
import traceback
traceback.print_exc()
return False
def reset_state(self):
"""
ریست کامل وضعیت ربات به حالت اولیه
🔄 تغییر: حذف position_size، اضافه کردن warmup variables
"""
print("\n🔄 ریست کامل وضعیت ربات به حالت اولیه...")
self.balance = self.TOTAL_BALANCE
self.portfolio = self.PORTFOLIO_SIZE
self.trades = 0
self.winning_trades = 0
self.losing_trades = 0
self.total_profit = 0
self.total_loss = 0
self.max_drawdown = 0
self.peak_balance = self.TOTAL_BALANCE
self.in_trade = False
self.trade_direction = None
self.entry_price = 0
self.stop_loss_price = 0
self.take_profit_price = 0
self.current_trade_strategy = None
self.trade_start_time = None
self.last_trade_exit_time = None
self.trade_history = []
self.open_trades = []
self.current_internal_trade_id = None
self.highest_price_in_trade = 0
self.lowest_price_in_trade = float('inf')
self.MARKET_PRICE = None
self.LAST_KNOWN_PRICE = None
self.LAST_CANDLE_TIME = None
self.CONNECTION_STATUS = "disconnected"
self.warmup_complete = False
self.warmup_start_time = None
self.warmup_candles_seen = 0
self.first_realtime_candle_time = None
files_to_remove = ['bot_state.json', 'performance_report.json', 'trading_bot.log']
for filepath in files_to_remove:
try:
if os.path.exists(filepath):
os.remove(filepath)
print(f"✅ فایل {filepath} پاک شد")
except Exception as e:
print(f"⚠️ خطا در پاک کردن فایل {filepath}: {e}")
print("✅ وضعیت ربات با موفقیت ریست شد")
print(f"💰 موجودی جدید: {self.balance:,.0f} تومان")
def generate_performance_report(self):
"""
تولید گزارش عملکرد ربات
🔄 تغییر از تیک به خط
"""
print("\n" + "📊" * 25)
print("📊 گزارش کامل عملکرد ربات معامله‌گر مظنه 📊")
print("📊" * 25)
total_trades = self.trades
win_rate = 0
if total_trades > 0:
win_rate = (self.winning_trades / total_trades) * 100
total_pnl = self.total_profit - self.total_loss
total_return_percent = 0
if self.TOTAL_BALANCE > 0:
total_return_percent = ((self.balance - self.TOTAL_BALANCE) / self.TOTAL_BALANCE) * 100
profit_factor = float('inf')
if self.total_loss > 0:
profit_factor = self.total_profit / self.total_loss
avg_win = 0
if self.winning_trades > 0:
avg_win = self.total_profit / self.winning_trades
avg_loss = 0
if self.losing_trades > 0:
avg_loss = self.total_loss / self.losing_trades
avg_pnl_per_trade = 0
if total_trades > 0:
avg_pnl_per_trade = total_pnl / total_trades
print(f"\n📈 آمار کلی:")
print(f" • کل موجودی: {self.balance:,.0f} تومان")
print(f" • تغییرات کل: {(self.balance - self.TOTAL_BALANCE):+,.0f} تومان")
print(f" • درصد تغییر: {total_return_percent:+.2f}%")
print(f" • حداکثر موجودی: {self.peak_balance:,.0f} تومان")
print(f" • حداکثر افت سرمایه: {self.max_drawdown:.2f}%")
print(f" • ارزش پورتفو: {self.portfolio:,.0f} تومان")
print(f" • ارزش هر خط: {self.LINE_VALUE_TOMAN:,.0f} تومان")
print(f"\n📊 آمار معاملات:")
print(f" • تعداد کل معاملات: {total_trades}")
print(f" • معاملات برنده: {self.winning_trades}")
print(f" • معاملات بازنده: {self.losing_trades}")
print(f" • نرخ برد: {win_rate:.1f}%")
print(f" • مجموع سود: {self.total_profit:,.0f} تومان")
print(f" • مجموع ضرر: {self.total_loss:,.0f} تومان")
print(f" • سود خالص: {total_pnl:+,.0f} تومان")
print(f" • میانگین سود هر معامله برنده: {avg_win:,.0f} تومان")
print(f" • میانگین ضرر هر معامله بازنده: {avg_loss:,.0f} تومان")
print(f" • میانگین سود/ضرر هر معامله: {avg_pnl_per_trade:+,.0f} تومان")
print(f" • فاکتور سود: {profit_factor:.2f}")
print(f"\n⚙️ پارامترهای معاملاتی:")
print(f" • ریسک هر معامله: {self.RISK_PERCENTAGE * 100}% از پورتفو")
print(f" • حد ضرر متحرک اولیه: {self.TRAILING_STOP_LINES} خط")
print(f" • گام حد ضرر: {self.TRAILING_STEP_LINES} خط")
print(f" • آستانه ADX: {self.ADX_THRESHOLD}")
print(f" • حداقل فاصله بین معاملات: {self.MIN_CANDLES_BETWEEN_TRADES} کندل")
# نمایش تاریخچه معاملات
if self.trade_history:
num_trades_to_show = min(5, len(self.trade_history))
print(f"\n📋 تاریخچه {num_trades_to_show} معامله آخر:")
print("-" * 80)
for i, trade in enumerate(self.trade_history[-num_trades_to_show:], 1):
pnl = trade.get('pnl_toman', 0)
pnl_lines = trade.get('pnl_lines', 0)
direction = trade.get('direction', 'unknown')
reason = trade.get('reason', 'نامشخص')
strategy = trade.get('strategy', 'نامشخص')
if pnl > 0:
emoji = '🟢'
else:
emoji = '🔴'
print(f"\n {i}. {emoji} {direction.upper()} ({strategy}) | {reason}")
entry_time = trade.get('entry_time')
if entry_time:
if isinstance(entry_time, datetime):
print(f" 🕒 ورود: {entry_time.strftime('%Y-%m-%d %H:%M')}")
else:
print(f" 🕒 ورود: {entry_time}")
exit_time = trade.get('exit_time')
if exit_time:
if isinstance(exit_time, datetime):
print(f" 🕒 خروج: {exit_time.strftime('%Y-%m-%d %H:%M')}")
else:
print(f" 🕒 خروج: {exit_time}")
print(f" 📈 قیمت ورود: {trade.get('entry_price', 0)}")
print(f" 📉 قیمت خروج: {trade.get('exit_price', 0)}")
print(f" 📊 تغییر: {pnl_lines:+} خط")
print(f" 💰 سود/ضرر: {pnl:+,.0f} تومان ({trade.get('pnl_percent', 0):+.1f}%)")
print(f" ⏱️ مدت: {trade.get('duration', 'نامشخص')}")
print(f" 📊 موجودی بعد: {trade.get('balance_after', 0):,.0f} تومان")
print("-" * 80)
# ذخیره گزارش
try:
report_data = {
'timestamp': datetime.now().isoformat(),
'balance': self.balance,
'total_balance_initial': self.TOTAL_BALANCE,
'total_trades': total_trades,
'winning_trades': self.winning_trades,
'losing_trades': self.losing_trades,
'win_rate': win_rate,
'total_profit': self.total_profit,
'total_loss': self.total_loss,
'total_pnl': total_pnl,
'total_return_percent': total_return_percent,
'max_drawdown': self.max_drawdown,
'peak_balance': self.peak_balance,
'profit_factor': profit_factor if profit_factor != float('inf') else 999999,
'avg_win': avg_win,
'avg_loss': avg_loss,
'avg_pnl_per_trade': avg_pnl_per_trade,
'line_value_toman': self.LINE_VALUE_TOMAN,
'parameters': {
'risk_percentage': self.RISK_PERCENTAGE,
'trailing_stop_lines': self.TRAILING_STOP_LINES,
'trailing_step_lines': self.TRAILING_STEP_LINES,
'adx_threshold': self.ADX_THRESHOLD,
'min_candles_between_trades': self.MIN_CANDLES_BETWEEN_TRADES
}
}
serializable_history = []
for trade in self.trade_history[-10:]:
trade_copy = trade.copy()
if 'entry_time' in trade_copy and trade_copy['entry_time'] is not None:
if isinstance(trade_copy['entry_time'], datetime):
trade_copy['entry_time'] = trade_copy['entry_time'].isoformat()
if 'exit_time' in trade_copy and trade_copy['exit_time'] is not None:
if isinstance(trade_copy['exit_time'], datetime):
trade_copy['exit_time'] = trade_copy['exit_time'].isoformat()
serializable_history.append(trade_copy)
report_data['trade_history'] = serializable_history
# with open('performance_report.json', 'w', encoding='utf-8') as f:
# json.dump(report_data, f, indent=4, default=str, ensure_ascii=False)
#
# print(f"\n💾 گزارش در فایل 'performance_report.json' ذخیره شد.")
except Exception as e:
print(f"⚠️ خطا در ذخیره گزارش: {e}")
print("📊" * 25)
print("📊 پایان گزارش 📊")
print("📊" * 25)
def save_settings(self):
"""
ذخیره تنظیمات در فایل جداگانه
"""
try:
settings = {
'trailing_enabled': self.TRAILING_ENABLED,
'breakeven_enabled': self.BREAKEVEN_ENABLED,
'trailing_stop_lines': self.TRAILING_STOP_LINES,
'trailing_step_lines': self.TRAILING_STEP_LINES,
'breakeven_lines': self.BREAKEVEN_LINES,
'strategy_mode': self.STRATEGY_MODE,
'scalp_risk_percentage': self.SCALP_RISK_PERCENTAGE,
'swing_risk_percentage': self.SWING_RISK_PERCENTAGE,
'min_profit_lines': self.MIN_PROFIT_LINES,
'enable_counter_trend': self.ENABLE_COUNTER_TREND,
'updated_at': datetime.now().isoformat()
}
with open('bot_settings.json', 'w', encoding='utf-8') as f:
json.dump(settings, f, indent=4, ensure_ascii=False)
print(f"💾 تنظیمات ذخیره شد")
return True
except Exception as e:
print(f"⚠️ خطا در ذخیره تنظیمات: {e}")
return False
def load_settings(self):
"""
بارگذاری تنظیمات از فایل
"""
try:
if os.path.exists('bot_settings.json'):
with open('bot_settings.json', 'r', encoding='utf-8') as f:
settings = json.load(f)
self.TRAILING_ENABLED = settings.get('trailing_enabled', True)
self.BREAKEVEN_ENABLED = settings.get('breakeven_enabled', True)
self.TRAILING_STOP_LINES = settings.get('trailing_stop_lines', self.TRAILING_STOP_LINES)
self.TRAILING_STEP_LINES = settings.get('trailing_step_lines', self.TRAILING_STEP_LINES)
self.BREAKEVEN_LINES = settings.get('breakeven_lines', self.BREAKEVEN_LINES)
self.STRATEGY_MODE = settings.get('strategy_mode', 'auto')
self.SCALP_RISK_PERCENTAGE = settings.get('scalp_risk_percentage', 0.10)
self.SWING_RISK_PERCENTAGE = settings.get('swing_risk_percentage', 0.10)
self.MIN_PROFIT_LINES = settings.get('min_profit_lines', 5)
self.ENABLE_COUNTER_TREND = settings.get('enable_counter_trend', True)
print(f"📂 تنظیمات بارگذاری شد")
print(f" 🎯 Trailing: {'فعال' if self.TRAILING_ENABLED else 'غیرفعال'}")
print(f" 🛡️ Breakeven: {'فعال' if self.BREAKEVEN_ENABLED else 'غیرفعال'}")
print(f" 📊 Strategy: {self.STRATEGY_MODE}")
return True
except Exception as e:
print(f"⚠️ خطا در بارگذاری تنظیمات: {e}")
return False
def run(self):
print("\n" + "🎯" * 30)
print("🎯 ربات معامله‌گر مظنه آبشده هیواگلد 🎯")
print("🎯" * 30)
print(f"💰 کل موجودی حساب: {self.TOTAL_BALANCE:,.0f} تومان")
print(f"💼 سایز پورتفو برای هر معامله: {self.PORTFOLIO_SIZE:,.0f} تومان")
print(f"💵 ارزش هر خط: {self.LINE_VALUE_TOMAN:,.0f} تومان")
print(f"⚖️ ریسک هر معامله اسکالپ: {self.SCALP_RISK_PERCENTAGE * 100}% از پورتفو")
print(f"⚖️ ریسک هر معامله سوئینگ: {self.SWING_RISK_PERCENTAGE * 100}% از پورتفو")
print(f"🛡️ حد ضرر متحرک اولیه: {self.TRAILING_STOP_LINES} خط")
print(f"📊 گام حد ضرر: {self.TRAILING_STEP_LINES} خط")
print(f"🛡️ حد ضرر اسکالپ: {self.SCALP_STOP_LOSS_LINES} خط | سوئینگ: {self.SWING_STOP_LOSS_LINES} خط")
print(f"🎯 آستانه ADX اسکالپ: {self.SCALP_ADX_THRESHOLD} | سوئینگ: {self.SWING_ADX_THRESHOLD}")
print(f"⏱️ حداقل فاصله بین معاملات: {self.MIN_CANDLES_BETWEEN_TRADES} کندل ({self.MIN_CANDLES_BETWEEN_TRADES * 3} دقیقه)")
print(f"📡 WebSocket Server داخلی: ws://{self.WS_SERVER_HOST}:{self.WS_SERVER_PORT}")
print(f"🎯 حالت استراتژی: {self.STRATEGY_MODE}")
print("🎯" * 30)
self.load_settings()
if not self.start_websocket_server():
print("❌ خطا در راه‌اندازی WebSocket Server. ربات متوقف شد.")
return
if not self.load_historical_data():
print("❌ خطا در بارگذاری داده‌های تاریخی. ربات متوقف شد.")
self.stop_websocket_server()
return
if os.path.exists('bot_state.json'):
try:
with open('bot_state.json', 'r') as f:
state = json.load(f)
current_balance = state.get('balance', self.TOTAL_BALANCE)
if current_balance < 0 or current_balance > self.TOTAL_BALANCE * 10:
print(f"\n⚠️ وضعیت خراب تشخیص داده شد!")
print(f" موجودی در فایل: {current_balance:,.0f} تومان")
print(f" موجودی منطقی: {self.TOTAL_BALANCE:,.0f} تومان")
choice = input("آیا می‌خواهید وضعیت را ریست کنید؟ (y/n): ").strip().lower()
if choice == 'y':
self.reset_state()
else:
self.load_state()
else:
self.load_state()
except Exception as e:
print(f"⚠️ خطا در بررسی وضعیت: {e}")
self.reset_state()
ws_thread = threading.Thread(target=self.connect_to_price_socket, daemon=True)
ws_thread.start()
print("📡 در حال اتصال به سرور قیمت‌های زنده مظنه...")
if not self.wait_for_connection(timeout=60):
print("❌ اتصال به سرور قیمت برقرار نشد. ربات متوقف می‌شود.")
self.stop_websocket_server()
return
print("✅ اتصال به سرور قیمت برقرار شد. ربات فعال است.")
time.sleep(2)
try:
LOOP_INTERVAL = 1
STATUS_INTERVAL = 60
HEARTBEAT_INTERVAL = 30
SAVE_INTERVAL = 60
REPORT_INTERVAL = 600
OPTIMIZATION_INTERVAL = 3600
status_counter = 0
heartbeat_counter = 0
save_counter = 0
report_counter = 0
optimization_counter = 0
print(f"\n⏱️ تنظیمات زمانی:")
print(f" نمایش وضعیت: هر {STATUS_INTERVAL} ثانیه")
print(f" ارسال heartbeat: هر {HEARTBEAT_INTERVAL} ثانیه")
print(f" ذخیره وضعیت: هر {SAVE_INTERVAL} ثانیه")
print(f" گزارش عملکرد: هر {REPORT_INTERVAL} ثانیه")
print(f" بهینه‌سازی پارامترها: هر {OPTIMIZATION_INTERVAL} ثانیه")
print(f" 📡 کلاینت‌ها می‌توانند به ws://localhost:{self.WS_SERVER_PORT} متصل شوند")
while True:
status_counter += LOOP_INTERVAL
heartbeat_counter += LOOP_INTERVAL
save_counter += LOOP_INTERVAL
report_counter += LOOP_INTERVAL
optimization_counter += LOOP_INTERVAL
if status_counter >= STATUS_INTERVAL:
self.show_status()
status_counter = 0
if heartbeat_counter >= HEARTBEAT_INTERVAL:
self.send_heartbeat()
heartbeat_counter = 0
if save_counter >= SAVE_INTERVAL:
self.save_state()
save_counter = 0
if report_counter >= REPORT_INTERVAL:
self.generate_performance_report()
report_counter = 0
if optimization_counter >= OPTIMIZATION_INTERVAL:
if hasattr(self, 'param_optimizer') and self.param_optimizer:
self.param_optimizer.optimize_parameters()
optimization_counter = 0
time.sleep(LOOP_INTERVAL)
except KeyboardInterrupt:
print("\n\n⚠️ ربات توسط کاربر متوقف شد.")
if self.in_trade:
print("\n⚠️ هشدار: یک معامله باز وجود دارد!")
current_price = self.MARKET_PRICE if self.MARKET_PRICE else self.LAST_KNOWN_PRICE
if current_price:
current_price = int(current_price)
print(f" قیمت فعلی: {current_price}")
print(f" قیمت ورود: {int(self.entry_price)}")
print(f" جهت: {self.trade_direction}")
print(f" استراتژی: {self.current_trade_strategy}")
current_pnl = self.calculate_current_profit_lines(current_price)
current_pnl_toman = current_pnl * self.LINE_VALUE_TOMAN
print(f" سود/ضرر فعلی: {current_pnl:+} خط ({current_pnl_toman:+,.0f} تومان)")
action = input("آیا می‌خواهید معامله را با قیمت بازار ببندید؟ (y/n): ").strip().lower()
if action == 'y':
self.exit_trade(current_price, "خروج دستی توسط کاربر")
print("✅ معامله با موفقیت بسته شد.")
else:
print("⚠️ معامله باز باقی ماند. توجه: ربات متوقف شده است.")
else:
print("❌ قیمت بازار در دسترس نیست. امکان بستن معامله وجود ندارد.")
self.save_state()
self.generate_performance_report()
shutdown_command = {
"type": "bot_shutdown",
"reason": "user_requested",
"final_balance": self.balance,
"total_trades": self.trades,
"total_profit": self.total_profit,
"total_loss": self.total_loss,
"connected_clients": self.get_connected_clients_count(),
"timestamp": datetime.now().isoformat()
}
self.send_command(shutdown_command)
self.stop_websocket_server()
print("\n✅ ربات با موفقیت متوقف شد.")
def _db_fetchone_value(self, query: str, params: tuple = ()):
"""کمک‌تابع: اجرای یک query و برگرداندن اولین مقدار"""
with self._get_db_connection() as conn:
cur = conn.cursor()
cur.execute(query, params)
row = cur.fetchone()
if not row:
return None
# sqlite3.Row یا tuple
try:
return row[0]
except Exception:
return None
def get_latest_m3_time_from_db(self) -> pd.Timestamp | None:
"""آخرین زمان کندل M3 در دیتابیس"""
try:
max_time = self._db_fetchone_value(f"SELECT MAX(time) FROM {self.M3_TABLE}")
if not max_time:
return None
return pd.Timestamp(str(max_time))
except Exception as e:
print(f"⚠️ خطا در get_latest_m3_time_from_db: {e}")
return None
def load_m3_candles_range_from_db(self, start_time: pd.Timestamp, end_time: pd.Timestamp) -> list[dict]:
"""
خواندن کندل‌های M3 بین دو زمان از دیتابیس.
خروجی: لیست دیکشنری با keys: time, open, high, low, close
"""
try:
start_s = pd.Timestamp(start_time).isoformat()
end_s = pd.Timestamp(end_time).isoformat()
with self._get_db_connection() as conn:
cur = conn.cursor()
cur.execute(
f"""
SELECT time, open, high, low, close
FROM {self.M3_TABLE}
WHERE time >= ? AND time <= ?
ORDER BY time ASC
""",
(start_s, end_s)
)
rows = cur.fetchall()
candles = []
for r in rows:
# sqlite3.Row یا tuple
t = r["time"] if isinstance(r, sqlite3.Row) else r[0]
o = r["open"] if isinstance(r, sqlite3.Row) else r[1]
h = r["high"] if isinstance(r, sqlite3.Row) else r[2]
l = r["low"] if isinstance(r, sqlite3.Row) else r[3]
c = r["close"] if isinstance(r, sqlite3.Row) else r[4]
candles.append({
"time": pd.Timestamp(str(t)),
"open": float(o),
"high": float(h),
"low": float(l),
"close": float(c),
})
return candles
except Exception as e:
print(f"⚠️ خطا در load_m3_candles_range_from_db: {e}")
import traceback
traceback.print_exc()
return []
def load_last_m3_candles_by_limit(self, limit: int) -> list[dict]:
"""
fallback: اگر فیلتر زمان مشکل داشت، با LIMIT آخرین کندل‌ها را می‌گیریم.
"""
try:
if limit <= 0:
return []
with self._get_db_connection() as conn:
cur = conn.cursor()
cur.execute(
f"""
SELECT time, open, high, low, close
FROM {self.M3_TABLE}
ORDER BY time DESC
LIMIT ?
""",
(int(limit),)
)
rows = cur.fetchall()
rows = list(reversed(rows)) # قدیمی به جدید
candles = []
for r in rows:
t = r["time"] if isinstance(r, sqlite3.Row) else r[0]
o = r["open"] if isinstance(r, sqlite3.Row) else r[1]
h = r["high"] if isinstance(r, sqlite3.Row) else r[2]
l = r["low"] if isinstance(r, sqlite3.Row) else r[3]
c = r["close"] if isinstance(r, sqlite3.Row) else r[4]
candles.append({
"time": pd.Timestamp(str(t)),
"open": float(o),
"high": float(h),
"low": float(l),
"close": float(c),
})
return candles
except Exception as e:
print(f"⚠️ خطا در load_last_m3_candles_by_limit: {e}")
import traceback
traceback.print_exc()
return []
def _reset_for_backtest(self):
"""ریست وضعیت‌های ربات برای بک‌تست (بدون حذف دیتابیس)"""
# معامله داخلی را خاموش می‌کنیم چون شما سیگنال‌محور هستی
self.SIGNAL_ONLY_MODE = True
# warmup بر اساس زمان واقعی در بک‌تست معنی ندارد
self.WARMUP_ENABLED = False
self.warmup_complete = True
self.warmup_start_time = None
self.warmup_candles_seen = 0
# cooldown زمانی مبتنی بر datetime.now در بک‌تست نمی‌خواهیم؛
# ولی می‌توانیم با زمان شبیه‌سازی مدیریت کنیم (اینجا ساده‌تر: خاموش)
self.MIN_CANDLES_BETWEEN_TRADES = 0
self.last_trade_exit_time = None
# وضعیت معاملات داخلی
self.reset_trade_variables()
self.trade_history = []
self.open_trades = []
# شمارنده کندل
self._new_candle_count = 0
# لیست سیگنال‌های بک‌تست
self.backtest_signals = []
def run_backtest_from_db(
self,
last_days: int = 30,
seed_days: int = 5,
session_only: bool = True,
session_start: int = 11,
session_end: int = 19,
tz_offset_minutes: int = 0,
tp_lines: int = 10,
sl_lines: int = 8,
horizon_candles: int = 10,
print_signal_logs: bool = False, # ✅ فقط برای سازگاری
) -> dict:
"""
بک‌تست از دیتابیس M3 با ارزیابی کیفیت سیگنال‌ها
- session_only: اگر True، فقط کندل‌های داخل سشن بررسی می‌شوند
- session_start/session_end: ساعت‌های سشن (مثلاً 11 تا 19)
- tz_offset_minutes: اگر دیتات UTC است، 210 بده
- tp_lines/sl_lines/horizon_candles: برای ارزیابی win/loss
"""
print(f"\n{'=' * 70}")
print("🧪 BACKTEST MODE (DB)")
print(f"{'=' * 70}")
self._reset_for_backtest()
loaded = self.load_from_database()
if not loaded:
raise RuntimeError("لود دیتابیس ناموفق بود.")
end_time = self.get_latest_m3_time_from_db()
if end_time is None:
raise RuntimeError("هیچ کندل M3 در دیتابیس نیست.")
start_bt = end_time - pd.Timedelta(days=int(last_days))
start_seed = start_bt - pd.Timedelta(days=int(seed_days))
candles = self.load_m3_candles_range_from_db(start_seed, end_time)
if not candles:
raise RuntimeError("هیچ کندلی برای بک‌تست پیدا نشد.")
print(f"TF={self._signal_timeframe()} | backtest_days={last_days} | seed_days={seed_days}")
print(f"candles loaded: {len(candles):,}")
print(f"range: {candles[0]['time']} -> {candles[-1]['time']}")
print(f"session_only={session_only} | session={session_start}-{session_end} | tz_offset_minutes={tz_offset_minutes}")
# ✅ لیست کندل‌های واقعاً استفاده‌شده (داخل سشن)
candles_used = []
# m3 را از اول بسازیم
self.m3 = pd.DataFrame()
self._new_candle_count = 0
signals_total = 0
long_count = 0
short_count = 0
for candle in candles:
t0 = pd.Timestamp(candle["time"])
o = float(candle["open"])
h = float(candle["high"])
l = float(candle["low"])
c = float(candle["close"])
# فقط سشن (اختیاری)
if session_only:
if not self._in_session(t0, start_hour=session_start, end_hour=session_end, tz_offset_minutes=tz_offset_minutes):
continue
# ✅ این کندل واقعاً در بک‌تست استفاده شد
candles_used.append(candle)
# کندل را وارد m3 کن
if self.m3 is None or not isinstance(self.m3, pd.DataFrame) or self.m3.empty:
self.m3 = pd.DataFrame([{"open": o, "high": h, "low": l, "close": c}], index=[t0])
else:
if t0 in self.m3.index:
continue
new_row = pd.DataFrame([{"open": o, "high": h, "low": l, "close": c}], index=[t0])
self.m3 = pd.concat([self.m3, new_row])
if len(self.m3) > 500:
self.m3 = self.m3.iloc[-500:]
self._new_candle_count += 1
self.MARKET_PRICE = o
self.LAST_KNOWN_PRICE = o
self.LAST_CANDLE_TIME = t0
self._backtest_asof_time = t0 # برای daily_trend_score(asof)
# اندیکاتورها
if len(self.m3) >= 50:
self.update_m3_indicators()
# رژیم بازار
try:
update_interval = int(self.REGIME_SETTINGS.get("update_interval", 5) or 5)
if update_interval < 1:
update_interval = 5
if self._new_candle_count % update_interval == 0:
self._update_market_regime()
except Exception:
pass
# فقط در بازه بک‌تست سیگنال بگیر
if t0 < start_bt:
continue
signal = self.check_entry_signal(o)
if signal in ("long", "short"):
signals_total += 1
if signal == "long":
long_count += 1
else:
short_count += 1
self.backtest_signals.append({
"time": t0.isoformat(),
"signal": signal,
"entry_price": float(o),
"close_prev": float(self.m3.iloc[-2]["close"]) if len(self.m3) >= 2 else None,
"market_regime": str(getattr(self, "market_regime", "unknown")),
"regime_confidence": float(getattr(self, "regime_confidence", 0) or 0),
})
if print_signal_logs:
print(f"✅ SIGNAL: {t0.isoformat()} {signal.upper()} @ {o}")
# ✅ مهم: ارزیابی را روی کندل‌های استفاده‌شده انجام بده
self.backtest_candles = candles_used
eval_report = self.evaluate_signals_barrier(
signals=self.backtest_signals,
candles=self.backtest_candles,
tp_lines=tp_lines,
sl_lines=sl_lines,
horizon_candles=horizon_candles
)
report = {
"last_days": int(last_days),
"seed_days": int(seed_days),
# ✅ تعداد کندل‌های لود شده از DB
"candles_loaded": len(candles),
# ✅ تعداد کندل‌هایی که واقعاً در بک‌تست استفاده شدند (داخل سشن)
"candles_used_in_session": len(candles_used) if session_only else len(candles),
"signals_total": int(signals_total),
"signals_long": int(long_count),
"signals_short": int(short_count),
"final_regime": str(getattr(self, "market_regime", "unknown")),
"final_regime_confidence": float(getattr(self, "regime_confidence", 0) or 0),
"session_only": bool(session_only),
"session_start": int(session_start),
"session_end": int(session_end),
"tz_offset_minutes": int(tz_offset_minutes),
"evaluation": eval_report,
}
out = {"report": report, "signals": self.backtest_signals}
with open("backtest_signals.json", "w", encoding="utf-8") as f:
json.dump(out, f, ensure_ascii=False, indent=2)
print(f"\n{'=' * 70}")
print("✅ BACKTEST SUMMARY")
print(f"{'=' * 70}")
print(json.dumps(report, ensure_ascii=False, indent=2))
print(f"{'=' * 70}\n")
return out
# --- اجرای اصلی ---
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="HivaGold MAZANEH Signal Bot")
parser.add_argument("--backtest", action="store_true", help="Run backtest using DB candles instead of realtime websocket")
parser.add_argument("--backtest-days", type=int, default=30, help="How many last days to backtest (default: 30)")
parser.add_argument("--seed-days", type=int, default=5, help="How many days before backtest start to warm indicators (default: 5)")
parser.add_argument("--no-ws-server", action="store_true", help="Do not start internal WS server")
parser.add_argument("--reload-data", action="store_true", help="Redownload and rebuild DB before running")
args = parser.parse_args()
try:
print("\n🚀 راه‌اندازی ربات معامله‌گر...")
print("=" * 50)
print("📡 این ربات یک WebSocket Server داخلی راه‌اندازی می‌کند")
print(" کلاینت‌ها می‌توانند مستقیماً به آن متصل شوند")
print("=" * 50)
bot = HivaGoldManager()
# اگر لازم داری همیشه سیگنال‌محور باشد:
bot.SIGNAL_ONLY_MODE = True
# WS Server داخلی (اختیاری)
if not args.no_ws_server:
if not bot.start_websocket_server():
print("❌ خطا در راه‌اندازی WebSocket Server داخلی")
raise SystemExit(1)
# داده‌ها
if args.reload_data:
# دانلود + ساخت DB
if not bot.load_historical_data():
print("❌ خطا در بارگذاری/دانلود داده‌های تاریخی")
raise SystemExit(1)
else:
# فقط از DB موجود بخوان
loaded = bot.load_from_database()
if not loaded:
# اگر DB خراب/خالی بود، دانلود کن
if not bot.load_historical_data():
print("❌ خطا در بارگذاری/دانلود داده‌های تاریخی")
raise SystemExit(1)
# ✅ حالت بک‌تست
if args.backtest:
bot.run_backtest_from_db(
last_days=args.backtest_days,
seed_days=args.seed_days,
session_only=True, # فقط داخل سشن بازار
session_start=11,
session_end=19,
tz_offset_minutes=0, # اگر زمان دیتات UTC بود، بگذار 210
tp_lines=10, # معیار ارزیابی درست/غلط
sl_lines=8,
horizon_candles=10
)
else:
bot.run()
except KeyboardInterrupt:
print("\n\n👋 ربات توسط کاربر متوقف شد.")
except Exception as e:
print(f"\n❌ خطای غیرمنتظره: {e}")
import traceback
traceback.print_exc()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment