Created
November 12, 2025 05:44
-
-
Save wzyy2/270318d28b71c53849cc039a766db17a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def process_trade(trade: Dict[str, Any], user_address: str = None): | |
| """处理单个交易""" | |
| try: | |
| transaction_hash = trade.get('transactionHash', '') | |
| if transaction_hash in processed_trades: | |
| return | |
| processed_trades.add(transaction_hash) | |
| slug = trade.get('slug', '') | |
| user_name = trade.get('name', '') | |
| # 优先使用传入的用户地址,如果没有则从trade数据中获取 | |
| if user_address is None: | |
| user_address = trade.get('user', '') | |
| size = float(trade.get('size', 0)) | |
| price = float(trade.get('price', 0)) | |
| side = trade.get('side', '') | |
| outcome_index = trade.get('outcomeIndex', 0) | |
| # 获取该用户的订单大小配置 | |
| user_config = COPY_TRADE_CONFIG['users'].get(user_address) | |
| if not user_config: | |
| logger.warning(f"未找到用户配置: {user_address}") | |
| return | |
| max_order_size = user_config.get('order_size', 10) * random.uniform(1, 2.5) | |
| keyword = user_config.get('keyword', None) | |
| keyword_blacklist = user_config.get('keyword_blacklist', None) | |
| # 检查keyword过滤 | |
| if keyword: | |
| if keyword.lower() not in slug.lower(): | |
| return | |
| logger.info(f"用户 {user_name} keyword匹配: {keyword}") | |
| # 检查keyword_blacklist过滤 | |
| if keyword_blacklist: | |
| if keyword_blacklist.lower() in slug.lower(): | |
| logger.info(f"用户 {user_name} 黑名单匹配,跳过: {keyword_blacklist}") | |
| return | |
| logger.info( | |
| f"检测到用户 {user_name} 交易: {slug}, 大小={size}, 价格={price}, 方向={side}, outcome={outcome_index}") | |
| pass | |
| def check_user_activity(user_address: str, user_name: str, user_config: Dict[str, Any], | |
| last_timestamp: int, first_trade_processed: bool) -> tuple[int, bool]: | |
| """ | |
| 检查单个用户的活动 | |
| 返回: (新的last_timestamp, 新的first_trade_processed状态) | |
| """ | |
| try: | |
| # 获取用户的 API 类型,默认为 activity | |
| api_type = user_config.get('api_type', 'activity') | |
| # 获取用户最新活动 | |
| activities = PolymarketAPI.get().get_user_activity( | |
| user_address, api_type, limit=1) | |
| if activities: | |
| # 按时间戳排序,获取最新的交易 | |
| activities.sort(key=lambda x: x.get('timestamp', 0), reverse=True) | |
| latest_activity = activities[0] | |
| current_timestamp = latest_activity.get('timestamp', 0) | |
| # 如果是新的交易,处理它 | |
| if current_timestamp > last_timestamp: | |
| # 如果是第一次交易,跳过处理 | |
| if not first_trade_processed: | |
| logger.info(f"用户 {user_name} 第一次交易,跳过处理") | |
| # 将第一次交易标记为已处理,避免重复处理 | |
| transaction_hash = latest_activity.get('transactionHash', '') | |
| if transaction_hash: | |
| processed_trades.add(transaction_hash) | |
| return current_timestamp, True | |
| logger.info(f"检测到用户 {user_name} 新交易 (API: {api_type})") | |
| process_trade(latest_activity, user_address) | |
| return current_timestamp, first_trade_processed | |
| except Exception as e: | |
| logger.error(f"监控用户 {user_name} 异常: {str(e)}") | |
| return last_timestamp, first_trade_processed | |
| def get_user_trades(self, user_address: str, limit: int = 1) -> List[Dict[str, Any]]: | |
| """通过 trades API 获取用户交易活动 | |
| Args: | |
| user_address: 用户地址 | |
| limit: 返回记录数量限制 | |
| Returns: | |
| 交易记录列表 | |
| """ | |
| try: | |
| url = "https://data-api.polymarket.com/trades" | |
| params = { | |
| "user": user_address, | |
| "limit": limit, | |
| "takerOnly": True # 只获取taker订单 | |
| } | |
| resp = requests.get(url, params=params, timeout=10) | |
| if resp.status_code == 200: | |
| trades = resp.json() | |
| # 新API直接返回交易数据,转换为统一格式 | |
| converted_trades = [] | |
| for trade in trades: | |
| # 将新API格式转换为旧API格式,保持向后兼容 | |
| converted_trade = { | |
| 'proxyWallet': trade.get('proxyWallet', ''), | |
| 'timestamp': trade.get('timestamp', 0), | |
| 'conditionId': trade.get('conditionId', ''), | |
| 'type': 'TRADE', # 新API都是交易类型 | |
| 'size': trade.get('size', 0), | |
| 'usdcSize': trade.get('size', 0) * trade.get('price', 0), # 根据size和price计算 | |
| 'transactionHash': trade.get('transactionHash', ''), | |
| 'price': trade.get('price', 0), | |
| 'asset': trade.get('asset', ''), | |
| 'side': trade.get('side', ''), | |
| 'outcomeIndex': trade.get('outcomeIndex', 0), | |
| 'title': trade.get('title', ''), | |
| 'slug': trade.get('slug', ''), | |
| 'icon': trade.get('icon', ''), | |
| 'eventSlug': trade.get('eventSlug', ''), | |
| 'outcome': trade.get('outcome', ''), | |
| 'name': trade.get('name', ''), | |
| 'pseudonym': trade.get('pseudonym', ''), | |
| 'bio': trade.get('bio', ''), | |
| 'profileImage': trade.get('profileImage', ''), | |
| 'profileImageOptimized': trade.get('profileImageOptimized', '') | |
| } | |
| converted_trades.append(converted_trade) | |
| return converted_trades | |
| else: | |
| logger.warning(f"获取用户交易失败: {resp.status_code}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"获取用户交易异常: {str(e)}") | |
| return [] | |
| def get_user_activity_old(self, user_address: str, limit: int = 1) -> List[Dict[str, Any]]: | |
| """通过 activity API 获取用户活动 | |
| Args: | |
| user_address: 用户地址 | |
| limit: 返回记录数量限制 | |
| Returns: | |
| 活动记录列表 | |
| """ | |
| try: | |
| url = "https://data-api.polymarket.com/activity" | |
| params = {"user": user_address, "limit": limit} | |
| resp = requests.get(url, params=params, timeout=10) | |
| if resp.status_code == 200: | |
| activities = resp.json() | |
| # 只返回TRADE类型的活动 | |
| trades = [activity for activity in activities if activity.get('type') == 'TRADE'] | |
| return trades | |
| else: | |
| logger.warning(f"获取用户活动失败: {resp.status_code}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"获取用户活动异常: {str(e)}") | |
| return [] | |
| def get_user_activity(self, user_address: str, api_type: str = "trades", limit: int = 1) -> List[Dict[str, Any]]: | |
| """获取用户活动 - 支持 trades 和 activity 两种 API | |
| Args: | |
| user_address: 用户地址 | |
| api_type: API类型,"trades" 或 "activity" | |
| limit: 返回记录数量限制 | |
| Returns: | |
| 活动记录列表 | |
| """ | |
| if api_type == "activity": | |
| return self.get_user_activity_old(user_address, limit) | |
| else: | |
| # 默认使用 trades API | |
| return self.get_user_trades(user_address, limit) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment