Last active
September 14, 2023 06:19
-
-
Save AutMaple/a81b619f3d30d87a2057f1eeed696a75 to your computer and use it in GitHub Desktop.
[Flink+Iceberg+Minio] #flink,#iceberg,#minio,#java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.example; | |
| import org.apache.flink.api.common.eventtime.WatermarkStrategy; | |
| import org.apache.flink.api.common.serialization.SimpleStringSchema; | |
| import org.apache.flink.api.java.utils.ParameterTool; | |
| import org.apache.flink.connector.kafka.source.KafkaSource; | |
| import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; | |
| import org.apache.flink.streaming.api.datastream.DataStream; | |
| import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
| import org.apache.flink.types.Row; | |
| import org.apache.hadoop.conf.Configuration; | |
| import org.apache.iceberg.DistributionMode; | |
| import org.apache.iceberg.PartitionSpec; | |
| import org.apache.iceberg.Schema; | |
| import org.apache.iceberg.catalog.Catalog; | |
| import org.apache.iceberg.catalog.TableIdentifier; | |
| import org.apache.iceberg.flink.CatalogLoader; | |
| import org.apache.iceberg.flink.FlinkSchemaUtil; | |
| import org.apache.iceberg.flink.TableLoader; | |
| import org.apache.iceberg.flink.sink.FlinkSink; | |
| import org.apache.iceberg.types.Types; | |
| import java.util.HashMap; | |
| import java.util.Map; | |
| public class Application { | |
| public static void main(String[] args) throws Exception { | |
| ParameterTool parameters = ParameterTool.fromArgs(args); | |
| Configuration hadoopConf = new Configuration(); | |
| Map<String, String> catalogProperties = new HashMap<>(); | |
| catalogProperties.put("uri", parameters.get("uri", "http://iceberg.rest:8181")); | |
| catalogProperties.put("io-impl", parameters.get("io-impl", "org.apache.iceberg.aws.s3.S3FileIO")); | |
| catalogProperties.put("warehouse", parameters.get("warehouse", "s3://warehouse/wh/")); | |
| // s3-endpoint 最好使用 ip 地址而不是域名的方式,如果域名的方式失败,可以尝试使用 ip 地址 | |
| catalogProperties.put("s3.endpoint", parameters.get("s3-endpoint", "http://minio:9000")); | |
| CatalogLoader catalogLoader = CatalogLoader.custom( | |
| "demo", | |
| catalogProperties, | |
| hadoopConf, | |
| parameters.get("catalog-impl", "org.apache.iceberg.rest.RESTCatalog")); | |
| Schema schema = new Schema( | |
| Types.NestedField.required(1, "character", Types.StringType.get()), | |
| Types.NestedField.required(2, "location", Types.StringType.get()), | |
| Types.NestedField.required(3, "event_time", Types.TimestampType.withZone())); | |
| Catalog catalog = catalogLoader.loadCatalog(); | |
| String databaseName = parameters.get("database", "default"); | |
| String tableName = parameters.getRequired("table"); | |
| TableIdentifier outputTable = TableIdentifier.of( | |
| databaseName, | |
| tableName); | |
| if (!catalog.tableExists(outputTable)) { | |
| catalog.createTable(outputTable, schema, PartitionSpec.unpartitioned()); | |
| } | |
| // Set up the execution environment | |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
| // Define Kafka topics | |
| String inputTopic = "Shakespeare"; | |
| String broker = "kafka:9092"; | |
| String groupId = "flink-group"; | |
| String jobName = "Kafka Word Count"; | |
| KafkaSource<String> kafkaConsumer = KafkaSource.<String>builder() | |
| .setBootstrapServers(broker) | |
| .setTopics(inputTopic) | |
| .setGroupId(groupId) | |
| .setStartingOffsets(OffsetsInitializer.earliest()) | |
| .setValueOnlyDeserializer(new SimpleStringSchema()) | |
| .build(); | |
| DataStream<Row> stream = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), jobName) | |
| .name("生产者") | |
| .map(s -> s.split(",")) | |
| .map(fields -> { | |
| Row row = new Row(3); | |
| row.setField(0, fields[0]); | |
| row.setField(1, fields[1]); | |
| row.setField(2, fields[2]); | |
| return row; | |
| }); | |
| // Configure row-based append | |
| FlinkSink.forRow(stream, FlinkSchemaUtil.toSchema(schema)) | |
| .tableLoader(TableLoader.fromCatalog(catalogLoader, outputTable)) | |
| .distributionMode(DistributionMode.HASH) | |
| .writeParallelism(2) | |
| .append(); | |
| // Execute the flink app | |
| env.execute(); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment