Created
January 27, 2021 09:56
-
-
Save petrosagg/d9693d754002b1fc19d82ded91f8e8ed to your computer and use it in GitHub Desktop.
postgres full dump and atomic switch to replication
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| [package] | |
| name = "postgres-replication-poc" | |
| version = "0.1.0" | |
| authors = ["Petros Angelatos <petrosagg@gmail.com>"] | |
| edition = "2018" | |
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |
| [dependencies] | |
| tokio-postgres = { path = "../rust-postgres/tokio-postgres" } | |
| postgres-protocol = { path = "../rust-postgres/postgres-protocol" } | |
| tokio = { version = "1", features = ["full"] } | |
| futures = "0.3" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use std::collections::HashMap; | |
| use std::time::{Duration, UNIX_EPOCH}; | |
| use futures::StreamExt; | |
| use postgres_protocol::message::backend::{LogicalReplicationMessage, ReplicationMessage}; | |
| use tokio_postgres::binary_copy::BinaryCopyOutStream; | |
| use tokio_postgres::replication_client::SnapshotMode; | |
| use tokio_postgres::types::Type; | |
| use tokio_postgres::{connect_replication, Error, NoTls, ReplicationMode, SimpleQueryMessage}; | |
| const TIME_SEC_CONVERSION: u64 = 946_684_800; | |
| #[tokio::main] | |
| async fn main() -> Result<(), Error> { | |
| let conninfo = "host=localhost user=postgres password=postgres dbname=postgres"; | |
| let publication = "mypublication"; | |
| // form a replication connection | |
| let (mut rclient, rconnection) = | |
| connect_replication(conninfo, NoTls, ReplicationMode::Logical).await?; | |
| // spawn connection to run on its own | |
| tokio::spawn(async move { | |
| if let Err(e) = rconnection.await { | |
| eprintln!("connection error: {}", e); | |
| } | |
| }); | |
| let slot_name = "myslot"; | |
| let plugin = "pgoutput"; | |
| let options = &[("proto_version", "1"), ("publication_names", publication)]; | |
| // Get the tables of the publication | |
| let publication_query = format!( | |
| "SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = '{}';", | |
| publication | |
| ); | |
| let tables = rclient | |
| .simple_query(&publication_query) | |
| .await? | |
| .into_iter() | |
| .filter_map(|msg| { | |
| if let SimpleQueryMessage::Row(row) = msg { | |
| let schema = row.get(0).unwrap().to_string(); | |
| let table = row.get(1).unwrap().to_string(); | |
| Some((schema, table)) | |
| } else { | |
| None | |
| } | |
| }) | |
| .collect::<Vec<_>>(); | |
| let mut schema = HashMap::new(); | |
| for (namespace, table) in &tables { | |
| // Get the relation id of the table | |
| let rel_id_query = format!( | |
| "SELECT c.oid | |
| FROM pg_catalog.pg_class c | |
| INNER JOIN pg_catalog.pg_namespace n | |
| ON (c.relnamespace = n.oid) | |
| WHERE n.nspname = '{}' | |
| AND c.relname = '{}';", | |
| namespace, table | |
| ); | |
| let rel_id = rclient | |
| .simple_query(&rel_id_query) | |
| .await? | |
| .into_iter() | |
| .filter_map(|msg| { | |
| if let SimpleQueryMessage::Row(row) = msg { | |
| Some(row.get(0).unwrap().parse::<u32>().unwrap()) | |
| } else { | |
| None | |
| } | |
| }) | |
| .next() | |
| .unwrap(); | |
| // Get the column type info | |
| let col_info_query = format!( | |
| "SELECT a.attname, | |
| a.atttypid, | |
| a.atttypmod, | |
| a.attnotnull, | |
| a.attnum = ANY(i.indkey) | |
| FROM pg_catalog.pg_attribute a | |
| LEFT JOIN pg_catalog.pg_index i | |
| ON (i.indexrelid = pg_get_replica_identity_index({})) | |
| WHERE a.attnum > 0::pg_catalog.int2 | |
| AND NOT a.attisdropped | |
| AND a.attrelid = {} | |
| ORDER BY a.attnum", | |
| rel_id, rel_id | |
| ); | |
| let col_types = rclient | |
| .simple_query(&col_info_query) | |
| .await? | |
| .into_iter() | |
| .filter_map(|msg| { | |
| if let SimpleQueryMessage::Row(row) = msg { | |
| let ty = Type::from_oid(row.get(1).unwrap().parse().unwrap()).unwrap(); | |
| let not_null = row.get(3).unwrap() == "t"; | |
| Some((ty, not_null)) | |
| } else { | |
| None | |
| } | |
| }) | |
| .collect::<Vec<_>>(); | |
| schema.insert((namespace, table), col_types); | |
| } | |
| println!("{:#?}", schema); | |
| // Clean up previous runs | |
| rclient | |
| .drop_replication_slot(slot_name, true) | |
| .await | |
| .unwrap_or(()); | |
| // We do this inside transaction so that we can use the snapshot made by the slot to get | |
| // existing data. | |
| rclient | |
| .simple_query("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;") | |
| .await?; | |
| // We set USE SNAPSHOT to make sure the snapshot of the slot is used for the transaction, that | |
| // way the COPY will get data that is consistent with the lsn used by the slot to start | |
| // decoding. | |
| let slot = rclient | |
| .create_logical_replication_slot(slot_name, false, plugin, Some(SnapshotMode::UseSnapshot)) | |
| .await?; | |
| // Initial data copy | |
| for ((namespace, table), types) in &schema { | |
| println!("\n\n###############"); | |
| println!("Dumping {}.{}", namespace, table); | |
| let copy_query = format!( | |
| r#"COPY "{}"."{}" TO STDOUT WITH (FORMAT binary);"#, | |
| namespace, table | |
| ); | |
| let stream = rclient.copy_out(©_query).await?; | |
| let bare_types = types.iter().map(|(ty, _)| ty.clone()).collect::<Vec<_>>(); | |
| let row_stream = BinaryCopyOutStream::new(stream, &bare_types); | |
| let rows = row_stream.collect::<Vec<_>>().await; | |
| for row in rows { | |
| let row = row.unwrap(); | |
| print!(" "); | |
| for (i, ty_info) in types.iter().enumerate() { | |
| match ty_info { | |
| (Type::BOOL, false) => print!("{:?}, ", row.get::<Option<bool>>(i)), | |
| (Type::INT4, false) => print!("{:?}, ", row.get::<Option<i32>>(i)), | |
| (Type::TEXT, false) => print!("{:?}, ", row.get::<Option<String>>(i)), | |
| (Type::BOOL, true) => print!("{:?}, ", row.get::<bool>(i)), | |
| (Type::INT4, true) => print!("{:?}, ", row.get::<i32>(i)), | |
| (Type::TEXT, true) => print!("{:?}, ", row.get::<String>(i)), | |
| _ => print!("<{:?}>, ", ty_info), | |
| } | |
| } | |
| println!(); | |
| } | |
| println!("###############"); | |
| } | |
| rclient.simple_query("COMMIT;").await?; | |
| // We now switch to consuming the stream | |
| let mut stream = rclient | |
| .start_logical_replication(slot_name, slot.consistent_point(), options) | |
| .await?; | |
| let epoch = UNIX_EPOCH + Duration::from_secs(TIME_SEC_CONVERSION); | |
| let mut last_lsn = slot.consistent_point(); | |
| while let Some(replication_message) = stream.next().await { | |
| match replication_message? { | |
| ReplicationMessage::XLogData(xlog_data) => { | |
| let msg = xlog_data.data(); | |
| // let msg = LogicalReplicationMessage::parse(&xlog_data.into_bytes()).unwrap(); | |
| if let LogicalReplicationMessage::Commit(commit) = msg { | |
| last_lsn = (commit.commit_lsn() as u64).into(); | |
| } | |
| eprintln!("text: {:?}", last_lsn); | |
| } | |
| ReplicationMessage::PrimaryKeepAlive(keepalive) => { | |
| if keepalive.reply() == 1 { | |
| println!("sending keepalive reply"); | |
| let ts = epoch.elapsed().unwrap().as_micros() as i64; | |
| stream | |
| .as_mut() | |
| .standby_status_update(last_lsn, last_lsn, last_lsn, ts, 0) | |
| .await?; | |
| } | |
| } | |
| _ => (), | |
| } | |
| } | |
| Ok(()) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment