Loading HBase Table Using MapReduce Job

In this Blog “Loading HBase Table Using MapReduce Job” I am going to show you how to load HBase Table using Map Reduce Job.

1) You have to Create the HBase table Like below with Column Familes, table creation is sample only


create 'student', {NAME => 'details'}, {NAME => 'education'}, {NAME => 'hobbies'}

2) Create Map Reduce JAVA class HbaseMapReduce.Java


package Hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.io.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.Put;

public class HbaseMapReduce {
     public static class Map extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String messageStr = value.toString();
            // Put put = new Put();// Bytes.toBytes("1")

            // put.add("cf".getBytes(),
            // "col1".getBytes(),messageStr.getBytes());

            String[] logRecvArr = messageStr.split("\\|");
            // if (logRecvArr.length < 5) {
            Put put = new Put(Bytes.toBytes(logRecvArr[0]));
            put.add(Bytes.toBytes("cf"), Bytes.toBytes("col1"),
                    Bytes.toBytes(logRecvArr[0]));

            put.add(Bytes.toBytes("cf"), Bytes.toBytes("col2"),
                    Bytes.toBytes(logRecvArr[1]));

            put.add(Bytes.toBytes("cf"), Bytes.toBytes("col3"),
                    Bytes.toBytes(logRecvArr[2]));
            // }

            context.write(new ImmutableBytesWritable("1".getBytes()), put);
        }
    }

    public static void main(String args[]) throws IOException,
            ClassNotFoundException, Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost:2181");
        Job job = new Job(conf, "Hbasemr");
        job.setJarByClass(hbasemr.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Put.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapperClass(Map.class);
        FileInputFormat.setInputPaths(job, new Path("input/hbasedata"));
        TableMapReduceUtil.initTableReducerJob(
                "hbasemr", // output table
                null, // reducer class
                job);
        job.setNumReduceTasks(0); // at least one, adjust as required
        job.waitForCompletion(true);
    }
}

 

3) You can execute as

hadoop jar [location of jar] [packagename.classname]

Thanks have a great day ! enjoy

Leave a Reply