A
common situation in many companies is that transaction records are
kept separate from the customer data. There is, of course, a
relationship between the two; usually a transaction record contains
the unique ID of the customer through which the sale was performed.
In
the Hadoop world, these would be represented by two types of data
files: one containing records of the customer IDs and information for
transactions, and the other would contain the full data for each
customer.
Frequent
tasks require reporting that uses data from both these sources; say,
for example, we wanted to see the total number of transactions and
total value for customer but do not want to associate it with an
anonymous ID number, but rather with a name. This may be valuable
when
customer service representatives wish to call the most frequent
customers—data from the sales records—but want to be able to
refer to the person by name and not just a number.
We
can perform the report explained in the previous section using a
reduce-side join.
/* Mayank */
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReduceJoin {
public static class CustsMapper extends
Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String record = value.toString();
String[] parts = record.split(",");
context.write(new Text(parts[0]), new Text("custs\t" + parts[1]));
}
}
public static class TxnsMapper extends
Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String record = value.toString();
String[] parts = record.split(",");
context.write(new Text(parts[2]), new Text("txns\t" + parts[3]));
}
}
public static class ReduceJoinReducer extends
Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String name = "";
double total = 0.0;
int count = 0;
for (Text t : values) {
String parts[] = t.toString().split("\t");
if (parts[0].equals("txns")) {
count++;
total += Float.parseFloat(parts[1]);
} else if (parts[0].equals("custs")) {
name = parts[1];
}
}
String str = String.format("%d\t%f", count, total);
context.write(new Text(name), new Text(str));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Reduce-side join");
job.setJarByClass(ReduceJoin.class);
job.setReducerClass(ReduceJoinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
MultipleInputs.addInputPath(job, new Path(args[0]),TextInputFormat.class, CustsMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]),TextInputFormat.class, TxnsMapper.class);
Path outputPath = new Path(args[2]);
FileOutputFormat.setOutputPath(job, outputPath);
outputPath.getFileSystem(conf).delete(outputPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}