Skip to content

Instantly share code, notes, and snippets.

@rollendxavier
Last active January 10, 2026 07:47
Show Gist options
  • Select an option

  • Save rollendxavier/b3aaf73f1a8215ece6e6ea3d9bacaf79 to your computer and use it in GitHub Desktop.

Select an option

Save rollendxavier/b3aaf73f1a8215ece6e6ea3d9bacaf79 to your computer and use it in GitHub Desktop.
Agentic Trading CG
import os
import json
import time
import asyncio
import requests
import pandas as pd
from datetime import datetime, timedelta
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# =============================================================================
# Configuration
# =============================================================================
CG_API_KEY = os.getenv("CG_PRO_API_KEY") or os.getenv("CG_DEMO_API_KEY")
CG_BASE_URL = "https://pro-api.coingecko.com/api/v3/onchain" # Pro API for paid features
CG_DEMO_URL = "https://api.coingecko.com/api/v3/onchain" # Demo API for free endpoints
# Use Pro API if Pro key available, otherwise Demo
if os.getenv("CG_PRO_API_KEY"):
BASE_URL = CG_BASE_URL
API_KEY = os.getenv("CG_PRO_API_KEY")
print("✓ Using CoinGecko Pro API")
else:
BASE_URL = CG_DEMO_URL
API_KEY = os.getenv("CG_DEMO_API_KEY")
print("✓ Using CoinGecko Demo API")
# Blockscout API for transaction verification
BLOCKSCOUT_BASE_URL = "https://eth.blockscout.com/api/v2"
# Headers for CoinGecko API
HEADERS = {
"accept": "application/json",
"x-cg-pro-api-key": API_KEY
}
# =============================================================================
# Helper Functions
# =============================================================================
def cg_request(endpoint: str, params: dict = None) -> dict:
"""Make a request to CoinGecko API with error handling and rate limiting."""
url = f"{BASE_URL}{endpoint}"
for attempt in range(3):
try:
response = requests.get(url, headers=HEADERS, params=params, timeout=30)
if response.status_code == 429:
print(f"⚠ Rate limited. Waiting {2 ** attempt} seconds...")
time.sleep(2 ** attempt)
continue
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt == 2:
print(f"✗ API request failed: {e}")
raise
time.sleep(1)
return {}
def get_trending_pools(network: str = "eth", limit: int = 10) -> pd.DataFrame:
    Fetch trending pools on a specific network.
    Args:
        network: Network ID (e.g. 'base', 'eth')
        limit: Maximum number of pools to return
    Returns:
        DataFrame with trending pool data
    print(f"Fetching trending pools on {network}...")
    endpoint = f"/networks/{network}/trending_pools"
    params = {"include": "base_token,quote_token"}
    data = cg_request(endpoint, params)
    pools = data.get("data", [])[:limit]   
    pool_data = []
    for pool in pools:
        attr = pool.get("attributes", {})
        relationships = pool.get("relationships", {})     
        pool_data.append({
            "pool_address": attr.get("address", ""),
            "pool_name": attr.get("name", "Unknown"),
            "dex": relationships.get("dex", {}).get("data", {}).get("id", "unknown"),
            "price_usd": float(attr.get("base_token_price_usd", 0) or 0),
            "volume_24h": float(attr.get("volume_usd", {}).get("h24", 0) or 0),
            "liquidity_usd": float(attr.get("reserve_in_usd", 0) or 0),
            "price_change_24h": float(attr.get("price_change_percentage", {}).get("h24", 0) or 0),
        })
    return pd.DataFrame(pool_data)
def get_pools_megafilter(
    network: str = "eth",
    min_volume_24h: float = 100000,
    max_liquidity: float = 1000000,
    min_transactions: int = 100,
    limit: int = 10
) -> pd.DataFrame:
    endpoint = "/pools/megafilter"
    params = {
        "networks": network,
        "volume_24h_usd_min": min_volume_24h,
        "reserve_usd_max": max_liquidity,
        "tx_24h_count_min": min_transactions,
        "order": "volume_usd_h24_desc",
        "page": 1,
        "limit": limit
    }
    data = cg_request(endpoint, params)
    pools = data.get("data", [])
    # Process and return pool data...
    return pd.DataFrame(pool_data)
def get_pool_trades(network: str, pool_address: str, limit: int = 300) -> pd.DataFrame:
    Fetch recent trades from a specific pool.
    Args:
        network: Network ID
        pool_address: Pool contract address
        limit: Number of trades to fetch (max 300 per call)
    Returns:
        DataFrame with trade data
    endpoint = f"/networks/{network}/pools/{pool_address}/trades"
    params = {"limit": min(limit, 300)}
    data = cg_request(endpoint, params)
    trades = data.get("data", [])
    trade_data = []
    for trade in trades:
        attr = trade.get("attributes", {})  
        trade_data.append({
            "tx_hash": attr.get("tx_hash", ""),
            "block_timestamp": attr.get("block_timestamp", ""),
            "tx_from_address": attr.get("tx_from_address", ""),
            "kind": attr.get("kind", ""),  # 'buy' or 'sell'
            "volume_usd": float(attr.get("volume_in_usd", 0) or 0),
            "from_token_amount": float(attr.get("from_token_amount", 0) or 0),
            "to_token_amount": float(attr.get("to_token_amount", 0) or 0),
            "price_from_in_usd": float(attr.get("price_from_in_usd", 0) or 0),
            "price_to_in_usd": float(attr.get("price_to_in_usd", 0) or 0),
        })
    return pd.DataFrame(trade_data)
def analyze_wallet_profitability(trades_df: pd.DataFrame, min_trades: int = 3) -> pd.DataFrame:
    Analyze wallet profitability based on trade history.
    Calculates realized PnL for each wallet.
     if trades_df.empty:
        return pd.DataFrame()
    wallet_stats = []
    for wallet, group in trades_df.groupby("tx_from_address"):
        if len(group) < min_trades:
            continue
        buys = group[group["kind"] == "buy"]
        sells = group[group["kind"] == "sell"]      
        total_bought = buys["volume_usd"].sum()
        total_sold = sells["volume_usd"].sum()        
        # Simple realized PnL calculation
        realized_pnl = total_sold - total_bought        
        wallet_stats.append({
            "wallet_address": wallet,
            "total_trades": len(group),
            "buys": len(buys),
            "sells": len(sells),
            "total_bought_usd": total_bought,
            "total_sold_usd": total_sold,
            "realized_pnl_usd": realized_pnl,
            "avg_trade_size_usd": group["volume_usd"].mean(),
        })
    df = pd.DataFrame(wallet_stats)
    return df.sort_values("realized_pnl_usd", ascending=False)
import websockets
async def subscribe_to_pool_trades(
    pool_address: str,
    network: str = "eth",
    callback=None,
    duration_seconds: int = 60
):
    # Subscribe to real-time trades for a pool using CoinGecko WebSocket. #
    Args:
        pool_address: Pool address to monitor
        network: Network ID
        callback: Function to call when trade is detected
        duration_seconds: How long to listen
    ws_url = f"wss://ws-onchain.coingecko.com/onchain/v1/subscribe/trades"
    print(f"Connecting to CoinGecko WebSocket...")
    print(f"Monitoring pool: {pool_address[:10]}...")
    async with websockets.connect(ws_url) as websocket:
        # Subscribe message
        subscribe_msg = {
            "action": "subscribe",
            "params": {
                "pool_addresses": [pool_address],
                "network": network
            }
            "api_key": API_KEY
        }    
        await websocket.send(json.dumps(subscribe_msg))
        print("Subscribed to trade stream")
        
        # Listen for trades
        start_time = time.time()
        while time.time() - start_time < duration_seconds:
            try:
                message = await asyncio.wait_for(
                    websocket.recv(),
                    timeout=5.0
                )           
                data = json.loads(message)       
                if data.get("type") == "trade":
                    print(f"TRADE DETECTED!")
                    print(f"  TX Hash: {data.get('tx_hash', 'N/A')}")
                    print(f"  Type: {data.get('kind', 'N/A')}")
                    print(f"  Volume: ${data.get('volume_usd', 0):,.2f}")              
                    if callback:
                        await callback(data)            
            except asyncio.TimeoutError:
                pass  # No message, continue listening
BLOCKSCOUT_BASE_URL = "https://eth.blockscout.com/api/v2"
def verify_transaction(tx_hash: str) -> dict:
    """
    Verify transaction details using Blockscout API.
    """
    url = f"{BLOCKSCOUT_BASE_URL}/transactions/{tx_hash}"
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        data = response.json()        
        return {
            "tx_hash": tx_hash,
            "from_address": data.get("from", {}).get("hash", ""),
            "to_address": data.get("to", {}).get("hash", ""),
            "value": data.get("value", "0"),
            "status": data.get("status", ""),
        }        
    except Exception as e:
        print(f"Error verifying transaction: {e}")
        return {}
class PaperTradingEngine:
    """
    Simulated trading engine for copy trading.
    Implements proportional allocation for position sizing.
    """
    def __init__(self, initial_equity: float = 10000.0, max_position_pct: float = 0.1):
        self.equity = initial_equity
        self.max_position_pct = max_position_pct
        self.positions = {}
        self.trades = []
        self.target_wallets = set()
    def add_target_wallet(self, wallet_address: str):
       
"""Add a wallet to the copy trade target list."""
        self.target_wallets.add(wallet_address.lower())
    def calculate_position_size(self, leader_trade_usd: float) -> float:
        """
        Calculate proportional position size.
        Instead of mirroring exact amounts, calculate based on equity.
        """
        max_size = self.equity * self.max_position_pct
        position_size = min(max_size, leader_trade_usd * 0.1)
        return round(position_size, 2)
    def check_slippage(
        self,
        current_price: float,
        entry_price: float,
        max_slippage_pct: float = 2.0
    ) -> bool:
        """
        Check if current price is within acceptable slippage.
        Prevents buying at peaks if the target already pumped the price
        """
        if entry_price == 0:
            return False
        slippage = abs(current_price - entry_price) / entry_price * 100
        return slippage <= max_slippage_pct
    async def execute_copy_trade(self, trade_data: dict):
        """
        Execute a copy trade based on detected trade.
        """
        tx_hash = trade_data.get("tx_hash", "")
        if not tx_hash:
            return
        # Verify transaction
        tx_info = verify_transaction(tx_hash)
        if not tx_info:
            return
        from_address = tx_info.get("from_address", "").lower()   
        # Check if this is from a target wallet
        if from_address not in self.target_wallets:
            return  # Not a target wallet
        print(f"TARGET WALLET TRADE DETECTED!")
        # Calculate position size
        leader_volume = float(trade_data.get("volume_usd", 0))
        position_size = self.calculate_position_size(leader_volume)
        trade_kind = trade_data.get("kind", "unknown")
        token_symbol = trade_data.get("base_token_symbol", "TOKEN")
        current_price = float(trade_data.get("price_usd", 0))
        # Execute paper trade
        trade_record = {
            "timestamp": datetime.now().isoformat(),
            "tx_hash": tx_hash,
            "copied_wallet": from_address,
            "action": trade_kind,
            "token": token_symbol,
            "amount_usd": position_size,
            "price": current_price,
            "status": "EXECUTED (PAPER)"
        }
        self.trades.append(trade_record)
        print(f"PAPER TRADE EXECUTED:")
        print(f"  Action: {trade_kind.upper()}")
        print(f"  Token: {token_symbol}")
        print(f"  Amount: ${position_size:,.2f}")
async def run_copy_trading_bot():
    # Step 1: Find trending pools
    trending_pools = get_trending_pools(network="eth", limit=10)
    target_pool = trending_pools.iloc[0]["pool_address"
    # Step 2: Find profitable wallets
    trades_df = get_pool_trades("eth", target_pool)
    wallets_df = analyze_wallet_profitability(trades_df)
    profitable_wallets = wallets_df[wallets_df["realized_pnl_usd"] >= 100]["wallet_address"].tolist()
    # Step 3: Initialize paper trading engine
    trader = PaperTradingEngine(initial_equity=10000.0, max_position_pct=0.05)
    for wallet in profitable_wallets[:3]:
        trader.add_target_wallet(wallet)
    # Step 4: Start real-time monitoring
    await subscribe_to_pool_trades(
        pool_address=target_pool,
        network="eth",
        callback=trader.execute_copy_trade,
        duration_seconds=300
    )
    # Print summary
    trader.get_summary()
# Run the bot
asyncio.run(run_copy_trading_bot())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment