Process xml file via mapreduce

  • 3

Process xml file via mapreduce

Category : YARN

When you have a requirement to process your data via hadoop which is not default input format then this article will help you. Hadoop provides default input formats like TextInputFormat, NLineInputFormat, KeyValueInputFormat etc., when you get a different types of files for processing you have to create your own custom input format for processing using MapReduce jobs Here I am going to show you how to processing XML files using MapReduce Job by creating custom XMLInputFormat (xmlinputformat hadoop)

So for example if you have following xml input file and you want to process it then you can do with the help of following steps.

<CATALOG>
<BOOK>
<TITLE>Hadoop Defnitive Guide</TITLE>
<AUTHOR>Tom White</AUTHOR>
<COUNTRY>US</COUNTRY>
<COMPANY>CLOUDERA</COMPANY>
<PRICE>24.90</PRICE>
<YEAR>2012</YEAR>
</BOOK>
<BOOK>
<TITLE>Programming Pig</TITLE>
<AUTHOR>Alan Gates</AUTHOR>
<COUNTRY>USA</COUNTRY>
<COMPANY>Horton Works</COMPANY>
<PRICE>30.90</PRICE>
<YEAR>2013</YEAR>
</BOOK>
</CATALOG>

 

Step 1:  Create XMLInputFormat.java: 

package xmlparsing.demo;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class XmlInputFormat extends TextInputFormat {
public static final String START_TAG_KEY = “<employee>”;
public static final String END_TAG_KEY = “</employee>”;

@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new XmlRecordReader();
}

public static class XmlRecordReader extends
RecordReader<LongWritable, Text> {
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
private LongWritable key = new LongWritable();
private Text value = new Text();

@Override
public void initialize(InputSplit is, TaskAttemptContext tac)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) is;
String START_TAG_KEY = “<employee>”;
String END_TAG_KEY = “</employee>”;
startTag = START_TAG_KEY.getBytes(“utf-8”);
endTag = END_TAG_KEY.getBytes(“utf-8”);

start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path file = fileSplit.getPath();

FileSystem fs = file.getFileSystem(tac.getConfiguration());
fsin = fs.open(fileSplit.getPath());
fsin.seek(start);

}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (fsin.getPos() < end) {
if (readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {

value.set(buffer.getData(), 0, buffer.getLength());
key.set(fsin.getPos());
return true;
}
} finally {
buffer.reset();
}
}
}
return false;
}

@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;

}

@Override
public float getProgress() throws IOException, InterruptedException {
return (fsin.getPos() – start) / (float) (end – start);
}

@Override
public void close() throws IOException {
fsin.close();
}

private boolean readUntilMatch(byte[] match, boolean withinBlock)
throws IOException {
int i = 0;
while (true) {
int b = fsin.read();

if (b == -1)
return false;

if (withinBlock)
buffer.write(b);

if (b == match[i]) {
i++;
if (i >= match.length)
return true;
} else
i = 0;

if (!withinBlock && i == 0 && fsin.getPos() >= end)
return false;
}
}

}

}

Step 2:  Create driver XMLDriver.java
package xmlparsing.demo;
importjavax.xml.stream.XMLInputFactory;
//import mrdp.logging.LogWriter;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.conf.*;
importorg.apache.hadoop.io.*;
importorg.apache.hadoop.mapred.TextOutputFormat;
importorg.apache.hadoop.mapreduce.*;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
importorg.apache.hadoop.util.GenericOptionsParser;
publicclassXMLDriver {
publicstaticvoidmain(String[] args) {
try{
Configuration conf = newConfiguration();
String[] arg = newGenericOptionsParser(conf, args).getRemainingArgs();
conf.set(“START_TAG_KEY”, “<employee>”);
conf.set(“END_TAG_KEY”, “</employee>”);
Job job = newJob(conf, “XML Processing Processing”);
job.setJarByClass(XMLDriver.class);
job.setMapperClass(MyMapper.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(XmlInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, newPath(args[0]));
FileOutputFormat.setOutputPath(job, newPath(args[1]));
job.waitForCompletion(true);
} catch(Exception e) {
LogWriter.getInstance().WriteLog(“Driver Error: “+ e.getMessage());
System.out.println(e.getMessage().toString());
}
}
}Step 3: Create MyMapper.javapackage xmlparsing.demo;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
//import mrdp.logging.LogWriter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

public class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

private static final Log LOG = LogFactory.getLog(MyMapper.class);
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

try {

InputStream is = new ByteArrayInputStream(value.toString().getBytes());
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
Document doc = dBuilder.parse(is);

doc.getDocumentElement().normalize();

NodeList nList = doc.getElementsByTagName(“employee”);

for (int temp = 0; temp < nList.getLength(); temp++) {

Node nNode = nList.item(temp);

if (nNode.getNodeType() == Node.ELEMENT_NODE) {

Element eElement = (Element) nNode;

String id = eElement.getElementsByTagName(“id”).item(0).getTextContent();
String name = eElement.getElementsByTagName(“name”).item(0).getTextContent();
String gender = eElement.getElementsByTagName(“gender”).item(0).getTextContent();

// System.out.println(id + “,” + name + “,” + gender);
context.write(new Text(id + “,” + name + “,” + gender), NullWritable.get());

}
}
} catch (Exception e) {
LogWriter.getInstance().WriteLog(e.getMessage());
}

}

}

ref: thinkbigdataanalytics.com