Created
January 21, 2026 10:12
-
-
Save Epivalent/bef4d22fe4e54170dc84f4004a73193c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """TopCVP_beta CLI | |
| This script packages the current working construction (paper-faithful): | |
| Primitives (all are *string* lambda terms built via plc_suite.terms): | |
| - fetch_{i,n} (paper), with NOT-substitution implemented by swapping CSTT/CSTF | |
| *only* at the (t==n+1 and j==i) app-slot inside the fetch constructor. | |
| - and'_n (paper) | |
| - OR derived purely by De Morgan using NOT-substitution + and' | |
| Normalization/tagging: | |
| - Default uses /mnt/data/planar_nf.py (fast normalizer). | |
| - Optionally uses an external binary normalizer which accepts the term as argv1 | |
| and prints the NF on stdout. | |
| Examples: | |
| # tag a boolean term | |
| python3 topcvp_beta_cli.py tag --term "(\\b.\\k.\\f.(b k f))" | |
| # run the paper example circuit (prints new-bit tags only) | |
| python3 topcvp_beta_cli.py paper-example | |
| # evaluate a CVP gate list (JSON) | |
| python3 topcvp_beta_cli.py cvp --eqs-json '[["const",1],["const",0],["and",1,2],["not",1],["or",3,4]]' | |
| # evaluate a DIMACS CNF with a provided assignment | |
| python3 topcvp_beta_cli.py cnf-eval --dimacs foo.cnf --assign "1,-2,3" | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import subprocess | |
| import sys | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Union | |
| sys.path.insert(0, '/mnt/data') | |
| import plc_suite.terms as T | |
| # ------------------------- | |
| # Normalization backends | |
| # ------------------------- | |
| class NFEngine: | |
| def normalize(self, src: str) -> str: | |
| raise NotImplementedError | |
| class PlanarNFEngine(NFEngine): | |
| def __init__(self): | |
| from planar_nf import parse, normalize, to_string # local file provided by user | |
| self._parse = parse | |
| self._normalize = normalize | |
| self._to_string = to_string | |
| def normalize(self, src: str) -> str: | |
| t = self._parse(src) | |
| nf = self._normalize(t) | |
| return self._to_string(nf) | |
| class ExternalBinEngine(NFEngine): | |
| def __init__(self, bin_path: str): | |
| self.bin_path = bin_path | |
| def normalize(self, src: str) -> str: | |
| # External tool contract: argv1 is term, stdout is NF. | |
| # Note: huge terms may exceed OS argv limits; for that case, wrap your binary. | |
| p = subprocess.run( | |
| [self.bin_path, src], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| check=False, | |
| ) | |
| if p.returncode != 0: | |
| raise RuntimeError( | |
| f"External normalizer failed (rc={p.returncode}).\nSTDERR:\n{p.stderr.strip()}" | |
| ) | |
| return p.stdout.strip() | |
| @dataclass | |
| class BoolTagger: | |
| eng: NFEngine | |
| dec_true_nf: str | |
| dec_false_nf: str | |
| cache: Dict[str, str] | |
| @classmethod | |
| def build(cls, eng: NFEngine) -> 'BoolTagger': | |
| dec_true_src = f"({T.DEC} {T.TRUE})" | |
| dec_false_src = f"({T.DEC} {T.FALSE})" | |
| dec_true_nf = eng.normalize(dec_true_src) | |
| dec_false_nf = eng.normalize(dec_false_src) | |
| return cls(eng=eng, dec_true_nf=dec_true_nf, dec_false_nf=dec_false_nf, cache={}) | |
| def tag(self, term: str) -> str: | |
| # Fast tag of a boolean term via DEC-normal form equality. | |
| if term in self.cache: | |
| return self.cache[term] | |
| src = f"({T.DEC} {term})" | |
| nf = self.eng.normalize(src) | |
| if nf == self.dec_true_nf: | |
| self.cache[term] = 'T' | |
| return 'T' | |
| if nf == self.dec_false_nf: | |
| self.cache[term] = 'F' | |
| return 'F' | |
| self.cache[term] = '?' | |
| return '?' | |
| def make_engine(args: argparse.Namespace) -> NFEngine: | |
| if args.normalizer == 'planar_nf': | |
| return PlanarNFEngine() | |
| if args.normalizer == 'external': | |
| if not args.normalizer_bin: | |
| raise SystemExit('need --normalizer-bin when --normalizer external') | |
| return ExternalBinEngine(args.normalizer_bin) | |
| raise SystemExit(f'unknown normalizer: {args.normalizer}') | |
| # ------------------------- | |
| # Term construction (paper) | |
| # ------------------------- | |
| var, lam, lams, app = T.var, T.lam, T.lams, T.app | |
| apps = getattr(T, 'apps', None) | |
| TRUE, FALSE = T.TRUE, T.FALSE | |
| COMP, AND = T.COMP, T.AND | |
| CSTT, CSTF = T.CSTT, T.CSTF | |
| def proj_of_tuple(t: str, j: int, m: int) -> str: | |
| xs = [f'x{i}' for i in range(1, m + 1)] | |
| return app(t, lams(xs, var(f'x{j}'))) | |
| def compose(f: str, g: str) -> str: | |
| return app(app(COMP, f), g) | |
| def append_const_op(n: int, const: str) -> str: | |
| """Unary op on tuples: <x1..xn> -> <x1..xn,const> (as a tuple, i.e. \k. ...).""" | |
| xs = [f'x{i}' for i in range(1, n + 1)] | |
| body = var('k') | |
| for x in xs: | |
| body = app(body, var(x)) | |
| body = app(body, const) | |
| # op expects (v,k): op v k = v (\x1..xn. k x1..xn const) | |
| return lams(['v', 'k'], app(var('v'), lams(xs, body))) | |
| def setterT(n: int, j: int, i: int, *, which: str, neg: bool = False) -> str: | |
| """Unary transformer on Church tuples, following the paper. | |
| which='t' uses CSTT at position j; which='f' uses CSTF. | |
| For the i=j branch, the extra appended position b_{n+1} uses CSTapp, | |
| where neg=True swaps CSTT<->CSTF as the NOT-substitution. | |
| """ | |
| if which not in ('t', 'f'): | |
| raise ValueError(which) | |
| CSTj = CSTT if which == 't' else CSTF | |
| if which == 't': | |
| CSTapp = CSTT if not neg else CSTF | |
| else: | |
| CSTapp = CSTF if not neg else CSTT | |
| bs = [f'b{t}' for t in range(1, n + 2)] # b1..b_{n+1} | |
| args: List[str] = [] | |
| for t in range(1, n + 2): | |
| if t == j: | |
| args.append(app(CSTj, var(f'b{t}'))) | |
| elif (t == n + 1) and (j == i): | |
| args.append(app(CSTapp, var(f'b{t}'))) | |
| else: | |
| args.append(var(f'b{t}')) | |
| body = var('k') | |
| for a in args: | |
| body = app(body, a) | |
| # unary transformer on tuples: t |-> (\k. t (\b1..b_{n+1}. k ...)) | |
| return lam('t', lam('k', app(var('t'), lams(bs, body)))) | |
| def cj(n: int, j: int, i: int, *, neg: bool = False) -> str: | |
| S = setterT(n, j, i, which='t', neg=neg) | |
| return lams(['g', 'h'], compose(var('g'), compose(S, var('h')))) | |
| def fj(n: int, j: int, i: int, *, neg: bool = False) -> str: | |
| return setterT(n, j, i, which='f', neg=neg) | |
| def fetch_paper(i: int, n: int, *, neg: bool = False) -> str: | |
| """fetch_{i,n}; if neg=True, NOT-substitution inside fetch.""" | |
| if not (1 <= i <= n): | |
| raise ValueError('need 1 <= i <= n') | |
| base = T.tup([FALSE] * (n + 1)) | |
| rest = base | |
| for j in range(n, 0, -1): | |
| rest = app(app(app(var(f'x{j}'), cj(n, j, i, neg=neg)), fj(n, j, i, neg=neg)), rest) | |
| xs = [f'x{j}' for j in range(1, n + 1)] | |
| return lam('v', app(var('v'), lams(xs, rest))) | |
| def and_prime_paper(n: int) -> str: | |
| """and'_n: <x1..x_{n+2}> -> <x1..xn, x_{n+1} AND x_{n+2}> (in CPS with k).""" | |
| xs = [f'x{i}' for i in range(1, n + 3)] | |
| out = var('k') | |
| for i in range(1, n + 1): | |
| out = app(out, var(f'x{i}')) | |
| out = app(out, app(app(AND, var(f'x{n+1}')), var(f'x{n+2}'))) | |
| return lams(['v', 'k'], app(var('v'), lams(xs, out))) | |
| def and_gate(i: int, j: int, n: int, v: str) -> str: | |
| """Append (xi AND xj) to a length-n vector v, producing length n+1.""" | |
| A = and_prime_paper(n) | |
| return app(A, app(fetch_paper(i, n + 1), app(fetch_paper(j, n), v))) | |
| def not_gate(i: int, n: int, v: str) -> str: | |
| """Append NOT(xi) to a length-n vector v, producing length n+1.""" | |
| return app(fetch_paper(i, n, neg=True), v) | |
| def or_gate_dm(i: int, j: int, n: int, v: str) -> Tuple[str, int, int]: | |
| """Append (xi OR xj) using De Morgan with only not_gate + and_gate. | |
| Expands to 4 steps, so vector length increases by 4. | |
| Returns (v_out, n_out, out_pos). | |
| """ | |
| # x_{n+1} := ~xi | |
| v = not_gate(i, n, v) | |
| n += 1 | |
| t1 = n | |
| # x_{n+1} := ~xj | |
| v = not_gate(j, n, v) | |
| n += 1 | |
| t2 = n | |
| # x_{n+1} := t1 & t2 | |
| v = and_gate(t1, t2, n, v) | |
| n += 1 | |
| t3 = n | |
| # x_{n+1} := ~t3 = xi OR xj | |
| v = not_gate(t3, n, v) | |
| n += 1 | |
| out = n | |
| return v, n, out | |
| # ------------------------- | |
| # CVP gate compilation | |
| # ------------------------- | |
| Eq = Union[Tuple[str, int], Tuple[str, int, int]] # ('const',0|1) / ('not',i) / ('and',i,j) / ('or',i,j) | |
| def compile_cvp(eqs: Sequence[Eq]) -> Tuple[str, int, List[int]]: | |
| """Compile a high-level CVP gate list to a single vector term. | |
| Returns (vector_term, final_length, wire_pos) | |
| - wire_pos[k-1] is the actual vector position of logical gate k. | |
| - OR introduces auxiliaries, but wire_pos always points to the gate output. | |
| """ | |
| v = T.tup([]) | |
| n = 0 | |
| wire_pos: List[int] = [] | |
| for gate_idx, eq in enumerate(eqs, start=1): | |
| op = eq[0] | |
| if op == 'const': | |
| bit = int(eq[1]) | |
| const = TRUE if bit == 1 else FALSE | |
| v = app(append_const_op(n, const), v) | |
| n += 1 | |
| wire_pos.append(n) | |
| elif op == 'not': | |
| i = int(eq[1]) | |
| src_pos = wire_pos[i - 1] | |
| v = not_gate(src_pos, n, v) | |
| n += 1 | |
| wire_pos.append(n) | |
| elif op == 'and': | |
| i, j = int(eq[1]), int(eq[2]) | |
| pi, pj = wire_pos[i - 1], wire_pos[j - 1] | |
| v = and_gate(pi, pj, n, v) | |
| n += 1 | |
| wire_pos.append(n) | |
| elif op == 'or': | |
| i, j = int(eq[1]), int(eq[2]) | |
| pi, pj = wire_pos[i - 1], wire_pos[j - 1] | |
| v, n, out_pos = or_gate_dm(pi, pj, n, v) | |
| wire_pos.append(out_pos) | |
| else: | |
| raise ValueError(f'unknown gate op: {op}') | |
| return v, n, wire_pos | |
| def eval_cvp_output_tag(tagger: BoolTagger, eqs: Sequence[Eq]) -> str: | |
| v, n, wire_pos = compile_cvp(eqs) | |
| out_pos = wire_pos[-1] | |
| out_term = proj_of_tuple(v, out_pos, n) | |
| return tagger.tag(out_term) | |
| # ------------------------- | |
| # CNF helper | |
| # ------------------------- | |
| def parse_dimacs(path: str) -> Tuple[int, List[List[int]]]: | |
| clauses: List[List[int]] = [] | |
| nvars = 0 | |
| with open(path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line or line.startswith('c'): | |
| continue | |
| if line.startswith('p'): | |
| parts = line.split() | |
| if len(parts) >= 4 and parts[1] == 'cnf': | |
| nvars = int(parts[2]) | |
| continue | |
| lits = [int(x) for x in line.split()] | |
| if not lits: | |
| continue | |
| if lits[-1] == 0: | |
| lits = lits[:-1] | |
| if lits: | |
| clauses.append(lits) | |
| if nvars <= 0: | |
| # fall back | |
| nvars = max((abs(l) for c in clauses for l in c), default=0) | |
| return nvars, clauses | |
| def compile_cnf_to_cvp(nvars: int, clauses: List[List[int]], assignment: List[int]) -> List[Eq]: | |
| """Compile CNF under a *fixed* assignment to a CVP gate list. | |
| assignment is a list of ints like [1,-2,3] meaning x1=T, x2=F, x3=T. | |
| Missing vars default to F. | |
| """ | |
| asg: Dict[int, int] = {abs(v): (1 if v > 0 else 0) for v in assignment} | |
| eqs: List[Eq] = [] | |
| # First, create x1..xn as constants (the assignment). | |
| for i in range(1, nvars + 1): | |
| eqs.append(('const', int(asg.get(i, 0)))) | |
| def lit_to_wire(lit: int) -> int: | |
| vid = abs(lit) | |
| if lit > 0: | |
| return vid | |
| # negated literal: create a NOT gate | |
| eqs.append(('not', vid)) | |
| return len(eqs) | |
| clause_wires: List[int] = [] | |
| for clause in clauses: | |
| if not clause: | |
| # empty clause => UNSAT | |
| eqs.append(('const', 0)) | |
| clause_wires.append(len(eqs)) | |
| continue | |
| cur = lit_to_wire(clause[0]) | |
| for lit in clause[1:]: | |
| nxt = lit_to_wire(lit) | |
| eqs.append(('or', cur, nxt)) | |
| cur = len(eqs) | |
| clause_wires.append(cur) | |
| # AND all clauses | |
| if not clause_wires: | |
| eqs.append(('const', 1)) | |
| return eqs | |
| cur = clause_wires[0] | |
| for w in clause_wires[1:]: | |
| eqs.append(('and', cur, w)) | |
| cur = len(eqs) | |
| return eqs | |
| # ------------------------- | |
| # CLI helpers | |
| # ------------------------- | |
| def load_term_arg(term: Optional[str], term_file: Optional[str]) -> str: | |
| if term is None and term_file is None: | |
| raise SystemExit('need --term or --term-file') | |
| if term is not None and term_file is not None: | |
| raise SystemExit('use only one of --term or --term-file') | |
| if term_file is not None: | |
| return Path(term_file).read_text(encoding='utf-8') | |
| return term # type: ignore[return-value] | |
| def load_eqs(args: argparse.Namespace) -> List[Eq]: | |
| if args.eqs_file: | |
| data = json.loads(Path(args.eqs_file).read_text(encoding='utf-8')) | |
| elif args.eqs_json: | |
| data = json.loads(args.eqs_json) | |
| else: | |
| raise SystemExit('need --eqs-json or --eqs-file') | |
| eqs: List[Eq] = [] | |
| for e in data: | |
| if not isinstance(e, (list, tuple)) or not e: | |
| raise SystemExit(f'bad eq: {e!r}') | |
| op = e[0] | |
| if op == 'const': | |
| eqs.append(('const', int(e[1]))) | |
| elif op == 'not': | |
| eqs.append(('not', int(e[1]))) | |
| elif op in ('and', 'or'): | |
| eqs.append((op, int(e[1]), int(e[2]))) | |
| else: | |
| raise SystemExit(f'unknown op in eq: {e!r}') | |
| return eqs | |
| def parse_assign(s: str) -> List[int]: | |
| # "1,-2,3" or "1 -2 3" | |
| s = s.replace(',', ' ') | |
| out: List[int] = [] | |
| for part in s.split(): | |
| out.append(int(part)) | |
| return out | |
| # ------------------------- | |
| # Subcommands | |
| # ------------------------- | |
| def cmd_norm(args: argparse.Namespace) -> int: | |
| eng = make_engine(args) | |
| term = load_term_arg(args.term, args.term_file) | |
| nf = eng.normalize(term) | |
| print(nf) | |
| return 0 | |
| def cmd_tag(args: argparse.Namespace) -> int: | |
| eng = make_engine(args) | |
| tagger = BoolTagger.build(eng) | |
| term = load_term_arg(args.term, args.term_file) | |
| print(tagger.tag(term)) | |
| return 0 | |
| def cmd_paper_example(args: argparse.Namespace) -> int: | |
| eng = make_engine(args) | |
| tagger = BoolTagger.build(eng) | |
| # Paper example (as we used): | |
| # x1:=1; x2:=0; x3:=1; x4=x1&x2; x5=~x1; x6=x5&x3; x7=x4|x6 | |
| # We implement OR via De Morgan macro (4 extra bits). We only check each *new* bit. | |
| def step(v: str, n: int, expected: str) -> str: | |
| newbit = tagger.tag(proj_of_tuple(v, n, n)) | |
| ok = 'OK' if newbit == expected else 'BAD' | |
| print(f" new x{n} = {newbit} (expect {expected}) => {ok}") | |
| return newbit | |
| bits: List[str] = [] | |
| v = T.tup([]) | |
| n = 0 | |
| # x1 := 1 | |
| v = app(append_const_op(n, TRUE), v); n += 1 | |
| bits.append(step(v, n, 'T')) | |
| # x2 := 0 | |
| v = app(append_const_op(n, FALSE), v); n += 1 | |
| bits.append(step(v, n, 'F')) | |
| # x3 := 1 | |
| v = app(append_const_op(n, TRUE), v); n += 1 | |
| bits.append(step(v, n, 'T')) | |
| # x4 := x1 & x2 | |
| v = and_gate(1, 2, n, v); n += 1 | |
| exp = 'T' if (bits[0] == 'T' and bits[1] == 'T') else 'F' | |
| bits.append(step(v, n, exp)) | |
| # x5 := ~x1 | |
| v = not_gate(1, n, v); n += 1 | |
| exp = 'T' if bits[0] == 'F' else 'F' | |
| bits.append(step(v, n, exp)) | |
| # x6 := x5 & x3 | |
| v = and_gate(5, 3, n, v); n += 1 | |
| exp = 'T' if (bits[4] == 'T' and bits[2] == 'T') else 'F' | |
| bits.append(step(v, n, exp)) | |
| # OR macro: x7 := ~x4 ; x8 := ~x6 ; x9 := x7 & x8 ; x10 := ~x9 | |
| v = not_gate(4, n, v); n += 1 | |
| exp = 'T' if bits[3] == 'F' else 'F' | |
| bits.append(step(v, n, exp)) | |
| v = not_gate(6, n, v); n += 1 | |
| exp = 'T' if bits[5] == 'F' else 'F' | |
| bits.append(step(v, n, exp)) | |
| v = and_gate(7, 8, n, v); n += 1 | |
| exp = 'T' if (bits[6] == 'T' and bits[7] == 'T') else 'F' | |
| bits.append(step(v, n, exp)) | |
| v = not_gate(9, n, v); n += 1 | |
| exp = 'T' if bits[8] == 'F' else 'F' | |
| bits.append(step(v, n, exp)) | |
| print(f"\nFinal derived OR bit (x{n}) = {bits[-1]}") | |
| print("Expected (x4 OR x6) =", 'T' if (bits[3] == 'T' or bits[5] == 'T') else 'F') | |
| return 0 | |
| def cmd_cvp(args: argparse.Namespace) -> int: | |
| eng = make_engine(args) | |
| tagger = BoolTagger.build(eng) | |
| eqs = load_eqs(args) | |
| v, n, wire_pos = compile_cvp(eqs) | |
| if args.dump_wires: | |
| print('wire_pos (logical_gate -> vector_position):') | |
| for i, p in enumerate(wire_pos, start=1): | |
| print(f' g{i} -> x{p}') | |
| if args.dump_gate_tags: | |
| for gi, p in enumerate(wire_pos, start=1): | |
| b = tagger.tag(proj_of_tuple(v, p, n)) | |
| print(f'g{gi} (x{p}) = {b}') | |
| out_pos = wire_pos[-1] | |
| out = tagger.tag(proj_of_tuple(v, out_pos, n)) | |
| print(out) | |
| return 0 | |
| def cmd_cnf_eval(args: argparse.Namespace) -> int: | |
| eng = make_engine(args) | |
| tagger = BoolTagger.build(eng) | |
| nvars, clauses = parse_dimacs(args.dimacs) | |
| assignment = parse_assign(args.assign) | |
| eqs = compile_cnf_to_cvp(nvars, clauses, assignment) | |
| if args.dump_eqs: | |
| print(json.dumps(eqs)) | |
| out = eval_cvp_output_tag(tagger, eqs) | |
| print(out) | |
| return 0 | |
| # ------------------------- | |
| # Main | |
| # ------------------------- | |
| def build_argparser() -> argparse.ArgumentParser: | |
| p = argparse.ArgumentParser(prog='topcvp_beta_cli.py') | |
| p.add_argument( | |
| '--normalizer', | |
| choices=['planar_nf', 'external'], | |
| default='planar_nf', | |
| help='normalization backend', | |
| ) | |
| p.add_argument( | |
| '--normalizer-bin', | |
| default=None, | |
| help='path to external normalizer binary (only for --normalizer external)', | |
| ) | |
| sub = p.add_subparsers(dest='cmd', required=True) | |
| s_norm = sub.add_parser('norm', help='normalize a term and print NF') | |
| s_norm.add_argument('--term', default=None) | |
| s_norm.add_argument('--term-file', default=None) | |
| s_norm.set_defaults(func=cmd_norm) | |
| s_tag = sub.add_parser('tag', help='tag a boolean term as T/F/? using DEC') | |
| s_tag.add_argument('--term', default=None) | |
| s_tag.add_argument('--term-file', default=None) | |
| s_tag.set_defaults(func=cmd_tag) | |
| s_paper = sub.add_parser('paper-example', help="run the paper example circuit (fast tagging)") | |
| s_paper.set_defaults(func=cmd_paper_example) | |
| s_cvp = sub.add_parser('cvp', help='evaluate a CVP gate list (JSON) and print final T/F/?') | |
| s_cvp.add_argument('--eqs-json', default=None, help='JSON list of gates') | |
| s_cvp.add_argument('--eqs-file', default=None, help='path to JSON file') | |
| s_cvp.add_argument('--dump-wires', action='store_true', help='print logical->vector mapping') | |
| s_cvp.add_argument('--dump-gate-tags', action='store_true', help='tag each gate output') | |
| s_cvp.set_defaults(func=cmd_cvp) | |
| s_cnf = sub.add_parser('cnf-eval', help='evaluate DIMACS CNF under a fixed assignment') | |
| s_cnf.add_argument('--dimacs', required=True) | |
| s_cnf.add_argument('--assign', required=True, help='e.g. "1,-2,3" meaning x1=T,x2=F,x3=T') | |
| s_cnf.add_argument('--dump-eqs', action='store_true', help='print the compiled gate list') | |
| s_cnf.set_defaults(func=cmd_cnf_eval) | |
| return p | |
| def main(argv: Optional[Sequence[str]] = None) -> int: | |
| ap = build_argparser() | |
| args = ap.parse_args(argv) | |
| return int(args.func(args)) | |
| if __name__ == '__main__': | |
| raise SystemExit(main()) |
Author
Epivalent
commented
Jan 21, 2026
Author
#!/usr/bin/env python3
"""topcvp_beta.py
Single-file toolkit for the paper-style planar/linear TopCVP construction:
- Implements fetch_{i,n} exactly as in the paper (as a fold of x_j c_j f_j over a base tuple).
- Implements NOT by *substitution inside fetch* (swap CSTT/CSTF only at the i=j appended slot).
- Implements in-place AND (and'n) and derived and{i,j,n} = and'n ∘ fetch{i,n+1} ∘ fetch_{j,n}.
- Implements OR via De Morgan using ONLY (not_subst) + and'.
- Includes an *inlined fast beta-normalizer* (planar_nf) for tagging booleans via DEC.
- Optional external normalizer binary: pass --nf-bin /path/to/bin (argv1=term, stdout=NF term).
Notes:
- The inlined normalizer assumes linear variable use (each binder occurs at most once). This matches the intended
planar/linear terms in this workflow.
- Terms are strings in backslash-lambda syntax: (\\x.BODY), (F A), variables like x1, b5, etc.
CLI examples:
python3 topcvp_beta.py paper-example
python3 topcvp_beta.py truth-table --expr xor
python3 topcvp_beta.py eval-cvp --cvp 'const 1\nconst 0\nand 1 2\nnot 3\n'
python3 topcvp_beta.py eval-dimacs --dimacs path/to.cnf --assign 10110
"""
from __future__ import annotations
import argparse
import itertools
import subprocess
from typing import Dict, Iterator, List, Optional, Sequence, Tuple, Union
# -----------------------------
# Inlined fast beta-normalizer (planar_nf.py)
# -----------------------------
import re
import itertools as _itertools
_binder_ids = _itertools.count(1)
class Node:
__slots__ = ("parent", "pslot")
def __init__(self) -> None:
self.parent: Optional["Node"] = None
self.pslot: Optional[str] = None # which field in parent: "body" / "fn" / "arg"
class Binder:
__slots__ = ("name", "id", "occ")
def __init__(self, name: str) -> None:
self.name = name
self.id = next(_binder_ids)
self.occ: Optional["Var"] = None # planar/linear: 0 or 1 occurrence
class Var(Node):
__slots__ = ("binder", "name")
def __init__(self, name: str, binder: Optional[Binder] = None) -> None:
super().__init__()
self.binder = binder
self.name = name
if binder is not None:
if binder.occ is None:
binder.occ = self
else:
raise ValueError(
f"Non-linear variable use: binder '{binder.name}' appears more than once."
)
class Abs(Node):
__slots__ = ("binder", "body")
def __init__(self, binder: Binder, body: Node) -> None:
super().__init__()
self.binder = binder
self.body = body
body.parent = self
body.pslot = "body"
class App(Node):
__slots__ = ("fn", "arg")
def __init__(self, fn: Node, arg: Node) -> None:
super().__init__()
self.fn = fn
self.arg = arg
fn.parent = self
fn.pslot = "fn"
arg.parent = self
arg.pslot = "arg"
Term = Union[Var, Abs, App]
_ident_re = re.compile(r"[A-Za-z][A-Za-z0-9_]*")
def tokenize(src: str) -> Iterator[str]:
i = 0
n = len(src)
while i < n:
c = src[i]
if c.isspace():
i += 1
continue
if c in ("(", ")", "."):
yield c
i += 1
continue
if c == "\\" or c == "λ":
yield "\\"
i += 1
continue
m = _ident_re.match(src, i)
if not m:
raise SyntaxError(f"Unexpected character at {i}: {src[i:i+20]!r}")
yield m.group(0)
i = m.end()
def parse(src: str) -> Term:
toks = list(tokenize(src))
tstack: List[object] = []
bstack: List[Binder] = []
i = 0
def reduce_lams() -> None:
nonlocal tstack, bstack
while (
len(tstack) >= 2
and isinstance(tstack[-2], tuple)
and tstack[-2][0] == "LAM"
and isinstance(tstack[-1], Node)
):
_, b = tstack[-2]
body = tstack.pop() # type: ignore
tstack.pop()
if not bstack or bstack[-1] is not b:
raise RuntimeError("Binder stack mismatch while reducing lambda.")
bstack.pop()
tstack.append(Abs(b, body)) # type: ignore
while i < len(toks):
tok = toks[i]
i += 1
if tok == "(":
tstack.append(("LPAREN", len(bstack)))
continue
if tok == ")":
items: List[Node] = []
while tstack:
top = tstack.pop()
if isinstance(top, tuple) and top[0] == "LPAREN":
target_len = top[1]
if len(bstack) < target_len:
raise RuntimeError("Binder stack underflow at ')'.")
del bstack[target_len:]
break
if not isinstance(top, Node):
raise SyntaxError("Malformed group inside parentheses.")
items.append(top)
else:
raise SyntaxError("Unmatched ')'.")
items.reverse()
if not items:
raise SyntaxError("Empty parentheses group.")
term: Node = items[0]
for nxt in items[1:]:
term = App(term, nxt)
tstack.append(term)
reduce_lams()
continue
if tok == "\\":
if i >= len(toks):
raise SyntaxError("Unexpected end after lambda.")
name = toks[i]
i += 1
if i >= len(toks) or toks[i] != ".":
raise SyntaxError("Expected '.' after lambda parameter.")
i += 1
b = Binder(name)
bstack.append(b)
tstack.append(("LAM", b))
continue
if tok == ".":
raise SyntaxError("Unexpected '.'.")
bnd: Optional[Binder] = None
for b in reversed(bstack):
if b.name == tok:
bnd = b
break
tstack.append(Var(tok, bnd))
reduce_lams()
if any(isinstance(x, tuple) and x[0] == "LPAREN" for x in tstack):
raise SyntaxError("Unmatched '('.")
terms = [x for x in tstack if isinstance(x, Node)]
if not terms:
raise SyntaxError("No term parsed.")
term: Node = terms[0]
for nxt in terms[1:]:
term = App(term, nxt)
return term # type: ignore
def _replace_child(parent: Node, slot: str, new: Node) -> None:
if isinstance(parent, Abs) and slot == "body":
parent.body = new
elif isinstance(parent, App) and slot == "fn":
parent.fn = new
elif isinstance(parent, App) and slot == "arg":
parent.arg = new
else:
raise RuntimeError("Bad parent/slot in replacement.")
new.parent = parent
new.pslot = slot
def beta(abs_node: Abs, arg: Node) -> Node:
b = abs_node.binder
body = abs_node.body
occ = b.occ
if occ is None:
body.parent = None
body.pslot = None
return body
if occ is body:
arg.parent = None
arg.pslot = None
return arg
p = occ.parent
slot = occ.pslot
if p is None:
arg.parent = None
arg.pslot = None
return arg
assert slot is not None
_replace_child(p, slot, arg)
body.parent = None
body.pslot = None
return body
def whnf(t: Node) -> Node:
while True:
apps: List[App] = []
cur: Node = t
while isinstance(cur, App):
apps.append(cur)
cur = cur.fn
reduced = False
for appn in reversed(apps):
appn.fn = cur
cur.parent = appn
cur.pslot = "fn"
if isinstance(cur, Abs):
cur = beta(cur, appn.arg)
reduced = True
else:
cur = appn
cur.parent = None
cur.pslot = None
t = cur
if not reduced:
return t
def normalize(t: Node) -> Node:
t = whnf(t)
stack: List[Tuple[str, Node]] = []
while True:
if isinstance(t, Abs):
stack.append(("abs", t))
t = whnf(t.body)
continue
if isinstance(t, App):
stack.append(("app_fn", t))
t = whnf(t.fn)
continue
while stack:
tag, node = stack.pop()
if tag == "abs":
absn: Abs = node # type: ignore
absn.body = t
t.parent = absn
t.pslot = "body"
t = absn
continue
if tag == "app_fn":
appn: App = node # type: ignore
appn.fn = t
t.parent = appn
t.pslot = "fn"
stack.append(("app_arg", appn))
t = whnf(appn.arg)
break
if tag == "app_arg":
appn: App = node # type: ignore
appn.arg = t
t.parent = appn
t.pslot = "arg"
t = whnf(appn)
continue
raise RuntimeError("Unknown frame tag.")
else:
t.parent = None
t.pslot = None
return t
def to_string(t: Node) -> str:
env: Dict[int, str] = {}
used: Dict[str, int] = {}
def fresh(base: str) -> str:
k = used.get(base, 0)
used[base] = k + 1
return base if k == 0 else f"{base}{k}"
out: List[str] = []
actions: List[Tuple[str, object]] = [("node", t)]
while actions:
kind, payload = actions.pop()
if kind == "str":
out.append(payload) # type: ignore
continue
if kind == "pop":
bid = payload # type: ignore
env.pop(bid, None)
continue
n = payload # type: ignore
if isinstance(n, Var):
if n.binder is None:
out.append(n.name)
else:
out.append(env.get(n.binder.id, n.binder.name))
continue
if isinstance(n, Abs):
b = n.binder
name = env.get(b.id)
if name is None:
name = fresh(b.name)
env[b.id] = name
actions.append(("pop", b.id))
actions.append(("str", ")"))
actions.append(("node", n.body))
actions.append(("str", "."))
actions.append(("str", name))
actions.append(("str", "\\"))
actions.append(("str", "("))
continue
if isinstance(n, App):
actions.append(("str", ")"))
actions.append(("node", n.arg))
actions.append(("str", " "))
actions.append(("node", n.fn))
actions.append(("str", "("))
continue
raise TypeError("Unknown node type in to_string().")
return "".join(out)
def normalize_to_string(src: str) -> str:
return to_string(normalize(parse(src)))
# -----------------------------
# Term constructors + TopCVP ops (paper)
# -----------------------------
# We rely on your existing, correct primitive definitions from plc_suite.terms
import sys
sys.path.insert(0, '/mnt/data')
import plc_suite.terms as T
var, lam, lams, app = T.var, T.lam, T.lams, T.app
TRUE, FALSE = T.TRUE, T.FALSE
COMP, AND = T.COMP, T.AND
CSTT, CSTF = T.CSTT, T.CSTF
def proj_of_tuple(t: str, j: int, m: int) -> str:
xs = [f'x{i}' for i in range(1, m + 1)]
return app(t, lams(xs, var(f'x{j}')))
def compose(f: str, g: str) -> str:
return app(app(COMP, f), g)
def append_const_op(n: int, const: str) -> str:
"""Unary op on tuples: <x1..xn> -> <x1..xn,const> (CPS with k)."""
xs = [f'x{i}' for i in range(1, n + 1)]
body = var('k')
for x in xs:
body = app(body, var(x))
body = app(body, const)
return lams(['v', 'k'], app(var('v'), lams(xs, body)))
def setterT(n: int, j: int, i: int, *, which: str, neg: bool = False) -> str:
"""Unary transformer on Church tuples.
which='t' => CSTT at position j
which='f' => CSTF at position j
NOT-substitution: when (j == i), swap CSTT<->CSTF only at the appended slot b_{n+1}.
"""
if which not in ('t', 'f'):
raise ValueError(which)
CSTj = CSTT if which == 't' else CSTF
if which == 't':
CSTapp = CSTT if not neg else CSTF
else:
CSTapp = CSTF if not neg else CSTT
bs = [f'b{t}' for t in range(1, n + 2)] # b1..b_{n+1}
args: List[str] = []
for t in range(1, n + 2):
if t == j:
args.append(app(CSTj, var(f'b{t}')))
elif (t == n + 1) and (j == i):
args.append(app(CSTapp, var(f'b{t}')))
else:
args.append(var(f'b{t}'))
body = var('k')
for a in args:
body = app(body, a)
return lam('t', lam('k', app(var('t'), lams(bs, body))))
def cj(n: int, j: int, i: int, *, neg: bool = False) -> str:
S = setterT(n, j, i, which='t', neg=neg)
return lams(['g', 'h'], compose(var('g'), compose(S, var('h'))))
def fj(n: int, j: int, i: int, *, neg: bool = False) -> str:
return setterT(n, j, i, which='f', neg=neg)
def fetch_paper(i: int, n: int, *, neg: bool = False) -> str:
"""fetch_{i,n}; neg=True activates NOT-substitution (swap at appended slot when j=i)."""
if not (1 <= i <= n):
raise ValueError('need 1 <= i <= n')
base = T.tup([FALSE] * (n + 1))
rest = base
for j in range(n, 0, -1):
rest = app(app(app(var(f'x{j}'), cj(n, j, i, neg=neg)), fj(n, j, i, neg=neg)), rest)
xs = [f'x{j}' for j in range(1, n + 1)]
return lam('v', app(var('v'), lams(xs, rest)))
def and_prime_paper(n: int) -> str:
"""and'n: <x1..x{n+2}> -> <x1..xn, x_{n+1} AND x_{n+2}> (CPS with k)."""
if n < 0:
raise ValueError('n must be >= 0')
xs = [f'x{i}' for i in range(1, n + 3)]
out = var('k')
for i in range(1, n + 1):
out = app(out, var(f'x{i}'))
out = app(out, app(app(AND, var(f'x{n+1}')), var(f'x{n+2}')))
return lams(['v', 'k'], app(var('v'), lams(xs, out)))
def and_gate(i: int, j: int, n: int, v: str) -> str:
"""Append x_{n+1} := x_i AND x_j to a length-n vector v."""
A = and_prime_paper(n)
return app(A, app(fetch_paper(i, n + 1), app(fetch_paper(j, n), v)))
def not_gate(i: int, n: int, v: str) -> str:
"""Append x_{n+1} := NOT(x_i) to a length-n vector v via NOT-substitution inside fetch."""
return app(fetch_paper(i, n, neg=True), v)
def or_gate_dm(i: int, j: int, n: int, v: str) -> Tuple[str, int]:
"""Append OR via De Morgan using ONLY not_gate + and_gate.
Starting from length-n vector v, append:
x_{n+1} := ~x_i
x_{n+2} := ~x_j
x_{n+3} := x_{n+1} & x_{n+2}
x_{n+4} := ~x_{n+3} (this equals x_i OR x_j)
Returns (v_new, new_length).
"""
v1 = not_gate(i, n, v)
n1 = n + 1
v2 = not_gate(j, n1, v1)
n2 = n + 2
v3 = and_gate(n1, n2, n2, v2)
n3 = n + 3
v4 = not_gate(n3, n3, v3)
n4 = n + 4
return v4, n4
# -----------------------------
# Boolean tagging (DEC) with either inline normalizer or external binary
# -----------------------------
class Normalizer:
def __init__(self, nf_bin: Optional[str] = None):
self.nf_bin = nf_bin
if nf_bin:
self._dec_true_nf = self._nf_bin(f"({T.DEC} {T.TRUE})")
self._dec_false_nf = self._nf_bin(f"({T.DEC} {T.FALSE})")
else:
self._dec_true_nf = normalize_to_string(f"({T.DEC} {T.TRUE})")
self._dec_false_nf = normalize_to_string(f"({T.DEC} {T.FALSE})")
def _nf_bin(self, term: str) -> str:
out = subprocess.check_output([self.nf_bin, term], text=True)
return out.strip()
def nf(self, term: str) -> str:
if self.nf_bin:
return self._nf_bin(term)
return normalize_to_string(term)
def tag_bool(self, bterm: str) -> str:
src = f"({T.DEC} {bterm})"
nf = self.nf(src)
if nf == self._dec_true_nf:
return 'T'
if nf == self._dec_false_nf:
return 'F'
return '?'
# -----------------------------
# CVP evaluator + CNF compiler
# -----------------------------
Gate = Tuple[str, ...] # ('const','1') | ('not','i') | ('and','i','j') | ('or','i','j')
def parse_cvp(text: str) -> List[Gate]:
gates: List[Gate] = []
for raw in text.splitlines():
line = raw.strip()
if not line or line.startswith('#'):
continue
parts = line.split()
op = parts[0].lower()
if op == 'const':
if len(parts) != 2 or parts[1] not in ('0', '1'):
raise ValueError(f"bad const line: {raw!r}")
gates.append(('const', parts[1]))
elif op == 'not':
if len(parts) != 2:
raise ValueError(f"bad not line: {raw!r}")
gates.append(('not', parts[1]))
elif op in ('and', 'or'):
if len(parts) != 3:
raise ValueError(f"bad {op} line: {raw!r}")
gates.append((op, parts[1], parts[2]))
else:
raise ValueError(f"unknown gate op: {op!r}")
return gates
def eval_cvp(
gates: Sequence[Gate],
*,
init_bits: str = '',
norm: Normalizer,
verbose: bool = True
) -> Tuple[str, int, List[str]]:
"""Evaluate CVP as a sequence of appended bits.
init_bits: bitstring of initial variables to append first (e.g. '101').
Returns (vector_term, length, python_tags_for_all_bits).
"""
bits: List[str] = []
v = T.tup([])
n = 0
# initialize variables
for ch in init_bits.strip():
if ch not in '01':
raise ValueError('init_bits must be a bitstring')
const = TRUE if ch == '1' else FALSE
v = app(append_const_op(n, const), v)
n += 1
bits.append('T' if ch == '1' else 'F')
if verbose:
print(f" init x{n} = {bits[-1]}")
# apply gates
for g in gates:
op = g[0]
if op == 'const':
const = TRUE if g[1] == '1' else FALSE
v = app(append_const_op(n, const), v)
n += 1
expected = 'T' if g[1] == '1' else 'F'
bits.append(expected)
if verbose:
got = norm.tag_bool(proj_of_tuple(v, n, n))
print(f" const -> new x{n} = {got} (expect {expected})")
continue
if op == 'not':
i = int(g[1])
v = not_gate(i, n, v)
n += 1
expected = 'T' if bits[i-1] == 'F' else 'F'
bits.append(expected)
if verbose:
got = norm.tag_bool(proj_of_tuple(v, n, n))
print(f" not {i} -> new x{n} = {got} (expect {expected})")
continue
if op == 'and':
i, j = int(g[1]), int(g[2])
v = and_gate(i, j, n, v)
n += 1
expected = 'T' if (bits[i-1] == 'T' and bits[j-1] == 'T') else 'F'
bits.append(expected)
if verbose:
got = norm.tag_bool(proj_of_tuple(v, n, n))
print(f" and {i} {j} -> new x{n} = {got} (expect {expected})")
continue
if op == 'or':
i, j = int(g[1]), int(g[2])
# De Morgan OR appends 4 bits; we report only the final OR bit.
v, n = or_gate_dm(i, j, n, v)
# update python truth list for each appended bit
not_i = 'T' if bits[i-1] == 'F' else 'F'
not_j = 'T' if bits[j-1] == 'F' else 'F'
bits.append(not_i) # x_{old+1}
bits.append(not_j) # x_{old+2}
bits.append('T' if (not_i == 'T' and not_j == 'T') else 'F') # x_{old+3}
bits.append('T' if bits[-1] == 'F' else 'F') # x_{old+4} = OR
if verbose:
got = norm.tag_bool(proj_of_tuple(v, n, n))
expected = bits[-1]
print(f" or {i} {j} -> new x{n} = {got} (expect {expected})")
continue
raise RuntimeError(f"unhandled gate: {g}")
return v, n, bits
def parse_dimacs(path: str) -> Tuple[int, List[List[int]]]:
nvars = 0
clauses: List[List[int]] = []
with open(path, 'r', encoding='utf-8') as f:
for raw in f:
line = raw.strip()
if not line or line.startswith('c'):
continue
if line.startswith('p'):
parts = line.split()
if len(parts) >= 4 and parts[1] == 'cnf':
nvars = int(parts[2])
continue
lits = [int(x) for x in line.split() if x != '0']
if lits:
clauses.append(lits)
if nvars <= 0:
# fallback: infer from clauses
nvars = max((abs(l) for cl in clauses for l in cl), default=0)
return nvars, clauses
def eval_cnf_dimacs(path: str, assign_bits: str, *, norm: Normalizer, verbose: bool = True) -> str:
"""Evaluate DIMACS CNF under assignment using TopCVP ops.
This compiles the CNF to an appended-bit circuit, then returns the final CNF truth tag.
"""
nvars, clauses = parse_dimacs(path)
if len(assign_bits) != nvars:
raise ValueError(f"assignment length {len(assign_bits)} != nvars {nvars}")
# Start with variables x1..xnvars as given bits.
v = T.tup([])
n = 0
bits: List[str] = []
for ch in assign_bits:
const = TRUE if ch == '1' else FALSE
v = app(append_const_op(n, const), v)
n += 1
bits.append('T' if ch == '1' else 'F')
def lit_value_idx(lit: int) -> int:
nonlocal v, n, bits
var_idx = abs(lit)
if lit > 0:
return var_idx
# negative literal: append NOT(var_idx)
v = not_gate(var_idx, n, v)
n += 1
val = 'T' if bits[var_idx-1] == 'F' else 'F'
bits.append(val)
return n
# For each clause, compute its OR by folding pairwise OR via De Morgan.
clause_idxs: List[int] = []
for cl in clauses:
if not cl:
# empty clause => false
v = app(append_const_op(n, FALSE), v)
n += 1
bits.append('F')
clause_idxs.append(n)
continue
cur = lit_value_idx(cl[0])
for lit in cl[1:]:
nxt = lit_value_idx(lit)
v, n = or_gate_dm(cur, nxt, n, v)
# update python-truth for OR-gate's internal bits
not_cur = 'T' if bits[cur-1] == 'F' else 'F'
not_nxt = 'T' if bits[nxt-1] == 'F' else 'F'
bits.append(not_cur)
bits.append(not_nxt)
bits.append('T' if (not_cur == 'T' and not_nxt == 'T') else 'F')
bits.append('T' if bits[-1] == 'F' else 'F')
cur = n
clause_idxs.append(cur)
# CNF = AND of clause outputs (fold with and_gate)
if not clause_idxs:
# vacuously true
v = app(append_const_op(n, TRUE), v)
n += 1
bits.append('T')
cnf_idx = n
else:
cnf_idx = clause_idxs[0]
for ci in clause_idxs[1:]:
v = and_gate(cnf_idx, ci, n, v)
n += 1
val = 'T' if (bits[cnf_idx-1] == 'T' and bits[ci-1] == 'T') else 'F'
bits.append(val)
cnf_idx = n
if verbose:
got = norm.tag_bool(proj_of_tuple(v, cnf_idx, n))
print(f"CNF value: {got} (python expect {bits[cnf_idx-1]})")
return norm.tag_bool(proj_of_tuple(v, cnf_idx, n))
# -----------------------------
# Built-in demos
# -----------------------------
def paper_example_gates() -> List[Gate]:
# x1:=1; x2:=0; x3:=1; x4:=x1&x2; x5:=~x1; x6:=x5&x3; x7:=x4|x6
return [
('const','1'),
('const','0'),
('const','1'),
('and','1','2'),
('not','1'),
('and','5','3'),
('or','4','6'),
]
def build_xor_gates() -> List[Gate]:
# init x1,x2 are init_bits, then:
# x3 = x1 AND x2
# x4 = NOT x3
# x5 = x1 OR x2 (expands by 4 bits; final OR bit ends up at x8 in this layout)
# x9 = x4 AND x8 => XOR
return [
('and','1','2'),
('not','3'),
('or','1','2'),
('and','4','8'),
]
def truth_table(expr: str, norm: Normalizer) -> None:
if expr == 'nand':
gates = [('and','1','2'), ('not','3')]
outs = [4]
elif expr == 'xor':
gates = build_xor_gates()
outs = [9]
elif expr == 'half-adder':
gates = [('and','1','2'), ('not','3'), ('or','1','2'), ('and','4','8')]
outs = [3,9]
else:
raise ValueError('expr must be nand|xor|half-adder')
for a, b in itertools.product('01', repeat=2):
init = a + b
print(f"in {a}{b}")
v, n, _bits = eval_cvp(gates, init_bits=init, norm=norm, verbose=False)
tags = [norm.tag_bool(proj_of_tuple(v, idx, n)) for idx in outs]
print(" out", tags)
# -----------------------------
# CLI
# -----------------------------
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument(
'--nf-bin',
default=None,
help='Optional external normalizer binary: argv1=term, stdout=NF term string'
)
sub = ap.add_subparsers(dest='cmd', required=True)
sub.add_parser('paper-example', help='Run the paper example circuit with step-by-step new-bit tagging')
p_tt = sub.add_parser('truth-table', help='Truth table for small demo circuits')
p_tt.add_argument('--expr', required=True, choices=['nand','xor','half-adder'])
p_tag = sub.add_parser('tag', help='Tag a boolean term via DEC')
p_tag.add_argument('term', help='Lambda term string')
p_cvp = sub.add_parser('eval-cvp', help='Evaluate a CVP gate list (DSL)')
p_cvp.add_argument('--cvp', required=True, help='CVP DSL as a single string; lines like: const 1 | not i | and i j | or i j')
p_cvp.add_argument('--init', default='', help='Initial bitstring (variables) to append before gates, e.g. 101')
p_dim = sub.add_parser('eval-dimacs', help='Evaluate DIMACS CNF under assignment')
p_dim.add_argument('--dimacs', required=True, help='Path to DIMACS CNF file')
p_dim.add_argument('--assign', required=True, help='Bitstring assignment of length nvars')
args = ap.parse_args()
# (also fixed: actually use --nf-bin)
norm = Normalizer(nf_bin=args.nf_bin)
if args.cmd == 'tag':
print(norm.tag_bool(args.term))
return
if args.cmd == 'paper-example':
gates = paper_example_gates()
eval_cvp(gates, init_bits='', norm=norm, verbose=True)
return
if args.cmd == 'truth-table':
truth_table(args.expr, norm)
return
if args.cmd == 'eval-cvp':
gates = parse_cvp(args.cvp)
eval_cvp(gates, init_bits=args.init, norm=norm, verbose=True)
return
if args.cmd == 'eval-dimacs':
eval_cnf_dimacs(args.dimacs, args.assign, norm=norm, verbose=True)
return
raise SystemExit(2)
if __name__ == '__main__':
main()
Author
#!/bin/bash
set -e
# --- Configuration ---
NORMALIZER="./planar_nf"
# 0. Safety Check
if [ ! -x "$NORMALIZER" ]; then
echo "Error: Normalizer binary '$NORMALIZER' not found."
exit 1
fi
# ==============================================================================
# 1. Create the Driver Script
# ==============================================================================
cat << 'EOF' > driver_67.py
import subprocess
import sys
import re
# --- Configuration ---
NORMALIZER = "./planar_nf"
SCRIPT = "topcvp_beta.py"
# --- Dynamic Circuit Generator ---
def generate_adder_cvp(a_val, b_val, cin_val):
lines = []
# Track physical wire count to predict logical outputs
# Constants take 3 wires
current_physical_top = 3
def emit(s):
lines.append(s)
# 1. Constants (Wires 1, 2, 3)
emit(f"const {a_val}")
w_a = 1 # Known fixed index
emit(f"const {b_val}")
w_b = 2 # Known fixed index
emit(f"const {cin_val}")
w_cin = 3 # Known fixed index
# 2. Logic Manager
# We track the "Physical Top" of the stack because 'topcvp' expands macros.
# or -> 4 wires
# and -> 1 wire
# not -> 1 wire
def do_op(op, arg1, arg2=None):
nonlocal current_physical_top
if op == 'not':
emit(f"not {arg1}")
current_physical_top += 1
return current_physical_top
elif op == 'and':
emit(f"and {arg1} {arg2}")
current_physical_top += 1
return current_physical_top
elif op == 'or':
emit(f"or {arg1} {arg2}")
current_physical_top += 4
return current_physical_top
def XOR(x, y):
# x XOR y = (x | y) & ~(x & y)
w_or = do_op('or', x, y)
w_and = do_op('and', x, y)
w_nand = do_op('not', w_and)
w_res = do_op('and', w_or, w_nand)
return w_res
# 3. Full Adder Logic
# Sum = A XOR B XOR Cin
s1 = XOR(w_a, w_b)
sum_final = XOR(s1, w_cin)
# Cout = (A & B) | (Cin & (A XOR B))
# Recalculate terms to maintain linear safety
ab = do_op('and', w_a, w_b)
# We need (Cin & S1). Note: S1 was (A XOR B)
cin_s1 = do_op('and', w_cin, s1)
cout = do_op('or', ab, cin_s1)
return "\n".join(lines), sum_final, cout
def solve_bit(a_val, b_val, c_val):
# 1. Generate Circuit
cvp_content, sum_idx, cout_idx = generate_adder_cvp(a_val, b_val, c_val)
# 2. Run Simulator
cmd = ["python3", SCRIPT, "eval-cvp", "--cvp", cvp_content]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print("SIMULATOR CRASHED:", result.stderr)
sys.exit(1)
# 3. Parse Output
map_vals = {}
for line in result.stdout.splitlines():
# strict matching for "new x<NUM> ="
m = re.search(r"new x(\d+) = (\S+)", line)
if m:
map_vals[int(m.group(1))] = m.group(2)
# 4. Extract
s_sym = map_vals.get(sum_idx)
c_sym = map_vals.get(cout_idx)
if s_sym is None or c_sym is None:
print(f"--- PARSING ERROR ---")
print(f"Expected wires x{sum_idx} (Sum) and x{cout_idx} (Cout)")
sys.exit(1)
def to_int(sym):
return 1 if sym == 'T' else 0
return to_int(s_sym), to_int(c_sym)
def main():
# ---------------------------------------------------------
# Computing: 35 + 32 = 67
# ---------------------------------------------------------
# 35 = 32 + 2 + 1
# Binary (LSB first): 1, 1, 0, 0, 0, 1, 0
A_bits = [1, 1, 0, 0, 0, 1, 0]
# 32 = 32
# Binary (LSB first): 0, 0, 0, 0, 0, 1, 0
B_bits = [0, 0, 0, 0, 0, 1, 0]
carry = 0
results = []
print(f"Arithmetic Pipeline: 35 + 32 (Staged)...")
print("-" * 50)
for i in range(len(A_bits)):
a = A_bits[i]
b = B_bits[i]
sum_bit, cout = solve_bit(a, b, carry)
# Visualize the addition at this stage
print(f"Bit {i} (2^{i}): {a} + {b} + Carry({carry}) ==> Sum: {sum_bit}, NewCarry: {cout}")
results.append(sum_bit)
carry = cout
# Reconstruct Decimal
val = 0
for i, bit in enumerate(results):
val += bit * (2**i)
print("-" * 50)
print(f"Final Binary (LSB): {results}")
print(f"Arithmetic Result: {val}")
if __name__ == "__main__":
main()
EOF
# ==============================================================================
# 2. Run
# ==============================================================================
python3 driver_67.py
Author
r"""
planar_nf.py — fast beta-normalizer for (mostly) planar/linear untyped lambda terms.
Input syntax (matches your examples):
- Abstraction: \\\x. BODY (also accepts 'λ' instead of '\')
- Application: (F A) (fully parenthesized recommended; N-ary apps allowed: (f a b c))
- Variables: identifiers like x, x1, b5, foo_bar
Key idea for speed on planar/linear terms:
- During parsing, each binder records its *unique* variable occurrence node (if any).
- Beta-reduction then becomes O(1): splice the argument subtree into that occurrence (no copying).
- Strong normalization is implemented iteratively (no Python recursion).
If a binder is used more than once, we raise (that’s outside “planar/linear”).
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional, List, Tuple, Union, Dict, Iterator
import itertools
import re
# -----------------------------
# Core term data structures
# -----------------------------
_binder_ids = itertools.count(1)
class Node:
__slots__ = ("parent", "pslot")
def __init__(self) -> None:
self.parent: Optional[Node] = None
self.pslot: Optional[str] = None # which field in parent: "body" / "fn" / "arg"
class Binder:
__slots__ = ("name", "id", "occ")
def __init__(self, name: str) -> None:
self.name = name
self.id = next(_binder_ids)
self.occ: Optional[Var] = None # planar/linear: 0 or 1 occurrence
class Var(Node):
__slots__ = ("binder", "name")
def __init__(self, name: str, binder: Optional[Binder] = None) -> None:
super().__init__()
self.binder = binder
self.name = name
if binder is not None:
if binder.occ is None:
binder.occ = self
else:
# Non-linear: multiple occurrences
raise ValueError(f"Non-linear variable use: binder '{binder.name}' appears more than once.")
class Abs(Node):
__slots__ = ("binder", "body")
def __init__(self, binder: Binder, body: Node) -> None:
super().__init__()
self.binder = binder
self.body = body
body.parent = self
body.pslot = "body"
class App(Node):
__slots__ = ("fn", "arg")
def __init__(self, fn: Node, arg: Node) -> None:
super().__init__()
self.fn = fn
self.arg = arg
fn.parent = self
fn.pslot = "fn"
arg.parent = self
arg.pslot = "arg"
Term = Union[Var, Abs, App]
# -----------------------------
# Tokenizer
# -----------------------------
_ident_re = re.compile(r"[A-Za-z_][A-Za-z0-9_]*")
def tokenize(src: str) -> Iterator[str]:
i = 0
n = len(src)
while i < n:
c = src[i]
if c.isspace():
i += 1
continue
if c in ("(", ")", "."):
yield c
i += 1
continue
if c == "\\" or c == "λ":
yield "\\"
i += 1
continue
m = _ident_re.match(src, i)
if not m:
raise SyntaxError(f"Unexpected character at {i}: {src[i:i+20]!r}")
yield m.group(0)
i = m.end()
# -----------------------------
# Parser (iterative, stack-based)
# -----------------------------
_LP = ("LPAREN", None) # marker (kind, binder_stack_len)
_LAM = ("LAM", None) # marker (kind, Binder)
def parse(src: str) -> Term:
toks = list(tokenize(src))
tstack: List[object] = []
bstack: List[Binder] = []
i = 0
def reduce_lams() -> None:
# If stack ends with [("LAM", binder), <term>], reduce to Abs(binder, term)
nonlocal tstack, bstack
while len(tstack) >= 2 and isinstance(tstack[-2], tuple) and tstack[-2][0] == "LAM" and isinstance(tstack[-1], Node):
_, b = tstack[-2]
body = tstack.pop() # type: ignore
tstack.pop()
# Pop binder from binder stack (must match)
if not bstack or bstack[-1] is not b:
raise RuntimeError("Binder stack mismatch while reducing lambda.")
bstack.pop()
tstack.append(Abs(b, body)) # type: ignore
while i < len(toks):
tok = toks[i]
i += 1
if tok == "(":
tstack.append(("LPAREN", len(bstack)))
continue
if tok == ")":
items: List[Node] = []
while tstack:
top = tstack.pop()
if isinstance(top, tuple) and top[0] == "LPAREN":
target_len = top[1]
# Restore binder stack to what it was at '('
if len(bstack) < target_len:
raise RuntimeError("Binder stack underflow at ')'.")
del bstack[target_len:]
break
if not isinstance(top, Node):
raise SyntaxError("Malformed group inside parentheses.")
items.append(top)
else:
raise SyntaxError("Unmatched ')'.")
items.reverse()
if not items:
raise SyntaxError("Empty parentheses group.")
# (a b c) => ((a b) c)
term: Node = items[0]
for nxt in items[1:]:
term = App(term, nxt)
tstack.append(term)
reduce_lams()
continue
if tok == "\\":
if i >= len(toks): raise SyntaxError("Unexpected end after lambda.")
name = toks[i]; i += 1
if i >= len(toks) or toks[i] != ".": raise SyntaxError("Expected '.' after lambda parameter.")
i += 1
b = Binder(name)
bstack.append(b)
tstack.append(("LAM", b))
continue
if tok == ".":
raise SyntaxError("Unexpected '.'.")
# identifier: resolve to nearest binder with same name, else free var
bnd: Optional[Binder] = None
for b in reversed(bstack):
if b.name == tok:
bnd = b
break
tstack.append(Var(tok, bnd))
reduce_lams()
# Close any remaining lambdas (top-level)
if any(isinstance(x, tuple) and x[0] == "LPAREN" for x in tstack):
raise SyntaxError("Unmatched '('.")
# Fold any remaining top-level applications (rare; usually one term)
terms = [x for x in tstack if isinstance(x, Node)]
if not terms:
raise SyntaxError("No term parsed.")
term: Node = terms[0]
for nxt in terms[1:]:
term = App(term, nxt)
return term # type: ignore
# -----------------------------
# Beta reduction (O(1) for linear)
# -----------------------------
def _replace_child(parent: Node, slot: str, new: Node) -> None:
if isinstance(parent, Abs) and slot == "body":
parent.body = new
elif isinstance(parent, App) and slot == "fn":
parent.fn = new
elif isinstance(parent, App) and slot == "arg":
parent.arg = new
else:
raise RuntimeError("Bad parent/slot in replacement.")
new.parent = parent
new.pslot = slot
def beta(abs_node: Abs, arg: Node) -> Node:
b = abs_node.binder
body = abs_node.body
occ = b.occ
if occ is None:
# Argument unused
body.parent = None
body.pslot = None
return body
# If the body itself is the occurrence, the result is just the argument
if occ is body:
arg.parent = None
arg.pslot = None
return arg
# Splice arg into occurrence site
p = occ.parent
slot = occ.pslot
if p is None:
# body itself is the variable
arg.parent = None
arg.pslot = None
return arg
assert slot is not None
_replace_child(p, slot, arg)
# Detach body from abs
body.parent = None
body.pslot = None
return body
# -----------------------------
# Weak-head normalization (iterative)
# -----------------------------
def whnf(t: Node) -> Node:
"""
Reduce t by normal-order at the head (left spine) until it is in weak-head normal form.
Implemented with spine reuse to avoid allocations.
"""
while True:
# Unwind left spine collecting App nodes
apps: List[App] = []
cur: Node = t
while isinstance(cur, App):
apps.append(cur)
cur = cur.fn
reduced = False
# Rebuild from inner -> outer, performing beta steps when possible
for app in reversed(apps):
app.fn = cur
cur.parent = app
cur.pslot = "fn"
if isinstance(cur, Abs):
cur = beta(cur, app.arg)
reduced = True
# cur is detached (parent None) here; outer rebuild will attach it
else:
cur = app
cur.parent = None
cur.pslot = None
t = cur
if not reduced:
return t
# -----------------------------
# Full (strong) normalization, iterative
# -----------------------------
def normalize(t: Node) -> Node:
"""
Compute full beta-normal form (strong normalization), normal-order style.
Works best when the term is planar/linear.
"""
t = whnf(t)
# Manual stack to avoid recursion.
# Frames:
# ("abs", abs_node)
# ("app_fn", app_node) - fn done, next normalize arg
# ("app_arg", app_node) - arg done, finalize app and whnf it
stack: List[Tuple[str, Node]] = []
while True:
if isinstance(t, Abs):
stack.append(("abs", t))
t = whnf(t.body)
continue
if isinstance(t, App):
stack.append(("app_fn", t))
t = whnf(t.fn)
continue
# t is Var (or free symbol)
while stack:
tag, node = stack.pop()
if tag == "abs":
absn: Abs = node # type: ignore
absn.body = t
t.parent = absn
t.pslot = "body"
t = absn
continue
if tag == "app_fn":
appn: App = node # type: ignore
appn.fn = t
t.parent = appn
t.pslot = "fn"
stack.append(("app_arg", appn))
t = whnf(appn.arg)
break # go back to outer loop
if tag == "app_arg":
appn: App = node # type: ignore
appn.arg = t
t.parent = appn
t.pslot = "arg"
t = whnf(appn) # may reduce (and returns detached)
continue
raise RuntimeError("Unknown frame tag.")
else:
# Finished rebuilding to root
t.parent = None
t.pslot = None
return t
# -----------------------------
# Utilities
# -----------------------------
def node_count(t: Node) -> int:
seen = set()
stack = [t]
n = 0
while stack:
x = stack.pop()
if id(x) in seen: # shouldn’t happen unless you add sharing manually
continue
seen.add(id(x))
n += 1
if isinstance(x, Abs):
stack.append(x.body)
elif isinstance(x, App):
stack.append(x.arg)
stack.append(x.fn)
return n
def to_string(t: Node) -> str:
"""
Serialize back to fully-parenthesized syntax:
Var => x
Abs => (\\x.BODY)
App => (F A)
Note: printing very large terms is inherently expensive (output size).
"""
env: Dict[int, str] = {}
used: Dict[str, int] = {}
def fresh(base: str) -> str:
k = used.get(base, 0)
used[base] = k + 1
return base if k == 0 else f"{base}{k}"
out: List[str] = []
# action kinds: 'str', 'node', 'pop'
actions: List[Tuple[str, object]] = [("node", t)]
while actions:
kind, payload = actions.pop()
if kind == "str":
out.append(payload) # type: ignore
continue
if kind == "pop":
bid = payload # type: ignore
env.pop(bid, None)
continue
# kind == "node"
n = payload # type: ignore
if isinstance(n, Var):
if n.binder is None:
out.append(n.name)
else:
out.append(env.get(n.binder.id, n.binder.name))
continue
if isinstance(n, Abs):
b = n.binder
name = env.get(b.id)
if name is None:
name = fresh(b.name)
env[b.id] = name
actions.append(("pop", b.id))
actions.append(("str", ")"))
actions.append(("node", n.body))
actions.append(("str", "."))
actions.append(("str", name))
actions.append(("str", "\\"))
actions.append(("str", "("))
continue
if isinstance(n, App):
actions.append(("str", ")"))
actions.append(("node", n.arg))
actions.append(("str", " "))
actions.append(("node", n.fn))
actions.append(("str", "("))
continue
raise TypeError("Unknown node type in to_string().")
return "".join(out)
def parse_and_normalize(src: str) -> Node:
return normalize(parse(src))
if __name__ == "__main__":
import sys, time
if len(sys.argv) < 2:
print("usage: python3 planar_nf.py '<term>'")
raise SystemExit(2)
term_src = sys.argv[1]
t0 = time.perf_counter()
ast = parse(term_src)
t1 = time.perf_counter()
nf = normalize(ast)
t2 = time.perf_counter()
print(f"parsed nodes: {node_count(ast)} parse_s={t1-t0:.4f}")
print(f"nf nodes: {node_count(nf)} norm_s={t2-t1:.4f}")
# Beware: printing giant terms is slow
# print(to_string(nf))
```
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment