Last active
May 23, 2019 21:46
-
-
Save macrael/277e5fbdef471c6ae6ccc7c77d3e4f84 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package postgres | |
| import ( | |
| "fmt" | |
| "testing" | |
| "time" | |
| "github.com/jmoiron/sqlx" | |
| _ "github.com/lib/pq" // required for sqlx + postgres | |
| ) | |
| // So you want to write a function in your db layer that fetches a row, changes something, | |
| // and then updates that row. In the example below, we will try and add two to an integer field. | |
| // You want to make sure that if two of these functions are running at the same time, you | |
| // don't have them trample on each other. You don't ever want two of them to read the same | |
| // initial value, then take turns updating because then you will have lost the work that one of | |
| // them tried to do. Thankfully, postgres row locking with SELECT ... FOR UPDATE is here to save the day. | |
| // To run this file, put it in a directory (probably in a gopath, I haven't worked out modules yet) and run: | |
| // $ dep init | |
| // $ go test -v . | |
| // if you want to edit it and see what happens, I highly recommend using entr: | |
| // $ find . | entr -c go test -v . | |
| // We'll use a very simple table to explore this | |
| const createTableSchema = `CREATE TABLE select_test_person ( | |
| id int, | |
| name text, | |
| age integer | |
| )` | |
| const dropTable = `DROP TABLE select_test_person` | |
| // Here are the three SQL statements we'll use | |
| const plainSelectSQL = `SELECT age FROM select_test_person WHERE id=$1` | |
| const selectForUpdateSQL = `SELECT age FROM select_test_person WHERE id=$1 FOR UPDATE` | |
| const updateSQL = `UPDATE select_test_person SET age=$2 WHERE id=$1` | |
| // increaseAge is the function we are worried about. It selects a person from the table by ID, and | |
| // then increases that age by the given amount. In order for us to synchronize our two troublesome | |
| // runs of this function, it sends a blocking message back to the coordinating function before and after each query | |
| func increaseAge(name string, db *sqlx.DB, sync chan<- string, selectSQL string, id int, ageIncremet int) { | |
| innnerTX := db.MustBegin() // The two queries must be wrapped in a transaction | |
| fmt.Println(name, "Began") | |
| sync <- "select" | |
| age := -1 | |
| innnerTX.Get(&age, selectSQL, id) // Here we select to get the current value | |
| fmt.Println(name, "Selected", age) | |
| sync <- "selected" | |
| sync <- "update" | |
| newAge := age + ageIncremet | |
| innnerTX.MustExec(updateSQL, id, newAge) // Here we update to set the new value | |
| fmt.Println(name, "Updated", newAge) | |
| sync <- "updated" | |
| innnerTX.Commit() | |
| fmt.Println(name, "Committed") | |
| close(sync) | |
| } | |
| // interleaveTransactions runs increaseAge twice, one inside the other. It uses message passing | |
| // to coordinate the two transactions so that their queries happen in this order: | |
| // 1. Outer SELECT | |
| // 2. Inner SELECT | |
| // 3. Outer UPDATE | |
| // 4. Inner UPDATE | |
| // we will see that row locking makes that order impossible. | |
| func interleaveTransactions(t *testing.T, db *sqlx.DB, selectSQL string, id int) { | |
| // setup two channels for communications | |
| outerComms := make(chan string) // this is the channel from the outer transaction | |
| innerComms := make(chan string) // this is the channel from the inner transaction | |
| // The outer transaction will start first, and we'll have it add 2 to the age | |
| go increaseAge("Outer", db, outerComms, selectSQL, id, 2) | |
| // The inner transaction will start second, and we'll have it add 3 to the age so we can tell who wins | |
| go increaseAge("Inner", db, innerComms, selectSQL, id, 3) | |
| // because our channels have no buffering, the goroutines will block at each "sync" message | |
| // so we can control when they continue from here. | |
| outerBeganMsg := <-outerComms | |
| fmt.Println("- outer do select:", outerBeganMsg) // I'm printing this just to make sure I don't get out of step | |
| fmt.Println("- outer did select:", <-outerComms) | |
| fmt.Println("- inner do select:", <-innerComms) | |
| // our goal is for this inner select to be blocked by the fact that the outer select already happened. | |
| // without row locking that won't be the case, but hopefully it will with row locking. | |
| // We'll use a timeout to keep going if the select doesn't happen. | |
| innerSelected := false | |
| select { | |
| case selectedMsg := <-innerComms: | |
| fmt.Println("- inner did select:", selectedMsg) | |
| innerSelected = true | |
| case <-time.After(1 * time.Second): | |
| fmt.Println("- inner did *not* select") | |
| } | |
| // Regardless of whether our inner select is able to happen here, we want simple SELECT queries | |
| // to work fine even in the middle of a SELECT/UPDATE pair | |
| inMediasAge := -1 | |
| db.Get(&inMediasAge, plainSelectSQL, id) | |
| fmt.Println("In medias res, selected", inMediasAge) | |
| // This demonstrates that SELECT ... FOR UPDATE doesn't block plain SELECTs, only other SELECT ... FOR UPDATEs | |
| fmt.Println("- outer do update:", <-outerComms) | |
| fmt.Println("- outer did update:", <-outerComms) | |
| if !innerSelected { | |
| // if the inner select didn't happen before, then it should be able to happen now | |
| fmt.Println("- inner did select:", <-innerComms) | |
| } | |
| fmt.Println("- inner do update:", <-innerComms) | |
| fmt.Println("- inner did update:", <-innerComms) | |
| // wait for both channels to close to make sure the transactions closed. | |
| if _, more := <-outerComms; more { | |
| fmt.Println("We shouldn't have more to receive from OuterTX") | |
| } | |
| if _, more := <-innerComms; more { | |
| fmt.Println("We shouldn't have more to receive from InnerTX") | |
| } | |
| // Now, ideally, since we had two transactions attempt to add 2 and 3 to the age respectively, | |
| // the age we end up with should be 5 more than it started at. Is that the case? | |
| // For the broken case, could you re-order some of the commands above to get it to come out | |
| // at +2 instead of +3? | |
| finalAge := -1 | |
| db.Get(&finalAge, plainSelectSQL, id) | |
| fmt.Println("After Both:", finalAge) | |
| if finalAge != 15 { | |
| t.Log("Adding 2 and 3 to 10 should give 15") | |
| t.Log("it didn't using:", selectSQL) | |
| t.Fail() | |
| } else { | |
| fmt.Println("Yay! even though both functions started interleaved, they each worked as intended") | |
| } | |
| } | |
| func TestSelect(t *testing.T) { | |
| db, connErr := sqlx.Connect("postgres", "postgres://postgres@localhost:5432/postgres") | |
| if connErr != nil { | |
| t.Fatal(connErr) | |
| } | |
| // drop (if it exists) and re-create the table | |
| db.Exec(dropTable) | |
| db.MustExec(createTableSchema) | |
| // seed the table with a couple rows | |
| db.MustExec("INSERT INTO select_test_person (id, name, age) VALUES ($1, $2, $3)", 1, "Ellie Hereward", 10) | |
| db.MustExec("INSERT INTO select_test_person (id, name, age) VALUES ($1, $2, $3)", 2, "Raz Berahthram", 10) | |
| // First, let's break things. We'll run our two transactions with a naïve SELECT with no row locking | |
| fmt.Println("--- Interleaving Using Plain SELECT ---") | |
| interleaveTransactions(t, db, plainSelectSQL, 1) | |
| // Now, let's try with SELECT ... FOR UPDATE | |
| // The FOR UPDATE clause acquires a lock on all the rows returned by the select query. | |
| // The lock makes all UPDATE, DELETE, and SELECT FOR UPDATE queries block until the lock is released. | |
| // That lock is released when the transaction it is in commits. | |
| // Crucially, it does *not* block naked SELECT clauses, so queries that are just fetching data and aren't | |
| // followed up by an update don't get blocked. | |
| fmt.Println("--- Interleaving Using SELECT ... FOR UPDATE ---") | |
| interleaveTransactions(t, db, selectForUpdateSQL, 2) | |
| db.MustExec(dropTable) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment