Skip to content

Instantly share code, notes, and snippets.

@allenhark
Last active June 27, 2025 23:02
Show Gist options
  • Select an option

  • Save allenhark/510d3c8e8448af0bb1e952fde3475fa0 to your computer and use it in GitHub Desktop.

Select an option

Save allenhark/510d3c8e8448af0bb1e952fde3475fa0 to your computer and use it in GitHub Desktop.
// Import required modules
import { Kafka, CompressionTypes, CompressionCodecs } from 'kafkajs';
import bs58 from 'bs58';
//@ts-ignore
import { loadProto } from 'bitquery-protobuf-schema';
import LZ4 from 'kafkajs-lz4';
import { v4 as uuidv4 } from 'uuid';
import chalk from 'chalk';
import { extractPumpFunTokenDetails } from './ExtractToken'
import { Connection, PublicKey, Finality, Keypair } from '@solana/web3.js';
import dotenv from 'dotenv';
import { PumpFunSDK } from "./pumpv3/pumpfunv2";
import NodeWallet from "@coral-xyz/anchor/dist/cjs/nodewallet";
import { AnchorProvider } from "@coral-xyz/anchor";
import connectionPool from './connectionpool';
// Types
import { Buffer } from 'buffer';
dotenv.config();
// Enable LZ4 compression
CompressionCodecs[CompressionTypes.LZ4] = new LZ4().codec;
const username = 'xx';
const password = 'xx';
const topic = 'solana.tokens.proto';
const pumpFunProgram = '6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P';
const id = uuidv4();
// Initialize wallet
const wallet = new NodeWallet(new Keypair());
//const connectionPool = new ConnectionPool(RPC_ENDPOINTS, rpcWsUrl);
let isProcessing = false;
// Initialize provider
const provider = new AnchorProvider(connectionPool.getConnection(), wallet, {
commitment: "processed",
});
// Initialize SDKs
const sdk = new PumpFunSDK(provider);
const kafka = new Kafka({
clientId: username,
brokers: ['rpk0.bitquery.io:9092', 'rpk1.bitquery.io:9092', 'rpk2.bitquery.io:9092'],
sasl: {
mechanism: 'scram-sha-512',
username,
password,
},
});
const convertBytes = (buffer: Buffer, encoding: 'base58' | 'hex' = 'base58'): string => {
return encoding === 'base58' ? bs58.encode(buffer) : buffer.toString('hex');
};
const convertProtobufToJson = (msg: any, encoding: 'base58' | 'hex' = 'base58'): any => {
if (Array.isArray(msg)) {
return msg.map(item => convertProtobufToJson(item, encoding));
} else if (msg && typeof msg === 'object' && !Buffer.isBuffer(msg)) {
const result: Record<string, any> = {};
for (const [key, value] of Object.entries(msg)) {
result[key] = convertProtobufToJson(value, encoding);
}
return result;
} else if (Buffer.isBuffer(msg)) {
return convertBytes(msg, encoding);
} else {
return msg;
}
};
const consumer = kafka.consumer({ groupId: `${username}-${id}` });
const run = async () => {
try {
const ParsedIdlBlockMessage = await loadProto(topic);
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: false });
await consumer.run({
autoCommit: false,
eachMessage: async ({ message }) => {
if (!message.value) return;
queueMicrotask(() => {
processMessage(ParsedIdlBlockMessage, message)
})
},
});
} catch (err) {
console.error('Kafka consumer startup failed:', err);
}
};
async function processMessage(ParsedIdlBlockMessage, message: any) {
try {
const decoded = ParsedIdlBlockMessage.decode(message.value);
const msgObj = ParsedIdlBlockMessage.toObject(decoded, { bytes: Buffer });
const jsonOutput = convertProtobufToJson(msgObj);
const transactions = jsonOutput.Transactions || [];
const filteredTxs = transactions.filter(tx =>
Array.isArray(tx.InstructionBalanceUpdates) &&
tx.InstructionBalanceUpdates.length > 0 &&
tx.Status?.Success === true
);
for (const tx of filteredTxs) {
const isPumpTx = tx.InstructionBalanceUpdates.some(inst =>
inst.Instruction?.Program?.Address === pumpFunProgram &&
inst.Instruction?.Program?.Method === 'create' &&
inst.Instruction?.Program?.Name === 'pump'
);
if (!isPumpTx) continue;
console.log(chalk.cyan('๐Ÿš€ Pump.fun token creation detected'));
const meta = extractPumpFunTokenDetails(tx);
// ๐Ÿš€ Fire-and-forget background task (non-blocking)
setImmediate(() => {
processToken(meta).catch((err) =>
console.error('Error in processToken:', err)
);
});
}
} catch (err) {
console.error('Error decoding or processing Kafka message:', err);
}
}
async function processToken(meta) {
console.log(meta)
let parsedTx = await connectionPool.getConnection().getParsedTransaction(meta.txSignature, {
commitment: "confirmed" as Finality,
maxSupportedTransactionVersion: 0
});
if (!parsedTx?.meta) {
await new Promise(resolve => setTimeout(resolve, 100));
parsedTx = await connectionPool.getConnection().getParsedTransaction(meta.txSignature, {
commitment: "confirmed" as Finality,
maxSupportedTransactionVersion: 0
});
};
if (!parsedTx?.meta) {
console.log('Meta missing')
return;
}
console.time('Calculate time difference');
// Calculate time difference
const blockTime = parsedTx.blockTime ? parsedTx.blockTime * 1000 : Date.now();
const timeDiff = Date.now() - blockTime;
console.timeEnd('Calculate time difference');
console.log(chalk.red('Time Diff: ', timeDiff, 'ms'))
}
run().catch(console.error);
process.on('SIGINT', async () => {
console.log('\nCaught SIGINT, disconnecting Kafka consumer...');
try {
await consumer.disconnect();
console.log('Kafka consumer disconnected.');
} catch (err) {
console.error('Error disconnecting consumer:', err);
} finally {
process.exit(0);
}
});
process.on('SIGTERM', async () => {
console.log('\nCaught SIGTERM, disconnecting Kafka consumer...');
try {
await consumer.disconnect();
console.log('Kafka consumer disconnected.');
} catch (err) {
console.error('Error disconnecting consumer:', err);
} finally {
process.exit(0);
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment