-
-
Save srinivasanHadoop/7245106 to your computer and use it in GitHub Desktop.
| package com.srini.tikacustom; | |
| import java.io.IOException; | |
| import org.apache.hadoop.io.Text; | |
| import org.apache.hadoop.mapreduce.InputSplit; | |
| import org.apache.hadoop.mapreduce.RecordReader; | |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
| public class TikaFileInputFormat extends FileInputFormat<Text, Text> { | |
| @Override | |
| public RecordReader<Text, Text> createRecordReader(InputSplit split, | |
| TaskAttemptContext context) throws IOException, InterruptedException { | |
| // TODO Auto-generated method stub | |
| return new TikaRecordReader(); | |
| } | |
| } |
| package com.srini.tikacustom; | |
| import java.io.IOException; | |
| import org.apache.hadoop.conf.Configuration; | |
| import org.apache.hadoop.conf.Configured; | |
| import org.apache.hadoop.fs.Path; | |
| import org.apache.hadoop.io.Text; | |
| import org.apache.hadoop.mapreduce.Job; | |
| import org.apache.hadoop.mapreduce.Mapper; | |
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
| import org.apache.hadoop.util.Tool; | |
| import org.apache.hadoop.util.ToolRunner; | |
| public class TikaMapreduce extends Configured implements Tool { | |
| public static class TikaMapper extends Mapper<Text, Text, Text, Text> { | |
| public void map(Text key, Text value, Context context) | |
| throws IOException, InterruptedException { | |
| context.write(key, value); | |
| } | |
| } | |
| public static void main(String[] args) throws Exception { | |
| int exit = ToolRunner.run(new Configuration(), new TikaMapreduce(), | |
| args); | |
| System.exit(exit); | |
| } | |
| @Override | |
| public int run(String[] args) throws Exception { | |
| // TODO Auto-generated method stub | |
| if (args.length != 2) { | |
| System.out.println("set the input path and output path"); | |
| return 2; | |
| } | |
| Configuration conf = new Configuration(); | |
| Job job = new Job(conf, "TikaMapreduce"); | |
| job.setJarByClass(getClass()); | |
| job.setJobName("TikRead"); | |
| job.setInputFormatClass(TikaFileInputFormat.class); | |
| FileInputFormat.addInputPath(job, new Path(args[0])); | |
| job.setMapperClass(TikaMapper.class); | |
| job.setOutputKeyClass(Text.class); | |
| job.setOutputValueClass(Text.class); | |
| job.setOutputFormatClass(TikaOutPutFormt.class); | |
| FileOutputFormat.setOutputPath(job, new Path(args[1] | |
| + System.currentTimeMillis())); | |
| return job.waitForCompletion(true) ? 0 : 1; | |
| } | |
| } |
| package com.srini.tikacustom; | |
| import java.io.IOException; | |
| import org.apache.hadoop.fs.FSDataOutputStream; | |
| import org.apache.hadoop.fs.FileSystem; | |
| import org.apache.hadoop.fs.Path; | |
| import org.apache.hadoop.io.Text; | |
| import org.apache.hadoop.mapreduce.RecordWriter; | |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
| public class TikaOutPutFormt extends FileOutputFormat<Text, Text> { | |
| @Override | |
| public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) | |
| throws IOException, InterruptedException { | |
| // TODO Auto-generated method stub | |
| Path path=FileOutputFormat.getOutputPath(context); | |
| Path fullapth=new Path(path,"Srini.txt"); | |
| FileSystem fs=path.getFileSystem(context.getConfiguration()); | |
| FSDataOutputStream output=fs.create(fullapth,context); | |
| return new TikaRecordWrite(output); | |
| } | |
| } |
| package com.srini.tikacustom; | |
| import java.io.IOException; | |
| import java.net.URL; | |
| import org.apache.hadoop.conf.Configuration; | |
| import org.apache.hadoop.fs.FSDataInputStream; | |
| import org.apache.hadoop.fs.FileSystem; | |
| import org.apache.hadoop.fs.Path; | |
| import org.apache.hadoop.io.Text; | |
| import org.apache.hadoop.mapreduce.InputSplit; | |
| import org.apache.hadoop.mapreduce.RecordReader; | |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
| import org.apache.hadoop.mapreduce.lib.input.FileSplit; | |
| import org.apache.tika.Tika; | |
| import org.apache.tika.exception.TikaException; | |
| public class TikaRecordReader extends RecordReader<Text, Text> { | |
| private Text key = new Text(); | |
| private Text value = new Text(); | |
| private FileSplit fileSplit; | |
| private Configuration conf; | |
| private boolean processed = false; | |
| @Override | |
| public void close() throws IOException { | |
| // TODO Auto-generated method stub | |
| } | |
| @Override | |
| public Text getCurrentKey() throws IOException, InterruptedException { | |
| // TODO Auto-generated method stub | |
| return key; | |
| } | |
| @Override | |
| public Text getCurrentValue() throws IOException, InterruptedException { | |
| // TODO Auto-generated method stub | |
| return value; | |
| } | |
| @Override | |
| public float getProgress() throws IOException, InterruptedException { | |
| // TODO Auto-generated method stub | |
| return processed ? 1.0f : 0.0f; | |
| } | |
| @Override | |
| public void initialize(InputSplit split, TaskAttemptContext context) | |
| throws IOException, InterruptedException { | |
| // TODO Auto-generated method stub | |
| this.fileSplit = (FileSplit) split; | |
| this.conf = context.getConfiguration(); | |
| } | |
| @Override | |
| public boolean nextKeyValue() throws IOException, InterruptedException { | |
| // TODO Auto-generated method stub | |
| if (!processed) { | |
| Path path = fileSplit.getPath(); | |
| key.set(path.toString()); | |
| @SuppressWarnings("unused") | |
| FileSystem fs = path.getFileSystem(conf); | |
| @SuppressWarnings("unused") | |
| FSDataInputStream fin = null; | |
| try { | |
| String con = new Tika().parseToString(new URL(path.toString())); | |
| String string = con.replaceAll("[$%&+,:;=?#|']", " "); | |
| String string2 = string.replaceAll("\\s+", " "); | |
| String lo = string2.toLowerCase(); | |
| value.set(lo); | |
| } catch (TikaException e) { | |
| // TODO Auto-generated catch block | |
| e.printStackTrace(); | |
| } | |
| processed = true; | |
| return true; | |
| } else { | |
| return false; | |
| } | |
| } | |
| } |
| package com.srini.tikacustom; | |
| import java.io.DataOutputStream; | |
| import java.io.IOException; | |
| import org.apache.hadoop.io.Text; | |
| import org.apache.hadoop.mapreduce.RecordWriter; | |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
| public class TikaRecordWrite extends RecordWriter<Text, Text> { | |
| private DataOutputStream out; | |
| public TikaRecordWrite(DataOutputStream output) { | |
| // TODO Auto-generated constructor stub | |
| out=output; | |
| try { | |
| out.writeBytes("result:\r\n"); | |
| } catch (IOException e) { | |
| // TODO Auto-generated catch block | |
| e.printStackTrace(); | |
| } | |
| } | |
| @Override | |
| public void close(TaskAttemptContext context) throws IOException, | |
| InterruptedException { | |
| // TODO Auto-generated method stub | |
| out.close(); | |
| } | |
| @Override | |
| public void write(Text key, Text value) throws IOException, | |
| InterruptedException { | |
| // TODO Auto-generated method stub | |
| out.writeBytes(key.toString()); | |
| out.writeBytes(","); | |
| out.writeBytes(value.toString()); | |
| out.writeBytes("\r\n"); | |
| } | |
| } |
there is two ways to solve the error.
add 'URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());' at TikaRecordReader.java
...
...
...
FSDataInputStream fin = null;
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
try {
String con = new Tika().parseToString(new URL(path.toString()));
...
...
...
or, change the code below which at TikaRecordReader.java
...
...
...
FSDataInputStream fin = null;
try {
String con = new Tika().parseToString(new URL(path.toString()));
...
...
...
to
...
...
...
FSDataInputStream fin = fs.open(path);
try {
String con = new Tika().parseToString(fin);
...
...
...
hope it helps
hi i am getting error , what jar files i need to add for tika ????can you share if any ?
Hi I am trying to use your mapreduce program to parse TIF images. Do i need to change any input output format ??
i am also facing same error Error: java.net.MalformedURLException: unknown protocol: hdfs . what the i got this error please anyone help me