Skip to content

Instantly share code, notes, and snippets.

@zackiles
Created February 27, 2026 06:37
Show Gist options
  • Select an option

  • Save zackiles/bac106a4898a735a733832ed9686720b to your computer and use it in GitHub Desktop.

Select an option

Save zackiles/bac106a4898a735a733832ed9686720b to your computer and use it in GitHub Desktop.

Core design extracted from US 8,805,897

  • Append-only file: every add/update/delete is handled by appending new data to the end of a file (no in-place overwrite / no delete of the file). (Justia Patents)
  • store process: seek EOF → write object → define object identifier tied to write location → move EOF. (Justia Patents)
  • commit process: append a root marker that points at the chosen “current root” object (a pointer switch), then move EOF. (Justia Patents)
  • root process: scan backwards from EOF to find the latest root marker; that implicitly makes older roots “inactive” without deleting anything. (Justia Patents)
  • read process: read an object given an object identifier (which can be a location/offset or an indirection variable). (Justia Patents)
  • Higher-level APIs + transparent plug-ins: B-tree + skip-list interfaces built on top of store/read/commit/root, with transparent plug-ins (cache/cluster/partition/sequence/monitor/remote). (Justia Patents)
/**
 * Reference implementation (Node.js) of the patent’s fundamental components:
 * - store 100, read 150, commit 200, root 250
 * - append-only file format with backward scanning for last root marker
 * - “interfaces 300” examples: B-tree interface 304, skip-list interface 308
 * - “plug-ins 400” examples: caching 402, clustering 404, partitioning 408,
 *   sequencing 412, monitoring 416, remoting 420 (stub)
 *
 * This is a minimal, readable skeleton: production concerns (concurrency control,
 * compaction, checksums-at-scale, WAL fsync strategy, corruption recovery, etc.)
 * are intentionally simplified.
 */

'use strict';

const fs = require('fs');
const path = require('path');
const crypto = require('crypto');
const http = require('http');
const https = require('https');

/* ------------------------------------------------------------------------------------------
 * Component: OID encoding/decoding
 * Implements: “object identifier associated with said write location” (store 100)
 * ------------------------------------------------------------------------------------------ */
const Oid = {
  // OIDs must survive across process restarts and across multiple backends/segments.
  // We encode: {backend, segment, offset} into a compact string.
  // Example: oid:v1:b0:s0:o12345
  encode({ backend = 0, segment = 0, offset }) {
    if (typeof offset !== 'bigint') offset = BigInt(offset);
    return `oid:v1:b${backend}:s${segment}:o${offset.toString(10)}`;
  },
  decode(oidStr) {
    const m = /^oid:v1:b(\d+):s(\d+):o(\d+)$/.exec(oidStr);
    if (!m) throw new Error(`Invalid OID: ${oidStr}`);
    return { backend: Number(m[1]), segment: Number(m[2]), offset: BigInt(m[3]) };
  },
};

/* ------------------------------------------------------------------------------------------
 * Component: CRC32 (lightweight integrity check for records)
 * Implements: basic record integrity support (not explicitly required, but practical)
 * ------------------------------------------------------------------------------------------ */
function crc32(buf) {
  // Tiny CRC32 implementation (table generated once).
  const table = crc32._table || (crc32._table = (() => {
    const t = new Uint32Array(256);
    for (let i = 0; i < 256; i++) {
      let c = i;
      for (let k = 0; k < 8; k++) c = (c & 1) ? (0xEDB88320 ^ (c >>> 1)) : (c >>> 1);
      t[i] = c >>> 0;
    }
    return t;
  })());

  let c = 0xFFFFFFFF;
  for (let i = 0; i < buf.length; i++) c = table[(c ^ buf[i]) & 0xFF] ^ (c >>> 8);
  return (c ^ 0xFFFFFFFF) >>> 0;
}

/* ------------------------------------------------------------------------------------------
 * Component: RecordCodec (append-only record format with backward traversal)
 * Implements: “continually appends data to the end of a data file” + root backward scan
 * ------------------------------------------------------------------------------------------ */
const RecordType = Object.freeze({
  OBJECT: 1,       // stored object payload
  ROOT_MARKER: 2,  // commit root marker RM
  EOF: 3,          // end-of-file marker EOF (explicit marker like the patent examples)
  TOMBSTONE: 4,    // optional: delete marker (delete via append)
});

const RecordCodec = {
  MAGIC: Buffer.from('STOR', 'ascii'),
  VERSION: 1,

  encode({ type, payloadObj }) {
    const payload = Buffer.from(JSON.stringify(payloadObj), 'utf8');

    // Layout:
    // [MAGIC 4][VER 1][TYPE 1][PAYLOAD_LEN 4][PAYLOAD N][CRC32 4][TOTAL_LEN 4]
    const header = Buffer.alloc(4 + 1 + 1 + 4);
    this.MAGIC.copy(header, 0);
    header.writeUInt8(this.VERSION, 4);
    header.writeUInt8(type, 5);
    header.writeUInt32BE(payload.length, 6);

    const crc = Buffer.alloc(4);
    const crcVal = crc32(payload);
    crc.writeUInt32BE(crcVal, 0);

    const totalLen = header.length + payload.length + crc.length + 4;
    const trailer = Buffer.alloc(4);
    trailer.writeUInt32BE(totalLen, 0);

    return Buffer.concat([header, payload, crc, trailer]);
  },

  decode(buffer) {
    if (buffer.length < 14) throw new Error('Record too short');
    if (!buffer.subarray(0, 4).equals(this.MAGIC)) throw new Error('Bad magic');
    const ver = buffer.readUInt8(4);
    if (ver !== this.VERSION) throw new Error(`Unsupported version: ${ver}`);
    const type = buffer.readUInt8(5);
    const payloadLen = buffer.readUInt32BE(6);
    const payloadStart = 10;
    const payloadEnd = payloadStart + payloadLen;
    const crcStart = payloadEnd;
    const crcEnd = crcStart + 4;
    const trailerStart = crcEnd;

    if (buffer.length !== trailerStart + 4) throw new Error('Bad record length');
    const wantTotal = buffer.readUInt32BE(trailerStart);
    if (wantTotal !== buffer.length) throw new Error('Trailer length mismatch');

    const payload = buffer.subarray(payloadStart, payloadEnd);
    const gotCrc = buffer.readUInt32BE(crcStart);
    const calcCrc = crc32(payload);
    if (gotCrc !== calcCrc) throw new Error('CRC mismatch');

    const payloadObj = JSON.parse(payload.toString('utf8'));
    return { type, payloadObj, totalLen: wantTotal };
  },
};

/* ------------------------------------------------------------------------------------------
 * Component: LocalFileDevice
 * Implements: “persistent storage device 70 maintaining a file 300”
 * ------------------------------------------------------------------------------------------ */
class LocalFileDevice {
  constructor(filePath) {
    this.filePath = filePath;
    fs.mkdirSync(path.dirname(filePath), { recursive: true });
    this.fd = fs.openSync(filePath, 'a+'); // read/write, create if missing
  }

  size() {
    return BigInt(fs.fstatSync(this.fd).size);
  }

  append(buf) {
    // Returns the offset *where the record begins* (OID write location).
    const offset = this.size();
    fs.writeSync(this.fd, buf, 0, buf.length, Number(offset));
    // For durability semantics you’d tune fsync frequency; keep explicit and simple here.
    fs.fsyncSync(this.fd);
    return offset;
  }

  readAt(offset, length) {
    const buf = Buffer.alloc(length);
    fs.readSync(this.fd, buf, 0, length, Number(offset));
    return buf;
  }

  close() {
    fs.closeSync(this.fd);
  }
}

/* ------------------------------------------------------------------------------------------
 * Component: AppendOnlyLogBackend
 * Implements: store 100 + read 150 + commit 200 + root 250 on a single append-only file
 * ------------------------------------------------------------------------------------------ */
class AppendOnlyLogBackend {
  constructor({ device, backendId = 0, segmentId = 0 }) {
    this.device = device;
    this.backendId = backendId;
    this.segmentId = segmentId;
  }

  appendRecord(type, payloadObj) {
    const rec = RecordCodec.encode({ type, payloadObj });
    const offset = this.device.append(rec);
    return Oid.encode({ backend: this.backendId, segment: this.segmentId, offset });
  }

  readRecord(oidStr) {
    const { offset } = Oid.decode(oidStr);

    // Read last 4 bytes (totalLen) by first reading a small tail window.
    // We need the record’s total length, so read [offset + ?] is not known.
    // Since we store OID as *record start*, we can parse header to get payload len.
    const header = this.device.readAt(offset, 10);
    if (!header.subarray(0, 4).equals(RecordCodec.MAGIC)) throw new Error('Bad magic at OID');
    const payloadLen = header.readUInt32BE(6);
    const totalLen = 10 + payloadLen + 4 + 4;
    const rec = this.device.readAt(offset, totalLen);
    return RecordCodec.decode(rec);
  }

  scanBackwardForRecordType(type) {
    // Implements: root 250 “seek to end of file” then backward scan to latest RM.
    let pos = this.device.size();
    while (pos > 0n) {
      // Read trailer totalLen
      if (pos < 4n) break;
      const trailer = this.device.readAt(pos - 4n, 4);
      const totalLen = BigInt(trailer.readUInt32BE(0));
      const start = pos - totalLen;
      if (start < 0n) throw new Error('Corrupt log (negative start)');

      const recBuf = this.device.readAt(start, Number(totalLen));
      const rec = RecordCodec.decode(recBuf);
      if (rec.type === type) {
        const oidStr = Oid.encode({ backend: this.backendId, segment: this.segmentId, offset: start });
        return { oid: oidStr, record: rec };
      }
      pos = start;
    }
    return null;
  }

  close() {
    this.device.close();
  }
}

/* ------------------------------------------------------------------------------------------
 * Component: Backend interface helper (multi-backend OID routing)
 * Implements: enabling partitioning/clustering/sequencing across devices
 * ------------------------------------------------------------------------------------------ */
class MultiBackendRouter {
  constructor(backends) {
    this.backends = backends; // array of backends indexed by backendId
  }

  backendForOid(oidStr) {
    const { backend } = Oid.decode(oidStr);
    const b = this.backends[backend];
    if (!b) throw new Error(`No backend for id=${backend}`);
    return b;
  }

  appendRecord(type, payloadObj, { backendHint = 0 } = {}) {
    const b = this.backends[backendHint];
    if (!b) throw new Error(`No backend for hint id=${backendHint}`);
    return b.appendRecord(type, payloadObj);
  }

  readRecord(oidStr) {
    return this.backendForOid(oidStr).readRecord(oidStr);
  }

  scanBackwardForRecordType(type, { backendHint = 0 } = {}) {
    const b = this.backends[backendHint];
    if (!b) throw new Error(`No backend for hint id=${backendHint}`);
    return b.scanBackwardForRecordType(type);
  }

  closeAll() {
    for (const b of this.backends) b.close();
  }
}

/* ------------------------------------------------------------------------------------------
 * Component: Monitoring plug-in 416 (wrapper)
 * Implements: “assess utilization ... operations moderated” (basic metrics)
 * ------------------------------------------------------------------------------------------ */
class MonitoringPluginBackend {
  constructor(inner) {
    this.inner = inner;
    this.metrics = {
      appendCalls: 0,
      readCalls: 0,
      scanCalls: 0,
      bytesAppended: 0,
    };
  }

  appendRecord(type, payloadObj, opts) {
    const buf = RecordCodec.encode({ type, payloadObj });
    this.metrics.appendCalls++;
    this.metrics.bytesAppended += buf.length;
    // Use inner’s append to preserve its OID scheme; re-encode for measurement only.
    return this.inner.appendRecord(type, payloadObj, opts);
  }

  readRecord(oidStr) {
    this.metrics.readCalls++;
    return this.inner.readRecord(oidStr);
  }

  scanBackwardForRecordType(type, opts) {
    this.metrics.scanCalls++;
    return this.inner.scanBackwardForRecordType(type, opts);
  }

  getMetrics() {
    return JSON.parse(JSON.stringify(this.metrics));
  }

  closeAll() {
    return this.inner.closeAll();
  }
}

/* ------------------------------------------------------------------------------------------
 * Component: Partitioning plug-in 408 (wrapper)
 * Implements: “portions of file 300 are stored across a multiple number of persistent devices”
 * ------------------------------------------------------------------------------------------ */
class PartitioningPluginBackend {
  constructor(router, { shardCount, metadataBackend = 0 }) {
    this.router = router;
    this.shardCount = shardCount;
    this.metadataBackend = metadataBackend; // where ROOT_MARKER/EOF goes
  }

  shardForKey(key) {
    const h = crypto.createHash('sha256').update(String(key)).digest();
    // First 4 bytes as uint32
    const n = h.readUInt32BE(0);
    return n % this.shardCount;
  }

  appendRecord(type, payloadObj, opts = {}) {
    // Root markers/EOF should land on metadata backend to keep root scan simple.
    if (type === RecordType.ROOT_MARKER || type === RecordType.EOF) {
      return this.router.appendRecord(type, payloadObj, { backendHint: this.metadataBackend });
    }

    const partitionKey = opts.partitionKey ?? payloadObj?.key ?? payloadObj?.id ?? crypto.randomUUID();
    const shard = this.shardForKey(partitionKey);
    return this.router.appendRecord(type, payloadObj, { backendHint: shard });
  }

  readRecord(oidStr) {
    return this.router.readRecord(oidStr);
  }

  scanBackwardForRecordType(type) {
    return this.router.scanBackwardForRecordType(type, { backendHint: this.metadataBackend });
  }

  closeAll() {
    return this.router.closeAll();
  }
}

/* ------------------------------------------------------------------------------------------
 * Component: Clustering plug-in 404 (wrapper)
 * Implements: “stores all data to all persistent storage devices” (replication)
 * ------------------------------------------------------------------------------------------ */
class ClusteringPluginBackend {
  constructor(router, { primaryBackend = 0 } = {}) {
    this.router = router;
    this.primaryBackend = primaryBackend;
  }

  appendRecord(type, payloadObj, opts = {}) {
    // Replicate to all backends, return OID from primary.
    let primaryOid = null;
    for (let i = 0; i < this.router.backends.length; i++) {
      const oid = this.router.appendRecord(type, payloadObj, { backendHint: i, ...opts });
      if (i === this.primaryBackend) primaryOid = oid;
    }
    return primaryOid;
  }

  readRecord(oidStr) {
    // Read from the backend encoded in OID (or you could failover to others).
    return this.router.readRecord(oidStr);
  }

  scanBackwardForRecordType(type) {
    // Scan primary (simple); advanced versions could reconcile across replicas.
    return this.router.scanBackwardForRecordType(type, { backendHint: this.primaryBackend });
  }

  closeAll() {
    return this.router.closeAll();
  }
}

/* ------------------------------------------------------------------------------------------
 * Component: Sequencing plug-in 412 (simplified stub)
 * Implements: “when one device is full the file is continued on the next device”
 * ------------------------------------------------------------------------------------------ */
class SequencingPluginBackend {
  constructor({ makeBackend, maxBytesPerSegment = 64 * 1024 * 1024 }) {
    this.makeBackend = makeBackend;
    this.maxBytesPerSegment = BigInt(maxBytesPerSegment);
    this.current = makeBackend(0);
    this.segmentIndex = 0;
  }

  ensureSpace(estimatedBytes) {
    const size = this.current.device.size();
    if (size + BigInt(estimatedBytes) > this.maxBytesPerSegment) {
      this.current.close();
      this.segmentIndex++;
      this.current = this.makeBackend(this.segmentIndex);
    }
  }

  appendRecord(type, payloadObj) {
    const rec = RecordCodec.encode({ type, payloadObj });
    this.ensureSpace(rec.length);
    // Append via backend to preserve OID scheme; this backend uses segmentId already.
    return this.current.appendRecord(type, payloadObj);
  }

  readRecord(oidStr) {
    const { segment } = Oid.decode(oidStr);
    if (segment === this.segmentIndex) return this.current.readRecord(oidStr);

    // In a full implementation you’d keep prior segments open or reopen on-demand.
    // For skeleton purposes, reopen on-demand:
    const reopened = this.makeBackend(segment);
    const out = reopened.readRecord(oidStr);
    reopened.close();
    return out;
  }

  scanBackwardForRecordType(type) {
    // Scan current segment (latest).
    return this.current.scanBackwardForRecordType(type);
  }

  closeAll() {
    this.current.close();
  }
}

/* ------------------------------------------------------------------------------------------
 * Component: Caching plug-in 402 (write-behind cache with logical OIDs)
 * Implements: “temporarily stored in volatile storage ... flushed when criteria satisfied”
 * ------------------------------------------------------------------------------------------ */
class CachingPluginBackend {
  constructor(inner, { maxBufferedBytes = 1_000_000 } = {}) {
    this.inner = inner;
    this.maxBufferedBytes = maxBufferedBytes;
    this.bufferedBytes = 0;
    this.queue = []; // { logicalOid, type, payloadObj, opts }
    this.logicalToPhysical = new Map(); // logicalOid -> physicalOid
  }

  makeLogicalOid() {
    return `loid:v1:${crypto.randomUUID()}`;
  }

  flushIfNeeded() {
    if (this.bufferedBytes < this.maxBufferedBytes) return;
    this.flushAll();
  }

  flushAll() {
    for (const item of this.queue) {
      const physicalOid = this.inner.appendRecord(item.type, item.payloadObj, item.opts);
      this.logicalToPhysical.set(item.logicalOid, physicalOid);
    }
    this.queue = [];
    this.bufferedBytes = 0;
  }

  appendRecord(type, payloadObj, opts = {}) {
    // Root markers must be durable/ordered: flush before appending RM/EOF.
    if (type === RecordType.ROOT_MARKER || type === RecordType.EOF) {
      this.flushAll();
      return this.inner.appendRecord(type, payloadObj, opts);
    }

    const logicalOid = this.makeLogicalOid();
    const approxBytes = JSON.stringify(payloadObj).length + 64;
    this.queue.push({ logicalOid, type, payloadObj, opts });
    this.bufferedBytes += approxBytes;
    this.flushIfNeeded();
    return logicalOid;
  }

  resolveOid(oidStr) {
    if (oidStr.startsWith('loid:')) {
      const phys = this.logicalToPhysical.get(oidStr);
      if (!phys) throw new Error(`Logical OID not flushed yet: ${oidStr}`);
      return phys;
    }
    return oidStr;
  }

  readRecord(oidStr) {
    const phys = this.resolveOid(oidStr);
    return this.inner.readRecord(phys);
  }

  scanBackwardForRecordType(type, opts) {
    this.flushAll();
    return this.inner.scanBackwardForRecordType(type, opts);
  }

  closeAll() {
    this.flushAll();
    return this.inner.closeAll();
  }
}

/* ------------------------------------------------------------------------------------------
 * Component: Remoting plug-in 420 (stub)
 * Implements: “utilize remotely connected persistent storage devices”
 * ------------------------------------------------------------------------------------------ */
class RemoteBackendStub {
  constructor({ baseUrl }) {
    this.baseUrl = baseUrl;
  }

  requestJson(method, route, bodyObj) {
    const url = new URL(route, this.baseUrl);
    const lib = url.protocol === 'https:' ? https : http;

    const body = bodyObj ? Buffer.from(JSON.stringify(bodyObj), 'utf8') : null;

    return new Promise((resolve, reject) => {
      const req = lib.request(url, {
        method,
        headers: {
          'content-type': 'application/json',
          ...(body ? { 'content-length': body.length } : {}),
        },
      }, (res) => {
        const chunks = [];
        res.on('data', (d) => chunks.push(d));
        res.on('end', () => {
          const txt = Buffer.concat(chunks).toString('utf8');
          if (res.statusCode >= 200 && res.statusCode < 300) resolve(txt ? JSON.parse(txt) : null);
          else reject(new Error(`HTTP ${res.statusCode}: ${txt}`));
        });
      });
      req.on('error', reject);
      if (body) req.write(body);
      req.end();
    });
  }

  appendRecord(type, payloadObj) {
    // Expect server returns { oid }
    return this.requestJson('POST', '/append', { type, payloadObj }).then(r => r.oid);
  }

  readRecord(oidStr) {
    return this.requestJson('POST', '/read', { oid: oidStr });
  }

  scanBackwardForRecordType(type) {
    return this.requestJson('POST', '/scanBackwardFor', { type });
  }

  closeAll() { /* no-op */ }
}

/* ------------------------------------------------------------------------------------------
 * Component: StoreProcess (store 100)
 * Implements: seek EOF, define write location, write object, define OID, move EOF
 * ------------------------------------------------------------------------------------------ */
class StoreProcess {
  constructor(backend) {
    this.backend = backend;
  }

  storeObject(payloadObj, opts = {}) {
    const oid = this.backend.appendRecord(RecordType.OBJECT, payloadObj, opts);
    this.backend.appendRecord(RecordType.EOF, { ts: Date.now() }, opts);
    return oid;
  }

  deleteObject(targetOid, opts = {}) {
    // Deletion-by-append (tombstone); higher-level structures “remove” by pointer updates.
    const oid = this.backend.appendRecord(RecordType.TOMBSTONE, { targetOid, ts: Date.now() }, opts);
    this.backend.appendRecord(RecordType.EOF, { ts: Date.now() }, opts);
    return oid;
  }
}

/* ------------------------------------------------------------------------------------------
 * Component: CommitProcess (commit 200)
 * Implements: append root marker RM pointing to selected root OID, then move EOF
 * ------------------------------------------------------------------------------------------ */
class CommitProcess {
  constructor(backend) {
    this.backend = backend;
  }

  commitRoot(rootOid, meta = {}) {
    const rmOid = this.backend.appendRecord(RecordType.ROOT_MARKER, {
      rootOid,
      ts: Date.now(),
      ...meta,
    });
    this.backend.appendRecord(RecordType.EOF, { ts: Date.now() }, {});
    return rmOid;
  }
}

/* ------------------------------------------------------------------------------------------
 * Component: RootProcess (root 250)
 * Implements: seek EOF, backward scan until RM found, return RM->root pointer
 * ------------------------------------------------------------------------------------------ */
class RootProcess {
  constructor(backend) {
    this.backend = backend;
  }

  getLatestRootOid() {
    const found = this.backend.scanBackwardForRecordType(RecordType.ROOT_MARKER);
    if (!found) return null;
    return found.record.payloadObj.rootOid;
  }
}

/* ------------------------------------------------------------------------------------------
 * Component: ReadProcess (read 150)
 * Implements: read object content given OID (location/offset or indirection)
 * ------------------------------------------------------------------------------------------ */
class ReadProcess {
  constructor(backend) {
    this.backend = backend;
  }

  read(oidStr) {
    const rec = this.backend.readRecord(oidStr);
    return rec;
  }
}

/* ------------------------------------------------------------------------------------------
 * Component: B-tree interface 304 (minimal persistent B-tree via copy-on-write)
 * Implements: “B-tree interface issues instructions to store/commit transparently”
 * ------------------------------------------------------------------------------------------ */
class PersistentBTree {
  constructor({ store, read, root, commit, minDegree = 2 }) {
    this.store = store;
    this.read = read;
    this.rootProc = root;
    this.commit = commit;
    this.t = minDegree; // B-tree minimum degree
  }

  // Node format stored as OBJECT:
  // { kind:'btree_node', leaf:boolean, keys:[...], values:[...], children:[oid...] }
  emptyNode(leaf = true) {
    return { kind: 'btree_node', leaf, keys: [], values: [], children: [] };
  }

  loadNode(oid) {
    const rec = this.read.read(oid);
    if (rec.type !== RecordType.OBJECT) throw new Error('Not an object record');
    if (rec.payloadObj.kind !== 'btree_node') throw new Error('Not a btree node');
    return rec.payloadObj;
  }

  storeNode(node) {
    return this.store.storeObject(node);
  }

  currentRootOid() {
    return this.rootProc.getLatestRootOid();
  }

  // Public API: put(key,value) => commits new root
  put(key, value) {
    let rootOid = this.currentRootOid();
    if (!rootOid) {
      rootOid = this.storeNode(this.emptyNode(true));
      this.commit.commitRoot(rootOid, { structure: 'btree' });
    }

    let rootNode = this.loadNode(rootOid);
    const maxKeys = 2 * this.t - 1;

    // If root is full, split and grow tree height (copy-on-write: create a new root)
    if (rootNode.keys.length === maxKeys) {
      const newRoot = this.emptyNode(false);
      newRoot.children = [rootOid];

      const { parentOid } = this.splitChildAndStore(newRoot, 0);
      const newRootOid = parentOid;
      const finalRootOid = this.insertNonFull(newRootOid, key, value);
      this.commit.commitRoot(finalRootOid, { structure: 'btree' });
      return finalRootOid;
    }

    const newRootOid = this.insertNonFull(rootOid, key, value);
    this.commit.commitRoot(newRootOid, { structure: 'btree' });
    return newRootOid;
  }

  // Splits child at parent.children[i]. Returns new stored parent OID.
  splitChildAndStore(parentNode, i) {
    const childOid = parentNode.children[i];
    const child = this.loadNode(childOid);

    const t = this.t;
    const right = this.emptyNode(child.leaf);

    // Median key moves up
    const medianKey = child.keys[t - 1];
    const medianVal = child.values[t - 1];

    // Left keeps first t-1 keys; right gets last t-1 keys
    const left = {
      ...child,
      keys: child.keys.slice(0, t - 1),
      values: child.values.slice(0, t - 1),
      children: child.leaf ? [] : child.children.slice(0, t),
    };

    right.keys = child.keys.slice(t);
    right.values = child.values.slice(t);
    right.children = child.leaf ? [] : child.children.slice(t);

    const leftOid = this.storeNode(left);
    const rightOid = this.storeNode(right);

    const newParent = {
      ...parentNode,
      keys: parentNode.keys.slice(),
      values: parentNode.values.slice(),
      children: parentNode.children.slice(),
    };

    newParent.keys.splice(i, 0, medianKey);
    newParent.values.splice(i, 0, medianVal);
    newParent.children[i] = leftOid;
    newParent.children.splice(i + 1, 0, rightOid);

    const parentOid = this.storeNode(newParent);
    return { parentOid, leftOid, rightOid };
  }

  insertNonFull(nodeOid, key, value) {
    const node = this.loadNode(nodeOid);

    // Copy-on-write: we build and store a modified clone for every changed node.
    const newNode = {
      ...node,
      keys: node.keys.slice(),
      values: node.values.slice(),
      children: node.children.slice(),
    };

    let i = newNode.keys.length - 1;

    if (newNode.leaf) {
      // Insert into sorted keys
      while (i >= 0 && key < newNode.keys[i]) i--;
      newNode.keys.splice(i + 1, 0, key);
      newNode.values.splice(i + 1, 0, value);
      return this.storeNode(newNode);
    }

    while (i >= 0 && key < newNode.keys[i]) i--;
    i++;

    const childOid = newNode.children[i];
    const child = this.loadNode(childOid);
    const maxKeys = 2 * this.t - 1;

    if (child.keys.length === maxKeys) {
      // Split child; parent changes (copy-on-write)
      const { parentOid } = this.splitChildAndStore(newNode, i);
      const parentAfterSplit = this.loadNode(parentOid); // reload stored parent snapshot

      // Determine which of the two children we descend into
      const nextIndex = (key > parentAfterSplit.keys[i]) ? i + 1 : i;
      const newChildOid = this.insertNonFull(parentAfterSplit.children[nextIndex], key, value);

      const parentFinal = {
        ...parentAfterSplit,
        children: parentAfterSplit.children.slice(),
      };
      parentFinal.children[nextIndex] = newChildOid;
      return this.storeNode(parentFinal);
    } else {
      const newChildOid = this.insertNonFull(childOid, key, value);
      newNode.children[i] = newChildOid;
      return this.storeNode(newNode);
    }
  }

  // Public API: get(key) by walking current root
  get(key) {
    const rootOid = this.currentRootOid();
    if (!rootOid) return null;

    let oid = rootOid;
    while (true) {
      const node = this.loadNode(oid);
      let i = 0;
      while (i < node.keys.length && key > node.keys[i]) i++;
      if (i < node.keys.length && key === node.keys[i]) return node.values[i];
      if (node.leaf) return null;
      oid = node.children[i];
    }
  }
}

/* ------------------------------------------------------------------------------------------
 * Component: Skip-list interface 308 (journal-ish “root indexes every Nth node”)
 * Implements: store only changed nodes + rewrite root pointer(s) as needed
 * ------------------------------------------------------------------------------------------ */
class JournalSkipList {
  constructor({ store, read, root, commit, stride = 2 }) {
    this.store = store;
    this.read = read;
    this.rootProc = root;
    this.commit = commit;
    this.stride = stride; // “every other one” style indexing (patent example uses every other)
  }

  // Root format stored as OBJECT:
  // { kind:'skip_root', headOid: oid|null, index:[oid,...], stride }
  emptyRoot() {
    return { kind: 'skip_root', headOid: null, index: [], stride: this.stride };
  }

  // Node format stored as OBJECT:
  // { kind:'skip_node', key, value, nextOid: oid|null }
  makeNode(key, value, nextOid) {
    return { kind: 'skip_node', key, value, nextOid: nextOid ?? null };
  }

  loadObject(oid) {
    const rec = this.read.read(oid);
    if (rec.type !== RecordType.OBJECT) throw new Error('Not an object record');
    return rec.payloadObj;
  }

  currentRootOid() {
    return this.rootProc.getLatestRootOid();
  }

  ensureRoot() {
    let rootOid = this.currentRootOid();
    if (!rootOid) {
      rootOid = this.store.storeObject(this.emptyRoot());
      this.commit.commitRoot(rootOid, { structure: 'skiplist' });
    }
    return rootOid;
  }

  rebuildIndex(rootObj) {
    // Simple “root points to every Nth node” index.
    const index = [];
    let cur = rootObj.headOid;
    let i = 0;
    while (cur) {
      if (i % rootObj.stride === 0) index.push(cur);
      const n = this.loadObject(cur);
      cur = n.nextOid;
      i++;
    }
    return index;
  }

  put(key, value) {
    const rootOid = this.ensureRoot();
    const rootObj = this.loadObject(rootOid);
    if (rootObj.kind !== 'skip_root') throw new Error('Root is not skip_root');

    // Find insertion point by walking list (could be accelerated via index; keep simple)
    let prevOid = null;
    let curOid = rootObj.headOid;

    while (curOid) {
      const cur = this.loadObject(curOid);
      if (cur.kind !== 'skip_node') throw new Error('Corrupt skip list');
      if (cur.key === key) {
        // Update-by-append: rewrite this node with new value (copy-on-write)
        const newCurOid = this.store.storeObject(this.makeNode(key, value, cur.nextOid));
        // Rewrite predecessor (or root head) to point at newCurOid
        const newRootOid = this.rewriteLink(rootObj, prevOid, curOid, newCurOid);
        this.commit.commitRoot(newRootOid, { structure: 'skiplist' });
        return newRootOid;
      }
      if (cur.key > key) break;
      prevOid = curOid;
      curOid = cur.nextOid;
    }

    // Insert new node between prevOid and curOid
    const newNodeOid = this.store.storeObject(this.makeNode(key, value, curOid));
    const newRootOid = this.rewriteLink(rootObj, prevOid, null, newNodeOid);
    this.commit.commitRoot(newRootOid, { structure: 'skiplist' });
    return newRootOid;
  }

  rewriteLink(rootObj, prevOid, /* oldCurOid unused */ _oldCurOid, newCurOid) {
    // Rewrite either predecessor node, or root head pointer, and then rewrite root to update index.
    let newHeadOid = rootObj.headOid;

    if (!prevOid) {
      newHeadOid = newCurOid;
    } else {
      const prev = this.loadObject(prevOid);
      const newPrevOid = this.store.storeObject(this.makeNode(prev.key, prev.value, newCurOid));
      // Now we must also patch the chain from root head to ensure it reaches newPrevOid.
      // Minimal approach: rebuild the entire list head by head-rewrite is expensive; instead:
      // we rely on the fact that prevOid remains reachable and newPrevOid becomes reachable only
      // if something points to it. For correctness, we also rewrite the node *before* prev if needed;
      // but we didn’t track it. So for this minimal skeleton, we do a conservative rebuild:
      // rebuild the list by re-inserting nodes in order (not ideal but demonstrates the model).
      //
      // Practical implementation would track predecessor-of-predecessor or use an index jump table.

      // Conservative rebuild:
      const items = [];
      let curOid = rootObj.headOid;
      while (curOid) {
        const cur = this.loadObject(curOid);
        if (curOid === prevOid) {
          items.push({ key: cur.key, value: cur.value }); // prev
          items.push({ key: this.loadObject(newCurOid).key, value: this.loadObject(newCurOid).value }); // inserted/updated
          curOid = this.loadObject(newCurOid).nextOid; // skip old linkage
        } else {
          items.push({ key: cur.key, value: cur.value });
          curOid = cur.nextOid;
        }
      }
      // If we inserted before head, ensure items reflect it:
      if (!rootObj.headOid) {
        items.push({ key: this.loadObject(newCurOid).key, value: this.loadObject(newCurOid).value });
      }

      // Rebuild chain by append-only writes (demonstrates partial rewrite principle)
      let head = null;
      for (let i = items.length - 1; i >= 0; i--) {
        head = this.store.storeObject(this.makeNode(items[i].key, items[i].value, head));
      }
      newHeadOid = head;

      // newPrevOid is unused after conservative rebuild
      void newPrevOid;
    }

    const newRoot = { ...rootObj, headOid: newHeadOid };
    newRoot.index = this.rebuildIndex(newRoot);
    return this.store.storeObject(newRoot);
  }

  get(key) {
    const rootOid = this.currentRootOid();
    if (!rootOid) return null;
    const rootObj = this.loadObject(rootOid);
    if (rootObj.kind !== 'skip_root') return null;

    let curOid = rootObj.headOid;
    while (curOid) {
      const cur = this.loadObject(curOid);
      if (cur.key === key) return cur.value;
      if (cur.key > key) return null;
      curOid = cur.nextOid;
    }
    return null;
  }
}

/* ------------------------------------------------------------------------------------------
 * Example wiring: a single-file store, plus optional plug-ins
 * ------------------------------------------------------------------------------------------ */
function openSingleFileEngine({ filePath }) {
  const device = new LocalFileDevice(filePath);
  const backend = new AppendOnlyLogBackend({ device, backendId: 0, segmentId: 0 });
  return backend;
}

function openPartitionedEngine({ dirPath, shardCount = 3 }) {
  const backends = [];
  for (let i = 0; i < shardCount; i++) {
    const device = new LocalFileDevice(path.join(dirPath, `shard-${i}.storlog`));
    backends.push(new AppendOnlyLogBackend({ device, backendId: i, segmentId: 0 }));
  }
  const router = new MultiBackendRouter(backends);
  return new PartitioningPluginBackend(router, { shardCount, metadataBackend: 0 });
}

/* ------------------------------------------------------------------------------------------
 * Demo (run directly): node append_only_store.js
 * ------------------------------------------------------------------------------------------ */
if (require.main === module) {
  const baseDir = path.join(process.cwd(), '.stor-demo');

  // Choose one:
  // const engine = openSingleFileEngine({ filePath: path.join(baseDir, 'file300.storlog') });
  let engine = openPartitionedEngine({ dirPath: baseDir, shardCount: 3 });

  // Wrap with monitoring, optional caching:
  engine = new MonitoringPluginBackend(engine);
  engine = new CachingPluginBackend(engine, { maxBufferedBytes: 50_000 });

  const store = new StoreProcess(engine);
  const read = new ReadProcess(engine);
  const commit = new CommitProcess(engine);
  const root = new RootProcess(engine);

  // B-tree interface 304
  const btree = new PersistentBTree({ store, read, root, commit, minDegree: 2 });
  btree.put('alice', { balance: 10 });
  btree.put('bob', { balance: 20 });
  btree.put('carol', { balance: 30 });

  // Skip-list interface 308
  const skip = new JournalSkipList({ store, read, root, commit, stride: 2 });
  skip.put(10, 'ten');
  skip.put(5, 'five');
  skip.put(20, 'twenty');

  // Read current root pointer (root 250)
  const latestRoot = root.getLatestRootOid();
  console.log('Latest root OID:', latestRoot);

  console.log('B-tree get bob:', btree.get('bob'));
  console.log('Skip get 20:', skip.get(20));

  // Monitoring metrics
  if (engine.inner?.getMetrics) {
    console.log('Metrics:', engine.inner.getMetrics());
  } else if (engine.getMetrics) {
    console.log('Metrics:', engine.getMetrics());
  }

  engine.closeAll?.();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment