processing delimited file mapreduce hadoop

By processing delimited file mapreduce hadoop , I am going to show how to process transaction data which is in csv file format.

1) Load the csv file to HDFS using

hadoop fs -copyFromLocal [filepath] [destination HDFS path]

2) Create a Transaction.java which contains all Mapper, Reducer and Driver class.


package mrd.training.sample;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.StringTokenizer;
import java.io.*;
import java.net.URI;

import mrdp.logging.LogWriter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.job_005fauthorization_005ferror_jsp;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapred.MapReduceBase;

public class Transaction {

    /*Krishna - Processing a delimted files using Map Reduce Jobs and using Distributed Cache for joins*/
    public enum test {
        error, completed
    }

    public static class MyMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            bufferparamter.close();
        }

        String parameterpath = "";
        BufferedReader bufferparamter;
        String paramline = null;
        String paramData = "";

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            try {

                Path[] files = null;
                try {

                    files = DistributedCache.getLocalCacheFiles(context.getConfiguration());
                } catch (Exception e) {

                }

                String line = null;

                if (files != null) {

                    parameterpath = files[0].toString();
                } else {

                    parameterpath = context.getConfiguration().get("ParameterPath");

                }

                bufferparamter = new BufferedReader(new FileReader(parameterpath));
                while ((paramline = bufferparamter.readLine()) != null) {

                    paramData = paramData + paramline + "\n";
                }

            } catch (Exception e) {

                LogWriter.getInstance().WriteLog(e.getMessage());

            }
        }

        private Text word = new Text();

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            try {

                String line = value.toString();
                String[] array = line.split(",");
                String[] paramDataarr = paramData.split("\n");

                boolean found = false;
                for (String paramline : paramDataarr) {

                    String[] custarray = paramline.split(",");

                    if (array[2].equals(custarray[0]))
                    {
                        word.set(custarray[4].toString());
                        found = true;
                        break;
                    }
                    // paramData = paramData + paramline + "\n";
                }
                if (!found)
                {
                    word.set("Default");
                }

                // Configuration conf= context.getConfiguration();
                // Double amount = Double.parseDouble(conf.get("Amount"));
                // context.getCounter(test.completed).increment(1);

                // if(Double.parseDouble(array[3].toString())>=amount)
                // {

                context.write(word, new DoubleWritable(Double.parseDouble(array[3].toString())));
            } catch (Exception e) {
                LogWriter.getInstance().WriteLog(e.getMessage());
            }
            // }

        }
    }

    public static class MyReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
        public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {

            int sum = 0;

            for (DoubleWritable val : values) {

                sum += val.get();

            }

            context.write(key, new DoubleWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        DistributedCache.addCacheFile(new URI(otherArgs[0].toString()), conf);
        // FileSystem localfile = FileSystem.getLocal(conf);

        // FSDataInputStream data = localfile.open(new Path(
        // "/home/Software/data/paramet.txt"));
        // @SuppressWarnings("deprecation")
        // String line = data.readLine();
        // String val = line.split("=")[1];
        // conf.set("Amount", val);
        // System.out.print(val);
        Job job = new Job(conf, "Transaction");

        job.setJarByClass(Transaction.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        // job.setNumReduceTasks(2);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}



Using this any kinds of Text file with delimited format file we can process it.

Leave a Reply