Skip to content

Instantly share code, notes, and snippets.

@wzyy2
Created November 12, 2025 05:44
Show Gist options
  • Select an option

  • Save wzyy2/270318d28b71c53849cc039a766db17a to your computer and use it in GitHub Desktop.

Select an option

Save wzyy2/270318d28b71c53849cc039a766db17a to your computer and use it in GitHub Desktop.
def process_trade(trade: Dict[str, Any], user_address: str = None):
"""处理单个交易"""
try:
transaction_hash = trade.get('transactionHash', '')
if transaction_hash in processed_trades:
return
processed_trades.add(transaction_hash)
slug = trade.get('slug', '')
user_name = trade.get('name', '')
# 优先使用传入的用户地址,如果没有则从trade数据中获取
if user_address is None:
user_address = trade.get('user', '')
size = float(trade.get('size', 0))
price = float(trade.get('price', 0))
side = trade.get('side', '')
outcome_index = trade.get('outcomeIndex', 0)
# 获取该用户的订单大小配置
user_config = COPY_TRADE_CONFIG['users'].get(user_address)
if not user_config:
logger.warning(f"未找到用户配置: {user_address}")
return
max_order_size = user_config.get('order_size', 10) * random.uniform(1, 2.5)
keyword = user_config.get('keyword', None)
keyword_blacklist = user_config.get('keyword_blacklist', None)
# 检查keyword过滤
if keyword:
if keyword.lower() not in slug.lower():
return
logger.info(f"用户 {user_name} keyword匹配: {keyword}")
# 检查keyword_blacklist过滤
if keyword_blacklist:
if keyword_blacklist.lower() in slug.lower():
logger.info(f"用户 {user_name} 黑名单匹配,跳过: {keyword_blacklist}")
return
logger.info(
f"检测到用户 {user_name} 交易: {slug}, 大小={size}, 价格={price}, 方向={side}, outcome={outcome_index}")
pass
def check_user_activity(user_address: str, user_name: str, user_config: Dict[str, Any],
last_timestamp: int, first_trade_processed: bool) -> tuple[int, bool]:
"""
检查单个用户的活动
返回: (新的last_timestamp, 新的first_trade_processed状态)
"""
try:
# 获取用户的 API 类型,默认为 activity
api_type = user_config.get('api_type', 'activity')
# 获取用户最新活动
activities = PolymarketAPI.get().get_user_activity(
user_address, api_type, limit=1)
if activities:
# 按时间戳排序,获取最新的交易
activities.sort(key=lambda x: x.get('timestamp', 0), reverse=True)
latest_activity = activities[0]
current_timestamp = latest_activity.get('timestamp', 0)
# 如果是新的交易,处理它
if current_timestamp > last_timestamp:
# 如果是第一次交易,跳过处理
if not first_trade_processed:
logger.info(f"用户 {user_name} 第一次交易,跳过处理")
# 将第一次交易标记为已处理,避免重复处理
transaction_hash = latest_activity.get('transactionHash', '')
if transaction_hash:
processed_trades.add(transaction_hash)
return current_timestamp, True
logger.info(f"检测到用户 {user_name} 新交易 (API: {api_type})")
process_trade(latest_activity, user_address)
return current_timestamp, first_trade_processed
except Exception as e:
logger.error(f"监控用户 {user_name} 异常: {str(e)}")
return last_timestamp, first_trade_processed
def get_user_trades(self, user_address: str, limit: int = 1) -> List[Dict[str, Any]]:
"""通过 trades API 获取用户交易活动
Args:
user_address: 用户地址
limit: 返回记录数量限制
Returns:
交易记录列表
"""
try:
url = "https://data-api.polymarket.com/trades"
params = {
"user": user_address,
"limit": limit,
"takerOnly": True # 只获取taker订单
}
resp = requests.get(url, params=params, timeout=10)
if resp.status_code == 200:
trades = resp.json()
# 新API直接返回交易数据,转换为统一格式
converted_trades = []
for trade in trades:
# 将新API格式转换为旧API格式,保持向后兼容
converted_trade = {
'proxyWallet': trade.get('proxyWallet', ''),
'timestamp': trade.get('timestamp', 0),
'conditionId': trade.get('conditionId', ''),
'type': 'TRADE', # 新API都是交易类型
'size': trade.get('size', 0),
'usdcSize': trade.get('size', 0) * trade.get('price', 0), # 根据size和price计算
'transactionHash': trade.get('transactionHash', ''),
'price': trade.get('price', 0),
'asset': trade.get('asset', ''),
'side': trade.get('side', ''),
'outcomeIndex': trade.get('outcomeIndex', 0),
'title': trade.get('title', ''),
'slug': trade.get('slug', ''),
'icon': trade.get('icon', ''),
'eventSlug': trade.get('eventSlug', ''),
'outcome': trade.get('outcome', ''),
'name': trade.get('name', ''),
'pseudonym': trade.get('pseudonym', ''),
'bio': trade.get('bio', ''),
'profileImage': trade.get('profileImage', ''),
'profileImageOptimized': trade.get('profileImageOptimized', '')
}
converted_trades.append(converted_trade)
return converted_trades
else:
logger.warning(f"获取用户交易失败: {resp.status_code}")
return []
except Exception as e:
logger.error(f"获取用户交易异常: {str(e)}")
return []
def get_user_activity_old(self, user_address: str, limit: int = 1) -> List[Dict[str, Any]]:
"""通过 activity API 获取用户活动
Args:
user_address: 用户地址
limit: 返回记录数量限制
Returns:
活动记录列表
"""
try:
url = "https://data-api.polymarket.com/activity"
params = {"user": user_address, "limit": limit}
resp = requests.get(url, params=params, timeout=10)
if resp.status_code == 200:
activities = resp.json()
# 只返回TRADE类型的活动
trades = [activity for activity in activities if activity.get('type') == 'TRADE']
return trades
else:
logger.warning(f"获取用户活动失败: {resp.status_code}")
return []
except Exception as e:
logger.error(f"获取用户活动异常: {str(e)}")
return []
def get_user_activity(self, user_address: str, api_type: str = "trades", limit: int = 1) -> List[Dict[str, Any]]:
"""获取用户活动 - 支持 trades 和 activity 两种 API
Args:
user_address: 用户地址
api_type: API类型,"trades" 或 "activity"
limit: 返回记录数量限制
Returns:
活动记录列表
"""
if api_type == "activity":
return self.get_user_activity_old(user_address, limit)
else:
# 默认使用 trades API
return self.get_user_trades(user_address, limit)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment