Skip to content

Instantly share code, notes, and snippets.

@petrosagg
Created January 27, 2021 09:56
Show Gist options
  • Select an option

  • Save petrosagg/d9693d754002b1fc19d82ded91f8e8ed to your computer and use it in GitHub Desktop.

Select an option

Save petrosagg/d9693d754002b1fc19d82ded91f8e8ed to your computer and use it in GitHub Desktop.
postgres full dump and atomic switch to replication
[package]
name = "postgres-replication-poc"
version = "0.1.0"
authors = ["Petros Angelatos <petrosagg@gmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio-postgres = { path = "../rust-postgres/tokio-postgres" }
postgres-protocol = { path = "../rust-postgres/postgres-protocol" }
tokio = { version = "1", features = ["full"] }
futures = "0.3"
use std::collections::HashMap;
use std::time::{Duration, UNIX_EPOCH};
use futures::StreamExt;
use postgres_protocol::message::backend::{LogicalReplicationMessage, ReplicationMessage};
use tokio_postgres::binary_copy::BinaryCopyOutStream;
use tokio_postgres::replication_client::SnapshotMode;
use tokio_postgres::types::Type;
use tokio_postgres::{connect_replication, Error, NoTls, ReplicationMode, SimpleQueryMessage};
const TIME_SEC_CONVERSION: u64 = 946_684_800;
#[tokio::main]
async fn main() -> Result<(), Error> {
let conninfo = "host=localhost user=postgres password=postgres dbname=postgres";
let publication = "mypublication";
// form a replication connection
let (mut rclient, rconnection) =
connect_replication(conninfo, NoTls, ReplicationMode::Logical).await?;
// spawn connection to run on its own
tokio::spawn(async move {
if let Err(e) = rconnection.await {
eprintln!("connection error: {}", e);
}
});
let slot_name = "myslot";
let plugin = "pgoutput";
let options = &[("proto_version", "1"), ("publication_names", publication)];
// Get the tables of the publication
let publication_query = format!(
"SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = '{}';",
publication
);
let tables = rclient
.simple_query(&publication_query)
.await?
.into_iter()
.filter_map(|msg| {
if let SimpleQueryMessage::Row(row) = msg {
let schema = row.get(0).unwrap().to_string();
let table = row.get(1).unwrap().to_string();
Some((schema, table))
} else {
None
}
})
.collect::<Vec<_>>();
let mut schema = HashMap::new();
for (namespace, table) in &tables {
// Get the relation id of the table
let rel_id_query = format!(
"SELECT c.oid
FROM pg_catalog.pg_class c
INNER JOIN pg_catalog.pg_namespace n
ON (c.relnamespace = n.oid)
WHERE n.nspname = '{}'
AND c.relname = '{}';",
namespace, table
);
let rel_id = rclient
.simple_query(&rel_id_query)
.await?
.into_iter()
.filter_map(|msg| {
if let SimpleQueryMessage::Row(row) = msg {
Some(row.get(0).unwrap().parse::<u32>().unwrap())
} else {
None
}
})
.next()
.unwrap();
// Get the column type info
let col_info_query = format!(
"SELECT a.attname,
a.atttypid,
a.atttypmod,
a.attnotnull,
a.attnum = ANY(i.indkey)
FROM pg_catalog.pg_attribute a
LEFT JOIN pg_catalog.pg_index i
ON (i.indexrelid = pg_get_replica_identity_index({}))
WHERE a.attnum > 0::pg_catalog.int2
AND NOT a.attisdropped
AND a.attrelid = {}
ORDER BY a.attnum",
rel_id, rel_id
);
let col_types = rclient
.simple_query(&col_info_query)
.await?
.into_iter()
.filter_map(|msg| {
if let SimpleQueryMessage::Row(row) = msg {
let ty = Type::from_oid(row.get(1).unwrap().parse().unwrap()).unwrap();
let not_null = row.get(3).unwrap() == "t";
Some((ty, not_null))
} else {
None
}
})
.collect::<Vec<_>>();
schema.insert((namespace, table), col_types);
}
println!("{:#?}", schema);
// Clean up previous runs
rclient
.drop_replication_slot(slot_name, true)
.await
.unwrap_or(());
// We do this inside transaction so that we can use the snapshot made by the slot to get
// existing data.
rclient
.simple_query("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;")
.await?;
// We set USE SNAPSHOT to make sure the snapshot of the slot is used for the transaction, that
// way the COPY will get data that is consistent with the lsn used by the slot to start
// decoding.
let slot = rclient
.create_logical_replication_slot(slot_name, false, plugin, Some(SnapshotMode::UseSnapshot))
.await?;
// Initial data copy
for ((namespace, table), types) in &schema {
println!("\n\n###############");
println!("Dumping {}.{}", namespace, table);
let copy_query = format!(
r#"COPY "{}"."{}" TO STDOUT WITH (FORMAT binary);"#,
namespace, table
);
let stream = rclient.copy_out(&copy_query).await?;
let bare_types = types.iter().map(|(ty, _)| ty.clone()).collect::<Vec<_>>();
let row_stream = BinaryCopyOutStream::new(stream, &bare_types);
let rows = row_stream.collect::<Vec<_>>().await;
for row in rows {
let row = row.unwrap();
print!(" ");
for (i, ty_info) in types.iter().enumerate() {
match ty_info {
(Type::BOOL, false) => print!("{:?}, ", row.get::<Option<bool>>(i)),
(Type::INT4, false) => print!("{:?}, ", row.get::<Option<i32>>(i)),
(Type::TEXT, false) => print!("{:?}, ", row.get::<Option<String>>(i)),
(Type::BOOL, true) => print!("{:?}, ", row.get::<bool>(i)),
(Type::INT4, true) => print!("{:?}, ", row.get::<i32>(i)),
(Type::TEXT, true) => print!("{:?}, ", row.get::<String>(i)),
_ => print!("<{:?}>, ", ty_info),
}
}
println!();
}
println!("###############");
}
rclient.simple_query("COMMIT;").await?;
// We now switch to consuming the stream
let mut stream = rclient
.start_logical_replication(slot_name, slot.consistent_point(), options)
.await?;
let epoch = UNIX_EPOCH + Duration::from_secs(TIME_SEC_CONVERSION);
let mut last_lsn = slot.consistent_point();
while let Some(replication_message) = stream.next().await {
match replication_message? {
ReplicationMessage::XLogData(xlog_data) => {
let msg = xlog_data.data();
// let msg = LogicalReplicationMessage::parse(&xlog_data.into_bytes()).unwrap();
if let LogicalReplicationMessage::Commit(commit) = msg {
last_lsn = (commit.commit_lsn() as u64).into();
}
eprintln!("text: {:?}", last_lsn);
}
ReplicationMessage::PrimaryKeepAlive(keepalive) => {
if keepalive.reply() == 1 {
println!("sending keepalive reply");
let ts = epoch.elapsed().unwrap().as_micros() as i64;
stream
.as_mut()
.standby_status_update(last_lsn, last_lsn, last_lsn, ts, 0)
.await?;
}
}
_ => (),
}
}
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment