Skip to content

Instantly share code, notes, and snippets.

@VatslavDS
Created August 9, 2016 21:42
Show Gist options
  • Select an option

  • Save VatslavDS/f668fbab5c8c5c0ee63a41004c3dc8bd to your computer and use it in GitHub Desktop.

Select an option

Save VatslavDS/f668fbab5c8c5c0ee63a41004c3dc8bd to your computer and use it in GitHub Desktop.
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, IndexedRecord}
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SQLContext
import org.apache.avro.mapred.FsInput
import parquet.avro.AvroParquetWriter
import avrotest.avro.{Message}
import avrotest.UserOperations._
object ParquetAvroSparkExample {
private var sqc: SQLContext = _
def main(args: Array[String]) {
// Create a Spark Configuration for local cluster mode
val conf = new SparkConf(true)
.setMaster("local")
.setAppName("ParquetAvroExample")
// Create a Spark Context and wrap it inside a SQLContext
sqc = new SQLContext(new SparkContext(conf))
val schema = Message.getClassSchema
val haddopConfig = new Configuration()
val fsInput = new FsInput(new Path("hdfs://$NAMENODEHOST:$NAMENODEPORT/topics/avro-hdfs/partition=0/avro-hdfs+0+0000000003+0000000005.avro"), new Configuration())
val reader = new GenericDatumReader[IndexedRecord](schema)
val dataFileReader = DataFileReader.openReader(fsInput, reader)
val parquetWriter = new AvroParquetWriter[IndexedRecord](new Path("hdfs://$NAMENODEHOST:$NAMENODEPORT/topics/avro-hdfs/archivo.parquet"), schema)
while(dataFileReader.hasNext) {
parquetWriter.write(dataFileReader.next())
}
dataFileReader.close()
parquetWriter.close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment