Category Archives: Pig

  • 0

Run Pig Script in Nifi

Category : Nifi , Pig

NiFi can interface directly with Hive, HDFS, HBase, Flume and Phoenix. And I can also trigger Spark and Flink through Kafka and Site-To-Site. Sometimes I need to run some Pig scripts. Apache Pig is very stable and has a lot of functions and tools that make for some smart processing. You can easily augment and add this piece to a larger pipeline or part of the process.

Pig Setup

I like to use Ambari to install the HDP 2.5 clients on my NiFi box to have access to all the tools I may need.

Then I can just do:

$ yum install pig

9098-pig1

9099-pig2

ExecuteProcess

We call a shell script that wraps the Pig script.

Output of script is stored to HDFS: hdfs dfs -ls /nifi-logs

Shell Script

$ export JAVA_HOME=/opt/jdk1.8.0_101/

$ pig x local l /tmp/pig.log f /opt/demo/pigscripts/test.pig

You can run in different Pig modes like local, mapreduce and tez. You can also pass in parameters or the script.

Pig Script

messages = LOAD ‘/opt/demo/HDF/centos7/tars/nifi/nifi-1.0.0.2.0.0.0-579/logs/nifi-app.log’;

warns = FILTER messages BY $0 MATCHES ‘.*WARN+.*’;

DUMP warns

store warns into ‘warns.out’

This is a basic example from the internet, with the NIFI 1.0 log used as the source.

As an aside, I run a daily script with the schedule 1 * * * * ? to clean up my logs.

Simply: /bin/rm -rf /opt/demo/HDF/centos7/tars/nifi/nifi-1.0.0.2.0.0.0-579/logs/*2016*

PutHDFS

Hadoop Configuration: /etc/hadoop/conf/core-site.xml

Pick a directory and store away.

Result:

HadoopVersionPigVersionUserIdStartedAtFinishedAtFeatures

2.7.3.2.5.0.012450.16.0.2.5.0.01245root20161103 19:53:5720161103 19:53:59FILTER

Success!

Job Stats (time in seconds):

JobIdMapsReducesMaxMapTimeMinMapTimeAvgMapTimeMedianMapTimeMaxReduceTimeMinReduceTimeAvgReduceTimeMedianReducetimeAliasFeatureOutputs

job_local72884441_000110n/an/an/an/a0000messages,warnsMAP_ONLYfile:/tmp/temp1540654561/tmp600070101,

Input(s):

Successfully read 30469 records from: “/opt/demo/HDF/centos7/tars/nifi/nifi-1.0.0.2.0.0.0-579/logs/nifi-app.log”

Output(s):

Successfully stored 1347 records in: “file:/tmp/temp1540654561/tmp-600070101”

Counters:

Total records written : 1347

Total bytes written : 0

Spillable Memory Manager spill count : 0

Total bags proactively spilled: 0

Total records proactively spilled: 0

Job DAG:

job_local72884441_0001

 

ref : https://community.hortonworks.com/articles/64844/running-apache-pig-scripts-from-apache-nifi-and-st.html


  • 4

Process xml file via apache pig

Category : Pig

If you want to work with XML in Pig, the Piggybank library (a user-contributed library of useful Pig code) contains an XMLLoader. It works in a similar way to our technique and captures all of the content between a start and end tag and supplies it as a single bytearray field in a Pig tuple.

Pig is a tool used to analyze large amounts of data by representing them as data flows. Using the Pig Latin scripting language operations like ETL (Extract, Transform and Load), adhoc data analysis and iterative processing can be easily achieved. Pig is an abstraction over MapReduce. In other words, all Pig scripts internally are converted into Map and Reduce tasks to get the task done. Pig was built to make programming MapReduce applications easier. Before Pig, Java was the only way to process the data stored on HDFS. Pig was first built in Yahoo! and later became a top level Apache project.

Sample input file :hadoop_books.xml

<CATALOG>
<BOOK>
<TITLE>Hadoop Defnitive Guide</TITLE>
<AUTHOR>Tom White</AUTHOR>
<COUNTRY>US</COUNTRY>
<COMPANY>CLOUDERA</COMPANY>
<PRICE>24.90</PRICE>
<YEAR>2012</YEAR>
</BOOK>
<BOOK>
<TITLE>Programming Pig</TITLE>
<AUTHOR>Alan Gates</AUTHOR>
<COUNTRY>USA</COUNTRY>
<COMPANY>Horton Works</COMPANY>
<PRICE>30.90</PRICE>
<YEAR>2013</YEAR>
</BOOK>
</CATALOG>
There are two approaches to parse an XML file in PIG.1. Using Regular Expression
2. Using XPath

Lets discuss one by one.

1. Using Regular Expression : Here using the XMLLoader() in piggy bank UDF to load the xml, so ensure that Piggy Bank UDF is registered.  Then I am using regular expression to parse the XML.

REGISTER piggybank.jar
 A =  LOAD ‘/user/test/hadoop_books.xml’ using org.apache.pig.piggybank.storage.XMLLoader(‘BOOK’) as (x:chararray);
 B = foreach A GENERATE FLATTEN(REGEX_EXTRACT_ALL(x,‘<BOOK>\\s*<TITLE>(.*)</TITLE>\\s*<AUTHOR>(.*)</AUTHOR>\\s*<COUNTRY>(.*)</COUNTRY>\\s*<COMPANY>(.*)</COMPANY>\\s*<PRICE>(.*)</PRICE>\\s*<YEAR>(.*)</YEAR>\\s*</BOOK>’));
 dump B;
Once you will run this pig script then you will see the following output on your console.
(Hadoop Defnitive Guide,Tom White,US,CLOUDERA,24.90,2012)
(Programming Pig,Alan Gates,USA,Horton Works,30.90,2013)

2. Using XPath : It is second approach to solve xml parsing problem through Pig. XPath is a function that allows text extraction from xml. Starting PIG 0.13 , Piggy bank UDF comes with XPath support. It eases the XML parsing in PIG scripts.

A sample script using XPath is as shown below.

REGISTER piggybank.jar
DEFINE XPath org.apache.pig.piggybank.evaluation.xml.XPath();
A =  LOAD /user/test/hadoop_books.xml’ using org.apache.pig.piggybank.storage.XMLLoader(‘BOOK’) as (x:chararray);
B = FOREACH A GENERATE XPath(x, ‘BOOK/AUTHOR’), XPath(x, ‘BOOK/PRICE’);
dump B;
Output:
(Tom White,24.90)
(Alan Gates,30.90)

  • 0

Pig script with HCatLoader on Hive ORC table

Category : Pig

Sometime we have to run some pig command on hive orc tables then this article will help you to do that.

Step 1: First create hive orc table:

hive> CREATE TABLE ORC_Table(COL1 BIGINT,COL2 STRING) CLUSTERED BY (COL1) INTO 10 BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\T’ STORED AS ORC TBLPROPERTIES (‘TRANSACTIONAL’=’TRUE’) ;

Step 2: Now insert data to this table:

hive> insert into orc_table values(122342,’test’);

hive> insert into orc_table values(231232,’rest’);

hive> select * from orc_table;

OK

122342 test

231232 rest

Time taken: 1.663 seconds, Fetched: 2 row(s)

Step 3: Now create pig script :

[user1@server1 ~]$ cat  myscript.pig

A = LOAD ‘test1.orc_table’ USING org.apache.hive.hcatalog.pig.HCatLoader();

Dump A;

Step 4: Now you have to run your pig script:

[user1@server1 ~]$ pig -useHCatalog -f myscript.pig

WARNING: Use “yarn jar” to launch YARN applications.

16/09/16 03:31:02 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL

16/09/16 03:31:02 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE

16/09/16 03:31:02 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType

2016-09-16 03:31:02,440 [main] INFO  org.apache.pig.Main – Apache Pig version 0.15.0.2.3.4.0-3485 (rexported) compiled Dec 16 2015, 04:30:33

2016-09-16 03:31:02,440 [main] INFO  org.apache.pig.Main – Logging error messages to: /home/user1/pig_1474011062438.log

2016-09-16 03:31:03,233 [main] INFO  org.apache.pig.impl.util.Utils – Default bootup file /home/user1/.pigbootup not found

2016-09-16 03:31:03,386 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine – Connecting to hadoop file system at: hdfs://HDPINFHA

2016-09-16 03:31:04,269 [main] INFO  org.apache.pig.PigServer – Pig Script ID for the session: PIG-myscript.pig-eb253b46-2d2e-495c-9149-ef305ee4e408

2016-09-16 03:31:04,726 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl – Timeline service address: http://server2:8188/ws/v1/timeline/

2016-09-16 03:31:04,726 [main] INFO  org.apache.pig.backend.hadoop.ATSService – Created ATS Hook

2016-09-16 03:31:05,618 [main] INFO  hive.metastore – Trying to connect to metastore with URI thrift://server2:9083

2016-09-16 03:31:05,659 [main] INFO  hive.metastore – Connected to metastore.

2016-09-16 03:31:06,209 [main] INFO  org.apache.pig.tools.pigstats.ScriptState – Pig features used in the script: UNKNOWN

2016-09-16 03:31:06,247 [main] INFO  org.apache.pig.data.SchemaTupleBackend – Key [pig.schematuple] was not set… will not generate code.

2016-09-16 03:31:06,284 [main] INFO  org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer – {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, ConstantCalculator, GroupByConstParallelSetter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, PartitionFilterOptimizer, PredicatePushdownOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter]}

2016-09-16 03:31:06,384 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler – File concatenation threshold: 100 optimistic? false

2016-09-16 03:31:06,409 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer – MR plan size before optimization: 1

2016-09-16 03:31:06,409 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer – MR plan size after optimization: 1

2016-09-16 03:31:06,576 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl – Timeline service address: http://server2:8188/ws/v1/timeline/

2016-09-16 03:31:06,758 [main] INFO  org.apache.pig.tools.pigstats.mapreduce.MRScriptState – Pig script settings are added to the job

2016-09-16 03:31:06,762 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3

2016-09-16 03:31:06,999 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – This job cannot be converted run in-process

2016-09-16 03:31:07,292 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/hive/lib/hive-metastore-1.2.1.2.3.4.0-3485.jar to DistributedCache through /tmp/temp-1473630461/tmp428549735/hive-metastore-1.2.1.2.3.4.0-3485.jar

2016-09-16 03:31:07,329 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/hive/lib/libthrift-0.9.2.jar to DistributedCache through /tmp/temp-1473630461/tmp568922300/libthrift-0.9.2.jar

2016-09-16 03:31:07,542 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/hive/lib/hive-exec-1.2.1.2.3.4.0-3485.jar to DistributedCache through /tmp/temp-1473630461/tmp-1007595209/hive-exec-1.2.1.2.3.4.0-3485.jar

2016-09-16 03:31:07,577 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/hive/lib/libfb303-0.9.2.jar to DistributedCache through /tmp/temp-1473630461/tmp-1039107423/libfb303-0.9.2.jar

2016-09-16 03:31:07,609 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/hive/lib/jdo-api-3.0.1.jar to DistributedCache through /tmp/temp-1473630461/tmp-1375931436/jdo-api-3.0.1.jar

2016-09-16 03:31:07,642 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler-1.2.1.2.3.4.0-3485.jar to DistributedCache through /tmp/temp-1473630461/tmp-893657730/hive-hbase-handler-1.2.1.2.3.4.0-3485.jar

2016-09-16 03:31:07,674 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/hive-hcatalog/share/hcatalog/hive-hcatalog-core-1.2.1.2.3.4.0-3485.jar to DistributedCache through /tmp/temp-1473630461/tmp-1850340790/hive-hcatalog-core-1.2.1.2.3.4.0-3485.jar

2016-09-16 03:31:07,705 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/hive-hcatalog/share/hcatalog/hive-hcatalog-pig-adapter-1.2.1.2.3.4.0-3485.jar to DistributedCache through /tmp/temp-1473630461/tmp58999520/hive-hcatalog-pig-adapter-1.2.1.2.3.4.0-3485.jar

2016-09-16 03:31:07,775 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/pig/pig-0.15.0.2.3.4.0-3485-core-h2.jar to DistributedCache through /tmp/temp-1473630461/tmp-422634726/pig-0.15.0.2.3.4.0-3485-core-h2.jar

2016-09-16 03:31:07,808 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/pig/lib/automaton-1.11-8.jar to DistributedCache through /tmp/temp-1473630461/tmp1167068812/automaton-1.11-8.jar

2016-09-16 03:31:07,840 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/pig/lib/antlr-runtime-3.4.jar to DistributedCache through /tmp/temp-1473630461/tmp708151030/antlr-runtime-3.4.jar

2016-09-16 03:31:07,882 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Setting up single store job

2016-09-16 03:31:07,932 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – 1 map-reduce job(s) waiting for submission.

2016-09-16 03:31:08,248 [JobControl] WARN  org.apache.hadoop.mapreduce.JobResourceUploader – No job jar file set.  User classes may not be found. See Job or Job#setJar(String).

2016-09-16 03:31:08,351 [JobControl] INFO  org.apache.hadoop.hive.ql.log.PerfLogger – <PERFLOG method=OrcGetSplits from=org.apache.hadoop.hive.ql.io.orc.ReaderImpl>

2016-09-16 03:31:08,355 [JobControl] INFO  org.apache.hadoop.hive.ql.io.orc.OrcInputFormat – ORC pushdown predicate: null

2016-09-16 03:31:08,416 [JobControl] INFO  org.apache.hadoop.hive.ql.io.orc.OrcInputFormat – FooterCacheHitRatio: 0/0

2016-09-16 03:31:08,416 [JobControl] INFO  org.apache.hadoop.hive.ql.log.PerfLogger – </PERFLOG method=OrcGetSplits start=1474011068351 end=1474011068416 duration=65 from=org.apache.hadoop.hive.ql.io.orc.ReaderImpl>

2016-09-16 03:31:08,421 [JobControl] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil – Total input paths (combined) to process : 1

2016-09-16 03:31:08,514 [JobControl] INFO  org.apache.hadoop.mapreduce.JobSubmitter – number of splits:1

2016-09-16 03:31:08,612 [JobControl] INFO  org.apache.hadoop.mapreduce.JobSubmitter – Submitting tokens for job: job_1472564332053_0029

2016-09-16 03:31:08,755 [JobControl] INFO  org.apache.hadoop.mapred.YARNRunner – Job jar is not present. Not adding any jar to the list of resources.

2016-09-16 03:31:08,947 [JobControl] INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl – Submitted application application_1472564332053_0029

2016-09-16 03:31:08,989 [JobControl] INFO  org.apache.hadoop.mapreduce.Job – The url to track the job: http://server2:8088/proxy/application_1472564332053_0029/

2016-09-16 03:31:08,990 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – HadoopJobId: job_1472564332053_0029

2016-09-16 03:31:08,990 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – Processing aliases A

2016-09-16 03:31:08,990 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – detailed locations: M: A[1,4] C:  R:

2016-09-16 03:31:09,007 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – 0% complete

2016-09-16 03:31:09,007 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – Running jobs are [job_1472564332053_0029]

2016-09-16 03:31:28,133 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – 50% complete

2016-09-16 03:31:28,133 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – Running jobs are [job_1472564332053_0029]

2016-09-16 03:31:29,251 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl – Timeline service address: http://server2:8188/ws/v1/timeline/

2016-09-16 03:31:29,258 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate – Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server

2016-09-16 03:31:30,186 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl – Timeline service address: http://server2:8188/ws/v1/timeline/

HadoopVersion PigVersion UserId StartedAt FinishedAt Features

2.7.1.2.3.4.0-3485 0.15.0.2.3.4.0-3485 s0998dnz 2016-09-16 03:31:06 2016-09-16 03:31:30 UNKNOWN

Success!

Job Stats (time in seconds):

JobId Maps Reduces MaxMapTime MinMapTime AvgMapTime MedianMapTime MaxReduceTime MinReduceTime AvgReduceTime MedianReducetime Alias Feature Outputs

job_1472564332053_0029 1 0 5 5 5 5 0 0 0 0 A MAP_ONLY hdfs://HDPINFHA/tmp/temp-1473630461/tmp1899757076,

Input(s):

Successfully read 2 records (28587 bytes) from: “test1.orc_table”

Output(s):

Successfully stored 2 records (32 bytes) in: “hdfs://HDPINFHA/tmp/temp-1473630461/tmp1899757076”

Counters:

Total records written : 2

Total bytes written : 32

Spillable Memory Manager spill count : 0

Total bags proactively spilled: 0

Total records proactively spilled: 0

Job DAG:

job_1472564332053_0029

2016-09-16 03:31:30,822 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – Success!

2016-09-16 03:31:30,825 [main] WARN  org.apache.pig.data.SchemaTupleBackend – SchemaTupleBackend has already been initialized

2016-09-16 03:31:30,834 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat – Total input paths to process : 1

2016-09-16 03:31:30,834 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil – Total input paths to process : 1

(122342,test)

(231232,rest)

2016-09-16 03:31:30,984 [main] INFO  org.apache.pig.Main – Pig script completed in 28 seconds and 694 milliseconds (28694 ms)