Skip to content

Instantly share code, notes, and snippets.

@Epivalent
Created January 21, 2026 10:12
Show Gist options
  • Select an option

  • Save Epivalent/bef4d22fe4e54170dc84f4004a73193c to your computer and use it in GitHub Desktop.

Select an option

Save Epivalent/bef4d22fe4e54170dc84f4004a73193c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""TopCVP_beta CLI
This script packages the current working construction (paper-faithful):
Primitives (all are *string* lambda terms built via plc_suite.terms):
- fetch_{i,n} (paper), with NOT-substitution implemented by swapping CSTT/CSTF
*only* at the (t==n+1 and j==i) app-slot inside the fetch constructor.
- and'_n (paper)
- OR derived purely by De Morgan using NOT-substitution + and'
Normalization/tagging:
- Default uses /mnt/data/planar_nf.py (fast normalizer).
- Optionally uses an external binary normalizer which accepts the term as argv1
and prints the NF on stdout.
Examples:
# tag a boolean term
python3 topcvp_beta_cli.py tag --term "(\\b.\\k.\\f.(b k f))"
# run the paper example circuit (prints new-bit tags only)
python3 topcvp_beta_cli.py paper-example
# evaluate a CVP gate list (JSON)
python3 topcvp_beta_cli.py cvp --eqs-json '[["const",1],["const",0],["and",1,2],["not",1],["or",3,4]]'
# evaluate a DIMACS CNF with a provided assignment
python3 topcvp_beta_cli.py cnf-eval --dimacs foo.cnf --assign "1,-2,3"
"""
from __future__ import annotations
import argparse
import json
import os
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Union
sys.path.insert(0, '/mnt/data')
import plc_suite.terms as T
# -------------------------
# Normalization backends
# -------------------------
class NFEngine:
def normalize(self, src: str) -> str:
raise NotImplementedError
class PlanarNFEngine(NFEngine):
def __init__(self):
from planar_nf import parse, normalize, to_string # local file provided by user
self._parse = parse
self._normalize = normalize
self._to_string = to_string
def normalize(self, src: str) -> str:
t = self._parse(src)
nf = self._normalize(t)
return self._to_string(nf)
class ExternalBinEngine(NFEngine):
def __init__(self, bin_path: str):
self.bin_path = bin_path
def normalize(self, src: str) -> str:
# External tool contract: argv1 is term, stdout is NF.
# Note: huge terms may exceed OS argv limits; for that case, wrap your binary.
p = subprocess.run(
[self.bin_path, src],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=False,
)
if p.returncode != 0:
raise RuntimeError(
f"External normalizer failed (rc={p.returncode}).\nSTDERR:\n{p.stderr.strip()}"
)
return p.stdout.strip()
@dataclass
class BoolTagger:
eng: NFEngine
dec_true_nf: str
dec_false_nf: str
cache: Dict[str, str]
@classmethod
def build(cls, eng: NFEngine) -> 'BoolTagger':
dec_true_src = f"({T.DEC} {T.TRUE})"
dec_false_src = f"({T.DEC} {T.FALSE})"
dec_true_nf = eng.normalize(dec_true_src)
dec_false_nf = eng.normalize(dec_false_src)
return cls(eng=eng, dec_true_nf=dec_true_nf, dec_false_nf=dec_false_nf, cache={})
def tag(self, term: str) -> str:
# Fast tag of a boolean term via DEC-normal form equality.
if term in self.cache:
return self.cache[term]
src = f"({T.DEC} {term})"
nf = self.eng.normalize(src)
if nf == self.dec_true_nf:
self.cache[term] = 'T'
return 'T'
if nf == self.dec_false_nf:
self.cache[term] = 'F'
return 'F'
self.cache[term] = '?'
return '?'
def make_engine(args: argparse.Namespace) -> NFEngine:
if args.normalizer == 'planar_nf':
return PlanarNFEngine()
if args.normalizer == 'external':
if not args.normalizer_bin:
raise SystemExit('need --normalizer-bin when --normalizer external')
return ExternalBinEngine(args.normalizer_bin)
raise SystemExit(f'unknown normalizer: {args.normalizer}')
# -------------------------
# Term construction (paper)
# -------------------------
var, lam, lams, app = T.var, T.lam, T.lams, T.app
apps = getattr(T, 'apps', None)
TRUE, FALSE = T.TRUE, T.FALSE
COMP, AND = T.COMP, T.AND
CSTT, CSTF = T.CSTT, T.CSTF
def proj_of_tuple(t: str, j: int, m: int) -> str:
xs = [f'x{i}' for i in range(1, m + 1)]
return app(t, lams(xs, var(f'x{j}')))
def compose(f: str, g: str) -> str:
return app(app(COMP, f), g)
def append_const_op(n: int, const: str) -> str:
"""Unary op on tuples: <x1..xn> -> <x1..xn,const> (as a tuple, i.e. \k. ...)."""
xs = [f'x{i}' for i in range(1, n + 1)]
body = var('k')
for x in xs:
body = app(body, var(x))
body = app(body, const)
# op expects (v,k): op v k = v (\x1..xn. k x1..xn const)
return lams(['v', 'k'], app(var('v'), lams(xs, body)))
def setterT(n: int, j: int, i: int, *, which: str, neg: bool = False) -> str:
"""Unary transformer on Church tuples, following the paper.
which='t' uses CSTT at position j; which='f' uses CSTF.
For the i=j branch, the extra appended position b_{n+1} uses CSTapp,
where neg=True swaps CSTT<->CSTF as the NOT-substitution.
"""
if which not in ('t', 'f'):
raise ValueError(which)
CSTj = CSTT if which == 't' else CSTF
if which == 't':
CSTapp = CSTT if not neg else CSTF
else:
CSTapp = CSTF if not neg else CSTT
bs = [f'b{t}' for t in range(1, n + 2)] # b1..b_{n+1}
args: List[str] = []
for t in range(1, n + 2):
if t == j:
args.append(app(CSTj, var(f'b{t}')))
elif (t == n + 1) and (j == i):
args.append(app(CSTapp, var(f'b{t}')))
else:
args.append(var(f'b{t}'))
body = var('k')
for a in args:
body = app(body, a)
# unary transformer on tuples: t |-> (\k. t (\b1..b_{n+1}. k ...))
return lam('t', lam('k', app(var('t'), lams(bs, body))))
def cj(n: int, j: int, i: int, *, neg: bool = False) -> str:
S = setterT(n, j, i, which='t', neg=neg)
return lams(['g', 'h'], compose(var('g'), compose(S, var('h'))))
def fj(n: int, j: int, i: int, *, neg: bool = False) -> str:
return setterT(n, j, i, which='f', neg=neg)
def fetch_paper(i: int, n: int, *, neg: bool = False) -> str:
"""fetch_{i,n}; if neg=True, NOT-substitution inside fetch."""
if not (1 <= i <= n):
raise ValueError('need 1 <= i <= n')
base = T.tup([FALSE] * (n + 1))
rest = base
for j in range(n, 0, -1):
rest = app(app(app(var(f'x{j}'), cj(n, j, i, neg=neg)), fj(n, j, i, neg=neg)), rest)
xs = [f'x{j}' for j in range(1, n + 1)]
return lam('v', app(var('v'), lams(xs, rest)))
def and_prime_paper(n: int) -> str:
"""and'_n: <x1..x_{n+2}> -> <x1..xn, x_{n+1} AND x_{n+2}> (in CPS with k)."""
xs = [f'x{i}' for i in range(1, n + 3)]
out = var('k')
for i in range(1, n + 1):
out = app(out, var(f'x{i}'))
out = app(out, app(app(AND, var(f'x{n+1}')), var(f'x{n+2}')))
return lams(['v', 'k'], app(var('v'), lams(xs, out)))
def and_gate(i: int, j: int, n: int, v: str) -> str:
"""Append (xi AND xj) to a length-n vector v, producing length n+1."""
A = and_prime_paper(n)
return app(A, app(fetch_paper(i, n + 1), app(fetch_paper(j, n), v)))
def not_gate(i: int, n: int, v: str) -> str:
"""Append NOT(xi) to a length-n vector v, producing length n+1."""
return app(fetch_paper(i, n, neg=True), v)
def or_gate_dm(i: int, j: int, n: int, v: str) -> Tuple[str, int, int]:
"""Append (xi OR xj) using De Morgan with only not_gate + and_gate.
Expands to 4 steps, so vector length increases by 4.
Returns (v_out, n_out, out_pos).
"""
# x_{n+1} := ~xi
v = not_gate(i, n, v)
n += 1
t1 = n
# x_{n+1} := ~xj
v = not_gate(j, n, v)
n += 1
t2 = n
# x_{n+1} := t1 & t2
v = and_gate(t1, t2, n, v)
n += 1
t3 = n
# x_{n+1} := ~t3 = xi OR xj
v = not_gate(t3, n, v)
n += 1
out = n
return v, n, out
# -------------------------
# CVP gate compilation
# -------------------------
Eq = Union[Tuple[str, int], Tuple[str, int, int]] # ('const',0|1) / ('not',i) / ('and',i,j) / ('or',i,j)
def compile_cvp(eqs: Sequence[Eq]) -> Tuple[str, int, List[int]]:
"""Compile a high-level CVP gate list to a single vector term.
Returns (vector_term, final_length, wire_pos)
- wire_pos[k-1] is the actual vector position of logical gate k.
- OR introduces auxiliaries, but wire_pos always points to the gate output.
"""
v = T.tup([])
n = 0
wire_pos: List[int] = []
for gate_idx, eq in enumerate(eqs, start=1):
op = eq[0]
if op == 'const':
bit = int(eq[1])
const = TRUE if bit == 1 else FALSE
v = app(append_const_op(n, const), v)
n += 1
wire_pos.append(n)
elif op == 'not':
i = int(eq[1])
src_pos = wire_pos[i - 1]
v = not_gate(src_pos, n, v)
n += 1
wire_pos.append(n)
elif op == 'and':
i, j = int(eq[1]), int(eq[2])
pi, pj = wire_pos[i - 1], wire_pos[j - 1]
v = and_gate(pi, pj, n, v)
n += 1
wire_pos.append(n)
elif op == 'or':
i, j = int(eq[1]), int(eq[2])
pi, pj = wire_pos[i - 1], wire_pos[j - 1]
v, n, out_pos = or_gate_dm(pi, pj, n, v)
wire_pos.append(out_pos)
else:
raise ValueError(f'unknown gate op: {op}')
return v, n, wire_pos
def eval_cvp_output_tag(tagger: BoolTagger, eqs: Sequence[Eq]) -> str:
v, n, wire_pos = compile_cvp(eqs)
out_pos = wire_pos[-1]
out_term = proj_of_tuple(v, out_pos, n)
return tagger.tag(out_term)
# -------------------------
# CNF helper
# -------------------------
def parse_dimacs(path: str) -> Tuple[int, List[List[int]]]:
clauses: List[List[int]] = []
nvars = 0
with open(path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line or line.startswith('c'):
continue
if line.startswith('p'):
parts = line.split()
if len(parts) >= 4 and parts[1] == 'cnf':
nvars = int(parts[2])
continue
lits = [int(x) for x in line.split()]
if not lits:
continue
if lits[-1] == 0:
lits = lits[:-1]
if lits:
clauses.append(lits)
if nvars <= 0:
# fall back
nvars = max((abs(l) for c in clauses for l in c), default=0)
return nvars, clauses
def compile_cnf_to_cvp(nvars: int, clauses: List[List[int]], assignment: List[int]) -> List[Eq]:
"""Compile CNF under a *fixed* assignment to a CVP gate list.
assignment is a list of ints like [1,-2,3] meaning x1=T, x2=F, x3=T.
Missing vars default to F.
"""
asg: Dict[int, int] = {abs(v): (1 if v > 0 else 0) for v in assignment}
eqs: List[Eq] = []
# First, create x1..xn as constants (the assignment).
for i in range(1, nvars + 1):
eqs.append(('const', int(asg.get(i, 0))))
def lit_to_wire(lit: int) -> int:
vid = abs(lit)
if lit > 0:
return vid
# negated literal: create a NOT gate
eqs.append(('not', vid))
return len(eqs)
clause_wires: List[int] = []
for clause in clauses:
if not clause:
# empty clause => UNSAT
eqs.append(('const', 0))
clause_wires.append(len(eqs))
continue
cur = lit_to_wire(clause[0])
for lit in clause[1:]:
nxt = lit_to_wire(lit)
eqs.append(('or', cur, nxt))
cur = len(eqs)
clause_wires.append(cur)
# AND all clauses
if not clause_wires:
eqs.append(('const', 1))
return eqs
cur = clause_wires[0]
for w in clause_wires[1:]:
eqs.append(('and', cur, w))
cur = len(eqs)
return eqs
# -------------------------
# CLI helpers
# -------------------------
def load_term_arg(term: Optional[str], term_file: Optional[str]) -> str:
if term is None and term_file is None:
raise SystemExit('need --term or --term-file')
if term is not None and term_file is not None:
raise SystemExit('use only one of --term or --term-file')
if term_file is not None:
return Path(term_file).read_text(encoding='utf-8')
return term # type: ignore[return-value]
def load_eqs(args: argparse.Namespace) -> List[Eq]:
if args.eqs_file:
data = json.loads(Path(args.eqs_file).read_text(encoding='utf-8'))
elif args.eqs_json:
data = json.loads(args.eqs_json)
else:
raise SystemExit('need --eqs-json or --eqs-file')
eqs: List[Eq] = []
for e in data:
if not isinstance(e, (list, tuple)) or not e:
raise SystemExit(f'bad eq: {e!r}')
op = e[0]
if op == 'const':
eqs.append(('const', int(e[1])))
elif op == 'not':
eqs.append(('not', int(e[1])))
elif op in ('and', 'or'):
eqs.append((op, int(e[1]), int(e[2])))
else:
raise SystemExit(f'unknown op in eq: {e!r}')
return eqs
def parse_assign(s: str) -> List[int]:
# "1,-2,3" or "1 -2 3"
s = s.replace(',', ' ')
out: List[int] = []
for part in s.split():
out.append(int(part))
return out
# -------------------------
# Subcommands
# -------------------------
def cmd_norm(args: argparse.Namespace) -> int:
eng = make_engine(args)
term = load_term_arg(args.term, args.term_file)
nf = eng.normalize(term)
print(nf)
return 0
def cmd_tag(args: argparse.Namespace) -> int:
eng = make_engine(args)
tagger = BoolTagger.build(eng)
term = load_term_arg(args.term, args.term_file)
print(tagger.tag(term))
return 0
def cmd_paper_example(args: argparse.Namespace) -> int:
eng = make_engine(args)
tagger = BoolTagger.build(eng)
# Paper example (as we used):
# x1:=1; x2:=0; x3:=1; x4=x1&x2; x5=~x1; x6=x5&x3; x7=x4|x6
# We implement OR via De Morgan macro (4 extra bits). We only check each *new* bit.
def step(v: str, n: int, expected: str) -> str:
newbit = tagger.tag(proj_of_tuple(v, n, n))
ok = 'OK' if newbit == expected else 'BAD'
print(f" new x{n} = {newbit} (expect {expected}) => {ok}")
return newbit
bits: List[str] = []
v = T.tup([])
n = 0
# x1 := 1
v = app(append_const_op(n, TRUE), v); n += 1
bits.append(step(v, n, 'T'))
# x2 := 0
v = app(append_const_op(n, FALSE), v); n += 1
bits.append(step(v, n, 'F'))
# x3 := 1
v = app(append_const_op(n, TRUE), v); n += 1
bits.append(step(v, n, 'T'))
# x4 := x1 & x2
v = and_gate(1, 2, n, v); n += 1
exp = 'T' if (bits[0] == 'T' and bits[1] == 'T') else 'F'
bits.append(step(v, n, exp))
# x5 := ~x1
v = not_gate(1, n, v); n += 1
exp = 'T' if bits[0] == 'F' else 'F'
bits.append(step(v, n, exp))
# x6 := x5 & x3
v = and_gate(5, 3, n, v); n += 1
exp = 'T' if (bits[4] == 'T' and bits[2] == 'T') else 'F'
bits.append(step(v, n, exp))
# OR macro: x7 := ~x4 ; x8 := ~x6 ; x9 := x7 & x8 ; x10 := ~x9
v = not_gate(4, n, v); n += 1
exp = 'T' if bits[3] == 'F' else 'F'
bits.append(step(v, n, exp))
v = not_gate(6, n, v); n += 1
exp = 'T' if bits[5] == 'F' else 'F'
bits.append(step(v, n, exp))
v = and_gate(7, 8, n, v); n += 1
exp = 'T' if (bits[6] == 'T' and bits[7] == 'T') else 'F'
bits.append(step(v, n, exp))
v = not_gate(9, n, v); n += 1
exp = 'T' if bits[8] == 'F' else 'F'
bits.append(step(v, n, exp))
print(f"\nFinal derived OR bit (x{n}) = {bits[-1]}")
print("Expected (x4 OR x6) =", 'T' if (bits[3] == 'T' or bits[5] == 'T') else 'F')
return 0
def cmd_cvp(args: argparse.Namespace) -> int:
eng = make_engine(args)
tagger = BoolTagger.build(eng)
eqs = load_eqs(args)
v, n, wire_pos = compile_cvp(eqs)
if args.dump_wires:
print('wire_pos (logical_gate -> vector_position):')
for i, p in enumerate(wire_pos, start=1):
print(f' g{i} -> x{p}')
if args.dump_gate_tags:
for gi, p in enumerate(wire_pos, start=1):
b = tagger.tag(proj_of_tuple(v, p, n))
print(f'g{gi} (x{p}) = {b}')
out_pos = wire_pos[-1]
out = tagger.tag(proj_of_tuple(v, out_pos, n))
print(out)
return 0
def cmd_cnf_eval(args: argparse.Namespace) -> int:
eng = make_engine(args)
tagger = BoolTagger.build(eng)
nvars, clauses = parse_dimacs(args.dimacs)
assignment = parse_assign(args.assign)
eqs = compile_cnf_to_cvp(nvars, clauses, assignment)
if args.dump_eqs:
print(json.dumps(eqs))
out = eval_cvp_output_tag(tagger, eqs)
print(out)
return 0
# -------------------------
# Main
# -------------------------
def build_argparser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(prog='topcvp_beta_cli.py')
p.add_argument(
'--normalizer',
choices=['planar_nf', 'external'],
default='planar_nf',
help='normalization backend',
)
p.add_argument(
'--normalizer-bin',
default=None,
help='path to external normalizer binary (only for --normalizer external)',
)
sub = p.add_subparsers(dest='cmd', required=True)
s_norm = sub.add_parser('norm', help='normalize a term and print NF')
s_norm.add_argument('--term', default=None)
s_norm.add_argument('--term-file', default=None)
s_norm.set_defaults(func=cmd_norm)
s_tag = sub.add_parser('tag', help='tag a boolean term as T/F/? using DEC')
s_tag.add_argument('--term', default=None)
s_tag.add_argument('--term-file', default=None)
s_tag.set_defaults(func=cmd_tag)
s_paper = sub.add_parser('paper-example', help="run the paper example circuit (fast tagging)")
s_paper.set_defaults(func=cmd_paper_example)
s_cvp = sub.add_parser('cvp', help='evaluate a CVP gate list (JSON) and print final T/F/?')
s_cvp.add_argument('--eqs-json', default=None, help='JSON list of gates')
s_cvp.add_argument('--eqs-file', default=None, help='path to JSON file')
s_cvp.add_argument('--dump-wires', action='store_true', help='print logical->vector mapping')
s_cvp.add_argument('--dump-gate-tags', action='store_true', help='tag each gate output')
s_cvp.set_defaults(func=cmd_cvp)
s_cnf = sub.add_parser('cnf-eval', help='evaluate DIMACS CNF under a fixed assignment')
s_cnf.add_argument('--dimacs', required=True)
s_cnf.add_argument('--assign', required=True, help='e.g. "1,-2,3" meaning x1=T,x2=F,x3=T')
s_cnf.add_argument('--dump-eqs', action='store_true', help='print the compiled gate list')
s_cnf.set_defaults(func=cmd_cnf_eval)
return p
def main(argv: Optional[Sequence[str]] = None) -> int:
ap = build_argparser()
args = ap.parse_args(argv)
return int(args.func(args))
if __name__ == '__main__':
raise SystemExit(main())
@Epivalent
Copy link
Author

"""Paper-aligned term constructors.

All terms are returned as fully parenthesised strings for the LamVIZ parser.
"""

from __future__ import annotations

from typing import List, Sequence


def var(x: str) -> str:
    return x


def lam(x: str, body: str) -> str:
    return f"(\\{x}.{body})"


def lams(xs: Sequence[str], body: str) -> str:
    t = body
    for x in reversed(xs):
        t = lam(x, t)
    return t


def app(f: str, a: str) -> str:
    return f"({f} {a})"


def apps(f: str, *args: str) -> str:
    t = f
    for a in args:
        t = app(t, a)
    return t


def tup(bits: Sequence[str]) -> str:
    if not bits:
        return lam('k', var('k'))
    acc = var('k')
    for b in bits:
        acc = app(acc, b)
    return lam('k', acc)


def ktuple(m: int) -> str:
    bs = [f"b{i}" for i in range(1, m + 1)]
    body = var('kk')
    for b in bs:
        body = app(body, var(b))
    return lams(bs, lam('kk', body))


# ---- paper booleans + helpers ----

ID = lam('u', var('u'))
TRUE = lams(['k', 'f'], app(app(var('k'), ID), var('f')))
FALSE = lams(['k', 'f'], app(app(var('k'), var('f')), ID))

DEC = lam(
    'b',
    app(
        app(var('b'), lams(['p', 'q'], app(var('q'), ID))),
        lams(['w', 'x', 'y'], var('x')),
    ),
)

COMP = lams(['f', 'g', 'x'], app(var('f'), app(var('g'), var('x'))))
K = lams(['g', 'h', 'x'], app(var('g'), app(var('h'), var('x'))))

CSTT = lams(
    ['b', 'k', 'f'],
    app(
        app(
            var('b'),
            lams(
                ['g', 'h'],
                app(app(var('k'), ID), app(app(COMP, var('g')), var('h'))),
            ),
        ),
        var('f'),
    ),
)

CSTF = lams(
    ['b', 'k', 'f'],
    app(
        app(
            var('b'),
            lams(
                ['g', 'h'],
                app(app(var('k'), app(app(COMP, var('g')), var('h'))), ID),
            ),
        ),
        var('f'),
    ),
)

NOT = lam(
    'b',
    app(
        app(
            var('b'),
            lams(['g', 'h'], app(var('g'), app(CSTT, app(var('h'), TRUE))))
        ),
        CSTF,
    ),
)

AND = lams(
    ['b1', 'b2', 'k'],
    app(
        var('b1'),
        lam(
            'f1',
            app(
                var('b2'),
                lams(
                    ['f2', 'f3'],
                    app(app(var('k'), lam('x', app(var('f1'), app(var('f2'), var('x'))))), var('f3')),
                ),
            ),
        ),
    ),
)


# ---- vector operations (paper patterns) ----

def last_n(n: int) -> str:
    if n < 1:
        raise ValueError('n must be >= 1')
    if n == 1:
        return lam('v', app(var('v'), lam('x1', var('x1'))))
    inner = var(f"x{n}")
    for j in range(n - 1, 0, -1):
        inner = app(app(app(var(f"x{j}"), K), ID), inner)
    xs = [f"x{i}" for i in range(1, n + 1)]
    return lam('v', app(var('v'), lams(xs, inner)))


def and_prime(n: int) -> str:
    if n < 0:
        raise ValueError('n must be >= 0')
    xs = [f"x{i}" for i in range(1, n + 3)]
    out = var('k')
    for i in range(1, n + 1):
        out = app(out, var(f"x{i}"))
    out = app(out, app(app(AND, var(f"x{n+1}")), var(f"x{n+2}")))
    return lams(['v', 'k'], app(var('v'), lams(xs, out)))


# --- fetch_{i,n} (paper; built structurally) ---

def _setter(n: int, j: int, i: int, *, which: str) -> str:
    if which not in ('t', 'f'):
        raise ValueError(which)
    CST = CSTT if which == 't' else CSTF
    bs = [f"b{t}" for t in range(1, n + 2)]  # b1..b_{n+1}
    args: List[str] = []
    for t in range(1, n + 2):
        if t == j:
            args.append(app(CST, var(f"b{t}")))
        elif (t == n + 1) and (j == i):
            args.append(app(CST, var(f"b{t}")))
        else:
            args.append(var(f"b{t}"))
    body = var('k')
    for a in args:
        body = app(body, a)
    return lam('k', lams(bs, body))


def _cj(n: int, j: int, i: int) -> str:
    S = _setter(n, j, i, which='t')
    return lams(['g', 'h', 'x'], app(var('g'), app(S, app(var('h'), var('x')))))


def _fj(n: int, j: int, i: int) -> str:
    return _setter(n, j, i, which='f')


def fetch(i: int, n: int) -> str:
    if not (1 <= i <= n):
        raise ValueError('need 1 <= i <= n')
    base = tup([FALSE] * (n + 1))
    rest = base
    # fold from x_n ... x_1
    for j in range(n, 0, -1):
        rest = app(app(app(var(f"x{j}"), _cj(n, j, i)), _fj(n, j, i)), rest)
    xs = [f"x{j}" for j in range(1, n + 1)]
    return lam('v', app(var('v'), lams(xs, rest)))


def bool_tag_expr(b: str) -> str:
    return app(DEC, b)

@Epivalent
Copy link
Author

#!/usr/bin/env python3
"""topcvp_beta.py

Single-file toolkit for the paper-style planar/linear TopCVP construction:
- Implements fetch_{i,n} exactly as in the paper (as a fold of x_j c_j f_j over a base tuple).
- Implements NOT by *substitution inside fetch* (swap CSTT/CSTF only at the i=j appended slot).
- Implements in-place AND (and'n) and derived and{i,j,n} = and'n ∘ fetch{i,n+1} ∘ fetch_{j,n}.
- Implements OR via De Morgan using ONLY (not_subst) + and'.
- Includes an *inlined fast beta-normalizer* (planar_nf) for tagging booleans via DEC.
- Optional external normalizer binary: pass --nf-bin /path/to/bin (argv1=term, stdout=NF term).

Notes:
- The inlined normalizer assumes linear variable use (each binder occurs at most once). This matches the intended
  planar/linear terms in this workflow.
- Terms are strings in backslash-lambda syntax: (\\x.BODY), (F A), variables like x1, b5, etc.

CLI examples:
  python3 topcvp_beta.py paper-example
  python3 topcvp_beta.py truth-table --expr xor
  python3 topcvp_beta.py eval-cvp --cvp 'const 1\nconst 0\nand 1 2\nnot 3\n'
  python3 topcvp_beta.py eval-dimacs --dimacs path/to.cnf --assign 10110
"""

from __future__ import annotations

import argparse
import itertools
import subprocess
from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union

# -----------------------------
# Inlined fast beta-normalizer (planar_nf.py)
# -----------------------------

import re
import itertools as _itertools

_binder_ids = _itertools.count(1)

class Node:
    __slots__ = ("parent", "pslot")
    def __init__(self) -> None:
        self.parent: Optional["Node"] = None
        self.pslot: Optional[str] = None  # which field in parent: "body" / "fn" / "arg"

class Binder:
    __slots__ = ("name", "id", "occ")
    def __init__(self, name: str) -> None:
        self.name = name
        self.id = next(_binder_ids)
        self.occ: Optional["Var"] = None  # planar/linear: 0 or 1 occurrence

class Var(Node):
    __slots__ = ("binder", "name")
    def __init__(self, name: str, binder: Optional[Binder] = None) -> None:
        super().__init__()
        self.binder = binder
        self.name = name
        if binder is not None:
            if binder.occ is None:
                binder.occ = self
            else:
                raise ValueError(
                    f"Non-linear variable use: binder '{binder.name}' appears more than once."
                )

class Abs(Node):
    __slots__ = ("binder", "body")
    def __init__(self, binder: Binder, body: Node) -> None:
        super().__init__()
        self.binder = binder
        self.body = body
        body.parent = self
        body.pslot = "body"

class App(Node):
    __slots__ = ("fn", "arg")
    def __init__(self, fn: Node, arg: Node) -> None:
        super().__init__()
        self.fn = fn
        self.arg = arg
        fn.parent = self
        fn.pslot = "fn"
        arg.parent = self
        arg.pslot = "arg"

Term = Union[Var, Abs, App]

_ident_re = re.compile(r"[A-Za-z][A-Za-z0-9_]*")

def tokenize(src: str) -> Iterator[str]:
    i = 0
    n = len(src)
    while i < n:
        c = src[i]
        if c.isspace():
            i += 1
            continue
        if c in ("(", ")", "."):
            yield c
            i += 1
            continue
        if c == "\\" or c == "λ":
            yield "\\"
            i += 1
            continue
        m = _ident_re.match(src, i)
        if not m:
            raise SyntaxError(f"Unexpected character at {i}: {src[i:i+20]!r}")
        yield m.group(0)
        i = m.end()

def parse(src: str) -> Term:
    toks = list(tokenize(src))
    tstack: List[object] = []
    bstack: List[Binder] = []
    i = 0

    def reduce_lams() -> None:
        nonlocal tstack, bstack
        while (
            len(tstack) >= 2
            and isinstance(tstack[-2], tuple)
            and tstack[-2][0] == "LAM"
            and isinstance(tstack[-1], Node)
        ):
            _, b = tstack[-2]
            body = tstack.pop()  # type: ignore
            tstack.pop()
            if not bstack or bstack[-1] is not b:
                raise RuntimeError("Binder stack mismatch while reducing lambda.")
            bstack.pop()
            tstack.append(Abs(b, body))  # type: ignore

    while i < len(toks):
        tok = toks[i]
        i += 1

        if tok == "(":
            tstack.append(("LPAREN", len(bstack)))
            continue

        if tok == ")":
            items: List[Node] = []
            while tstack:
                top = tstack.pop()
                if isinstance(top, tuple) and top[0] == "LPAREN":
                    target_len = top[1]
                    if len(bstack) < target_len:
                        raise RuntimeError("Binder stack underflow at ')'.")
                    del bstack[target_len:]
                    break
                if not isinstance(top, Node):
                    raise SyntaxError("Malformed group inside parentheses.")
                items.append(top)
            else:
                raise SyntaxError("Unmatched ')'.")
            items.reverse()
            if not items:
                raise SyntaxError("Empty parentheses group.")
            term: Node = items[0]
            for nxt in items[1:]:
                term = App(term, nxt)
            tstack.append(term)
            reduce_lams()
            continue

        if tok == "\\":
            if i >= len(toks):
                raise SyntaxError("Unexpected end after lambda.")
            name = toks[i]
            i += 1
            if i >= len(toks) or toks[i] != ".":
                raise SyntaxError("Expected '.' after lambda parameter.")
            i += 1
            b = Binder(name)
            bstack.append(b)
            tstack.append(("LAM", b))
            continue

        if tok == ".":
            raise SyntaxError("Unexpected '.'.")

        bnd: Optional[Binder] = None
        for b in reversed(bstack):
            if b.name == tok:
                bnd = b
                break
        tstack.append(Var(tok, bnd))
        reduce_lams()

    if any(isinstance(x, tuple) and x[0] == "LPAREN" for x in tstack):
        raise SyntaxError("Unmatched '('.")

    terms = [x for x in tstack if isinstance(x, Node)]
    if not terms:
        raise SyntaxError("No term parsed.")
    term: Node = terms[0]
    for nxt in terms[1:]:
        term = App(term, nxt)
    return term  # type: ignore

def _replace_child(parent: Node, slot: str, new: Node) -> None:
    if isinstance(parent, Abs) and slot == "body":
        parent.body = new
    elif isinstance(parent, App) and slot == "fn":
        parent.fn = new
    elif isinstance(parent, App) and slot == "arg":
        parent.arg = new
    else:
        raise RuntimeError("Bad parent/slot in replacement.")
    new.parent = parent
    new.pslot = slot

def beta(abs_node: Abs, arg: Node) -> Node:
    b = abs_node.binder
    body = abs_node.body

    occ = b.occ
    if occ is None:
        body.parent = None
        body.pslot = None
        return body

    if occ is body:
        arg.parent = None
        arg.pslot = None
        return arg

    p = occ.parent
    slot = occ.pslot
    if p is None:
        arg.parent = None
        arg.pslot = None
        return arg
    assert slot is not None
    _replace_child(p, slot, arg)

    body.parent = None
    body.pslot = None
    return body

def whnf(t: Node) -> Node:
    while True:
        apps: List[App] = []
        cur: Node = t
        while isinstance(cur, App):
            apps.append(cur)
            cur = cur.fn

        reduced = False

        for appn in reversed(apps):
            appn.fn = cur
            cur.parent = appn
            cur.pslot = "fn"

            if isinstance(cur, Abs):
                cur = beta(cur, appn.arg)
                reduced = True
            else:
                cur = appn

        cur.parent = None
        cur.pslot = None
        t = cur

        if not reduced:
            return t

def normalize(t: Node) -> Node:
    t = whnf(t)
    stack: List[Tuple[str, Node]] = []

    while True:
        if isinstance(t, Abs):
            stack.append(("abs", t))
            t = whnf(t.body)
            continue

        if isinstance(t, App):
            stack.append(("app_fn", t))
            t = whnf(t.fn)
            continue

        while stack:
            tag, node = stack.pop()

            if tag == "abs":
                absn: Abs = node  # type: ignore
                absn.body = t
                t.parent = absn
                t.pslot = "body"
                t = absn
                continue

            if tag == "app_fn":
                appn: App = node  # type: ignore
                appn.fn = t
                t.parent = appn
                t.pslot = "fn"
                stack.append(("app_arg", appn))
                t = whnf(appn.arg)
                break

            if tag == "app_arg":
                appn: App = node  # type: ignore
                appn.arg = t
                t.parent = appn
                t.pslot = "arg"
                t = whnf(appn)
                continue

            raise RuntimeError("Unknown frame tag.")

        else:
            t.parent = None
            t.pslot = None
            return t

def to_string(t: Node) -> str:
    env: Dict[int, str] = {}
    used: Dict[str, int] = {}

    def fresh(base: str) -> str:
        k = used.get(base, 0)
        used[base] = k + 1
        return base if k == 0 else f"{base}{k}"

    out: List[str] = []
    actions: List[Tuple[str, object]] = [("node", t)]

    while actions:
        kind, payload = actions.pop()

        if kind == "str":
            out.append(payload)  # type: ignore
            continue

        if kind == "pop":
            bid = payload  # type: ignore
            env.pop(bid, None)
            continue

        n = payload  # type: ignore
        if isinstance(n, Var):
            if n.binder is None:
                out.append(n.name)
            else:
                out.append(env.get(n.binder.id, n.binder.name))
            continue

        if isinstance(n, Abs):
            b = n.binder
            name = env.get(b.id)
            if name is None:
                name = fresh(b.name)
                env[b.id] = name
            actions.append(("pop", b.id))
            actions.append(("str", ")"))
            actions.append(("node", n.body))
            actions.append(("str", "."))
            actions.append(("str", name))
            actions.append(("str", "\\"))
            actions.append(("str", "("))
            continue

        if isinstance(n, App):
            actions.append(("str", ")"))
            actions.append(("node", n.arg))
            actions.append(("str", " "))
            actions.append(("node", n.fn))
            actions.append(("str", "("))
            continue

        raise TypeError("Unknown node type in to_string().")

    return "".join(out)

def normalize_to_string(src: str) -> str:
    return to_string(normalize(parse(src)))

# -----------------------------
# Term constructors + TopCVP ops (paper)
# -----------------------------

# We rely on your existing, correct primitive definitions from plc_suite.terms
import sys
sys.path.insert(0, '/mnt/data')
import plc_suite.terms as T

var, lam, lams, app = T.var, T.lam, T.lams, T.app
TRUE, FALSE = T.TRUE, T.FALSE
COMP, AND = T.COMP, T.AND
CSTT, CSTF = T.CSTT, T.CSTF

def proj_of_tuple(t: str, j: int, m: int) -> str:
    xs = [f'x{i}' for i in range(1, m + 1)]
    return app(t, lams(xs, var(f'x{j}')))

def compose(f: str, g: str) -> str:
    return app(app(COMP, f), g)

def append_const_op(n: int, const: str) -> str:
    """Unary op on tuples: <x1..xn> -> <x1..xn,const> (CPS with k)."""
    xs = [f'x{i}' for i in range(1, n + 1)]
    body = var('k')
    for x in xs:
        body = app(body, var(x))
    body = app(body, const)
    return lams(['v', 'k'], app(var('v'), lams(xs, body)))

def setterT(n: int, j: int, i: int, *, which: str, neg: bool = False) -> str:
    """Unary transformer on Church tuples.

    which='t' => CSTT at position j
    which='f' => CSTF at position j

    NOT-substitution: when (j == i), swap CSTT<->CSTF only at the appended slot b_{n+1}.
    """
    if which not in ('t', 'f'):
        raise ValueError(which)

    CSTj = CSTT if which == 't' else CSTF

    if which == 't':
        CSTapp = CSTT if not neg else CSTF
    else:
        CSTapp = CSTF if not neg else CSTT

    bs = [f'b{t}' for t in range(1, n + 2)]  # b1..b_{n+1}
    args: List[str] = []
    for t in range(1, n + 2):
        if t == j:
            args.append(app(CSTj, var(f'b{t}')))
        elif (t == n + 1) and (j == i):
            args.append(app(CSTapp, var(f'b{t}')))
        else:
            args.append(var(f'b{t}'))

    body = var('k')
    for a in args:
        body = app(body, a)

    return lam('t', lam('k', app(var('t'), lams(bs, body))))

def cj(n: int, j: int, i: int, *, neg: bool = False) -> str:
    S = setterT(n, j, i, which='t', neg=neg)
    return lams(['g', 'h'], compose(var('g'), compose(S, var('h'))))

def fj(n: int, j: int, i: int, *, neg: bool = False) -> str:
    return setterT(n, j, i, which='f', neg=neg)

def fetch_paper(i: int, n: int, *, neg: bool = False) -> str:
    """fetch_{i,n}; neg=True activates NOT-substitution (swap at appended slot when j=i)."""
    if not (1 <= i <= n):
        raise ValueError('need 1 <= i <= n')

    base = T.tup([FALSE] * (n + 1))
    rest = base
    for j in range(n, 0, -1):
        rest = app(app(app(var(f'x{j}'), cj(n, j, i, neg=neg)), fj(n, j, i, neg=neg)), rest)
    xs = [f'x{j}' for j in range(1, n + 1)]
    return lam('v', app(var('v'), lams(xs, rest)))

def and_prime_paper(n: int) -> str:
    """and'n: <x1..x{n+2}> -> <x1..xn, x_{n+1} AND x_{n+2}> (CPS with k)."""
    if n < 0:
        raise ValueError('n must be >= 0')
    xs = [f'x{i}' for i in range(1, n + 3)]
    out = var('k')
    for i in range(1, n + 1):
        out = app(out, var(f'x{i}'))
    out = app(out, app(app(AND, var(f'x{n+1}')), var(f'x{n+2}')))
    return lams(['v', 'k'], app(var('v'), lams(xs, out)))

def and_gate(i: int, j: int, n: int, v: str) -> str:
    """Append x_{n+1} := x_i AND x_j to a length-n vector v."""
    A = and_prime_paper(n)
    return app(A, app(fetch_paper(i, n + 1), app(fetch_paper(j, n), v)))

def not_gate(i: int, n: int, v: str) -> str:
    """Append x_{n+1} := NOT(x_i) to a length-n vector v via NOT-substitution inside fetch."""
    return app(fetch_paper(i, n, neg=True), v)

def or_gate_dm(i: int, j: int, n: int, v: str) -> Tuple[str, int]:
    """Append OR via De Morgan using ONLY not_gate + and_gate.

    Starting from length-n vector v, append:
      x_{n+1} := ~x_i
      x_{n+2} := ~x_j
      x_{n+3} := x_{n+1} & x_{n+2}
      x_{n+4} := ~x_{n+3}   (this equals x_i OR x_j)

    Returns (v_new, new_length).
    """
    v1 = not_gate(i, n, v)
    n1 = n + 1
    v2 = not_gate(j, n1, v1)
    n2 = n + 2
    v3 = and_gate(n1, n2, n2, v2)
    n3 = n + 3
    v4 = not_gate(n3, n3, v3)
    n4 = n + 4
    return v4, n4

# -----------------------------
# Boolean tagging (DEC) with either inline normalizer or external binary
# -----------------------------

class Normalizer:
    def __init__(self, nf_bin: Optional[str] = None):
        self.nf_bin = nf_bin
        if nf_bin:
            self._dec_true_nf = self._nf_bin(f"({T.DEC} {T.TRUE})")
            self._dec_false_nf = self._nf_bin(f"({T.DEC} {T.FALSE})")
        else:
            self._dec_true_nf = normalize_to_string(f"({T.DEC} {T.TRUE})")
            self._dec_false_nf = normalize_to_string(f"({T.DEC} {T.FALSE})")

    def _nf_bin(self, term: str) -> str:
        out = subprocess.check_output([self.nf_bin, term], text=True)
        return out.strip()

    def nf(self, term: str) -> str:
        if self.nf_bin:
            return self._nf_bin(term)
        return normalize_to_string(term)

    def tag_bool(self, bterm: str) -> str:
        src = f"({T.DEC} {bterm})"
        nf = self.nf(src)
        if nf == self._dec_true_nf:
            return 'T'
        if nf == self._dec_false_nf:
            return 'F'
        return '?'

# -----------------------------
# CVP evaluator + CNF compiler
# -----------------------------

Gate = Tuple[str, ...]  # ('const','1') | ('not','i') | ('and','i','j') | ('or','i','j')

def parse_cvp(text: str) -> List[Gate]:
    gates: List[Gate] = []
    for raw in text.splitlines():
        line = raw.strip()
        if not line or line.startswith('#'):
            continue
        parts = line.split()
        op = parts[0].lower()
        if op == 'const':
            if len(parts) != 2 or parts[1] not in ('0', '1'):
                raise ValueError(f"bad const line: {raw!r}")
            gates.append(('const', parts[1]))
        elif op == 'not':
            if len(parts) != 2:
                raise ValueError(f"bad not line: {raw!r}")
            gates.append(('not', parts[1]))
        elif op in ('and', 'or'):
            if len(parts) != 3:
                raise ValueError(f"bad {op} line: {raw!r}")
            gates.append((op, parts[1], parts[2]))
        else:
            raise ValueError(f"unknown gate op: {op!r}")
    return gates

def eval_cvp(
    gates: Sequence[Gate],
    *,
    init_bits: str = '',
    norm: Normalizer,
    verbose: bool = True
) -> Tuple[str, int, List[str]]:
    """Evaluate CVP as a sequence of appended bits.

    init_bits: bitstring of initial variables to append first (e.g. '101').

    Returns (vector_term, length, python_tags_for_all_bits).
    """
    bits: List[str] = []
    v = T.tup([])
    n = 0

    # initialize variables
    for ch in init_bits.strip():
        if ch not in '01':
            raise ValueError('init_bits must be a bitstring')
        const = TRUE if ch == '1' else FALSE
        v = app(append_const_op(n, const), v)
        n += 1
        bits.append('T' if ch == '1' else 'F')
        if verbose:
            print(f"  init x{n} = {bits[-1]}")

    # apply gates
    for g in gates:
        op = g[0]
        if op == 'const':
            const = TRUE if g[1] == '1' else FALSE
            v = app(append_const_op(n, const), v)
            n += 1
            expected = 'T' if g[1] == '1' else 'F'
            bits.append(expected)
            if verbose:
                got = norm.tag_bool(proj_of_tuple(v, n, n))
                print(f"  const -> new x{n} = {got} (expect {expected})")
            continue

        if op == 'not':
            i = int(g[1])
            v = not_gate(i, n, v)
            n += 1
            expected = 'T' if bits[i-1] == 'F' else 'F'
            bits.append(expected)
            if verbose:
                got = norm.tag_bool(proj_of_tuple(v, n, n))
                print(f"  not {i} -> new x{n} = {got} (expect {expected})")
            continue

        if op == 'and':
            i, j = int(g[1]), int(g[2])
            v = and_gate(i, j, n, v)
            n += 1
            expected = 'T' if (bits[i-1] == 'T' and bits[j-1] == 'T') else 'F'
            bits.append(expected)
            if verbose:
                got = norm.tag_bool(proj_of_tuple(v, n, n))
                print(f"  and {i} {j} -> new x{n} = {got} (expect {expected})")
            continue

        if op == 'or':
            i, j = int(g[1]), int(g[2])
            # De Morgan OR appends 4 bits; we report only the final OR bit.
            v, n = or_gate_dm(i, j, n, v)
            # update python truth list for each appended bit
            not_i = 'T' if bits[i-1] == 'F' else 'F'
            not_j = 'T' if bits[j-1] == 'F' else 'F'
            bits.append(not_i)  # x_{old+1}
            bits.append(not_j)  # x_{old+2}
            bits.append('T' if (not_i == 'T' and not_j == 'T') else 'F')  # x_{old+3}
            bits.append('T' if bits[-1] == 'F' else 'F')  # x_{old+4} = OR
            if verbose:
                got = norm.tag_bool(proj_of_tuple(v, n, n))
                expected = bits[-1]
                print(f"  or {i} {j} -> new x{n} = {got} (expect {expected})")
            continue

        raise RuntimeError(f"unhandled gate: {g}")

    return v, n, bits

def parse_dimacs(path: str) -> Tuple[int, List[List[int]]]:
    nvars = 0
    clauses: List[List[int]] = []
    with open(path, 'r', encoding='utf-8') as f:
        for raw in f:
            line = raw.strip()
            if not line or line.startswith('c'):
                continue
            if line.startswith('p'):
                parts = line.split()
                if len(parts) >= 4 and parts[1] == 'cnf':
                    nvars = int(parts[2])
                continue
            lits = [int(x) for x in line.split() if x != '0']
            if lits:
                clauses.append(lits)
    if nvars <= 0:
        # fallback: infer from clauses
        nvars = max((abs(l) for cl in clauses for l in cl), default=0)
    return nvars, clauses

def eval_cnf_dimacs(path: str, assign_bits: str, *, norm: Normalizer, verbose: bool = True) -> str:
    """Evaluate DIMACS CNF under assignment using TopCVP ops.

    This compiles the CNF to an appended-bit circuit, then returns the final CNF truth tag.
    """
    nvars, clauses = parse_dimacs(path)
    if len(assign_bits) != nvars:
        raise ValueError(f"assignment length {len(assign_bits)} != nvars {nvars}")

    # Start with variables x1..xnvars as given bits.
    v = T.tup([])
    n = 0
    bits: List[str] = []
    for ch in assign_bits:
        const = TRUE if ch == '1' else FALSE
        v = app(append_const_op(n, const), v)
        n += 1
        bits.append('T' if ch == '1' else 'F')

    def lit_value_idx(lit: int) -> int:
        nonlocal v, n, bits
        var_idx = abs(lit)
        if lit > 0:
            return var_idx
        # negative literal: append NOT(var_idx)
        v = not_gate(var_idx, n, v)
        n += 1
        val = 'T' if bits[var_idx-1] == 'F' else 'F'
        bits.append(val)
        return n

    # For each clause, compute its OR by folding pairwise OR via De Morgan.
    clause_idxs: List[int] = []
    for cl in clauses:
        if not cl:
            # empty clause => false
            v = app(append_const_op(n, FALSE), v)
            n += 1
            bits.append('F')
            clause_idxs.append(n)
            continue
        cur = lit_value_idx(cl[0])
        for lit in cl[1:]:
            nxt = lit_value_idx(lit)
            v, n = or_gate_dm(cur, nxt, n, v)
            # update python-truth for OR-gate's internal bits
            not_cur = 'T' if bits[cur-1] == 'F' else 'F'
            not_nxt = 'T' if bits[nxt-1] == 'F' else 'F'
            bits.append(not_cur)
            bits.append(not_nxt)
            bits.append('T' if (not_cur == 'T' and not_nxt == 'T') else 'F')
            bits.append('T' if bits[-1] == 'F' else 'F')
            cur = n
        clause_idxs.append(cur)

    # CNF = AND of clause outputs (fold with and_gate)
    if not clause_idxs:
        # vacuously true
        v = app(append_const_op(n, TRUE), v)
        n += 1
        bits.append('T')
        cnf_idx = n
    else:
        cnf_idx = clause_idxs[0]
        for ci in clause_idxs[1:]:
            v = and_gate(cnf_idx, ci, n, v)
            n += 1
            val = 'T' if (bits[cnf_idx-1] == 'T' and bits[ci-1] == 'T') else 'F'
            bits.append(val)
            cnf_idx = n

    if verbose:
        got = norm.tag_bool(proj_of_tuple(v, cnf_idx, n))
        print(f"CNF value: {got} (python expect {bits[cnf_idx-1]})")

    return norm.tag_bool(proj_of_tuple(v, cnf_idx, n))

# -----------------------------
# Built-in demos
# -----------------------------

def paper_example_gates() -> List[Gate]:
    # x1:=1; x2:=0; x3:=1; x4:=x1&x2; x5:=~x1; x6:=x5&x3; x7:=x4|x6
    return [
        ('const','1'),
        ('const','0'),
        ('const','1'),
        ('and','1','2'),
        ('not','1'),
        ('and','5','3'),
        ('or','4','6'),
    ]

def build_xor_gates() -> List[Gate]:
    # init x1,x2 are init_bits, then:
    # x3 = x1 AND x2
    # x4 = NOT x3
    # x5 = x1 OR x2  (expands by 4 bits; final OR bit ends up at x8 in this layout)
    # x9 = x4 AND x8  => XOR
    return [
        ('and','1','2'),
        ('not','3'),
        ('or','1','2'),
        ('and','4','8'),
    ]

def truth_table(expr: str, norm: Normalizer) -> None:
    if expr == 'nand':
        gates = [('and','1','2'), ('not','3')]
        outs = [4]
    elif expr == 'xor':
        gates = build_xor_gates()
        outs = [9]
    elif expr == 'half-adder':
        gates = [('and','1','2'), ('not','3'), ('or','1','2'), ('and','4','8')]
        outs = [3,9]
    else:
        raise ValueError('expr must be nand|xor|half-adder')

    for a, b in itertools.product('01', repeat=2):
        init = a + b
        print(f"in {a}{b}")
        v, n, _bits = eval_cvp(gates, init_bits=init, norm=norm, verbose=False)
        tags = [norm.tag_bool(proj_of_tuple(v, idx, n)) for idx in outs]
        print("  out", tags)

# -----------------------------
# CLI
# -----------------------------

def main() -> None:
    ap = argparse.ArgumentParser()
    ap.add_argument(
        '--nf-bin',
        default=None,
        help='Optional external normalizer binary: argv1=term, stdout=NF term string'
    )

    sub = ap.add_subparsers(dest='cmd', required=True)

    sub.add_parser('paper-example', help='Run the paper example circuit with step-by-step new-bit tagging')

    p_tt = sub.add_parser('truth-table', help='Truth table for small demo circuits')
    p_tt.add_argument('--expr', required=True, choices=['nand','xor','half-adder'])

    p_tag = sub.add_parser('tag', help='Tag a boolean term via DEC')
    p_tag.add_argument('term', help='Lambda term string')

    p_cvp = sub.add_parser('eval-cvp', help='Evaluate a CVP gate list (DSL)')
    p_cvp.add_argument('--cvp', required=True, help='CVP DSL as a single string; lines like: const 1 | not i | and i j | or i j')
    p_cvp.add_argument('--init', default='', help='Initial bitstring (variables) to append before gates, e.g. 101')

    p_dim = sub.add_parser('eval-dimacs', help='Evaluate DIMACS CNF under assignment')
    p_dim.add_argument('--dimacs', required=True, help='Path to DIMACS CNF file')
    p_dim.add_argument('--assign', required=True, help='Bitstring assignment of length nvars')

    args = ap.parse_args()

    # (also fixed: actually use --nf-bin)
    norm = Normalizer(nf_bin=args.nf_bin)

    if args.cmd == 'tag':
        print(norm.tag_bool(args.term))
        return

    if args.cmd == 'paper-example':
        gates = paper_example_gates()
        eval_cvp(gates, init_bits='', norm=norm, verbose=True)
        return

    if args.cmd == 'truth-table':
        truth_table(args.expr, norm)
        return

    if args.cmd == 'eval-cvp':
        gates = parse_cvp(args.cvp)
        eval_cvp(gates, init_bits=args.init, norm=norm, verbose=True)
        return

    if args.cmd == 'eval-dimacs':
        eval_cnf_dimacs(args.dimacs, args.assign, norm=norm, verbose=True)
        return

    raise SystemExit(2)

if __name__ == '__main__':
    main()

@Epivalent
Copy link
Author

#!/bin/bash
set -e

# --- Configuration ---
NORMALIZER="./planar_nf"

# 0. Safety Check
if [ ! -x "$NORMALIZER" ]; then
    echo "Error: Normalizer binary '$NORMALIZER' not found."
    exit 1
fi

# ==============================================================================
# 1. Create the Driver Script
# ==============================================================================
cat << 'EOF' > driver_67.py
import subprocess
import sys
import re

# --- Configuration ---
NORMALIZER = "./planar_nf"
SCRIPT = "topcvp_beta.py"

# --- Dynamic Circuit Generator ---
def generate_adder_cvp(a_val, b_val, cin_val):
    lines = []
    
    # Track physical wire count to predict logical outputs
    # Constants take 3 wires
    current_physical_top = 3 
    
    def emit(s):
        lines.append(s)

    # 1. Constants (Wires 1, 2, 3)
    emit(f"const {a_val}") 
    w_a = 1 # Known fixed index
    emit(f"const {b_val}") 
    w_b = 2 # Known fixed index
    emit(f"const {cin_val}") 
    w_cin = 3 # Known fixed index

    # 2. Logic Manager
    # We track the "Physical Top" of the stack because 'topcvp' expands macros.
    # or -> 4 wires
    # and -> 1 wire
    # not -> 1 wire
    
    def do_op(op, arg1, arg2=None):
        nonlocal current_physical_top
        if op == 'not':
            emit(f"not {arg1}")
            current_physical_top += 1
            return current_physical_top
        elif op == 'and':
            emit(f"and {arg1} {arg2}")
            current_physical_top += 1
            return current_physical_top
        elif op == 'or':
            emit(f"or {arg1} {arg2}")
            current_physical_top += 4
            return current_physical_top
        
    def XOR(x, y):
        # x XOR y = (x | y) & ~(x & y)
        w_or = do_op('or', x, y)
        w_and = do_op('and', x, y)
        w_nand = do_op('not', w_and)
        w_res = do_op('and', w_or, w_nand)
        return w_res

    # 3. Full Adder Logic
    # Sum = A XOR B XOR Cin
    s1 = XOR(w_a, w_b)
    sum_final = XOR(s1, w_cin)

    # Cout = (A & B) | (Cin & (A XOR B))
    # Recalculate terms to maintain linear safety
    ab = do_op('and', w_a, w_b)
    
    # We need (Cin & S1). Note: S1 was (A XOR B)
    cin_s1 = do_op('and', w_cin, s1)
    
    cout = do_op('or', ab, cin_s1)

    return "\n".join(lines), sum_final, cout

def solve_bit(a_val, b_val, c_val):
    # 1. Generate Circuit
    cvp_content, sum_idx, cout_idx = generate_adder_cvp(a_val, b_val, c_val)
    
    # 2. Run Simulator
    cmd = ["python3", SCRIPT, "eval-cvp", "--cvp", cvp_content]
    result = subprocess.run(cmd, capture_output=True, text=True)

    if result.returncode != 0:
        print("SIMULATOR CRASHED:", result.stderr)
        sys.exit(1)

    # 3. Parse Output
    map_vals = {}
    for line in result.stdout.splitlines():
        # strict matching for "new x<NUM> ="
        m = re.search(r"new x(\d+) = (\S+)", line)
        if m:
            map_vals[int(m.group(1))] = m.group(2)

    # 4. Extract
    s_sym = map_vals.get(sum_idx)
    c_sym = map_vals.get(cout_idx)

    if s_sym is None or c_sym is None:
        print(f"--- PARSING ERROR ---")
        print(f"Expected wires x{sum_idx} (Sum) and x{cout_idx} (Cout)")
        sys.exit(1)

    def to_int(sym):
        return 1 if sym == 'T' else 0

    return to_int(s_sym), to_int(c_sym)

def main():
    # ---------------------------------------------------------
    # Computing: 35 + 32 = 67
    # ---------------------------------------------------------
    # 35 = 32 + 2 + 1
    # Binary (LSB first): 1, 1, 0, 0, 0, 1, 0
    A_bits = [1, 1, 0, 0, 0, 1, 0]
    
    # 32 = 32
    # Binary (LSB first): 0, 0, 0, 0, 0, 1, 0
    B_bits = [0, 0, 0, 0, 0, 1, 0]
    
    carry = 0
    results = []

    print(f"Arithmetic Pipeline: 35 + 32 (Staged)...")
    print("-" * 50)

    for i in range(len(A_bits)):
        a = A_bits[i]
        b = B_bits[i]
        
        sum_bit, cout = solve_bit(a, b, carry)
        
        # Visualize the addition at this stage
        print(f"Bit {i} (2^{i}): {a} + {b} + Carry({carry})  ==>  Sum: {sum_bit}, NewCarry: {cout}")
        
        results.append(sum_bit)
        carry = cout

    # Reconstruct Decimal
    val = 0
    for i, bit in enumerate(results):
        val += bit * (2**i)
        
    print("-" * 50)
    print(f"Final Binary (LSB): {results}")
    print(f"Arithmetic Result: {val}")

if __name__ == "__main__":
    main()
EOF

# ==============================================================================
# 2. Run
# ==============================================================================
python3 driver_67.py

@Epivalent
Copy link
Author

r"""
planar_nf.py — fast beta-normalizer for (mostly) planar/linear untyped lambda terms.

Input syntax (matches your examples):
  - Abstraction: \\\x. BODY        (also accepts 'λ' instead of '\')
  - Application: (F A)          (fully parenthesized recommended; N-ary apps allowed: (f a b c))
  - Variables:   identifiers like x, x1, b5, foo_bar

Key idea for speed on planar/linear terms:
  - During parsing, each binder records its *unique* variable occurrence node (if any).
  - Beta-reduction then becomes O(1): splice the argument subtree into that occurrence (no copying).
  - Strong normalization is implemented iteratively (no Python recursion).

If a binder is used more than once, we raise (that’s outside “planar/linear”).
"""

from __future__ import annotations
from dataclasses import dataclass
from typing import Optional, List, Tuple, Union, Dict, Iterator
import itertools
import re

# -----------------------------
# Core term data structures
# -----------------------------

_binder_ids = itertools.count(1)

class Node:
    __slots__ = ("parent", "pslot")
    def __init__(self) -> None:
        self.parent: Optional[Node] = None
        self.pslot: Optional[str] = None  # which field in parent: "body" / "fn" / "arg"

class Binder:
    __slots__ = ("name", "id", "occ")
    def __init__(self, name: str) -> None:
        self.name = name
        self.id = next(_binder_ids)
        self.occ: Optional[Var] = None  # planar/linear: 0 or 1 occurrence

class Var(Node):
    __slots__ = ("binder", "name")
    def __init__(self, name: str, binder: Optional[Binder] = None) -> None:
        super().__init__()
        self.binder = binder
        self.name = name
        if binder is not None:
            if binder.occ is None:
                binder.occ = self
            else:
                # Non-linear: multiple occurrences
                raise ValueError(f"Non-linear variable use: binder '{binder.name}' appears more than once.")

class Abs(Node):
    __slots__ = ("binder", "body")
    def __init__(self, binder: Binder, body: Node) -> None:
        super().__init__()
        self.binder = binder
        self.body = body
        body.parent = self
        body.pslot = "body"

class App(Node):
    __slots__ = ("fn", "arg")
    def __init__(self, fn: Node, arg: Node) -> None:
        super().__init__()
        self.fn = fn
        self.arg = arg
        fn.parent = self
        fn.pslot = "fn"
        arg.parent = self
        arg.pslot = "arg"

Term = Union[Var, Abs, App]

# -----------------------------
# Tokenizer
# -----------------------------

_ident_re = re.compile(r"[A-Za-z_][A-Za-z0-9_]*")

def tokenize(src: str) -> Iterator[str]:
    i = 0
    n = len(src)
    while i < n:
        c = src[i]
        if c.isspace():
            i += 1
            continue
        if c in ("(", ")", "."):
            yield c
            i += 1
            continue
        if c == "\\" or c == "λ":
            yield "\\"
            i += 1
            continue
        m = _ident_re.match(src, i)
        if not m:
            raise SyntaxError(f"Unexpected character at {i}: {src[i:i+20]!r}")
        yield m.group(0)
        i = m.end()

# -----------------------------
# Parser (iterative, stack-based)
# -----------------------------

_LP = ("LPAREN", None)  # marker (kind, binder_stack_len)
_LAM = ("LAM", None)    # marker (kind, Binder)

def parse(src: str) -> Term:
    toks = list(tokenize(src))
    tstack: List[object] = []
    bstack: List[Binder] = []
    i = 0

    def reduce_lams() -> None:
        # If stack ends with [("LAM", binder), <term>], reduce to Abs(binder, term)
        nonlocal tstack, bstack
        while len(tstack) >= 2 and isinstance(tstack[-2], tuple) and tstack[-2][0] == "LAM" and isinstance(tstack[-1], Node):
            _, b = tstack[-2]
            body = tstack.pop()  # type: ignore
            tstack.pop()
            # Pop binder from binder stack (must match)
            if not bstack or bstack[-1] is not b:
                raise RuntimeError("Binder stack mismatch while reducing lambda.")
            bstack.pop()
            tstack.append(Abs(b, body))  # type: ignore

    while i < len(toks):
        tok = toks[i]
        i += 1

        if tok == "(":
            tstack.append(("LPAREN", len(bstack)))
            continue

        if tok == ")":
            items: List[Node] = []
            while tstack:
                top = tstack.pop()
                if isinstance(top, tuple) and top[0] == "LPAREN":
                    target_len = top[1]
                    # Restore binder stack to what it was at '('
                    if len(bstack) < target_len:
                        raise RuntimeError("Binder stack underflow at ')'.")
                    del bstack[target_len:]
                    break
                if not isinstance(top, Node):
                    raise SyntaxError("Malformed group inside parentheses.")
                items.append(top)
            else:
                raise SyntaxError("Unmatched ')'.")
            items.reverse()
            if not items:
                raise SyntaxError("Empty parentheses group.")
            # (a b c) => ((a b) c)
            term: Node = items[0]
            for nxt in items[1:]:
                term = App(term, nxt)
            tstack.append(term)
            reduce_lams()
            continue

        if tok == "\\":
            if i >= len(toks): raise SyntaxError("Unexpected end after lambda.")
            name = toks[i]; i += 1
            if i >= len(toks) or toks[i] != ".": raise SyntaxError("Expected '.' after lambda parameter.")
            i += 1
            b = Binder(name)
            bstack.append(b)
            tstack.append(("LAM", b))
            continue

        if tok == ".":
            raise SyntaxError("Unexpected '.'.")

        # identifier: resolve to nearest binder with same name, else free var
        bnd: Optional[Binder] = None
        for b in reversed(bstack):
            if b.name == tok:
                bnd = b
                break
        tstack.append(Var(tok, bnd))
        reduce_lams()

    # Close any remaining lambdas (top-level)
    if any(isinstance(x, tuple) and x[0] == "LPAREN" for x in tstack):
        raise SyntaxError("Unmatched '('.")

    # Fold any remaining top-level applications (rare; usually one term)
    terms = [x for x in tstack if isinstance(x, Node)]
    if not terms:
        raise SyntaxError("No term parsed.")
    term: Node = terms[0]
    for nxt in terms[1:]:
        term = App(term, nxt)
    return term  # type: ignore

# -----------------------------
# Beta reduction (O(1) for linear)
# -----------------------------

def _replace_child(parent: Node, slot: str, new: Node) -> None:
    if isinstance(parent, Abs) and slot == "body":
        parent.body = new
    elif isinstance(parent, App) and slot == "fn":
        parent.fn = new
    elif isinstance(parent, App) and slot == "arg":
        parent.arg = new
    else:
        raise RuntimeError("Bad parent/slot in replacement.")
    new.parent = parent
    new.pslot = slot

def beta(abs_node: Abs, arg: Node) -> Node:
    b = abs_node.binder
    body = abs_node.body

    occ = b.occ
    if occ is None:
        # Argument unused
        body.parent = None
        body.pslot = None
        return body

    # If the body itself is the occurrence, the result is just the argument
    if occ is body:
        arg.parent = None
        arg.pslot = None
        return arg

    # Splice arg into occurrence site
    p = occ.parent
    slot = occ.pslot
    if p is None:
        # body itself is the variable
        arg.parent = None
        arg.pslot = None
        return arg
    assert slot is not None
    _replace_child(p, slot, arg)

    # Detach body from abs
    body.parent = None
    body.pslot = None
    return body

# -----------------------------
# Weak-head normalization (iterative)
# -----------------------------

def whnf(t: Node) -> Node:
    """
    Reduce t by normal-order at the head (left spine) until it is in weak-head normal form.
    Implemented with spine reuse to avoid allocations.
    """
    while True:
        # Unwind left spine collecting App nodes
        apps: List[App] = []
        cur: Node = t
        while isinstance(cur, App):
            apps.append(cur)
            cur = cur.fn

        reduced = False

        # Rebuild from inner -> outer, performing beta steps when possible
        for app in reversed(apps):
            app.fn = cur
            cur.parent = app
            cur.pslot = "fn"

            if isinstance(cur, Abs):
                cur = beta(cur, app.arg)
                reduced = True
                # cur is detached (parent None) here; outer rebuild will attach it
            else:
                cur = app

        cur.parent = None
        cur.pslot = None
        t = cur

        if not reduced:
            return t

# -----------------------------
# Full (strong) normalization, iterative
# -----------------------------

def normalize(t: Node) -> Node:
    """
    Compute full beta-normal form (strong normalization), normal-order style.
    Works best when the term is planar/linear.
    """
    t = whnf(t)

    # Manual stack to avoid recursion.
    # Frames:
    #   ("abs", abs_node)
    #   ("app_fn", app_node)  - fn done, next normalize arg
    #   ("app_arg", app_node) - arg done, finalize app and whnf it
    stack: List[Tuple[str, Node]] = []

    while True:
        if isinstance(t, Abs):
            stack.append(("abs", t))
            t = whnf(t.body)
            continue

        if isinstance(t, App):
            stack.append(("app_fn", t))
            t = whnf(t.fn)
            continue

        # t is Var (or free symbol)
        while stack:
            tag, node = stack.pop()

            if tag == "abs":
                absn: Abs = node  # type: ignore
                absn.body = t
                t.parent = absn
                t.pslot = "body"
                t = absn
                continue

            if tag == "app_fn":
                appn: App = node  # type: ignore
                appn.fn = t
                t.parent = appn
                t.pslot = "fn"
                stack.append(("app_arg", appn))
                t = whnf(appn.arg)
                break  # go back to outer loop

            if tag == "app_arg":
                appn: App = node  # type: ignore
                appn.arg = t
                t.parent = appn
                t.pslot = "arg"
                t = whnf(appn)  # may reduce (and returns detached)
                continue

            raise RuntimeError("Unknown frame tag.")

        else:
            # Finished rebuilding to root
            t.parent = None
            t.pslot = None
            return t

# -----------------------------
# Utilities
# -----------------------------

def node_count(t: Node) -> int:
    seen = set()
    stack = [t]
    n = 0
    while stack:
        x = stack.pop()
        if id(x) in seen:  # shouldn’t happen unless you add sharing manually
            continue
        seen.add(id(x))
        n += 1
        if isinstance(x, Abs):
            stack.append(x.body)
        elif isinstance(x, App):
            stack.append(x.arg)
            stack.append(x.fn)
    return n

def to_string(t: Node) -> str:
    """
    Serialize back to fully-parenthesized syntax:
      Var          => x
      Abs          => (\\x.BODY)
      App          => (F A)

    Note: printing very large terms is inherently expensive (output size).
    """
    env: Dict[int, str] = {}
    used: Dict[str, int] = {}

    def fresh(base: str) -> str:
        k = used.get(base, 0)
        used[base] = k + 1
        return base if k == 0 else f"{base}{k}"

    out: List[str] = []
    # action kinds: 'str', 'node', 'pop'
    actions: List[Tuple[str, object]] = [("node", t)]

    while actions:
        kind, payload = actions.pop()

        if kind == "str":
            out.append(payload)  # type: ignore
            continue

        if kind == "pop":
            bid = payload  # type: ignore
            env.pop(bid, None)
            continue

        # kind == "node"
        n = payload  # type: ignore
        if isinstance(n, Var):
            if n.binder is None:
                out.append(n.name)
            else:
                out.append(env.get(n.binder.id, n.binder.name))
            continue

        if isinstance(n, Abs):
            b = n.binder
            name = env.get(b.id)
            if name is None:
                name = fresh(b.name)
                env[b.id] = name
            actions.append(("pop", b.id))
            actions.append(("str", ")"))
            actions.append(("node", n.body))
            actions.append(("str", "."))
            actions.append(("str", name))
            actions.append(("str", "\\"))
            actions.append(("str", "("))
            continue

        if isinstance(n, App):
            actions.append(("str", ")"))
            actions.append(("node", n.arg))
            actions.append(("str", " "))
            actions.append(("node", n.fn))
            actions.append(("str", "("))
            continue

        raise TypeError("Unknown node type in to_string().")

    return "".join(out)

def parse_and_normalize(src: str) -> Node:
    return normalize(parse(src))

if __name__ == "__main__":
    import sys, time
    if len(sys.argv) < 2:
        print("usage: python3 planar_nf.py '<term>'")
        raise SystemExit(2)
    term_src = sys.argv[1]
    t0 = time.perf_counter()
    ast = parse(term_src)
    t1 = time.perf_counter()
    nf = normalize(ast)
    t2 = time.perf_counter()
    print(f"parsed nodes: {node_count(ast)}  parse_s={t1-t0:.4f}")
    print(f"nf nodes:     {node_count(nf)}   norm_s={t2-t1:.4f}")
    # Beware: printing giant terms is slow
    # print(to_string(nf))
    ```

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment