Skip to content

Instantly share code, notes, and snippets.

@AutMaple
Last active September 14, 2023 06:19
Show Gist options
  • Select an option

  • Save AutMaple/a81b619f3d30d87a2057f1eeed696a75 to your computer and use it in GitHub Desktop.

Select an option

Save AutMaple/a81b619f3d30d87a2057f1eeed696a75 to your computer and use it in GitHub Desktop.
[Flink+Iceberg+Minio] #flink,#iceberg,#minio,#java
package org.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.types.Types;
import java.util.HashMap;
import java.util.Map;
public class Application {
public static void main(String[] args) throws Exception {
ParameterTool parameters = ParameterTool.fromArgs(args);
Configuration hadoopConf = new Configuration();
Map<String, String> catalogProperties = new HashMap<>();
catalogProperties.put("uri", parameters.get("uri", "http://iceberg.rest:8181"));
catalogProperties.put("io-impl", parameters.get("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"));
catalogProperties.put("warehouse", parameters.get("warehouse", "s3://warehouse/wh/"));
// s3-endpoint 最好使用 ip 地址而不是域名的方式,如果域名的方式失败,可以尝试使用 ip 地址
catalogProperties.put("s3.endpoint", parameters.get("s3-endpoint", "http://minio:9000"));
CatalogLoader catalogLoader = CatalogLoader.custom(
"demo",
catalogProperties,
hadoopConf,
parameters.get("catalog-impl", "org.apache.iceberg.rest.RESTCatalog"));
Schema schema = new Schema(
Types.NestedField.required(1, "character", Types.StringType.get()),
Types.NestedField.required(2, "location", Types.StringType.get()),
Types.NestedField.required(3, "event_time", Types.TimestampType.withZone()));
Catalog catalog = catalogLoader.loadCatalog();
String databaseName = parameters.get("database", "default");
String tableName = parameters.getRequired("table");
TableIdentifier outputTable = TableIdentifier.of(
databaseName,
tableName);
if (!catalog.tableExists(outputTable)) {
catalog.createTable(outputTable, schema, PartitionSpec.unpartitioned());
}
// Set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define Kafka topics
String inputTopic = "Shakespeare";
String broker = "kafka:9092";
String groupId = "flink-group";
String jobName = "Kafka Word Count";
KafkaSource<String> kafkaConsumer = KafkaSource.<String>builder()
.setBootstrapServers(broker)
.setTopics(inputTopic)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<Row> stream = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), jobName)
.name("生产者")
.map(s -> s.split(","))
.map(fields -> {
Row row = new Row(3);
row.setField(0, fields[0]);
row.setField(1, fields[1]);
row.setField(2, fields[2]);
return row;
});
// Configure row-based append
FlinkSink.forRow(stream, FlinkSchemaUtil.toSchema(schema))
.tableLoader(TableLoader.fromCatalog(catalogLoader, outputTable))
.distributionMode(DistributionMode.HASH)
.writeParallelism(2)
.append();
// Execute the flink app
env.execute();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment