Wednesday 13 September 2017

Reduce Side Join

 
A common situation in many companies is that transaction records are kept separate from the customer data. There is, of course, a relationship between the two; usually a transaction record contains the unique ID of the customer through which the sale was performed.

In the Hadoop world, these would be represented by two types of data files: one containing records of the customer IDs and information for transactions, and the other would contain the full data for each customer.

Frequent tasks require reporting that uses data from both these sources; say, for example, we wanted to see the total number of transactions and total value for customer but do not want to associate it with an anonymous ID number, but rather with a name. This may be valuable
when customer service representatives wish to call the most frequent customers—data from the sales records—but want to be able to refer to the person by name and not just a number.


We can perform the report explained in the previous section using a reduce-side join.



/* Mayank  */

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ReduceJoin {

    public static class CustsMapper extends
            Mapper<Object, Text, Text, Text> {
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String record = value.toString();
            String[] parts = record.split(",");
            context.write(new Text(parts[0]), new Text("custs\t" + parts[1]));
        }
    }

    public static class TxnsMapper extends
            Mapper<Object, Text, Text, Text> {
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String record = value.toString();
            String[] parts = record.split(",");
            context.write(new Text(parts[2]), new Text("txns\t" + parts[3]));
        }
    }

    public static class ReduceJoinReducer extends
            Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            String name = "";
            double total = 0.0;
            int count = 0;
            for (Text t : values) {
                String parts[] = t.toString().split("\t");
                if (parts[0].equals("txns")) {
                    count++;
                    total += Float.parseFloat(parts[1]);
                } else if (parts[0].equals("custs")) {
                    name = parts[1];
                }
            }
            String str = String.format("%d\t%f", count, total);
            context.write(new Text(name), new Text(str));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "Reduce-side join");
        job.setJarByClass(ReduceJoin.class);
        job.setReducerClass(ReduceJoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
       
   
        MultipleInputs.addInputPath(job, new Path(args[0]),TextInputFormat.class, CustsMapper.class);
        MultipleInputs.addInputPath(job, new Path(args[1]),TextInputFormat.class, TxnsMapper.class);
        Path outputPath = new Path(args[2]);
       
       
        FileOutputFormat.setOutputPath(job, outputPath);
        outputPath.getFileSystem(conf).delete(outputPath);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}