Skip to content

Instantly share code, notes, and snippets.

@allenhark
Created June 30, 2025 12:53
Show Gist options
  • Select an option

  • Save allenhark/1b63d50ddc870f52c29f848ed54fce48 to your computer and use it in GitHub Desktop.

Select an option

Save allenhark/1b63d50ddc870f52c29f848ed54fce48 to your computer and use it in GitHub Desktop.
import chalk from "chalk";
import { extractPumpFunTokenDetails } from "./ExtractToken2";
import PQueue from 'p-queue';
// Import required modules
const { Kafka } = require('kafkajs');
const bs58 = require('bs58');
const { loadProto } = require('bitquery-protobuf-schema');
const { CompressionTypes, CompressionCodecs } = require('kafkajs');
const LZ4 = require('kafkajs-lz4');
const { v4: uuidv4 } = require('uuid');
//const config = require('./config.json'); //credentials imported from JSON file
// Enable LZ4 compression
CompressionCodecs[CompressionTypes.LZ4] = new LZ4().codec;
// Configuration
const username = 'xx'; //credentials imported from JSON file
const password = 'xx';
const topic = 'solana.transactions.proto';
const pumpFunProgram = '6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P';
const id = uuidv4();
const queue = new PQueue({ concurrency: 4 });
// Initialize Kafka Client (Non-SSL)
const kafka = new Kafka({
clientId: username,
brokers: ['rpk0.bitquery.io:9092', 'rpk1.bitquery.io:9092', 'rpk2.bitquery.io:9092'],
sasl: {
mechanism: 'scram-sha-512',
username: username,
password: password,
},
});
// Function to convert bytes to base58
const convertBytes = (buffer, encoding = 'base58') => {
if (encoding === 'base58') {
return bs58.default.encode(buffer);
}
return buffer.toString('hex');
}
// Recursive function to convert Protobuf messages to JSON
const protobufToJson = (msg, encoding = 'base58') => {
const result = {};
for (const [key, value] of Object.entries(msg)) {
if (Array.isArray(value)) {
result[key] = value.map((item, idx) => {
if (typeof item === 'object' && item !== null) {
return protobufToJson(item, encoding);
} else {
return item;
}
});
} else if (value && typeof value === 'object' && Buffer.isBuffer(value)) {
result[key] = convertBytes(value, encoding);
} else if (value && typeof value === 'object') {
result[key] = protobufToJson(value, encoding);
} else {
result[key] = value;
}
}
return result;
}
// Initialize consumer with more aggressive settings
const consumer = kafka.consumer({
groupId: `${username}-${id}`,
// More aggressive timeouts
sessionTimeout: 60000, // 60 seconds
heartbeatInterval: 5000, // 5 seconds
allowAutoTopicCreation: false,
// More aggressive fetch settings
maxWaitTimeInMs: 50, // Reduce wait time further
maxBytes: 2097152, // 2MB max message size
// Increase fetch size for better throughput
fetchMaxBytes: 1048576, // 1MB fetch size
fetchMinBytes: 1,
});
// Run the consumer
const run = async () => {
try {
const ParsedIdlBlockMessage = await loadProto(topic);
await consumer.connect();
await consumer.subscribe({
topic,
fromBeginning: false
});
await consumer.run({
// More aggressive auto-commit
autoCommit: true,
autoCommitInterval: 500, // Commit every 500ms
autoCommitThreshold: 50, // Or commit after 50 messages
// Increase concurrency
partitionsConsumedConcurrently: 5, // Process more partitions concurrently
eachMessage: async ({ partition, message }) => {
// Process immediately without setImmediate for better performance
// processToken(ParsedIdlBlockMessage, message);
queue.add(() => processToken(ParsedIdlBlockMessage, message));
},
});
} catch (error) {
console.error('Error running consumer:', error);
}
}
async function processToken(ParsedIdlBlockMessage, message: any) {
try {
const buffer = message.value;
const decoded = ParsedIdlBlockMessage.decode(buffer);
const msgObj = ParsedIdlBlockMessage.toObject(decoded, { bytes: Buffer });
const jsonOutput = protobufToJson(msgObj) as any;
const transactions = (jsonOutput as any).Transactions || [];
// Only process if there are transactions
if (transactions.length === 0) return;
// Log timing only occasionally to reduce overhead
if (Math.random() < 0.01) { // Log 1% of messages
if (jsonOutput?.Header?.Timestamp) {
const now = Date.now();
const creationTimeMs = jsonOutput.Header.Timestamp.low * 1000;
const creationDiff = now - creationTimeMs;
console.log(`🕒 Lag: ${creationDiff}ms`);
// Alert if lag is too high
if (creationDiff > 30000) { // 30 seconds
console.warn(`⚠️ High lag detected: ${creationDiff}ms`);
}
}
}
// Early return if no transactions to process
if (transactions.length === 0) return;
// Process transactions more efficiently
const filteredTxs = transactions.filter(tx =>
Array.isArray(tx.ParsedIdlInstructions) &&
tx.TotalBalanceUpdates.length > 0 &&
tx.Status?.Success === true
);
// Process pump transactions
for (const tx of filteredTxs) {
const hasPumpProgram = tx?.Header?.Accounts?.some(account =>
account.Address === pumpFunProgram
);
if (!hasPumpProgram) continue;
const pumpCreateInstructions = tx.ParsedIdlInstructions.filter(inst =>
inst?.Program?.Name === 'pump' &&
inst?.Program?.Method === 'create'
);
if (pumpCreateInstructions.length > 0) {
console.log(chalk.cyan('🚀 Pump.fun token creation detected'));
const meta = extractPumpFunTokenDetails(tx);
console.log(meta);
//console.log(JSON.stringify(tx))
}
}
} catch (err) {
console.error('Error processing message:', err);
}
}
// Start the consumer
run().catch(console.error);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment