Created
June 30, 2025 12:53
-
-
Save allenhark/1b63d50ddc870f52c29f848ed54fce48 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import chalk from "chalk"; | |
| import { extractPumpFunTokenDetails } from "./ExtractToken2"; | |
| import PQueue from 'p-queue'; | |
| // Import required modules | |
| const { Kafka } = require('kafkajs'); | |
| const bs58 = require('bs58'); | |
| const { loadProto } = require('bitquery-protobuf-schema'); | |
| const { CompressionTypes, CompressionCodecs } = require('kafkajs'); | |
| const LZ4 = require('kafkajs-lz4'); | |
| const { v4: uuidv4 } = require('uuid'); | |
| //const config = require('./config.json'); //credentials imported from JSON file | |
| // Enable LZ4 compression | |
| CompressionCodecs[CompressionTypes.LZ4] = new LZ4().codec; | |
| // Configuration | |
| const username = 'xx'; //credentials imported from JSON file | |
| const password = 'xx'; | |
| const topic = 'solana.transactions.proto'; | |
| const pumpFunProgram = '6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P'; | |
| const id = uuidv4(); | |
| const queue = new PQueue({ concurrency: 4 }); | |
| // Initialize Kafka Client (Non-SSL) | |
| const kafka = new Kafka({ | |
| clientId: username, | |
| brokers: ['rpk0.bitquery.io:9092', 'rpk1.bitquery.io:9092', 'rpk2.bitquery.io:9092'], | |
| sasl: { | |
| mechanism: 'scram-sha-512', | |
| username: username, | |
| password: password, | |
| }, | |
| }); | |
| // Function to convert bytes to base58 | |
| const convertBytes = (buffer, encoding = 'base58') => { | |
| if (encoding === 'base58') { | |
| return bs58.default.encode(buffer); | |
| } | |
| return buffer.toString('hex'); | |
| } | |
| // Recursive function to convert Protobuf messages to JSON | |
| const protobufToJson = (msg, encoding = 'base58') => { | |
| const result = {}; | |
| for (const [key, value] of Object.entries(msg)) { | |
| if (Array.isArray(value)) { | |
| result[key] = value.map((item, idx) => { | |
| if (typeof item === 'object' && item !== null) { | |
| return protobufToJson(item, encoding); | |
| } else { | |
| return item; | |
| } | |
| }); | |
| } else if (value && typeof value === 'object' && Buffer.isBuffer(value)) { | |
| result[key] = convertBytes(value, encoding); | |
| } else if (value && typeof value === 'object') { | |
| result[key] = protobufToJson(value, encoding); | |
| } else { | |
| result[key] = value; | |
| } | |
| } | |
| return result; | |
| } | |
| // Initialize consumer with more aggressive settings | |
| const consumer = kafka.consumer({ | |
| groupId: `${username}-${id}`, | |
| // More aggressive timeouts | |
| sessionTimeout: 60000, // 60 seconds | |
| heartbeatInterval: 5000, // 5 seconds | |
| allowAutoTopicCreation: false, | |
| // More aggressive fetch settings | |
| maxWaitTimeInMs: 50, // Reduce wait time further | |
| maxBytes: 2097152, // 2MB max message size | |
| // Increase fetch size for better throughput | |
| fetchMaxBytes: 1048576, // 1MB fetch size | |
| fetchMinBytes: 1, | |
| }); | |
| // Run the consumer | |
| const run = async () => { | |
| try { | |
| const ParsedIdlBlockMessage = await loadProto(topic); | |
| await consumer.connect(); | |
| await consumer.subscribe({ | |
| topic, | |
| fromBeginning: false | |
| }); | |
| await consumer.run({ | |
| // More aggressive auto-commit | |
| autoCommit: true, | |
| autoCommitInterval: 500, // Commit every 500ms | |
| autoCommitThreshold: 50, // Or commit after 50 messages | |
| // Increase concurrency | |
| partitionsConsumedConcurrently: 5, // Process more partitions concurrently | |
| eachMessage: async ({ partition, message }) => { | |
| // Process immediately without setImmediate for better performance | |
| // processToken(ParsedIdlBlockMessage, message); | |
| queue.add(() => processToken(ParsedIdlBlockMessage, message)); | |
| }, | |
| }); | |
| } catch (error) { | |
| console.error('Error running consumer:', error); | |
| } | |
| } | |
| async function processToken(ParsedIdlBlockMessage, message: any) { | |
| try { | |
| const buffer = message.value; | |
| const decoded = ParsedIdlBlockMessage.decode(buffer); | |
| const msgObj = ParsedIdlBlockMessage.toObject(decoded, { bytes: Buffer }); | |
| const jsonOutput = protobufToJson(msgObj) as any; | |
| const transactions = (jsonOutput as any).Transactions || []; | |
| // Only process if there are transactions | |
| if (transactions.length === 0) return; | |
| // Log timing only occasionally to reduce overhead | |
| if (Math.random() < 0.01) { // Log 1% of messages | |
| if (jsonOutput?.Header?.Timestamp) { | |
| const now = Date.now(); | |
| const creationTimeMs = jsonOutput.Header.Timestamp.low * 1000; | |
| const creationDiff = now - creationTimeMs; | |
| console.log(`🕒 Lag: ${creationDiff}ms`); | |
| // Alert if lag is too high | |
| if (creationDiff > 30000) { // 30 seconds | |
| console.warn(`⚠️ High lag detected: ${creationDiff}ms`); | |
| } | |
| } | |
| } | |
| // Early return if no transactions to process | |
| if (transactions.length === 0) return; | |
| // Process transactions more efficiently | |
| const filteredTxs = transactions.filter(tx => | |
| Array.isArray(tx.ParsedIdlInstructions) && | |
| tx.TotalBalanceUpdates.length > 0 && | |
| tx.Status?.Success === true | |
| ); | |
| // Process pump transactions | |
| for (const tx of filteredTxs) { | |
| const hasPumpProgram = tx?.Header?.Accounts?.some(account => | |
| account.Address === pumpFunProgram | |
| ); | |
| if (!hasPumpProgram) continue; | |
| const pumpCreateInstructions = tx.ParsedIdlInstructions.filter(inst => | |
| inst?.Program?.Name === 'pump' && | |
| inst?.Program?.Method === 'create' | |
| ); | |
| if (pumpCreateInstructions.length > 0) { | |
| console.log(chalk.cyan('🚀 Pump.fun token creation detected')); | |
| const meta = extractPumpFunTokenDetails(tx); | |
| console.log(meta); | |
| //console.log(JSON.stringify(tx)) | |
| } | |
| } | |
| } catch (err) { | |
| console.error('Error processing message:', err); | |
| } | |
| } | |
| // Start the consumer | |
| run().catch(console.error); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment