Run Pig Script in Nifi

  • 0

Run Pig Script in Nifi

Category : Nifi , Pig

NiFi can interface directly with Hive, HDFS, HBase, Flume and Phoenix. And I can also trigger Spark and Flink through Kafka and Site-To-Site. Sometimes I need to run some Pig scripts. Apache Pig is very stable and has a lot of functions and tools that make for some smart processing. You can easily augment and add this piece to a larger pipeline or part of the process.

Pig Setup

I like to use Ambari to install the HDP 2.5 clients on my NiFi box to have access to all the tools I may need.

Then I can just do:

$ yum install pig

9098-pig1

9099-pig2

ExecuteProcess

We call a shell script that wraps the Pig script.

Output of script is stored to HDFS: hdfs dfs -ls /nifi-logs

Shell Script

$ export JAVA_HOME=/opt/jdk1.8.0_101/

$ pig x local l /tmp/pig.log f /opt/demo/pigscripts/test.pig

You can run in different Pig modes like local, mapreduce and tez. You can also pass in parameters or the script.

Pig Script

messages = LOAD ‘/opt/demo/HDF/centos7/tars/nifi/nifi-1.0.0.2.0.0.0-579/logs/nifi-app.log’;

warns = FILTER messages BY $0 MATCHES ‘.*WARN+.*’;

DUMP warns

store warns into ‘warns.out’

This is a basic example from the internet, with the NIFI 1.0 log used as the source.

As an aside, I run a daily script with the schedule 1 * * * * ? to clean up my logs.

Simply: /bin/rm -rf /opt/demo/HDF/centos7/tars/nifi/nifi-1.0.0.2.0.0.0-579/logs/*2016*

PutHDFS

Hadoop Configuration: /etc/hadoop/conf/core-site.xml

Pick a directory and store away.

Result:

HadoopVersionPigVersionUserIdStartedAtFinishedAtFeatures

2.7.3.2.5.0.012450.16.0.2.5.0.01245root20161103 19:53:5720161103 19:53:59FILTER

Success!

Job Stats (time in seconds):

JobIdMapsReducesMaxMapTimeMinMapTimeAvgMapTimeMedianMapTimeMaxReduceTimeMinReduceTimeAvgReduceTimeMedianReducetimeAliasFeatureOutputs

job_local72884441_000110n/an/an/an/a0000messages,warnsMAP_ONLYfile:/tmp/temp1540654561/tmp600070101,

Input(s):

Successfully read 30469 records from: “/opt/demo/HDF/centos7/tars/nifi/nifi-1.0.0.2.0.0.0-579/logs/nifi-app.log”

Output(s):

Successfully stored 1347 records in: “file:/tmp/temp1540654561/tmp-600070101”

Counters:

Total records written : 1347

Total bytes written : 0

Spillable Memory Manager spill count : 0

Total bags proactively spilled: 0

Total records proactively spilled: 0

Job DAG:

job_local72884441_0001

 

ref : https://community.hortonworks.com/articles/64844/running-apache-pig-scripts-from-apache-nifi-and-st.html


  • 0

Top most Hadoop Interview question

1. What are the Side Data Distribution Techniques?

Side data refers to extra static small data required by map reduce to perform job. Main challenge is the availability of side data on the node where the map would be executed. Hadoop provides two side data distribution techniques.

Using Job Configuration

An arbitrary Key value pair can be set in job configuration.

2. What is shuffling in MapReduce?

Once map tasks started to complete, A communication from reducers is started. where map output sent to reducer, which is looking for the output data to process. at same time data nodes are still process multiple other tasks. The data transfer of mappers output to reducer known as shuffling.

3. What is partitioning?

Partitioning is a process to identify the reducer instance, which would be used to supply the mappers output. Before mapper emits the data (Key Value) pair to reducer, mapper identifies the reducer as an recipient of mapper output. All the key, no matter which mapper has generated this, must lie with same reducer.

4. What is Distributed Cache in mapreduce framework?

Distributed cache is an important feature provide by map reduce framework. Distributed cache can cache text, archive, jars, which could be used by application to improve performance. Application provides details of file to jobconf object to cache. Mapreduce framework would copy the

5. What is a job tracker?

Job tracker is a background service executed on namenode for submitting and tracking a Job. Job in hadoop terminology refers to mapreduce jobs. It further break up the job into tasks. Which would be deployed every data node holding the required data. In a Hadoop cluster, Job tracker is master and task acts like child, acts, performs and revert the progress to job tracker through heartbeat.

6. How to set which framework would be used to run map reduce program?

mapreduce.framework.name. it can be

  1. Local
  2. Classic
  3. Yarn

7. What is replication factor for Job’s JAR?

These are one of the most critical resources used regularly by task completion. it’s replication factor is 10

8. mapred.job.tracker property is used for?

mapred.job.tracker property is used by runner to get the job tracker mode. if it set to local then runner would submit the job to local job tracker running of single JVM. else job would be sent to mentioned address in property.

9. Difference between Job.submit() or waitForCompletion() ?

Job Submit internally creates submitter instance and submit the job, while waitforcompletion poll’s progress at regular interval of one second. if job gets executed successfully, it displays successful message on console else display a relevant error message.

 

10. What are the types of tables in Hive?

There are two types of tables.

  1. Managed tables.
  2. External tables.

Only the drop table command differentiates managed and external tables. Otherwise, both type of tables are very similar.

11. Does Hive support record level Insert, delete or update?

Hive does not provide record-level update, insert, or delete. Henceforth, Hive does not provide transactions too. However, users can go with CASE statements and built in functions of Hive to satisfy the above DML operations. Thus, a complex update query in a RDBMS may need many lines of code in Hive.

12. What kind of datawarehouse application is suitable for Hive?

Hive is not a full database. The design constraints and limitations of Hadoop and HDFS impose limits on what Hive can do.

Hive is most suited for data warehouse applications, where

1) Relatively static data is analyzed,

2) Fast response times are not required, and

3) When the data is not changing rapidly.

Hive doesn’t provide crucial features required for OLTP, Online Transaction Processing. It’s closer to being an OLAP tool, Online Analytic Processing.So, Hive is best suited for data warehouse applications, where a large data set is maintained and mined for insights, reports, etc.

13. How can the columns of a table in hive be written to a file?

By using awk command in shell, the output from HiveQL (Describe) can be written to a file.

hive -S -e “describe table_name;” | awk -F” ” ’{print 1}’ > ~/output.

14. CONCAT function in Hive with Example?

CONCAT function will concat the input strings. You can specify any number of strings separated by comma.

Example:

CONCAT (‘Hive’,’-’,’performs’,’-’,’good’,’-’,’in’,’-’,’Hadoop’);

Output:

Hive-performs-good-in-Hadoop

So, every time you delimit the strings by ‘-’. If it is common for all the strings, then Hive provides another command CONCAT_WS. Here you have to specify the delimit operator first.

CONCAT_WS (‘-’,’Hive’,’performs’,’good’,’in’,’Hadoop’);

Output: Hive-performs-good-in-Hadoop

15. REPEAT function in Hive with example?

REPEAT function will repeat the input string n times specified in the command.

Example:

REPEAT(‘Hadoop’,3);

Output:

HadoopHadoopHadoop.

Note: You can add a space with the input string also.

16. How Pig integrate with Mapreduce to process data?

Pig can easier to execute. When programmer wrote a script to analyze the data sets, Here Pig compiler will convert the programs into MapReduce understandable format. Pig engine execute the query on the MR Jobs. The MapReduce process the data and generate output report. Here MapReduce doesn’t return output to Pig, directly stored in the HDFS.

17. What is the difference between logical and physical plan?

Pig undergoes some steps when a Pig Latin Script is converted into MapReduce jobs. After performing the basic parsing and semantic checking, it produces a logical plan. The logical plan describes the logical operators that have to be executed by Pig during execution. After this, Pig produces a physical plan. The physical plan describes the physical operators that are needed to execute the script.

18. How many ways we can run Pig programs?

Pig programs or commands can be executed in three ways.

  • Script – Batch Method
  • Grunt Shell – Interactive Method
  • Embedded mode

All these ways can be applied to both Local and Mapreduce modes of execution.

19. What is Grunt in Pig?

Grunt is an Interactive Shell in Pig, and below are its major features:

  • Ctrl-E key combination will move the cursor to the end of the line.
  • Grunt remembers command history, and can recall lines in the history buffer using up or down cursor keys.
  • Grunt supports Auto completion mechanism, which will try to complete
  • Pig Latin keywords and functions when you press the Tab key.

20. What are the modes of Pig Execution?

Local Mode:

Local execution in a single JVM, all files are installed and run using local host and file system.

Mapreduce Mode:

Distributed execution on a Hadoop cluster, it is the default mode.

21. What are the main difference between local mode and MapReduce mode?

Local mode:

No need to start or install Hadoop. The pig scripts run in the local system. By default Pig store data in File system. 100% MapReduce and Local mode commands everything same, no need to change anything.

MapReduce Mode:

It’s mandatory to start Hadoop. Pig scripts run and stored in in HDFS. in Both modes, Java and Pig installation is mandatory.

22. Can we process vast amount of data in local mode? Why?

No, System has limited fixed amount of storage, where as Hadoop can handle vast amount of data. So, Pig -x Mapreduce mode is the best choice to process vast amount of data.

23. Does Pig support multi-line commands?

Yes

24. Hive doesn’t support multi-line commands, what about Pig?

Pig can support single and multiple line commands.

Single line comments:

Dump B; — It execute the data, but not store in the file system.

Multiple Line comments:

Store B into ‘/output’; /* it can store/persists the data in Hdfs or Local File System. In protection level most often used Store command */

25. Difference Between Pig and SQL ?

Pig is a Procedural SQL is Declarative Nested relational data model SQL flat relational Schema is optional SQL schema is required OLAP works SQL supports OLAP+OLTP works loads Limited Query  Optimization and Significent opportunity for query Optimization.

 


  • 0

Pig script with HCatLoader on Hive ORC table

Category : Pig

Sometime we have to run some pig command on hive orc tables then this article will help you to do that.

Step 1: First create hive orc table:

hive> CREATE TABLE ORC_Table(COL1 BIGINT,COL2 STRING) CLUSTERED BY (COL1) INTO 10 BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\T’ STORED AS ORC TBLPROPERTIES (‘TRANSACTIONAL’=’TRUE’) ;

Step 2: Now insert data to this table:

hive> insert into orc_table values(122342,’test’);

hive> insert into orc_table values(231232,’rest’);

hive> select * from orc_table;

OK

122342 test

231232 rest

Time taken: 1.663 seconds, Fetched: 2 row(s)

Step 3: Now create pig script :

[user1@server1 ~]$ cat  myscript.pig

A = LOAD ‘test1.orc_table’ USING org.apache.hive.hcatalog.pig.HCatLoader();

Dump A;

Step 4: Now you have to run your pig script:

[user1@server1 ~]$ pig -useHCatalog -f myscript.pig

WARNING: Use “yarn jar” to launch YARN applications.

16/09/16 03:31:02 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL

16/09/16 03:31:02 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE

16/09/16 03:31:02 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType

2016-09-16 03:31:02,440 [main] INFO  org.apache.pig.Main – Apache Pig version 0.15.0.2.3.4.0-3485 (rexported) compiled Dec 16 2015, 04:30:33

2016-09-16 03:31:02,440 [main] INFO  org.apache.pig.Main – Logging error messages to: /home/user1/pig_1474011062438.log

2016-09-16 03:31:03,233 [main] INFO  org.apache.pig.impl.util.Utils – Default bootup file /home/user1/.pigbootup not found

2016-09-16 03:31:03,386 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine – Connecting to hadoop file system at: hdfs://HDPINFHA

2016-09-16 03:31:04,269 [main] INFO  org.apache.pig.PigServer – Pig Script ID for the session: PIG-myscript.pig-eb253b46-2d2e-495c-9149-ef305ee4e408

2016-09-16 03:31:04,726 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl – Timeline service address: http://server2:8188/ws/v1/timeline/

2016-09-16 03:31:04,726 [main] INFO  org.apache.pig.backend.hadoop.ATSService – Created ATS Hook

2016-09-16 03:31:05,618 [main] INFO  hive.metastore – Trying to connect to metastore with URI thrift://server2:9083

2016-09-16 03:31:05,659 [main] INFO  hive.metastore – Connected to metastore.

2016-09-16 03:31:06,209 [main] INFO  org.apache.pig.tools.pigstats.ScriptState – Pig features used in the script: UNKNOWN

2016-09-16 03:31:06,247 [main] INFO  org.apache.pig.data.SchemaTupleBackend – Key [pig.schematuple] was not set… will not generate code.

2016-09-16 03:31:06,284 [main] INFO  org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer – {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, ConstantCalculator, GroupByConstParallelSetter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, PartitionFilterOptimizer, PredicatePushdownOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter]}

2016-09-16 03:31:06,384 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler – File concatenation threshold: 100 optimistic? false

2016-09-16 03:31:06,409 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer – MR plan size before optimization: 1

2016-09-16 03:31:06,409 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer – MR plan size after optimization: 1

2016-09-16 03:31:06,576 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl – Timeline service address: http://server2:8188/ws/v1/timeline/

2016-09-16 03:31:06,758 [main] INFO  org.apache.pig.tools.pigstats.mapreduce.MRScriptState – Pig script settings are added to the job

2016-09-16 03:31:06,762 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3

2016-09-16 03:31:06,999 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – This job cannot be converted run in-process

2016-09-16 03:31:07,292 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/hive/lib/hive-metastore-1.2.1.2.3.4.0-3485.jar to DistributedCache through /tmp/temp-1473630461/tmp428549735/hive-metastore-1.2.1.2.3.4.0-3485.jar

2016-09-16 03:31:07,329 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/hive/lib/libthrift-0.9.2.jar to DistributedCache through /tmp/temp-1473630461/tmp568922300/libthrift-0.9.2.jar

2016-09-16 03:31:07,542 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/hive/lib/hive-exec-1.2.1.2.3.4.0-3485.jar to DistributedCache through /tmp/temp-1473630461/tmp-1007595209/hive-exec-1.2.1.2.3.4.0-3485.jar

2016-09-16 03:31:07,577 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/hive/lib/libfb303-0.9.2.jar to DistributedCache through /tmp/temp-1473630461/tmp-1039107423/libfb303-0.9.2.jar

2016-09-16 03:31:07,609 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/hive/lib/jdo-api-3.0.1.jar to DistributedCache through /tmp/temp-1473630461/tmp-1375931436/jdo-api-3.0.1.jar

2016-09-16 03:31:07,642 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/hive/lib/hive-hbase-handler-1.2.1.2.3.4.0-3485.jar to DistributedCache through /tmp/temp-1473630461/tmp-893657730/hive-hbase-handler-1.2.1.2.3.4.0-3485.jar

2016-09-16 03:31:07,674 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/hive-hcatalog/share/hcatalog/hive-hcatalog-core-1.2.1.2.3.4.0-3485.jar to DistributedCache through /tmp/temp-1473630461/tmp-1850340790/hive-hcatalog-core-1.2.1.2.3.4.0-3485.jar

2016-09-16 03:31:07,705 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/hive-hcatalog/share/hcatalog/hive-hcatalog-pig-adapter-1.2.1.2.3.4.0-3485.jar to DistributedCache through /tmp/temp-1473630461/tmp58999520/hive-hcatalog-pig-adapter-1.2.1.2.3.4.0-3485.jar

2016-09-16 03:31:07,775 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/pig/pig-0.15.0.2.3.4.0-3485-core-h2.jar to DistributedCache through /tmp/temp-1473630461/tmp-422634726/pig-0.15.0.2.3.4.0-3485-core-h2.jar

2016-09-16 03:31:07,808 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/pig/lib/automaton-1.11-8.jar to DistributedCache through /tmp/temp-1473630461/tmp1167068812/automaton-1.11-8.jar

2016-09-16 03:31:07,840 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Added jar file:/usr/hdp/2.3.4.0-3485/pig/lib/antlr-runtime-3.4.jar to DistributedCache through /tmp/temp-1473630461/tmp708151030/antlr-runtime-3.4.jar

2016-09-16 03:31:07,882 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler – Setting up single store job

2016-09-16 03:31:07,932 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – 1 map-reduce job(s) waiting for submission.

2016-09-16 03:31:08,248 [JobControl] WARN  org.apache.hadoop.mapreduce.JobResourceUploader – No job jar file set.  User classes may not be found. See Job or Job#setJar(String).

2016-09-16 03:31:08,351 [JobControl] INFO  org.apache.hadoop.hive.ql.log.PerfLogger – <PERFLOG method=OrcGetSplits from=org.apache.hadoop.hive.ql.io.orc.ReaderImpl>

2016-09-16 03:31:08,355 [JobControl] INFO  org.apache.hadoop.hive.ql.io.orc.OrcInputFormat – ORC pushdown predicate: null

2016-09-16 03:31:08,416 [JobControl] INFO  org.apache.hadoop.hive.ql.io.orc.OrcInputFormat – FooterCacheHitRatio: 0/0

2016-09-16 03:31:08,416 [JobControl] INFO  org.apache.hadoop.hive.ql.log.PerfLogger – </PERFLOG method=OrcGetSplits start=1474011068351 end=1474011068416 duration=65 from=org.apache.hadoop.hive.ql.io.orc.ReaderImpl>

2016-09-16 03:31:08,421 [JobControl] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil – Total input paths (combined) to process : 1

2016-09-16 03:31:08,514 [JobControl] INFO  org.apache.hadoop.mapreduce.JobSubmitter – number of splits:1

2016-09-16 03:31:08,612 [JobControl] INFO  org.apache.hadoop.mapreduce.JobSubmitter – Submitting tokens for job: job_1472564332053_0029

2016-09-16 03:31:08,755 [JobControl] INFO  org.apache.hadoop.mapred.YARNRunner – Job jar is not present. Not adding any jar to the list of resources.

2016-09-16 03:31:08,947 [JobControl] INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl – Submitted application application_1472564332053_0029

2016-09-16 03:31:08,989 [JobControl] INFO  org.apache.hadoop.mapreduce.Job – The url to track the job: http://server2:8088/proxy/application_1472564332053_0029/

2016-09-16 03:31:08,990 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – HadoopJobId: job_1472564332053_0029

2016-09-16 03:31:08,990 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – Processing aliases A

2016-09-16 03:31:08,990 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – detailed locations: M: A[1,4] C:  R:

2016-09-16 03:31:09,007 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – 0% complete

2016-09-16 03:31:09,007 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – Running jobs are [job_1472564332053_0029]

2016-09-16 03:31:28,133 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – 50% complete

2016-09-16 03:31:28,133 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – Running jobs are [job_1472564332053_0029]

2016-09-16 03:31:29,251 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl – Timeline service address: http://server2:8188/ws/v1/timeline/

2016-09-16 03:31:29,258 [main] INFO  org.apache.hadoop.mapred.ClientServiceDelegate – Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server

2016-09-16 03:31:30,186 [main] INFO  org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl – Timeline service address: http://server2:8188/ws/v1/timeline/

HadoopVersion PigVersion UserId StartedAt FinishedAt Features

2.7.1.2.3.4.0-3485 0.15.0.2.3.4.0-3485 s0998dnz 2016-09-16 03:31:06 2016-09-16 03:31:30 UNKNOWN

Success!

Job Stats (time in seconds):

JobId Maps Reduces MaxMapTime MinMapTime AvgMapTime MedianMapTime MaxReduceTime MinReduceTime AvgReduceTime MedianReducetime Alias Feature Outputs

job_1472564332053_0029 1 0 5 5 5 5 0 0 0 0 A MAP_ONLY hdfs://HDPINFHA/tmp/temp-1473630461/tmp1899757076,

Input(s):

Successfully read 2 records (28587 bytes) from: “test1.orc_table”

Output(s):

Successfully stored 2 records (32 bytes) in: “hdfs://HDPINFHA/tmp/temp-1473630461/tmp1899757076”

Counters:

Total records written : 2

Total bytes written : 32

Spillable Memory Manager spill count : 0

Total bags proactively spilled: 0

Total records proactively spilled: 0

Job DAG:

job_1472564332053_0029

2016-09-16 03:31:30,822 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – Success!

2016-09-16 03:31:30,825 [main] WARN  org.apache.pig.data.SchemaTupleBackend – SchemaTupleBackend has already been initialized

2016-09-16 03:31:30,834 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat – Total input paths to process : 1

2016-09-16 03:31:30,834 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil – Total input paths to process : 1

(122342,test)

(231232,rest)

2016-09-16 03:31:30,984 [main] INFO  org.apache.pig.Main – Pig script completed in 28 seconds and 694 milliseconds (28694 ms)


  • 1

Run pig script though Oozie

Category : Bigdata

If you have a requirement where you have to read some file through pig and you want to schedule your pig script via Oozie then this article will help you to do your job.

Step 1: First create some dir inside hdfs(under your home dir) would be good.

$ hadoop fs -mkdir -p /user/<user_id>/oozie-scripts/PigTest

Step 2: Create your workflow.xml and job.properties:

$ vi job.properties

#*************************************************

#  job.properties

#*************************************************

nameNode=hdfs://HDPTSTHA

jobTracker=<RM_Server>:8050

queueName=default

oozie.libpath=${nameNode}/user/oozie/share/lib

oozie.use.system.libpath=true

oozie.wf.rerun.failnodes=true

examplesRoot=oozie-scripts

examplesRootDir=/user/${user.name}/${examplesRoot}

appPath=${examplesRootDir}/PigTest

oozie.wf.application.path=${appPath}

$ vi workflow.xml

<!–******************************************–>

<!–workflow.xml                              –>

<!–******************************************–>

<workflow-app name=”WorkFlowForPigAction” xmlns=”uri:oozie:workflow:0.1″>

<start to=”pigAction”/>

<action name=”pigAction”>

        <pig>

            <job-tracker>${jobTracker}</job-tracker>

            <name-node>${nameNode}</name-node>

            <prepare>

                <delete path=”${nameNode}/${examplesRootDir}/PigTest/temp”/>

            </prepare>

            <configuration>

                <property>

                    <name>mapred.job.queue.name</name>

                    <value>${queueName}</value>

                </property>

                <property>

                    <name>mapred.compress.map.output</name>

                    <value>true</value>

                </property>

            </configuration>

            <script>pig_script_file.pig</script>

          </pig>

        <ok to=”end”/>

        <error to=”end”/>

    </action>

<end name=”end”/>

</workflow-app>

Step 3: Create your pig script :

$ cat pig_script_file.pig

lines = LOAD ‘/user/demouser/file.txt’ AS (line:chararray);

words = FOREACH lines GENERATE FLATTEN(TOKENIZE(line)) as word;

grouped = GROUP words BY word;

wordcount = FOREACH grouped GENERATE group, COUNT(words);

store wordcount into ‘/user/demouser/pigOut1’;

— DUMP wordcount;

Step 4: Now copy your workflow.xml and pig script to your hdfs location:

$ hadoop fs -put pig_script_file.pig /user/demouser/oozie-scripts/PigTest/

$ hadoop fs -put workflow.xml /user/demouser/oozie-scripts/PigTest/

Step 5: Now you can schedule or submit oozie job

$ oozie job -oozie http://<Oozie_server>:11000/oozie -config job.properties -run

Now you can see your output in hdfs :

[demouser@<nameNode_server> pig_oozie_demo]$ hadoop fs -ls /user/demouser/pigOut1

Found 2 items

-rw-r–r–   demouser hdfs          0 2016-05-03 05:36 /user/demouser/pigOut1/_SUCCESS

-rw-r–r–   demouser hdfs         30 2016-05-03 05:36 /user/demouser/pigOut1/part-r-00000

[demouser@<nameNode_server> pig_oozie_demo]$ hadoop fs -cat /user/demouser/pigOut1/part-r-00000

pig 4

test 1

oozie 1

sample 1

Conclusion : I hope this article will help you and if you feel to give any feedback or have any doubt please feel free to write to me in comment.