Process xml file via apache pig

  • 4

Process xml file via apache pig

Category : Pig

If you want to work with XML in Pig, the Piggybank library (a user-contributed library of useful Pig code) contains an XMLLoader. It works in a similar way to our technique and captures all of the content between a start and end tag and supplies it as a single bytearray field in a Pig tuple.

Pig is a tool used to analyze large amounts of data by representing them as data flows. Using the Pig Latin scripting language operations like ETL (Extract, Transform and Load), adhoc data analysis and iterative processing can be easily achieved. Pig is an abstraction over MapReduce. In other words, all Pig scripts internally are converted into Map and Reduce tasks to get the task done. Pig was built to make programming MapReduce applications easier. Before Pig, Java was the only way to process the data stored on HDFS. Pig was first built in Yahoo! and later became a top level Apache project.

Sample input file :hadoop_books.xml

<CATALOG>
<BOOK>
<TITLE>Hadoop Defnitive Guide</TITLE>
<AUTHOR>Tom White</AUTHOR>
<COUNTRY>US</COUNTRY>
<COMPANY>CLOUDERA</COMPANY>
<PRICE>24.90</PRICE>
<YEAR>2012</YEAR>
</BOOK>
<BOOK>
<TITLE>Programming Pig</TITLE>
<AUTHOR>Alan Gates</AUTHOR>
<COUNTRY>USA</COUNTRY>
<COMPANY>Horton Works</COMPANY>
<PRICE>30.90</PRICE>
<YEAR>2013</YEAR>
</BOOK>
</CATALOG>
There are two approaches to parse an XML file in PIG.1. Using Regular Expression
2. Using XPath

Lets discuss one by one.

1. Using Regular Expression : Here using the XMLLoader() in piggy bank UDF to load the xml, so ensure that Piggy Bank UDF is registered.  Then I am using regular expression to parse the XML.

REGISTER piggybank.jar
 A =  LOAD ‘/user/test/hadoop_books.xml’ using org.apache.pig.piggybank.storage.XMLLoader(‘BOOK’) as (x:chararray);
 B = foreach A GENERATE FLATTEN(REGEX_EXTRACT_ALL(x,‘<BOOK>\\s*<TITLE>(.*)</TITLE>\\s*<AUTHOR>(.*)</AUTHOR>\\s*<COUNTRY>(.*)</COUNTRY>\\s*<COMPANY>(.*)</COMPANY>\\s*<PRICE>(.*)</PRICE>\\s*<YEAR>(.*)</YEAR>\\s*</BOOK>’));
 dump B;
Once you will run this pig script then you will see the following output on your console.
(Hadoop Defnitive Guide,Tom White,US,CLOUDERA,24.90,2012)
(Programming Pig,Alan Gates,USA,Horton Works,30.90,2013)

2. Using XPath : It is second approach to solve xml parsing problem through Pig. XPath is a function that allows text extraction from xml. Starting PIG 0.13 , Piggy bank UDF comes with XPath support. It eases the XML parsing in PIG scripts.

A sample script using XPath is as shown below.

REGISTER piggybank.jar
DEFINE XPath org.apache.pig.piggybank.evaluation.xml.XPath();
A =  LOAD /user/test/hadoop_books.xml’ using org.apache.pig.piggybank.storage.XMLLoader(‘BOOK’) as (x:chararray);
B = FOREACH A GENERATE XPath(x, ‘BOOK/AUTHOR’), XPath(x, ‘BOOK/PRICE’);
dump B;
Output:
(Tom White,24.90)
(Alan Gates,30.90)

  • 3

Process xml file via mapreduce

Category : YARN

When you have a requirement to process your data via hadoop which is not default input format then this article will help you. Hadoop provides default input formats like TextInputFormat, NLineInputFormat, KeyValueInputFormat etc., when you get a different types of files for processing you have to create your own custom input format for processing using MapReduce jobs Here I am going to show you how to processing XML files using MapReduce Job by creating custom XMLInputFormat (xmlinputformat hadoop)

So for example if you have following xml input file and you want to process it then you can do with the help of following steps.

<CATALOG>
<BOOK>
<TITLE>Hadoop Defnitive Guide</TITLE>
<AUTHOR>Tom White</AUTHOR>
<COUNTRY>US</COUNTRY>
<COMPANY>CLOUDERA</COMPANY>
<PRICE>24.90</PRICE>
<YEAR>2012</YEAR>
</BOOK>
<BOOK>
<TITLE>Programming Pig</TITLE>
<AUTHOR>Alan Gates</AUTHOR>
<COUNTRY>USA</COUNTRY>
<COMPANY>Horton Works</COMPANY>
<PRICE>30.90</PRICE>
<YEAR>2013</YEAR>
</BOOK>
</CATALOG>

 

Step 1:  Create XMLInputFormat.java: 

package xmlparsing.demo;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class XmlInputFormat extends TextInputFormat {
public static final String START_TAG_KEY = “<employee>”;
public static final String END_TAG_KEY = “</employee>”;

@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new XmlRecordReader();
}

public static class XmlRecordReader extends
RecordReader<LongWritable, Text> {
private byte[] startTag;
private byte[] endTag;
private long start;
private long end;
private FSDataInputStream fsin;
private DataOutputBuffer buffer = new DataOutputBuffer();
private LongWritable key = new LongWritable();
private Text value = new Text();

@Override
public void initialize(InputSplit is, TaskAttemptContext tac)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) is;
String START_TAG_KEY = “<employee>”;
String END_TAG_KEY = “</employee>”;
startTag = START_TAG_KEY.getBytes(“utf-8”);
endTag = END_TAG_KEY.getBytes(“utf-8”);

start = fileSplit.getStart();
end = start + fileSplit.getLength();
Path file = fileSplit.getPath();

FileSystem fs = file.getFileSystem(tac.getConfiguration());
fsin = fs.open(fileSplit.getPath());
fsin.seek(start);

}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (fsin.getPos() < end) {
if (readUntilMatch(startTag, false)) {
try {
buffer.write(startTag);
if (readUntilMatch(endTag, true)) {

value.set(buffer.getData(), 0, buffer.getLength());
key.set(fsin.getPos());
return true;
}
} finally {
buffer.reset();
}
}
}
return false;
}

@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;

}

@Override
public float getProgress() throws IOException, InterruptedException {
return (fsin.getPos() – start) / (float) (end – start);
}

@Override
public void close() throws IOException {
fsin.close();
}

private boolean readUntilMatch(byte[] match, boolean withinBlock)
throws IOException {
int i = 0;
while (true) {
int b = fsin.read();

if (b == -1)
return false;

if (withinBlock)
buffer.write(b);

if (b == match[i]) {
i++;
if (i >= match.length)
return true;
} else
i = 0;

if (!withinBlock && i == 0 && fsin.getPos() >= end)
return false;
}
}

}

}

Step 2:  Create driver XMLDriver.java
package xmlparsing.demo;
importjavax.xml.stream.XMLInputFactory;
//import mrdp.logging.LogWriter;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.conf.*;
importorg.apache.hadoop.io.*;
importorg.apache.hadoop.mapred.TextOutputFormat;
importorg.apache.hadoop.mapreduce.*;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
importorg.apache.hadoop.util.GenericOptionsParser;
publicclassXMLDriver {
publicstaticvoidmain(String[] args) {
try{
Configuration conf = newConfiguration();
String[] arg = newGenericOptionsParser(conf, args).getRemainingArgs();
conf.set(“START_TAG_KEY”, “<employee>”);
conf.set(“END_TAG_KEY”, “</employee>”);
Job job = newJob(conf, “XML Processing Processing”);
job.setJarByClass(XMLDriver.class);
job.setMapperClass(MyMapper.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(XmlInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, newPath(args[0]));
FileOutputFormat.setOutputPath(job, newPath(args[1]));
job.waitForCompletion(true);
} catch(Exception e) {
LogWriter.getInstance().WriteLog(“Driver Error: “+ e.getMessage());
System.out.println(e.getMessage().toString());
}
}
}Step 3: Create MyMapper.javapackage xmlparsing.demo;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
//import mrdp.logging.LogWriter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

public class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

private static final Log LOG = LogFactory.getLog(MyMapper.class);
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

try {

InputStream is = new ByteArrayInputStream(value.toString().getBytes());
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
Document doc = dBuilder.parse(is);

doc.getDocumentElement().normalize();

NodeList nList = doc.getElementsByTagName(“employee”);

for (int temp = 0; temp < nList.getLength(); temp++) {

Node nNode = nList.item(temp);

if (nNode.getNodeType() == Node.ELEMENT_NODE) {

Element eElement = (Element) nNode;

String id = eElement.getElementsByTagName(“id”).item(0).getTextContent();
String name = eElement.getElementsByTagName(“name”).item(0).getTextContent();
String gender = eElement.getElementsByTagName(“gender”).item(0).getTextContent();

// System.out.println(id + “,” + name + “,” + gender);
context.write(new Text(id + “,” + name + “,” + gender), NullWritable.get());

}
}
} catch (Exception e) {
LogWriter.getInstance().WriteLog(e.getMessage());
}

}

}

ref: thinkbigdataanalytics.com