Sequencefile processing mapreduce hadoop

When we have large number of small files for example millions of small xml, how to process using hadoop mapreduce by using SequenceFileInputFormat is what I am going show you now the Sequencefile processing mapreduce hadoop

1) Create Driver class SeqDriver.java


package mrd.training.sample;

import java.io.IOException;
import java.net.URISyntaxException;

import mrdp.logging.LogWriter;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class SeqDriver {

/** Krishna - processing large number of xml files using SequenceFileInputFormat
* @param args
*/
public static void main(String[] args) {
try {

Configuration conf = new Configuration();
// conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, 2048);

// OR alternatively you can set it this way, the name of the
// property is
// "mapreduce.input.fixedlengthinputformat.record.length"
// conf.setInt("mapreduce.input.fixedlengthinputformat.record.length",
// 2048);
String[] arg = new GenericOptionsParser(conf, args).getRemainingArgs();

Job job = new Job(conf, "Seq Processing Processing");
job.setJarByClass(SeqDriver.class);
job.setMapperClass(MySeqMapper.class);

job.setNumReduceTasks(0);

job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

// job.setOutputValueClass(TextOutputFormat.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

SequenceFileInputFormat.addInputPath(job, new Path(args[0]));
// FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);

} catch (Exception e) {
LogWriter.getInstance().WriteLog("Driver Error: " + e.getMessage());
System.out.println(e.getMessage().toString());
}
// job.setReducerClass(ClickReducer.class);

}

}

2) Mapper class MySeqMapper.java


package mrd.training.sample;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;

import mrdp.logging.LogWriter;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

public class MySeqMapper extends Mapper<Text, Text, Text, NullWritable> {

private static final Log LOG = LogFactory.getLog(MySeqMapper.class);

// Fprivate Text videoName = new Text();

public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
try {

InputStream is = new ByteArrayInputStream(value.toString().getBytes());
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
Document doc = dBuilder.parse(is);

doc.getDocumentElement().normalize();

NodeList nList = doc.getElementsByTagName("employee");

for (int temp = 0; temp < nList.getLength(); temp++) {

Node nNode = nList.item(temp);

if (nNode.getNodeType() == Node.ELEMENT_NODE) {

Element eElement = (Element) nNode;

String id = eElement.getElementsByTagName("id").item(0).getTextContent();
String name = eElement.getElementsByTagName("name").item(0).getTextContent();
String gender = eElement.getElementsByTagName("gender").item(0).getTextContent();

// System.out.println(id + "," + name + "," + gender);
context.write(new Text(id + "," + name + "," + gender), NullWritable.get());

}
}
} catch (Exception e) {
LogWriter.getInstance().WriteLog(e.getMessage());
}

}
}

using this code you can process sequence files.

Leave a Reply