Skip to content

Instantly share code, notes, and snippets.

@macrael
Last active May 23, 2019 21:46
Show Gist options
  • Select an option

  • Save macrael/277e5fbdef471c6ae6ccc7c77d3e4f84 to your computer and use it in GitHub Desktop.

Select an option

Save macrael/277e5fbdef471c6ae6ccc7c77d3e4f84 to your computer and use it in GitHub Desktop.
package postgres
import (
"fmt"
"testing"
"time"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq" // required for sqlx + postgres
)
// So you want to write a function in your db layer that fetches a row, changes something,
// and then updates that row. In the example below, we will try and add two to an integer field.
// You want to make sure that if two of these functions are running at the same time, you
// don't have them trample on each other. You don't ever want two of them to read the same
// initial value, then take turns updating because then you will have lost the work that one of
// them tried to do. Thankfully, postgres row locking with SELECT ... FOR UPDATE is here to save the day.
// To run this file, put it in a directory (probably in a gopath, I haven't worked out modules yet) and run:
// $ dep init
// $ go test -v .
// if you want to edit it and see what happens, I highly recommend using entr:
// $ find . | entr -c go test -v .
// We'll use a very simple table to explore this
const createTableSchema = `CREATE TABLE select_test_person (
id int,
name text,
age integer
)`
const dropTable = `DROP TABLE select_test_person`
// Here are the three SQL statements we'll use
const plainSelectSQL = `SELECT age FROM select_test_person WHERE id=$1`
const selectForUpdateSQL = `SELECT age FROM select_test_person WHERE id=$1 FOR UPDATE`
const updateSQL = `UPDATE select_test_person SET age=$2 WHERE id=$1`
// increaseAge is the function we are worried about. It selects a person from the table by ID, and
// then increases that age by the given amount. In order for us to synchronize our two troublesome
// runs of this function, it sends a blocking message back to the coordinating function before and after each query
func increaseAge(name string, db *sqlx.DB, sync chan<- string, selectSQL string, id int, ageIncremet int) {
innnerTX := db.MustBegin() // The two queries must be wrapped in a transaction
fmt.Println(name, "Began")
sync <- "select"
age := -1
innnerTX.Get(&age, selectSQL, id) // Here we select to get the current value
fmt.Println(name, "Selected", age)
sync <- "selected"
sync <- "update"
newAge := age + ageIncremet
innnerTX.MustExec(updateSQL, id, newAge) // Here we update to set the new value
fmt.Println(name, "Updated", newAge)
sync <- "updated"
innnerTX.Commit()
fmt.Println(name, "Committed")
close(sync)
}
// interleaveTransactions runs increaseAge twice, one inside the other. It uses message passing
// to coordinate the two transactions so that their queries happen in this order:
// 1. Outer SELECT
// 2. Inner SELECT
// 3. Outer UPDATE
// 4. Inner UPDATE
// we will see that row locking makes that order impossible.
func interleaveTransactions(t *testing.T, db *sqlx.DB, selectSQL string, id int) {
// setup two channels for communications
outerComms := make(chan string) // this is the channel from the outer transaction
innerComms := make(chan string) // this is the channel from the inner transaction
// The outer transaction will start first, and we'll have it add 2 to the age
go increaseAge("Outer", db, outerComms, selectSQL, id, 2)
// The inner transaction will start second, and we'll have it add 3 to the age so we can tell who wins
go increaseAge("Inner", db, innerComms, selectSQL, id, 3)
// because our channels have no buffering, the goroutines will block at each "sync" message
// so we can control when they continue from here.
outerBeganMsg := <-outerComms
fmt.Println("- outer do select:", outerBeganMsg) // I'm printing this just to make sure I don't get out of step
fmt.Println("- outer did select:", <-outerComms)
fmt.Println("- inner do select:", <-innerComms)
// our goal is for this inner select to be blocked by the fact that the outer select already happened.
// without row locking that won't be the case, but hopefully it will with row locking.
// We'll use a timeout to keep going if the select doesn't happen.
innerSelected := false
select {
case selectedMsg := <-innerComms:
fmt.Println("- inner did select:", selectedMsg)
innerSelected = true
case <-time.After(1 * time.Second):
fmt.Println("- inner did *not* select")
}
// Regardless of whether our inner select is able to happen here, we want simple SELECT queries
// to work fine even in the middle of a SELECT/UPDATE pair
inMediasAge := -1
db.Get(&inMediasAge, plainSelectSQL, id)
fmt.Println("In medias res, selected", inMediasAge)
// This demonstrates that SELECT ... FOR UPDATE doesn't block plain SELECTs, only other SELECT ... FOR UPDATEs
fmt.Println("- outer do update:", <-outerComms)
fmt.Println("- outer did update:", <-outerComms)
if !innerSelected {
// if the inner select didn't happen before, then it should be able to happen now
fmt.Println("- inner did select:", <-innerComms)
}
fmt.Println("- inner do update:", <-innerComms)
fmt.Println("- inner did update:", <-innerComms)
// wait for both channels to close to make sure the transactions closed.
if _, more := <-outerComms; more {
fmt.Println("We shouldn't have more to receive from OuterTX")
}
if _, more := <-innerComms; more {
fmt.Println("We shouldn't have more to receive from InnerTX")
}
// Now, ideally, since we had two transactions attempt to add 2 and 3 to the age respectively,
// the age we end up with should be 5 more than it started at. Is that the case?
// For the broken case, could you re-order some of the commands above to get it to come out
// at +2 instead of +3?
finalAge := -1
db.Get(&finalAge, plainSelectSQL, id)
fmt.Println("After Both:", finalAge)
if finalAge != 15 {
t.Log("Adding 2 and 3 to 10 should give 15")
t.Log("it didn't using:", selectSQL)
t.Fail()
} else {
fmt.Println("Yay! even though both functions started interleaved, they each worked as intended")
}
}
func TestSelect(t *testing.T) {
db, connErr := sqlx.Connect("postgres", "postgres://postgres@localhost:5432/postgres")
if connErr != nil {
t.Fatal(connErr)
}
// drop (if it exists) and re-create the table
db.Exec(dropTable)
db.MustExec(createTableSchema)
// seed the table with a couple rows
db.MustExec("INSERT INTO select_test_person (id, name, age) VALUES ($1, $2, $3)", 1, "Ellie Hereward", 10)
db.MustExec("INSERT INTO select_test_person (id, name, age) VALUES ($1, $2, $3)", 2, "Raz Berahthram", 10)
// First, let's break things. We'll run our two transactions with a naïve SELECT with no row locking
fmt.Println("--- Interleaving Using Plain SELECT ---")
interleaveTransactions(t, db, plainSelectSQL, 1)
// Now, let's try with SELECT ... FOR UPDATE
// The FOR UPDATE clause acquires a lock on all the rows returned by the select query.
// The lock makes all UPDATE, DELETE, and SELECT FOR UPDATE queries block until the lock is released.
// That lock is released when the transaction it is in commits.
// Crucially, it does *not* block naked SELECT clauses, so queries that are just fetching data and aren't
// followed up by an update don't get blocked.
fmt.Println("--- Interleaving Using SELECT ... FOR UPDATE ---")
interleaveTransactions(t, db, selectForUpdateSQL, 2)
db.MustExec(dropTable)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment