/**
* Reference implementation (Node.js) of the patent’s fundamental components:
* - store 100, read 150, commit 200, root 250
* - append-only file format with backward scanning for last root marker
* - “interfaces 300” examples: B-tree interface 304, skip-list interface 308
* - “plug-ins 400” examples: caching 402, clustering 404, partitioning 408,
* sequencing 412, monitoring 416, remoting 420 (stub)
*
* This is a minimal, readable skeleton: production concerns (concurrency control,
* compaction, checksums-at-scale, WAL fsync strategy, corruption recovery, etc.)
* are intentionally simplified.
*/
'use strict';
const fs = require('fs');
const path = require('path');
const crypto = require('crypto');
const http = require('http');
const https = require('https');
/* ------------------------------------------------------------------------------------------
* Component: OID encoding/decoding
* Implements: “object identifier associated with said write location” (store 100)
* ------------------------------------------------------------------------------------------ */
const Oid = {
// OIDs must survive across process restarts and across multiple backends/segments.
// We encode: {backend, segment, offset} into a compact string.
// Example: oid:v1:b0:s0:o12345
encode({ backend = 0, segment = 0, offset }) {
if (typeof offset !== 'bigint') offset = BigInt(offset);
return `oid:v1:b${backend}:s${segment}:o${offset.toString(10)}`;
},
decode(oidStr) {
const m = /^oid:v1:b(\d+):s(\d+):o(\d+)$/.exec(oidStr);
if (!m) throw new Error(`Invalid OID: ${oidStr}`);
return { backend: Number(m[1]), segment: Number(m[2]), offset: BigInt(m[3]) };
},
};
/* ------------------------------------------------------------------------------------------
* Component: CRC32 (lightweight integrity check for records)
* Implements: basic record integrity support (not explicitly required, but practical)
* ------------------------------------------------------------------------------------------ */
function crc32(buf) {
// Tiny CRC32 implementation (table generated once).
const table = crc32._table || (crc32._table = (() => {
const t = new Uint32Array(256);
for (let i = 0; i < 256; i++) {
let c = i;
for (let k = 0; k < 8; k++) c = (c & 1) ? (0xEDB88320 ^ (c >>> 1)) : (c >>> 1);
t[i] = c >>> 0;
}
return t;
})());
let c = 0xFFFFFFFF;
for (let i = 0; i < buf.length; i++) c = table[(c ^ buf[i]) & 0xFF] ^ (c >>> 8);
return (c ^ 0xFFFFFFFF) >>> 0;
}
/* ------------------------------------------------------------------------------------------
* Component: RecordCodec (append-only record format with backward traversal)
* Implements: “continually appends data to the end of a data file” + root backward scan
* ------------------------------------------------------------------------------------------ */
const RecordType = Object.freeze({
OBJECT: 1, // stored object payload
ROOT_MARKER: 2, // commit root marker RM
EOF: 3, // end-of-file marker EOF (explicit marker like the patent examples)
TOMBSTONE: 4, // optional: delete marker (delete via append)
});
const RecordCodec = {
MAGIC: Buffer.from('STOR', 'ascii'),
VERSION: 1,
encode({ type, payloadObj }) {
const payload = Buffer.from(JSON.stringify(payloadObj), 'utf8');
// Layout:
// [MAGIC 4][VER 1][TYPE 1][PAYLOAD_LEN 4][PAYLOAD N][CRC32 4][TOTAL_LEN 4]
const header = Buffer.alloc(4 + 1 + 1 + 4);
this.MAGIC.copy(header, 0);
header.writeUInt8(this.VERSION, 4);
header.writeUInt8(type, 5);
header.writeUInt32BE(payload.length, 6);
const crc = Buffer.alloc(4);
const crcVal = crc32(payload);
crc.writeUInt32BE(crcVal, 0);
const totalLen = header.length + payload.length + crc.length + 4;
const trailer = Buffer.alloc(4);
trailer.writeUInt32BE(totalLen, 0);
return Buffer.concat([header, payload, crc, trailer]);
},
decode(buffer) {
if (buffer.length < 14) throw new Error('Record too short');
if (!buffer.subarray(0, 4).equals(this.MAGIC)) throw new Error('Bad magic');
const ver = buffer.readUInt8(4);
if (ver !== this.VERSION) throw new Error(`Unsupported version: ${ver}`);
const type = buffer.readUInt8(5);
const payloadLen = buffer.readUInt32BE(6);
const payloadStart = 10;
const payloadEnd = payloadStart + payloadLen;
const crcStart = payloadEnd;
const crcEnd = crcStart + 4;
const trailerStart = crcEnd;
if (buffer.length !== trailerStart + 4) throw new Error('Bad record length');
const wantTotal = buffer.readUInt32BE(trailerStart);
if (wantTotal !== buffer.length) throw new Error('Trailer length mismatch');
const payload = buffer.subarray(payloadStart, payloadEnd);
const gotCrc = buffer.readUInt32BE(crcStart);
const calcCrc = crc32(payload);
if (gotCrc !== calcCrc) throw new Error('CRC mismatch');
const payloadObj = JSON.parse(payload.toString('utf8'));
return { type, payloadObj, totalLen: wantTotal };
},
};
/* ------------------------------------------------------------------------------------------
* Component: LocalFileDevice
* Implements: “persistent storage device 70 maintaining a file 300”
* ------------------------------------------------------------------------------------------ */
class LocalFileDevice {
constructor(filePath) {
this.filePath = filePath;
fs.mkdirSync(path.dirname(filePath), { recursive: true });
this.fd = fs.openSync(filePath, 'a+'); // read/write, create if missing
}
size() {
return BigInt(fs.fstatSync(this.fd).size);
}
append(buf) {
// Returns the offset *where the record begins* (OID write location).
const offset = this.size();
fs.writeSync(this.fd, buf, 0, buf.length, Number(offset));
// For durability semantics you’d tune fsync frequency; keep explicit and simple here.
fs.fsyncSync(this.fd);
return offset;
}
readAt(offset, length) {
const buf = Buffer.alloc(length);
fs.readSync(this.fd, buf, 0, length, Number(offset));
return buf;
}
close() {
fs.closeSync(this.fd);
}
}
/* ------------------------------------------------------------------------------------------
* Component: AppendOnlyLogBackend
* Implements: store 100 + read 150 + commit 200 + root 250 on a single append-only file
* ------------------------------------------------------------------------------------------ */
class AppendOnlyLogBackend {
constructor({ device, backendId = 0, segmentId = 0 }) {
this.device = device;
this.backendId = backendId;
this.segmentId = segmentId;
}
appendRecord(type, payloadObj) {
const rec = RecordCodec.encode({ type, payloadObj });
const offset = this.device.append(rec);
return Oid.encode({ backend: this.backendId, segment: this.segmentId, offset });
}
readRecord(oidStr) {
const { offset } = Oid.decode(oidStr);
// Read last 4 bytes (totalLen) by first reading a small tail window.
// We need the record’s total length, so read [offset + ?] is not known.
// Since we store OID as *record start*, we can parse header to get payload len.
const header = this.device.readAt(offset, 10);
if (!header.subarray(0, 4).equals(RecordCodec.MAGIC)) throw new Error('Bad magic at OID');
const payloadLen = header.readUInt32BE(6);
const totalLen = 10 + payloadLen + 4 + 4;
const rec = this.device.readAt(offset, totalLen);
return RecordCodec.decode(rec);
}
scanBackwardForRecordType(type) {
// Implements: root 250 “seek to end of file” then backward scan to latest RM.
let pos = this.device.size();
while (pos > 0n) {
// Read trailer totalLen
if (pos < 4n) break;
const trailer = this.device.readAt(pos - 4n, 4);
const totalLen = BigInt(trailer.readUInt32BE(0));
const start = pos - totalLen;
if (start < 0n) throw new Error('Corrupt log (negative start)');
const recBuf = this.device.readAt(start, Number(totalLen));
const rec = RecordCodec.decode(recBuf);
if (rec.type === type) {
const oidStr = Oid.encode({ backend: this.backendId, segment: this.segmentId, offset: start });
return { oid: oidStr, record: rec };
}
pos = start;
}
return null;
}
close() {
this.device.close();
}
}
/* ------------------------------------------------------------------------------------------
* Component: Backend interface helper (multi-backend OID routing)
* Implements: enabling partitioning/clustering/sequencing across devices
* ------------------------------------------------------------------------------------------ */
class MultiBackendRouter {
constructor(backends) {
this.backends = backends; // array of backends indexed by backendId
}
backendForOid(oidStr) {
const { backend } = Oid.decode(oidStr);
const b = this.backends[backend];
if (!b) throw new Error(`No backend for id=${backend}`);
return b;
}
appendRecord(type, payloadObj, { backendHint = 0 } = {}) {
const b = this.backends[backendHint];
if (!b) throw new Error(`No backend for hint id=${backendHint}`);
return b.appendRecord(type, payloadObj);
}
readRecord(oidStr) {
return this.backendForOid(oidStr).readRecord(oidStr);
}
scanBackwardForRecordType(type, { backendHint = 0 } = {}) {
const b = this.backends[backendHint];
if (!b) throw new Error(`No backend for hint id=${backendHint}`);
return b.scanBackwardForRecordType(type);
}
closeAll() {
for (const b of this.backends) b.close();
}
}
/* ------------------------------------------------------------------------------------------
* Component: Monitoring plug-in 416 (wrapper)
* Implements: “assess utilization ... operations moderated” (basic metrics)
* ------------------------------------------------------------------------------------------ */
class MonitoringPluginBackend {
constructor(inner) {
this.inner = inner;
this.metrics = {
appendCalls: 0,
readCalls: 0,
scanCalls: 0,
bytesAppended: 0,
};
}
appendRecord(type, payloadObj, opts) {
const buf = RecordCodec.encode({ type, payloadObj });
this.metrics.appendCalls++;
this.metrics.bytesAppended += buf.length;
// Use inner’s append to preserve its OID scheme; re-encode for measurement only.
return this.inner.appendRecord(type, payloadObj, opts);
}
readRecord(oidStr) {
this.metrics.readCalls++;
return this.inner.readRecord(oidStr);
}
scanBackwardForRecordType(type, opts) {
this.metrics.scanCalls++;
return this.inner.scanBackwardForRecordType(type, opts);
}
getMetrics() {
return JSON.parse(JSON.stringify(this.metrics));
}
closeAll() {
return this.inner.closeAll();
}
}
/* ------------------------------------------------------------------------------------------
* Component: Partitioning plug-in 408 (wrapper)
* Implements: “portions of file 300 are stored across a multiple number of persistent devices”
* ------------------------------------------------------------------------------------------ */
class PartitioningPluginBackend {
constructor(router, { shardCount, metadataBackend = 0 }) {
this.router = router;
this.shardCount = shardCount;
this.metadataBackend = metadataBackend; // where ROOT_MARKER/EOF goes
}
shardForKey(key) {
const h = crypto.createHash('sha256').update(String(key)).digest();
// First 4 bytes as uint32
const n = h.readUInt32BE(0);
return n % this.shardCount;
}
appendRecord(type, payloadObj, opts = {}) {
// Root markers/EOF should land on metadata backend to keep root scan simple.
if (type === RecordType.ROOT_MARKER || type === RecordType.EOF) {
return this.router.appendRecord(type, payloadObj, { backendHint: this.metadataBackend });
}
const partitionKey = opts.partitionKey ?? payloadObj?.key ?? payloadObj?.id ?? crypto.randomUUID();
const shard = this.shardForKey(partitionKey);
return this.router.appendRecord(type, payloadObj, { backendHint: shard });
}
readRecord(oidStr) {
return this.router.readRecord(oidStr);
}
scanBackwardForRecordType(type) {
return this.router.scanBackwardForRecordType(type, { backendHint: this.metadataBackend });
}
closeAll() {
return this.router.closeAll();
}
}
/* ------------------------------------------------------------------------------------------
* Component: Clustering plug-in 404 (wrapper)
* Implements: “stores all data to all persistent storage devices” (replication)
* ------------------------------------------------------------------------------------------ */
class ClusteringPluginBackend {
constructor(router, { primaryBackend = 0 } = {}) {
this.router = router;
this.primaryBackend = primaryBackend;
}
appendRecord(type, payloadObj, opts = {}) {
// Replicate to all backends, return OID from primary.
let primaryOid = null;
for (let i = 0; i < this.router.backends.length; i++) {
const oid = this.router.appendRecord(type, payloadObj, { backendHint: i, ...opts });
if (i === this.primaryBackend) primaryOid = oid;
}
return primaryOid;
}
readRecord(oidStr) {
// Read from the backend encoded in OID (or you could failover to others).
return this.router.readRecord(oidStr);
}
scanBackwardForRecordType(type) {
// Scan primary (simple); advanced versions could reconcile across replicas.
return this.router.scanBackwardForRecordType(type, { backendHint: this.primaryBackend });
}
closeAll() {
return this.router.closeAll();
}
}
/* ------------------------------------------------------------------------------------------
* Component: Sequencing plug-in 412 (simplified stub)
* Implements: “when one device is full the file is continued on the next device”
* ------------------------------------------------------------------------------------------ */
class SequencingPluginBackend {
constructor({ makeBackend, maxBytesPerSegment = 64 * 1024 * 1024 }) {
this.makeBackend = makeBackend;
this.maxBytesPerSegment = BigInt(maxBytesPerSegment);
this.current = makeBackend(0);
this.segmentIndex = 0;
}
ensureSpace(estimatedBytes) {
const size = this.current.device.size();
if (size + BigInt(estimatedBytes) > this.maxBytesPerSegment) {
this.current.close();
this.segmentIndex++;
this.current = this.makeBackend(this.segmentIndex);
}
}
appendRecord(type, payloadObj) {
const rec = RecordCodec.encode({ type, payloadObj });
this.ensureSpace(rec.length);
// Append via backend to preserve OID scheme; this backend uses segmentId already.
return this.current.appendRecord(type, payloadObj);
}
readRecord(oidStr) {
const { segment } = Oid.decode(oidStr);
if (segment === this.segmentIndex) return this.current.readRecord(oidStr);
// In a full implementation you’d keep prior segments open or reopen on-demand.
// For skeleton purposes, reopen on-demand:
const reopened = this.makeBackend(segment);
const out = reopened.readRecord(oidStr);
reopened.close();
return out;
}
scanBackwardForRecordType(type) {
// Scan current segment (latest).
return this.current.scanBackwardForRecordType(type);
}
closeAll() {
this.current.close();
}
}
/* ------------------------------------------------------------------------------------------
* Component: Caching plug-in 402 (write-behind cache with logical OIDs)
* Implements: “temporarily stored in volatile storage ... flushed when criteria satisfied”
* ------------------------------------------------------------------------------------------ */
class CachingPluginBackend {
constructor(inner, { maxBufferedBytes = 1_000_000 } = {}) {
this.inner = inner;
this.maxBufferedBytes = maxBufferedBytes;
this.bufferedBytes = 0;
this.queue = []; // { logicalOid, type, payloadObj, opts }
this.logicalToPhysical = new Map(); // logicalOid -> physicalOid
}
makeLogicalOid() {
return `loid:v1:${crypto.randomUUID()}`;
}
flushIfNeeded() {
if (this.bufferedBytes < this.maxBufferedBytes) return;
this.flushAll();
}
flushAll() {
for (const item of this.queue) {
const physicalOid = this.inner.appendRecord(item.type, item.payloadObj, item.opts);
this.logicalToPhysical.set(item.logicalOid, physicalOid);
}
this.queue = [];
this.bufferedBytes = 0;
}
appendRecord(type, payloadObj, opts = {}) {
// Root markers must be durable/ordered: flush before appending RM/EOF.
if (type === RecordType.ROOT_MARKER || type === RecordType.EOF) {
this.flushAll();
return this.inner.appendRecord(type, payloadObj, opts);
}
const logicalOid = this.makeLogicalOid();
const approxBytes = JSON.stringify(payloadObj).length + 64;
this.queue.push({ logicalOid, type, payloadObj, opts });
this.bufferedBytes += approxBytes;
this.flushIfNeeded();
return logicalOid;
}
resolveOid(oidStr) {
if (oidStr.startsWith('loid:')) {
const phys = this.logicalToPhysical.get(oidStr);
if (!phys) throw new Error(`Logical OID not flushed yet: ${oidStr}`);
return phys;
}
return oidStr;
}
readRecord(oidStr) {
const phys = this.resolveOid(oidStr);
return this.inner.readRecord(phys);
}
scanBackwardForRecordType(type, opts) {
this.flushAll();
return this.inner.scanBackwardForRecordType(type, opts);
}
closeAll() {
this.flushAll();
return this.inner.closeAll();
}
}
/* ------------------------------------------------------------------------------------------
* Component: Remoting plug-in 420 (stub)
* Implements: “utilize remotely connected persistent storage devices”
* ------------------------------------------------------------------------------------------ */
class RemoteBackendStub {
constructor({ baseUrl }) {
this.baseUrl = baseUrl;
}
requestJson(method, route, bodyObj) {
const url = new URL(route, this.baseUrl);
const lib = url.protocol === 'https:' ? https : http;
const body = bodyObj ? Buffer.from(JSON.stringify(bodyObj), 'utf8') : null;
return new Promise((resolve, reject) => {
const req = lib.request(url, {
method,
headers: {
'content-type': 'application/json',
...(body ? { 'content-length': body.length } : {}),
},
}, (res) => {
const chunks = [];
res.on('data', (d) => chunks.push(d));
res.on('end', () => {
const txt = Buffer.concat(chunks).toString('utf8');
if (res.statusCode >= 200 && res.statusCode < 300) resolve(txt ? JSON.parse(txt) : null);
else reject(new Error(`HTTP ${res.statusCode}: ${txt}`));
});
});
req.on('error', reject);
if (body) req.write(body);
req.end();
});
}
appendRecord(type, payloadObj) {
// Expect server returns { oid }
return this.requestJson('POST', '/append', { type, payloadObj }).then(r => r.oid);
}
readRecord(oidStr) {
return this.requestJson('POST', '/read', { oid: oidStr });
}
scanBackwardForRecordType(type) {
return this.requestJson('POST', '/scanBackwardFor', { type });
}
closeAll() { /* no-op */ }
}
/* ------------------------------------------------------------------------------------------
* Component: StoreProcess (store 100)
* Implements: seek EOF, define write location, write object, define OID, move EOF
* ------------------------------------------------------------------------------------------ */
class StoreProcess {
constructor(backend) {
this.backend = backend;
}
storeObject(payloadObj, opts = {}) {
const oid = this.backend.appendRecord(RecordType.OBJECT, payloadObj, opts);
this.backend.appendRecord(RecordType.EOF, { ts: Date.now() }, opts);
return oid;
}
deleteObject(targetOid, opts = {}) {
// Deletion-by-append (tombstone); higher-level structures “remove” by pointer updates.
const oid = this.backend.appendRecord(RecordType.TOMBSTONE, { targetOid, ts: Date.now() }, opts);
this.backend.appendRecord(RecordType.EOF, { ts: Date.now() }, opts);
return oid;
}
}
/* ------------------------------------------------------------------------------------------
* Component: CommitProcess (commit 200)
* Implements: append root marker RM pointing to selected root OID, then move EOF
* ------------------------------------------------------------------------------------------ */
class CommitProcess {
constructor(backend) {
this.backend = backend;
}
commitRoot(rootOid, meta = {}) {
const rmOid = this.backend.appendRecord(RecordType.ROOT_MARKER, {
rootOid,
ts: Date.now(),
...meta,
});
this.backend.appendRecord(RecordType.EOF, { ts: Date.now() }, {});
return rmOid;
}
}
/* ------------------------------------------------------------------------------------------
* Component: RootProcess (root 250)
* Implements: seek EOF, backward scan until RM found, return RM->root pointer
* ------------------------------------------------------------------------------------------ */
class RootProcess {
constructor(backend) {
this.backend = backend;
}
getLatestRootOid() {
const found = this.backend.scanBackwardForRecordType(RecordType.ROOT_MARKER);
if (!found) return null;
return found.record.payloadObj.rootOid;
}
}
/* ------------------------------------------------------------------------------------------
* Component: ReadProcess (read 150)
* Implements: read object content given OID (location/offset or indirection)
* ------------------------------------------------------------------------------------------ */
class ReadProcess {
constructor(backend) {
this.backend = backend;
}
read(oidStr) {
const rec = this.backend.readRecord(oidStr);
return rec;
}
}
/* ------------------------------------------------------------------------------------------
* Component: B-tree interface 304 (minimal persistent B-tree via copy-on-write)
* Implements: “B-tree interface issues instructions to store/commit transparently”
* ------------------------------------------------------------------------------------------ */
class PersistentBTree {
constructor({ store, read, root, commit, minDegree = 2 }) {
this.store = store;
this.read = read;
this.rootProc = root;
this.commit = commit;
this.t = minDegree; // B-tree minimum degree
}
// Node format stored as OBJECT:
// { kind:'btree_node', leaf:boolean, keys:[...], values:[...], children:[oid...] }
emptyNode(leaf = true) {
return { kind: 'btree_node', leaf, keys: [], values: [], children: [] };
}
loadNode(oid) {
const rec = this.read.read(oid);
if (rec.type !== RecordType.OBJECT) throw new Error('Not an object record');
if (rec.payloadObj.kind !== 'btree_node') throw new Error('Not a btree node');
return rec.payloadObj;
}
storeNode(node) {
return this.store.storeObject(node);
}
currentRootOid() {
return this.rootProc.getLatestRootOid();
}
// Public API: put(key,value) => commits new root
put(key, value) {
let rootOid = this.currentRootOid();
if (!rootOid) {
rootOid = this.storeNode(this.emptyNode(true));
this.commit.commitRoot(rootOid, { structure: 'btree' });
}
let rootNode = this.loadNode(rootOid);
const maxKeys = 2 * this.t - 1;
// If root is full, split and grow tree height (copy-on-write: create a new root)
if (rootNode.keys.length === maxKeys) {
const newRoot = this.emptyNode(false);
newRoot.children = [rootOid];
const { parentOid } = this.splitChildAndStore(newRoot, 0);
const newRootOid = parentOid;
const finalRootOid = this.insertNonFull(newRootOid, key, value);
this.commit.commitRoot(finalRootOid, { structure: 'btree' });
return finalRootOid;
}
const newRootOid = this.insertNonFull(rootOid, key, value);
this.commit.commitRoot(newRootOid, { structure: 'btree' });
return newRootOid;
}
// Splits child at parent.children[i]. Returns new stored parent OID.
splitChildAndStore(parentNode, i) {
const childOid = parentNode.children[i];
const child = this.loadNode(childOid);
const t = this.t;
const right = this.emptyNode(child.leaf);
// Median key moves up
const medianKey = child.keys[t - 1];
const medianVal = child.values[t - 1];
// Left keeps first t-1 keys; right gets last t-1 keys
const left = {
...child,
keys: child.keys.slice(0, t - 1),
values: child.values.slice(0, t - 1),
children: child.leaf ? [] : child.children.slice(0, t),
};
right.keys = child.keys.slice(t);
right.values = child.values.slice(t);
right.children = child.leaf ? [] : child.children.slice(t);
const leftOid = this.storeNode(left);
const rightOid = this.storeNode(right);
const newParent = {
...parentNode,
keys: parentNode.keys.slice(),
values: parentNode.values.slice(),
children: parentNode.children.slice(),
};
newParent.keys.splice(i, 0, medianKey);
newParent.values.splice(i, 0, medianVal);
newParent.children[i] = leftOid;
newParent.children.splice(i + 1, 0, rightOid);
const parentOid = this.storeNode(newParent);
return { parentOid, leftOid, rightOid };
}
insertNonFull(nodeOid, key, value) {
const node = this.loadNode(nodeOid);
// Copy-on-write: we build and store a modified clone for every changed node.
const newNode = {
...node,
keys: node.keys.slice(),
values: node.values.slice(),
children: node.children.slice(),
};
let i = newNode.keys.length - 1;
if (newNode.leaf) {
// Insert into sorted keys
while (i >= 0 && key < newNode.keys[i]) i--;
newNode.keys.splice(i + 1, 0, key);
newNode.values.splice(i + 1, 0, value);
return this.storeNode(newNode);
}
while (i >= 0 && key < newNode.keys[i]) i--;
i++;
const childOid = newNode.children[i];
const child = this.loadNode(childOid);
const maxKeys = 2 * this.t - 1;
if (child.keys.length === maxKeys) {
// Split child; parent changes (copy-on-write)
const { parentOid } = this.splitChildAndStore(newNode, i);
const parentAfterSplit = this.loadNode(parentOid); // reload stored parent snapshot
// Determine which of the two children we descend into
const nextIndex = (key > parentAfterSplit.keys[i]) ? i + 1 : i;
const newChildOid = this.insertNonFull(parentAfterSplit.children[nextIndex], key, value);
const parentFinal = {
...parentAfterSplit,
children: parentAfterSplit.children.slice(),
};
parentFinal.children[nextIndex] = newChildOid;
return this.storeNode(parentFinal);
} else {
const newChildOid = this.insertNonFull(childOid, key, value);
newNode.children[i] = newChildOid;
return this.storeNode(newNode);
}
}
// Public API: get(key) by walking current root
get(key) {
const rootOid = this.currentRootOid();
if (!rootOid) return null;
let oid = rootOid;
while (true) {
const node = this.loadNode(oid);
let i = 0;
while (i < node.keys.length && key > node.keys[i]) i++;
if (i < node.keys.length && key === node.keys[i]) return node.values[i];
if (node.leaf) return null;
oid = node.children[i];
}
}
}
/* ------------------------------------------------------------------------------------------
* Component: Skip-list interface 308 (journal-ish “root indexes every Nth node”)
* Implements: store only changed nodes + rewrite root pointer(s) as needed
* ------------------------------------------------------------------------------------------ */
class JournalSkipList {
constructor({ store, read, root, commit, stride = 2 }) {
this.store = store;
this.read = read;
this.rootProc = root;
this.commit = commit;
this.stride = stride; // “every other one” style indexing (patent example uses every other)
}
// Root format stored as OBJECT:
// { kind:'skip_root', headOid: oid|null, index:[oid,...], stride }
emptyRoot() {
return { kind: 'skip_root', headOid: null, index: [], stride: this.stride };
}
// Node format stored as OBJECT:
// { kind:'skip_node', key, value, nextOid: oid|null }
makeNode(key, value, nextOid) {
return { kind: 'skip_node', key, value, nextOid: nextOid ?? null };
}
loadObject(oid) {
const rec = this.read.read(oid);
if (rec.type !== RecordType.OBJECT) throw new Error('Not an object record');
return rec.payloadObj;
}
currentRootOid() {
return this.rootProc.getLatestRootOid();
}
ensureRoot() {
let rootOid = this.currentRootOid();
if (!rootOid) {
rootOid = this.store.storeObject(this.emptyRoot());
this.commit.commitRoot(rootOid, { structure: 'skiplist' });
}
return rootOid;
}
rebuildIndex(rootObj) {
// Simple “root points to every Nth node” index.
const index = [];
let cur = rootObj.headOid;
let i = 0;
while (cur) {
if (i % rootObj.stride === 0) index.push(cur);
const n = this.loadObject(cur);
cur = n.nextOid;
i++;
}
return index;
}
put(key, value) {
const rootOid = this.ensureRoot();
const rootObj = this.loadObject(rootOid);
if (rootObj.kind !== 'skip_root') throw new Error('Root is not skip_root');
// Find insertion point by walking list (could be accelerated via index; keep simple)
let prevOid = null;
let curOid = rootObj.headOid;
while (curOid) {
const cur = this.loadObject(curOid);
if (cur.kind !== 'skip_node') throw new Error('Corrupt skip list');
if (cur.key === key) {
// Update-by-append: rewrite this node with new value (copy-on-write)
const newCurOid = this.store.storeObject(this.makeNode(key, value, cur.nextOid));
// Rewrite predecessor (or root head) to point at newCurOid
const newRootOid = this.rewriteLink(rootObj, prevOid, curOid, newCurOid);
this.commit.commitRoot(newRootOid, { structure: 'skiplist' });
return newRootOid;
}
if (cur.key > key) break;
prevOid = curOid;
curOid = cur.nextOid;
}
// Insert new node between prevOid and curOid
const newNodeOid = this.store.storeObject(this.makeNode(key, value, curOid));
const newRootOid = this.rewriteLink(rootObj, prevOid, null, newNodeOid);
this.commit.commitRoot(newRootOid, { structure: 'skiplist' });
return newRootOid;
}
rewriteLink(rootObj, prevOid, /* oldCurOid unused */ _oldCurOid, newCurOid) {
// Rewrite either predecessor node, or root head pointer, and then rewrite root to update index.
let newHeadOid = rootObj.headOid;
if (!prevOid) {
newHeadOid = newCurOid;
} else {
const prev = this.loadObject(prevOid);
const newPrevOid = this.store.storeObject(this.makeNode(prev.key, prev.value, newCurOid));
// Now we must also patch the chain from root head to ensure it reaches newPrevOid.
// Minimal approach: rebuild the entire list head by head-rewrite is expensive; instead:
// we rely on the fact that prevOid remains reachable and newPrevOid becomes reachable only
// if something points to it. For correctness, we also rewrite the node *before* prev if needed;
// but we didn’t track it. So for this minimal skeleton, we do a conservative rebuild:
// rebuild the list by re-inserting nodes in order (not ideal but demonstrates the model).
//
// Practical implementation would track predecessor-of-predecessor or use an index jump table.
// Conservative rebuild:
const items = [];
let curOid = rootObj.headOid;
while (curOid) {
const cur = this.loadObject(curOid);
if (curOid === prevOid) {
items.push({ key: cur.key, value: cur.value }); // prev
items.push({ key: this.loadObject(newCurOid).key, value: this.loadObject(newCurOid).value }); // inserted/updated
curOid = this.loadObject(newCurOid).nextOid; // skip old linkage
} else {
items.push({ key: cur.key, value: cur.value });
curOid = cur.nextOid;
}
}
// If we inserted before head, ensure items reflect it:
if (!rootObj.headOid) {
items.push({ key: this.loadObject(newCurOid).key, value: this.loadObject(newCurOid).value });
}
// Rebuild chain by append-only writes (demonstrates partial rewrite principle)
let head = null;
for (let i = items.length - 1; i >= 0; i--) {
head = this.store.storeObject(this.makeNode(items[i].key, items[i].value, head));
}
newHeadOid = head;
// newPrevOid is unused after conservative rebuild
void newPrevOid;
}
const newRoot = { ...rootObj, headOid: newHeadOid };
newRoot.index = this.rebuildIndex(newRoot);
return this.store.storeObject(newRoot);
}
get(key) {
const rootOid = this.currentRootOid();
if (!rootOid) return null;
const rootObj = this.loadObject(rootOid);
if (rootObj.kind !== 'skip_root') return null;
let curOid = rootObj.headOid;
while (curOid) {
const cur = this.loadObject(curOid);
if (cur.key === key) return cur.value;
if (cur.key > key) return null;
curOid = cur.nextOid;
}
return null;
}
}
/* ------------------------------------------------------------------------------------------
* Example wiring: a single-file store, plus optional plug-ins
* ------------------------------------------------------------------------------------------ */
function openSingleFileEngine({ filePath }) {
const device = new LocalFileDevice(filePath);
const backend = new AppendOnlyLogBackend({ device, backendId: 0, segmentId: 0 });
return backend;
}
function openPartitionedEngine({ dirPath, shardCount = 3 }) {
const backends = [];
for (let i = 0; i < shardCount; i++) {
const device = new LocalFileDevice(path.join(dirPath, `shard-${i}.storlog`));
backends.push(new AppendOnlyLogBackend({ device, backendId: i, segmentId: 0 }));
}
const router = new MultiBackendRouter(backends);
return new PartitioningPluginBackend(router, { shardCount, metadataBackend: 0 });
}
/* ------------------------------------------------------------------------------------------
* Demo (run directly): node append_only_store.js
* ------------------------------------------------------------------------------------------ */
if (require.main === module) {
const baseDir = path.join(process.cwd(), '.stor-demo');
// Choose one:
// const engine = openSingleFileEngine({ filePath: path.join(baseDir, 'file300.storlog') });
let engine = openPartitionedEngine({ dirPath: baseDir, shardCount: 3 });
// Wrap with monitoring, optional caching:
engine = new MonitoringPluginBackend(engine);
engine = new CachingPluginBackend(engine, { maxBufferedBytes: 50_000 });
const store = new StoreProcess(engine);
const read = new ReadProcess(engine);
const commit = new CommitProcess(engine);
const root = new RootProcess(engine);
// B-tree interface 304
const btree = new PersistentBTree({ store, read, root, commit, minDegree: 2 });
btree.put('alice', { balance: 10 });
btree.put('bob', { balance: 20 });
btree.put('carol', { balance: 30 });
// Skip-list interface 308
const skip = new JournalSkipList({ store, read, root, commit, stride: 2 });
skip.put(10, 'ten');
skip.put(5, 'five');
skip.put(20, 'twenty');
// Read current root pointer (root 250)
const latestRoot = root.getLatestRootOid();
console.log('Latest root OID:', latestRoot);
console.log('B-tree get bob:', btree.get('bob'));
console.log('Skip get 20:', skip.get(20));
// Monitoring metrics
if (engine.inner?.getMetrics) {
console.log('Metrics:', engine.inner.getMetrics());
} else if (engine.getMetrics) {
console.log('Metrics:', engine.getMetrics());
}
engine.closeAll?.();
}