|
// Reproducer for PostgreSQL bug: |
|
// recovery_target_action=pause is silently ignored when a RUNNING_XACTS WAL |
|
// record with subxid_overflow=true is replayed during standby recovery. |
|
// |
|
// Root cause (three-step chain): |
|
// 1. RUNNING_XACTS with subxid_overflow=true sets standbyState = STANDBY_SNAPSHOT_PENDING |
|
// instead of STANDBY_SNAPSHOT_READY (procarray.c). |
|
// 2. STANDBY_SNAPSHOT_PENDING prevents LocalHotStandbyActive from ever becoming true |
|
// (xlogrecovery.c). |
|
// 3. recoveryPausesHere() returns early when LocalHotStandbyActive=false, silently |
|
// skipping the pause and falling through to PROMOTE (xlogrecovery.c). |
|
// |
|
// Effect: standby promotes directly to primary instead of pausing. |
|
// Symptom: pg_get_wal_replay_pause_state() throws ERROR: recovery is not in progress |
|
// (SQLSTATE 55000). |
|
// |
|
// Affected versions confirmed: PostgreSQL 17.x |
|
// See: https://github.com/postgres/postgres/blob/REL_17_8/src/backend/storage/ipc/procarray.c#L1261 |
|
// https://github.com/postgres/postgres/blob/REL_17_8/src/backend/access/transam/xlogrecovery.c#L2935 |
|
package main |
|
|
|
import ( |
|
"context" |
|
"errors" |
|
"fmt" |
|
"os" |
|
"os/exec" |
|
"path/filepath" |
|
"strings" |
|
"time" |
|
|
|
"github.com/charmbracelet/log" |
|
"github.com/jackc/pgx/v5" |
|
"github.com/jackc/pgx/v5/pgconn" |
|
) |
|
|
|
const ( |
|
primaryPort = 15432 |
|
standbyPort = 15433 |
|
|
|
// PGPROC_MAX_CACHED_SUBXIDS = 64 in PostgreSQL source. |
|
// We need strictly more than 64 active subtransactions to trigger the overflow. |
|
numSavepoints = 100 |
|
) |
|
|
|
// ──────────────────────────────────────────────────────────────────────────── |
|
// Entrypoint |
|
// ──────────────────────────────────────────────────────────────────────────── |
|
|
|
func main() { |
|
log.SetReportTimestamp(false) |
|
|
|
binDir, err := findPgBinDir() |
|
if err != nil { |
|
log.Fatal("PostgreSQL not found", "err", err, "hint", "install PostgreSQL 14+ and ensure pg_ctl is in your PATH") |
|
} |
|
|
|
ver, _ := pgServerVersion(binDir) |
|
log.Info("PostgreSQL", "binaries", binDir, "version", ver) |
|
|
|
tmpDir, err := os.MkdirTemp("", "pg-subxid-overflow-*") |
|
if err != nil { |
|
log.Fatal("MkdirTemp", "err", err) |
|
} |
|
log.Info("working directory", "path", tmpDir) |
|
|
|
r := &repro{ |
|
binDir: binDir, |
|
tmpDir: tmpDir, |
|
primaryDir: filepath.Join(tmpDir, "primary"), |
|
standbyDir: filepath.Join(tmpDir, "standby"), |
|
archiveDir: filepath.Join(tmpDir, "wal-archive"), |
|
} |
|
defer r.cleanup() |
|
|
|
if err := r.run(); err != nil { |
|
log.Fatal("reproducer failed", "err", err) |
|
} |
|
} |
|
|
|
// ──────────────────────────────────────────────────────────────────────────── |
|
// Reproducer |
|
// ──────────────────────────────────────────────────────────────────────────── |
|
|
|
type repro struct { |
|
binDir string |
|
tmpDir string |
|
primaryDir string |
|
standbyDir string |
|
archiveDir string |
|
} |
|
|
|
func (r *repro) run() error { |
|
step("1/6", "Initialize and start primary PostgreSQL") |
|
if err := r.initPrimary(); err != nil { |
|
return fmt.Errorf("initPrimary: %w", err) |
|
} |
|
if err := r.startPrimary(); err != nil { |
|
return fmt.Errorf("startPrimary: %w", err) |
|
} |
|
|
|
ctx := context.Background() |
|
|
|
conn, err := r.connectWithRetry(ctx, primaryPort) |
|
if err != nil { |
|
return fmt.Errorf("connect to primary: %w", err) |
|
} |
|
defer conn.Close(ctx) |
|
|
|
step("2/6", fmt.Sprintf("Open transaction with %d SAVEPOINTs to overflow subxid cache (limit=64)", numSavepoints)) |
|
if err := r.createSubxidOverflow(ctx, conn); err != nil { |
|
return fmt.Errorf("createSubxidOverflow: %w", err) |
|
} |
|
|
|
// The big transaction is still OPEN here. When pg_basebackup forces a |
|
// checkpoint (--checkpoint=fast), PostgreSQL emits a RUNNING_XACTS WAL |
|
// record that will have subxid_overflow=true because our transaction has |
|
// 100 active subtransaction XIDs — more than PGPROC_MAX_CACHED_SUBXIDS=64. |
|
step("3/6", "Take base backup (--checkpoint=fast emits RUNNING_XACTS with subxid_overflow=true)") |
|
if err := r.takeBaseBackup(); err != nil { |
|
return fmt.Errorf("takeBaseBackup: %w", err) |
|
} |
|
|
|
// We can now commit/rollback the big transaction — backup is done. |
|
if _, err := conn.Exec(ctx, "ROLLBACK"); err != nil { |
|
log.Warn("ROLLBACK failed", "err", err) |
|
} |
|
|
|
step("4/6", "Configure standby: recovery_target=immediate, recovery_target_action=pause") |
|
if err := r.configureStandby(); err != nil { |
|
return fmt.Errorf("configureStandby: %w", err) |
|
} |
|
|
|
step("5/6", "Start standby and replay WAL (should pause; will it?)") |
|
log.Info("expected behavior", "result", "standby pauses at recovery target") |
|
log.Warn("buggy behavior", "result", "standby silently promotes (recoveryPausesHere returns early)") |
|
fmt.Println() |
|
if err := r.startStandby(); err != nil { |
|
return fmt.Errorf("startStandby: %w", err) |
|
} |
|
|
|
step("6/6", "Query pg_get_wal_replay_pause_state() on standby") |
|
return r.checkBug(ctx) |
|
} |
|
|
|
// ──────────────────────────────────────────────────────────────────────────── |
|
// Primary setup |
|
// ──────────────────────────────────────────────────────────────────────────── |
|
|
|
func (r *repro) initPrimary() error { |
|
if err := os.MkdirAll(r.archiveDir, 0o755); err != nil { |
|
return err |
|
} |
|
|
|
if err := r.pgCmd("initdb", |
|
"-D", r.primaryDir, |
|
"--no-locale", |
|
"--encoding=UTF8", |
|
"-U", "postgres", |
|
); err != nil { |
|
return fmt.Errorf("initdb: %w", err) |
|
} |
|
|
|
// Append to postgresql.conf |
|
pgconf := filepath.Join(r.primaryDir, "postgresql.conf") |
|
f, err := os.OpenFile(pgconf, os.O_APPEND|os.O_WRONLY, 0o644) |
|
if err != nil { |
|
return err |
|
} |
|
defer f.Close() |
|
|
|
archiveCmd := fmt.Sprintf("cp %%p %s/%%f", r.archiveDir) |
|
_, err = fmt.Fprintf(f, ` |
|
|
|
# ── subxid-overflow-repro ────────────────────────── |
|
listen_addresses = 'localhost' |
|
port = %d |
|
wal_level = replica |
|
archive_mode = on |
|
archive_command = '%s' |
|
max_wal_senders = 5 |
|
wal_keep_size = 64 |
|
`, primaryPort, archiveCmd) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
// Allow trust auth for localhost (test cluster only) |
|
hba := filepath.Join(r.primaryDir, "pg_hba.conf") |
|
return os.WriteFile(hba, []byte( |
|
"local all all trust\n"+ |
|
"host all all 127.0.0.1/32 trust\n"+ |
|
"host all all ::1/128 trust\n"+ |
|
"host replication all 127.0.0.1/32 trust\n"+ |
|
"host replication all ::1/128 trust\n", |
|
), 0o644) |
|
} |
|
|
|
func (r *repro) startPrimary() error { |
|
return r.pgCmd("pg_ctl", "start", "-D", r.primaryDir, "-w") |
|
} |
|
|
|
// ──────────────────────────────────────────────────────────────────────────── |
|
// Subxid overflow trigger |
|
// ──────────────────────────────────────────────────────────────────────────── |
|
|
|
func (r *repro) createSubxidOverflow(ctx context.Context, conn *pgx.Conn) error { |
|
// Create the helper table first (outside the big transaction so it persists) |
|
if _, err := conn.Exec(ctx, |
|
"CREATE TABLE IF NOT EXISTS subxid_test (id int)", |
|
); err != nil { |
|
return fmt.Errorf("CREATE TABLE: %w", err) |
|
} |
|
|
|
if _, err := conn.Exec(ctx, "BEGIN"); err != nil { |
|
return fmt.Errorf("BEGIN: %w", err) |
|
} |
|
|
|
// Create numSavepoints SAVEPOINTs, each with a write. Each SAVEPOINT |
|
// produces a subtransaction XID. After PGPROC_MAX_CACHED_SUBXIDS=64 |
|
// subtransaction XIDs, the per-backend subxid cache overflows and the |
|
// `overflowed` flag is set in the proc array. |
|
for i := 1; i <= numSavepoints; i++ { |
|
if _, err := conn.Exec(ctx, fmt.Sprintf("SAVEPOINT s%d", i)); err != nil { |
|
return fmt.Errorf("SAVEPOINT s%d: %w", i, err) |
|
} |
|
if _, err := conn.Exec(ctx, fmt.Sprintf("INSERT INTO subxid_test VALUES (%d)", i)); err != nil { |
|
return fmt.Errorf("INSERT at s%d: %w", i, err) |
|
} |
|
} |
|
|
|
log.Info("subxid cache overflowed", "active_subtransactions", numSavepoints, "overflow_at", 64) |
|
log.Info("transaction held open — visible during the upcoming checkpoint") |
|
return nil |
|
} |
|
|
|
// ──────────────────────────────────────────────────────────────────────────── |
|
// Base backup |
|
// ──────────────────────────────────────────────────────────────────────────── |
|
|
|
func (r *repro) takeBaseBackup() error { |
|
// --checkpoint=fast forces an immediate checkpoint on the primary. |
|
// Because our big transaction is still open, the checkpoint's RUNNING_XACTS |
|
// WAL record will have subxid_overflow=true. |
|
// --wal-method=stream captures all WAL generated during the backup so the |
|
// standby's pg_wal directory contains everything needed for recovery. |
|
if err := r.pgCmd("pg_basebackup", |
|
"-h", "localhost", |
|
"-p", fmt.Sprintf("%d", primaryPort), |
|
"-U", "postgres", |
|
"-D", r.standbyDir, |
|
"--wal-method=stream", |
|
"--checkpoint=fast", |
|
"-P", |
|
); err != nil { |
|
return fmt.Errorf("pg_basebackup: %w", err) |
|
} |
|
log.Info("base backup complete", "dir", r.standbyDir) |
|
return nil |
|
} |
|
|
|
// ──────────────────────────────────────────────────────────────────────────── |
|
// Standby setup |
|
// ──────────────────────────────────────────────────────────────────────────── |
|
|
|
func (r *repro) configureStandby() error { |
|
// Create standby.signal so PostgreSQL enters recovery mode. |
|
if err := os.WriteFile( |
|
filepath.Join(r.standbyDir, "standby.signal"), []byte{}, 0o644, |
|
); err != nil { |
|
return err |
|
} |
|
|
|
// pg_basebackup may have created postgresql.auto.conf with a primary_conninfo |
|
// for streaming replication. Remove it: we want pure archive recovery so that |
|
// recovery is controlled solely by the WAL on disk — matching the pgbackup |
|
// restore-from-S3 scenario exactly. |
|
os.Remove(filepath.Join(r.standbyDir, "postgresql.auto.conf")) |
|
|
|
pgconf := filepath.Join(r.standbyDir, "postgresql.conf") |
|
f, err := os.OpenFile(pgconf, os.O_APPEND|os.O_WRONLY, 0o644) |
|
if err != nil { |
|
return err |
|
} |
|
defer f.Close() |
|
|
|
restoreCmd := fmt.Sprintf("cp %s/%%f %%p", r.archiveDir) |
|
_, err = fmt.Fprintf(f, ` |
|
|
|
# ── subxid-overflow-repro: standby recovery ─────── |
|
port = %d |
|
restore_command = '%s' |
|
recovery_target = 'immediate' |
|
recovery_target_action = 'pause' # BUG: this is silently ignored when |
|
# RUNNING_XACTS has subxid_overflow=true |
|
hot_standby = on |
|
`, standbyPort, restoreCmd) |
|
return err |
|
} |
|
|
|
func (r *repro) startStandby() error { |
|
logFile := filepath.Join(r.standbyDir, "postgresql.log") |
|
pgctl := filepath.Join(r.binDir, "pg_ctl") |
|
|
|
cmd := exec.Command(pgctl, |
|
"start", "-D", r.standbyDir, |
|
"-w", "-t", "60", |
|
"-l", logFile, |
|
) |
|
cmd.Stdout = os.Stdout |
|
cmd.Stderr = os.Stderr |
|
|
|
if err := cmd.Run(); err != nil { |
|
// pg_ctl -w can fail when recovery ends in an unexpected state (e.g. it |
|
// promoted instead of pausing). Log it and continue — checkBug() will |
|
// determine the actual outcome. |
|
log.Warn("pg_ctl start returned error (may be expected)", "err", err) |
|
} |
|
|
|
// Show relevant lines from the standby log. |
|
if data, _ := os.ReadFile(logFile); len(data) > 0 { |
|
fmt.Println() |
|
fmt.Println(" ┌─ Standby log (recovery-related lines) ──────────────────────") |
|
for _, line := range strings.Split(string(data), "\n") { |
|
lower := strings.ToLower(line) |
|
if strings.Contains(lower, "running_xacts") || |
|
strings.Contains(lower, "subxid") || |
|
strings.Contains(lower, "promot") || |
|
strings.Contains(lower, "archive recovery complete") || |
|
strings.Contains(lower, "recovery is") || |
|
strings.Contains(lower, "paused") || |
|
strings.Contains(lower, "consistent") || |
|
strings.Contains(lower, "recovery target") { |
|
fmt.Printf(" │ %s\n", line) |
|
} |
|
} |
|
fmt.Println(" └─────────────────────────────────────────────────────────────") |
|
fmt.Println() |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// ──────────────────────────────────────────────────────────────────────────── |
|
// Bug check |
|
// ──────────────────────────────────────────────────────────────────────────── |
|
|
|
func (r *repro) checkBug(ctx context.Context) error { |
|
conn, err := r.connectWithRetry(ctx, standbyPort) |
|
if err != nil { |
|
return fmt.Errorf("connect to standby on port %d: %w", standbyPort, err) |
|
} |
|
defer conn.Close(ctx) |
|
|
|
// Also check pg_is_in_recovery() for extra clarity. |
|
var inRecovery bool |
|
_ = conn.QueryRow(ctx, "SELECT pg_is_in_recovery()").Scan(&inRecovery) |
|
|
|
var pauseState string |
|
queryErr := conn.QueryRow(ctx, "SELECT pg_get_wal_replay_pause_state()").Scan(&pauseState) |
|
|
|
printResult(inRecovery, pauseState, queryErr) |
|
return nil |
|
} |
|
|
|
func printResult(inRecovery bool, pauseState string, queryErr error) { |
|
fmt.Println() |
|
fmt.Println("══════════════════════════════════════════════════════════════════") |
|
|
|
if queryErr != nil { |
|
var pgErr *pgconn.PgError |
|
if errors.As(queryErr, &pgErr) && |
|
pgErr.Code == "55000" && |
|
strings.Contains(pgErr.Message, "recovery is not in progress") { |
|
|
|
log.Error("BUG REPRODUCED") |
|
fmt.Println() |
|
log.Info("pg_is_in_recovery()", "value", inRecovery) |
|
fmt.Println() |
|
log.Info("SELECT pg_get_wal_replay_pause_state()") |
|
log.Info(" expected", "value", "paused (recovery_target_action = pause)") |
|
log.Error(" actual", "value", fmt.Sprintf("ERROR: %s (SQLSTATE %s)", pgErr.Message, pgErr.Code)) |
|
fmt.Println() |
|
log.Info("root cause — step 1", "file", "procarray.c:1261", |
|
"detail", "RUNNING_XACTS subxid_overflow=true sets standbyState=STANDBY_SNAPSHOT_PENDING instead of READY") |
|
log.Info("root cause — step 2", "file", "xlogrecovery.c:2266", |
|
"detail", "standbyState != STANDBY_SNAPSHOT_READY so LocalHotStandbyActive is never set to true") |
|
log.Info("root cause — step 3", "file", "xlogrecovery.c:2935", |
|
"detail", "recoveryPausesHere() returns early: `if (!LocalHotStandbyActive) return;` → promotes") |
|
fmt.Println() |
|
log.Warn("affected deployments", |
|
"condition", "recovery_target_action=pause + RUNNING_XACTS with subxid_overflow=true in replayed WAL", |
|
"trigger", "live transaction with >PGPROC_MAX_CACHED_SUBXIDS (64) active subtransaction XIDs at checkpoint time") |
|
} else { |
|
log.Error("unexpected error", "err", queryErr) |
|
} |
|
} else { |
|
if pauseState == "paused" { |
|
log.Info("NOT REPRODUCED — behavior is correct (or bug is already fixed)") |
|
fmt.Println() |
|
log.Info("pg_is_in_recovery()", "value", inRecovery) |
|
log.Info("SELECT pg_get_wal_replay_pause_state()", "expected", "paused", "actual", "paused ✓") |
|
fmt.Println() |
|
log.Info("possible reasons", |
|
"1", "this PostgreSQL version has the fix applied", |
|
"2", "RUNNING_XACTS overflow record was not in the replayed WAL range", |
|
"hint", "try with PostgreSQL 17.x to confirm") |
|
} else { |
|
log.Warn("unexpected pause state", |
|
"pg_get_wal_replay_pause_state", pauseState, |
|
"pg_is_in_recovery", inRecovery) |
|
} |
|
} |
|
|
|
fmt.Println("══════════════════════════════════════════════════════════════════") |
|
fmt.Println() |
|
} |
|
|
|
// ──────────────────────────────────────────────────────────────────────────── |
|
// Helpers |
|
// ──────────────────────────────────────────────────────────────────────────── |
|
|
|
func (r *repro) connectWithRetry(ctx context.Context, port int) (*pgx.Conn, error) { |
|
dsn := fmt.Sprintf( |
|
"host=localhost port=%d user=postgres dbname=postgres sslmode=disable", |
|
port, |
|
) |
|
var last error |
|
for i := 0; i < 40; i++ { |
|
conn, err := pgx.Connect(ctx, dsn) |
|
if err == nil { |
|
return conn, nil |
|
} |
|
last = err |
|
time.Sleep(500 * time.Millisecond) |
|
} |
|
return nil, fmt.Errorf("could not connect after 20s: %w", last) |
|
} |
|
|
|
func (r *repro) pgCmd(name string, args ...string) error { |
|
cmd := exec.Command(filepath.Join(r.binDir, name), args...) |
|
cmd.Stdout = os.Stdout |
|
cmd.Stderr = os.Stderr |
|
return cmd.Run() |
|
} |
|
|
|
func (r *repro) cleanup() { |
|
fmt.Println() |
|
log.Info("cleanup") |
|
pgctl := filepath.Join(r.binDir, "pg_ctl") |
|
for _, dir := range []string{r.standbyDir, r.primaryDir} { |
|
if _, err := os.Stat(dir); err == nil { |
|
exec.Command(pgctl, "stop", "-D", dir, "-m", "immediate", "-w").Run() |
|
} |
|
} |
|
if err := os.RemoveAll(r.tmpDir); err != nil { |
|
log.Warn("cleanup failed", "path", r.tmpDir, "err", err) |
|
return |
|
} |
|
log.Info("removed temp dir", "path", r.tmpDir) |
|
} |
|
|
|
func step(n, msg string) { |
|
fmt.Println() |
|
log.Infof("step %s: %s", n, msg) |
|
fmt.Println() |
|
} |
|
|
|
func findPgBinDir() (string, error) { |
|
if path, err := exec.LookPath("pg_ctl"); err == nil { |
|
return filepath.Dir(path), nil |
|
} |
|
for _, dir := range []string{ |
|
"/opt/homebrew/bin", |
|
"/usr/local/bin", |
|
"/opt/homebrew/opt/postgresql@17/bin", |
|
"/opt/homebrew/opt/postgresql@16/bin", |
|
"/opt/homebrew/opt/postgresql@15/bin", |
|
"/usr/local/opt/postgresql@17/bin", |
|
"/usr/local/opt/postgresql@16/bin", |
|
"/usr/lib/postgresql/17/bin", |
|
"/usr/lib/postgresql/16/bin", |
|
"/usr/lib/postgresql/15/bin", |
|
} { |
|
if _, err := os.Stat(filepath.Join(dir, "pg_ctl")); err == nil { |
|
return dir, nil |
|
} |
|
} |
|
return "", fmt.Errorf("pg_ctl not found in PATH or common locations") |
|
} |
|
|
|
func pgServerVersion(binDir string) (string, error) { |
|
out, err := exec.Command(filepath.Join(binDir, "postgres"), "--version").Output() |
|
if err != nil { |
|
return "unknown", err |
|
} |
|
return strings.TrimSpace(string(out)), nil |
|
} |