package org.apache.hadoop.mapreduce.lib.join;
import java.io.CharArrayReader;
import java.io.IOException;
import java.io.StreamTokenizer;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Stack;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils;
* Very simple shift-reduce parser for join expressions.
* This should be sufficient for the user extension permitted now, but ought to
* be replaced with a parser generator if more complex grammars are supported.
* In particular, this "shift-reduce" parser has no states. Each set
* of formals requires a different internal node type, which is responsible for
* interpreting the list of tokens it receives. This is sufficient for the
* current grammar, but it has several annoying properties that might inhibit
* extension. In particular, parenthesis are always function calls; an
* algebraic or filter grammar would not only require a node type, but must
* also work around the internals of this parser.
* For most other cases, adding classes to the hierarchy- particularly by
* extending JoinRecordReader and MultiFilterRecordReader- is fairly
* straightforward. One need only override the relevant method(s) (usually only
* {@link CompositeRecordReader#combine}) and include a property to map its
* value to an identifier in the parser.
public class Parser {
* Tagged-union type for tokens from the join expression.
* @see Parser.TType
public static class Token {
private TType type;
Token(TType type) {
this.type = type;
public TType getType() { return type; }
public Node getNode() throws IOException {
throw new IOException("Expected nodetype");
public double getNum() throws IOException {
throw new IOException("Expected numtype");
public String getStr() throws IOException {
throw new IOException("Expected strtype");
public static class NumToken extends Token {
private double num;
public NumToken(double num) {
this.num = num;
public double getNum() { return num; }
public static class NodeToken extends Token {
private Node node;
NodeToken(Node node) {
this.node = node;
public Node getNode() {
return node;
public static class StrToken extends Token {
private String str;
public StrToken(TType type, String str) {
this.str = str;
public String getStr() {
return str;
* Simple lexer wrapping a StreamTokenizer.
* This encapsulates the creation of tagged-union Tokens and initializes the
* SteamTokenizer.
private static class Lexer {
private StreamTokenizer tok;
Lexer(String s) {
tok = new StreamTokenizer(new CharArrayReader(s.toCharArray()));
Token next() throws IOException {
int type = tok.nextToken();
switch (type) {
case StreamTokenizer.TT_EOF:
case StreamTokenizer.TT_EOL:
return null;
case StreamTokenizer.TT_NUMBER:
return new NumToken(tok.nval);
case StreamTokenizer.TT_WORD:
return new StrToken(TType.IDENT, tok.sval);
case '"':
return new StrToken(TType.QUOT, tok.sval);
switch (type) {
case ',':
return new Token(TType.COMMA);
case '(':
return new Token(TType.LPAREN);
case ')':
return new Token(TType.RPAREN);
throw new IOException("Unexpected: " + type);
public abstract static class Node extends ComposableInputFormat {
* Return the node type registered for the particular identifier.
* By default, this is a CNode for any composite node and a WNode
* for "wrapped" nodes. User nodes will likely be composite
* nodes.
* @see #addIdentifier(java.lang.String, java.lang.Class[], java.lang.Class, java.lang.Class)
* @see CompositeInputFormat#setFormat(org.apache.hadoop.mapred.JobConf)
static Node forIdent(String ident) throws IOException {
try {
if (!nodeCstrMap.containsKey(ident)) {
throw new IOException("No nodetype for " + ident);
return nodeCstrMap.get(ident).newInstance(ident);
} catch (IllegalAccessException e) {
throw new IOException(e);
} catch (InstantiationException e) {
throw new IOException(e);
} catch (InvocationTargetException e) {
throw new IOException(e);
private static final Class<?>[] ncstrSig = { String.class };
private static final
Map<String,Constructor<? extends Node>> nodeCstrMap =
new HashMap<String,Constructor<? extends Node>>();
protected static final Map<String,Constructor<? extends
ComposableRecordReader>> rrCstrMap =
new HashMap<String,Constructor<? extends ComposableRecordReader>>();
* For a given identifier, add a mapping to the nodetype for the parse
* tree and to the ComposableRecordReader to be created, including the
* formals required to invoke the constructor.
* The nodetype and constructor signature should be filled in from the
* child node.
protected static void addIdentifier(String ident, Class<?>[] mcstrSig,
Class<? extends Node> nodetype,
Class<? extends ComposableRecordReader> cl)
throws NoSuchMethodException {
Constructor<? extends Node> ncstr =
nodeCstrMap.put(ident, ncstr);
Constructor<? extends ComposableRecordReader> mcstr =
rrCstrMap.put(ident, mcstr);
// inst
protected int id = -1;
protected String ident;
protected Class<? extends WritableComparator> cmpcl;
protected Node(String ident) {
this.ident = ident;
protected void setID(int id) {
this.id = id;
protected void setKeyComparator(
Class<? extends WritableComparator> cmpcl) {
this.cmpcl = cmpcl;
abstract void parse(List<Token> args, Configuration conf)
throws IOException;
* Nodetype in the parse tree for "wrapped" InputFormats.
static class WNode extends Node {
private static final Class<?>[] cstrSig =
{ Integer.TYPE, RecordReader.class, Class.class };
static void addIdentifier(String ident,
Class<? extends ComposableRecordReader> cl)
throws NoSuchMethodException {
Node.addIdentifier(ident, cstrSig, WNode.class, cl);
private String indir;
private InputFormat<?, ?> inf;
public WNode(String ident) {
* Let the first actual define the InputFormat and the second define
* the <tt>mapred.input.dir</tt> property.
public void parse(List<Token> ll, Configuration conf) throws IOException {
StringBuilder sb = new StringBuilder();
Iterator<Token> i = ll.iterator();
while (i.hasNext()) {
Token t = i.next();
if (TType.COMMA.equals(t.getType())) {
try {
inf = (InputFormat<?, ?>)ReflectionUtils.newInstance(
conf.getClassByName(sb.toString()), conf);
} catch (ClassNotFoundException e) {
throw new IOException(e);
} catch (IllegalArgumentException e) {
throw new IOException(e);
if (!i.hasNext()) {
throw new IOException("Parse error");
Token t = i.next();
if (!TType.QUOT.equals(t.getType())) {
throw new IOException("Expected quoted string");
indir = t.getStr();
// no check for ll.isEmpty() to permit extension
private Configuration getConf(Configuration jconf) throws IOException {
Job job = Job.getInstance(jconf);
FileInputFormat.setInputPaths(job, indir);
return job.getConfiguration();
public List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException {
return inf.getSplits(
new JobContextImpl(getConf(context.getConfiguration()),
public ComposableRecordReader<?, ?> createRecordReader(InputSplit split,
TaskAttemptContext taskContext)
throws IOException, InterruptedException {
try {
if (!rrCstrMap.containsKey(ident)) {
throw new IOException("No RecordReader for " + ident);
Configuration conf = getConf(taskContext.getConfiguration());
TaskAttemptContext context =
new TaskAttemptContextImpl(conf,
new WrappedStatusReporter(taskContext));
return rrCstrMap.get(ident).newInstance(id,
inf.createRecordReader(split, context), cmpcl);
} catch (IllegalAccessException e) {
throw new IOException(e);
} catch (InstantiationException e) {
throw new IOException(e);
} catch (InvocationTargetException e) {
throw new IOException(e);
public String toString() {
return ident + "(" + inf.getClass().getName() + ",\"" + indir + "\")";
private static class WrappedStatusReporter extends StatusReporter {
TaskAttemptContext context;
public WrappedStatusReporter(TaskAttemptContext context) {
this.context = context;
public Counter getCounter(Enum<?> name) {
return context.getCounter(name);
public Counter getCounter(String group, String name) {
return context.getCounter(group, name);
public void progress() {
public float getProgress() {
return context.getProgress();
public void setStatus(String status) {
* Internal nodetype for "composite" InputFormats.
static class CNode extends Node {
private static final Class<?>[] cstrSig =
{ Integer.TYPE, Configuration.class, Integer.TYPE, Class.class };
static void addIdentifier(String ident,
Class<? extends ComposableRecordReader> cl)
throws NoSuchMethodException {
Node.addIdentifier(ident, cstrSig, CNode.class, cl);
// inst
private ArrayList<Node> kids = new ArrayList<Node>();
public CNode(String ident) {
public void setKeyComparator(Class<? extends WritableComparator> cmpcl) {
for (Node n : kids) {
* Combine InputSplits from child InputFormats into a
* {@link CompositeInputSplit}.
public List<InputSplit> getSplits(JobContext job)
throws IOException, InterruptedException {
List<List<InputSplit>> splits =
new ArrayList<List<InputSplit>>(kids.size());
for (int i = 0; i < kids.size(); ++i) {
List<InputSplit> tmp = kids.get(i).getSplits(job);
if (null == tmp) {
throw new IOException("Error gathering splits from child RReader");
if (i > 0 && splits.get(i-1).size() != tmp.size()) {
throw new IOException("Inconsistent split cardinality from child " +
i + " (" + splits.get(i-1).size() + "/" + tmp.size() + ")");
splits.add(i, tmp);
final int size = splits.get(0).size();
List<InputSplit> ret = new ArrayList<InputSplit>();
for (int i = 0; i < size; ++i) {
CompositeInputSplit split = new CompositeInputSplit(splits.size());
for (int j = 0; j < splits.size(); ++j) {
return ret;
@SuppressWarnings("unchecked") // child types unknowable
public ComposableRecordReader
createRecordReader(InputSplit split, TaskAttemptContext taskContext)
throws IOException, InterruptedException {
if (!(split instanceof CompositeInputSplit)) {
throw new IOException("Invalid split type:" +
final CompositeInputSplit spl = (CompositeInputSplit)split;
final int capacity = kids.size();
CompositeRecordReader ret = null;
try {
if (!rrCstrMap.containsKey(ident)) {
throw new IOException("No RecordReader for " + ident);
ret = (CompositeRecordReader)rrCstrMap.get(ident).
newInstance(id, taskContext.getConfiguration(), capacity, cmpcl);
} catch (IllegalAccessException e) {
throw new IOException(e);
} catch (InstantiationException e) {
throw new IOException(e);
} catch (InvocationTargetException e) {
throw new IOException(e);
for (int i = 0; i < capacity; ++i) {
ret.add(kids.get(i).createRecordReader(spl.get(i), taskContext));
return (ComposableRecordReader)ret;
* Parse a list of comma-separated nodes.
public void parse(List<Token> args, Configuration conf)
throws IOException {
ListIterator<Token> i = args.listIterator();
while (i.hasNext()) {
Token t = i.next();
t.getNode().setID(i.previousIndex() >> 1);
if (i.hasNext() && !TType.COMMA.equals(i.next().getType())) {
throw new IOException("Expected ','");
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(ident + "(");
for (Node n : kids) {
sb.append(n.toString() + ",");
sb.setCharAt(sb.length() - 1, ')');
return sb.toString();
private static Token reduce(Stack<Token> st, Configuration conf)
throws IOException {
LinkedList<Token> args = new LinkedList<Token>();
while (!st.isEmpty() && !TType.LPAREN.equals(st.peek().getType())) {
if (st.isEmpty()) {
throw new IOException("Unmatched ')'");
if (st.isEmpty() || !TType.IDENT.equals(st.peek().getType())) {
throw new IOException("Identifier expected");
Node n = Node.forIdent(st.pop().getStr());
n.parse(args, conf);
return new NodeToken(n);
* Given an expression and an optional comparator, build a tree of
* InputFormats using the comparator to sort keys.
static Node parse(String expr, Configuration conf) throws IOException {
if (null == expr) {
throw new IOException("Expression is null");
Class<? extends WritableComparator> cmpcl = conf.getClass(
CompositeInputFormat.JOIN_COMPARATOR, null, WritableComparator.class);
Lexer lex = new Lexer(expr);
Stack<Token> st = new Stack<Token>();
Token tok;
while ((tok = lex.next()) != null) {
if (TType.RPAREN.equals(tok.getType())) {
st.push(reduce(st, conf));
} else {
if (st.size() == 1 && TType.CIF.equals(st.peek().getType())) {
Node ret = st.pop().getNode();
if (cmpcl != null) {
return ret;
throw new IOException("Missing ')'");
