This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "operationName": "GetAddOns", | |
| "variables": { | |
| "hostId": "5dd43565611ffd0029bc80b2", | |
| "listingId": "5e6b8d8ed39f1e00505f0294", | |
| "currency": "CAD", | |
| "offset": 0, | |
| "limit": 15, | |
| "state": "active" | |
| }, |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Extract errors PCollectionTuple | |
| PCollection<String> bqTableRowsErrors = | |
| bqTableRowsTuple.get(bqTableRowsFailedTag) | |
| .setCoder(NullableCoder.of(StringUtf8Coder.of())); | |
| // Log errors to a text file under cloud storage. | |
| bqTableRowsErrors | |
| .apply( | |
| "Write Errors", | |
| TextIO.write().to("gs://beam-tutorial/album_errors.txt") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.sandboxws.chinook; | |
| import com.google.api.services.bigquery.model.TableRow; | |
| import com.sandboxws.beam.AppOptions; | |
| import com.sandboxws.beam.coders.TableRowCoder; | |
| import com.sandboxws.chinook.bigquery.schema.AlbumTableSchema; | |
| import java.sql.ResultSet; | |
| import java.util.HashMap; | |
| import java.util.Map; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| gradle albums --args="--pgDatabase=chinook_development --pgUsername=root --project=GOOGLE_CLOUD_PROJECT_ID --outputTable=dwh.albums --tempLocation=gs://beam_tutorial/temp --stagingLocation=gs://beam_tutorial/staging" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| bqTableRows.apply("Write to BigQuery", | |
| BigQueryIO.writeTableRows() | |
| .to(options.getOutputTable()) // Passed as an argument from the command line | |
| .withSchema(AlbumTableSchema.schema()) // The schema for the BigQuery table | |
| .ignoreUnknownValues() // Ignore any values passed but not defined on the table schema | |
| .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) // Append to the BigQuery table. | |
| .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) // Create the BigQuery table if it doesn't exist | |
| ); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| PCollection<TableRow> bqTableRows = rows.apply( | |
| "HashMap to TableRow", | |
| ParDo.of(new HashMapToTableRowFn()) | |
| ).setCoder(TableRowJsonCoder.of()); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| AppOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(AppOptions.class); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| PipelineOptionsFactory.register(AppOptions.class); |
NewerOlder