Skip to content

Instantly share code, notes, and snippets.

@bossek
Created February 16, 2021 08:24
Show Gist options
  • Select an option

  • Save bossek/b2efb0d993fa8a01af2c334b83d86f55 to your computer and use it in GitHub Desktop.

Select an option

Save bossek/b2efb0d993fa8a01af2c334b83d86f55 to your computer and use it in GitHub Desktop.
kafka topic config using librdkafka
use rdkafka::admin::{AdminClient, AdminOptions, AlterConfig, ResourceSpecifier};
use rdkafka::client::DefaultClientContext;
use rdkafka::ClientConfig;
use tokio;
#[tokio::main]
async fn main() {
let topic = ResourceSpecifier::Topic("accounts_log_changes");
// kafka-configs.sh --zookeeper zookeeper:2181 --entity-type topics --entity-name accounts_log_changes --describe
// let config = [AlterConfig::new(topic).set("message.timestamp.type", "CreateTime")];
let config = [AlterConfig::new(topic).set("message.timestamp.type", "LogAppendTime")];
let opts = AdminOptions::new();
let admin_client = create_admin_client();
let res = admin_client.alter_configs(&config, &opts).await.unwrap();
println!("{:?}", res);
}
fn create_config() -> ClientConfig {
let mut config = ClientConfig::new();
config.set("bootstrap.servers", "localhost:9092");
config.set("allow.auto.create.topics", "false");
return config;
}
fn create_admin_client() -> AdminClient<DefaultClientContext> {
create_config()
.create()
.expect("admin client creation failed")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment