Skip to content

Instantly share code, notes, and snippets.

@rg443a
Last active February 24, 2026 17:10
Show Gist options
  • Select an option

  • Save rg443a/9f40ee42d3cbdf9500e04c54f36f3b17 to your computer and use it in GitHub Desktop.

Select an option

Save rg443a/9f40ee42d3cbdf9500e04c54f36f3b17 to your computer and use it in GitHub Desktop.
counting ipaddr in logfile
import fs from 'node:fs';
import path from 'node:path';
import cp from 'child_process';
import { Worker, isMainThread, parentPort } from 'node:worker_threads';
import { performance } from 'node:perf_hooks';
import os from 'node:os';
import { fileURLToPath } from 'node:url';
const __filename = fileURLToPath(import.meta.url);
const numWorkers = Math.min(16,os.cpus().length);
const CHUNK_SIZE = 64 * 1024 * 1024; // 64MB chunks for better distribution
if (isMainThread) {
let inputFile = process.argv[2];
if (!inputFile) {
console.error(`Usage: node ${path.basename(process.argv[1])} <logfile>\n counting IPv4 from logfile, check ip_counts.txt for results.`);
process.exit(1);
}
if (!fs.existsSync(inputFile)) {
console.error(`\n❌ Error: File not found: "${inputFile}"`);
console.error(`Please check the path and try again.`);
process.exit(1);
}
let isTemp = false; let t1=new Date();
if (/\.(gz|zst|bz2|zip|br|7z|rar|tar)$/i.test(inputFile)) {
// const decompressedPath = `${inputFile}.tmp`;
const decompressedPath = path.join(os.tmpdir(), `${path.basename(inputFile)}.tmp`);
console.log(`πŸš€ Extracting ${path.basename(inputFile)}...`);
try {
const outFd = fs.openSync(decompressedPath, 'w');
const result = cp.spawnSync('7z', ['e', inputFile, '-so', '-y'], {
stdio: ['ignore', outFd, 'inherit']
});
fs.closeSync(outFd);
if (result.status !== 0) {
throw new Error(`7z exited with code ${result.status}`);
}
inputFile = decompressedPath;
isTemp = true;
} catch (err) {
console.error("❌ Decompression failed:", err.message);
if (fs.existsSync(decompressedPath)) fs.unlinkSync(decompressedPath);
process.exit(1);
}
}
const stats = fs.statSync(inputFile);
const fd = fs.openSync(inputFile, 'r');
const startTime = performance.now();
let filePos = 0;
let activeWorkers = 0;
const finalCounts = new Map();
if (isTemp) {
console.log(' ',Math.floor(stats.size / (new Date()-t1)/1000), "MB/s, " + ((new Date()-t1)*0.001).toFixed(1) +" sec" ); // new Date()-t1,stats.size,
}
console.log(`\nπŸ“Š COUNTING IPAddr | ${numWorkers} Threads`);
const handleMessage = (worker) => (msg) => {
if (msg.type === 'result') {
// Merge worker counts into master map
for (const [ip, count] of msg.counts) {
finalCounts.set(ip, (finalCounts.get(ip) || 0) + count);
}
}
if (filePos < stats.size) {
const sab = new SharedArrayBuffer(CHUNK_SIZE);
const bytesRead = fs.readSync(fd, new Uint8Array(sab), 0, CHUNK_SIZE, filePos);
filePos += bytesRead;
worker.postMessage({ sab, length: bytesRead });
} else {
worker.terminate();
if (--activeWorkers === 0) finalize();
}
};
for (let i = 0; i < numWorkers; i++) {
const worker = new Worker(__filename);
activeWorkers++;
worker.on('message', handleMessage(worker));
worker.postMessage({ type: 'start' });
}
async function finalize() {
const duration = (performance.now() - startTime) / 1000;
console.log(`\n\nβœ… SCAN COMPLETE | Total Unique: ${finalCounts.size.toLocaleString()}`);
console.log(`🏎️ Throughput: ${(stats.size / 1024 / 1024 / duration).toFixed(2)} MB/s`);
if (finalCounts.size>0) await saveResults();
if (isTemp && fs.existsSync(inputFile)) {fs.unlinkSync(inputFile)};
process.exit(0);
}
async function saveResults() {
// console.log(`πŸ“ Sorting ${finalCounts.size.toLocaleString()} IPs...`);
// 1. Extract keys into a TypedArray for high-speed, low-memory sorting
const sortedIps = new Uint32Array(finalCounts.keys());
sortedIps.sort(); // Numerical sort in-place (C++ speed)
console.log("πŸ’Ύ Writing to ip_counts.txt");
const writeStream = fs.createWriteStream('ip_counts.txt');
let chunk = "";
for (let i = 0; i < sortedIps.length; i++) {
const ip = sortedIps[i];
const count = finalCounts.get(ip);
// Fast conversion back to string
chunk += `${(ip >>> 24)}.${(ip >> 16) & 255}.${(ip >> 8) & 255}.${ip & 255}\t${count}\n`;
// Buffer management for the NVMe
if (chunk.length > 65536) {
if (!writeStream.write(chunk)) {
await new Promise(r => writeStream.once('drain', r));
}
chunk = "";
}
}
writeStream.write(chunk);
return new Promise(r => writeStream.end(() => {
console.log("βœ… Done.");
r();
}));
}
setInterval(() => {
const elapsed = (performance.now() - startTime) / 1000;
process.stdout.write(`\r⚑ Speed: ${(filePos / 1024 / 1024 / elapsed).toFixed(1)} MB/s | Progress: ${Math.min ((filePos/stats.size)*100,100).toFixed(1)}%`);
}, 500);
} else {
parentPort.on('message', (msg) => {
const { sab, length } = msg;
const buffer = new Uint8Array(sab);
const localCounts = new Map();
let start = -1;
for (let i = 0; i < length; i++) {
const byte = buffer[i];
const isIPChar = (byte >= 48 && byte <= 57) || byte === 46;
if (isIPChar) {
if (start === -1) start = i;
} else if (start !== -1) {
const len = i - start;
if (len >= 7 && len <= 15) {
let dots = 0, res = 0, octet = 0, valid = true;
for (let j = start; j < i; j++) {
const b = buffer[j];
if (b === 46) {
dots++;
if (octet > 255) { valid = false; break; }
res = (res << 8) | octet; octet = 0;
} else { octet = octet * 10 + (b - 48); }
}
if (valid && dots === 3 && octet <= 255) {
const ip = ((res << 8) | octet) >>> 0;
localCounts.set(ip, (localCounts.get(ip) || 0) + 1);
}
}
start = -1;
}
}
parentPort.postMessage({ type: 'result', counts: localCounts });
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment