Map side join in Hive
Category : Hive
Many time we face a situation that we have very small tables in hive but when we query these tables then it takes long time.
Here I am going to explain Map side join and its advantages over the normal join operation in Hive. But before knowing about this, we should first understand the concept of ‘Join’ and what happens internally when we perform the join in Hive.
Join is a clause that combines the records of two tables (or Data-Sets).
Assume that we have two tables A and B. When we perform join operation on them, it will return the records which are the combination of all columns o f A and B.
Mapjoin is a little-known feature of Hive. It allows a table to be loaded into memory so that a (very fast) join could be performed entirely within a mapper without having to use a Map/Reduce step. If your queries frequently rely on small table joins (e.g. cities or countries, etc.) you might see a very substantial speed-up from using mapjoins.
There are two ways to enable it. First is by using a hint, which looks like /*+ MAPJOIN(aliasname), MAPJOIN(anothertable) */
. This C-style comment should be placed immediately following the SELECT
. It directs Hive to load aliasname
(which is a table or alias of the query) into memory.
SELECT /*+ MAPJOIN(c) */ * FROM orders o JOIN cities c ON (o.city_id = c.id);
Another (better, in my opinion) way to turn on mapjoins is to let Hive do it automatically. Simply set hive.auto.convert.join
to true in your config, and Hive will automatically use mapjoins for any tables smaller than hive.mapjoin.smalltable.filesize
(default is 25MB).
Assume that we have two tables of which one of them is a small table. When we submit a map reduce task, a Map Reduce local task will be created before the original join Map Reduce task which will read data of the small table from HDFS and store it into an in-memory hash table. After reading, it serializes the in-memory hash table into a hash table file.
In the next stage, when the original join Map Reduce task is running, it moves the data in the hash table file to the Hadoop distributed cache, which populates these files to each mapper’s local disk. So all the mappers can load this persistent hash table file back into the memory and do the join work as before. The execution flow of the optimized map join is shown in the figure below. After optimization, the small table needs to be read just once. Also if multiple mappers are running on the same machine, the distributed cache only needs to push one copy of the hash table file to this machine.
Create two sample table and and insert some data into those table and then perform map join operation.
hive> create table emp(name string,address string, deptid bigint) row format delimited fields terminated by ‘,’;
OK
Time taken: 20.218 seconds
hive> load data local inpath ‘/root/emp.txt’ overwrite into table emp;
Loading data to table test.emp
Table test.emp stats: [numFiles=1, numRows=0, totalSize=56, rawDataSize=0]
OK
Time taken: 0.713 seconds
hive> select * from emp;
OK
Saurabh AA 1
Babu AA 2
Nach BB 2
Jeba CC 1
Abhijit DD 1
Time taken: 20.105 seconds, Fetched: 5 row(s)
hive> create table dept(deptname string, deptid bigint) row format delimited fields terminated by ‘,’;
OK
Time taken: 20.192 seconds
hive> load data local inpath ‘/root/dept.txt’ overwrite into table dept;
Loading data to table test.dept
Table test.dept stats: [numFiles=1, numRows=0, totalSize=13, rawDataSize=0]
OK
Time taken: 20.705 seconds
hive> select * from dept;
OK
IT 1
Infra 2
Time taken: 0.081 seconds, Fetched: 2 row(s)
Without Map join:
hive> select emp.name,dept.deptname from emp join dept on emp.deptid=dept.deptid;
Query ID = root_20161018080320_198dcd9e-7e47-440f-871d-5da56522fced
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1476772397810_0003, Tracking URL = http://m1.hdp22:8088/proxy/application_1476772397810_0003/
Kill Command = /usr/hdp/2.3.0.0-2557/hadoop/bin/hadoop job -kill job_1476772397810_0003
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
2016-10-18 08:04:53,713 Stage-1 map = 0%, reduce = 0%
2016-10-18 08:05:52,017 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 3.08 sec
2016-10-18 08:06:39,906 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 4.37 sec
MapReduce Total cumulative CPU time: 4 seconds 370 msec
Ended Job = job_1476772397810_0003
MapReduce Jobs Launched:
Stage-Stage-1: Map: 2 Reduce: 1 Cumulative CPU: 4.37 sec HDFS Read: 13044 HDFS Write: 52 SUCCESS
Total MapReduce CPU Time Spent: 4 seconds 370 msec
OK
Abhijit IT
Jeba IT
Saurabh IT
Nach Infra
Babu Infra
Time taken: 201.293 seconds, Fetched: 5 row(s)
With Map Join :
hive> select /*+ MAPJOIN(dept) */ emp.name,dept.deptname from emp join dept on emp.deptid=dept.deptid;
Query ID = root_20161018075509_476aa0ce-704a-4e3b-91c5-c2a3444a9fd7
Total jobs = 1
WARNING: Use “yarn jar” to launch YARN applications.
Execution log at: /tmp/root/root_20161018075509_476aa0ce-704a-4e3b-91c5-c2a3444a9fd7.log
2016-10-18 07:55:37 Starting to launch local task to process map join; maximum memory = 1065484288
2016-10-18 07:55:59 Dump the side-table for tag: 1 with group count: 2 into file: file:/tmp/root/f9baf1b2-42f4-4f89-9ac8-a48f5e8b0170/hive_2016-10-18_07-55-09_410_1173820569043720345-1/-local-10003/HashTable-Stage-3/MapJoin-mapfile11–.hashtable
2016-10-18 07:55:59 Uploaded 1 File to: file:/tmp/root/f9baf1b2-42f4-4f89-9ac8-a48f5e8b0170/hive_2016-10-18_07-55-09_410_1173820569043720345-1/-local-10003/HashTable-Stage-3/MapJoin-mapfile11–.hashtable (307 bytes)
2016-10-18 07:55:59 End of local task; Time Taken: 22.621 sec.
Execution completed successfully
MapredLocal task succeeded
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there’s no reduce operator
Starting Job = job_1476772397810_0002, Tracking URL = http://m1.hdp22:8088/proxy/application_1476772397810_0002/
Kill Command = /usr/hdp/2.3.0.0-2557/hadoop/bin/hadoop job -kill job_1476772397810_0002
Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 0
2016-10-18 07:57:09,279 Stage-3 map = 0%, reduce = 0%
2016-10-18 07:57:57,324 Stage-3 map = 100%, reduce = 0%, Cumulative CPU 1.6 sec
MapReduce Total cumulative CPU time: 1 seconds 600 msec
Ended Job = job_1476772397810_0002
MapReduce Jobs Launched:
Stage-Stage-3: Map: 1 Cumulative CPU: 1.6 sec HDFS Read: 6415 HDFS Write: 52 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 600 msec
OK
Saurabh IT
Babu Infra
Nach Infra
Jeba IT
Abhijit IT
Time taken: 169.01 seconds, Fetched: 5 row(s)
I hope it will have help you to understand map join.
18 Comments
akash
March 12, 2017 at 4:50 pmthank u for giving gud information
admin
March 13, 2017 at 6:12 amThanks for your valuable comment.
Kartiki
April 12, 2017 at 6:06 pmYou have explained map-join very well. Thank you …! 🙂
Pranav
April 16, 2017 at 6:33 amNice explanation…Thanks a lot
Kirthika
June 12, 2017 at 8:05 amThank you for the insights. But I would like to know – how partitions and buckets are handled here?
Say I have a big table across multiple data nodes which is partitioned and bucketed in each node. As per map side join, the smaller table would be available across all the data nodes. But will the smaller table be available for data in all folders (i.e., partitions) and files (i.e., buckets)?
admin
June 24, 2017 at 3:57 amThanks for your valuable comment Krithika.
Yes, The smaller table would be available for data in case of partitioned or bucketing.
Navneet
June 23, 2017 at 2:37 amVery well explained… Thanks a lot
admin
June 24, 2017 at 3:26 amThanks Navneet. Feel free to give your valuable suggestion or ask any doubts.
Gupta
July 21, 2017 at 11:15 amExplained very well, Thanks!!!!
admin
July 23, 2017 at 5:53 pmThanks. Please feel free to give your valuable feedback and let us know in case you need any topic details.
Chowdary Naresh
August 6, 2017 at 8:06 pmLovely Explanation Thanks a lot :)…………..
admin
August 10, 2017 at 4:02 pmThanks Naresh.
gopi
September 25, 2017 at 7:01 amHi ,
Suppose i have tables like Student,Department,subject score and percentage tables. we can join student and department using Department id , student and subject with Student id in the same query. in this case MAPJOIN will give performance boost ??
admin
September 25, 2017 at 4:23 pmHello Gopi,
Yes it should reduce your execution time,I will replicate it in my env and will let you know. But you can try and if you see any challenge then let me know. Also you can try bucket join with the help of following link.
https://www.quora.com/What-is-a-map-join-and-a-bucket-join-in-Hive
https://community.hortonworks.com/questions/7334/bucket-map-join.html
Santosh Singh
February 7, 2018 at 4:56 amSat sri akal paaji,
When I am using map join operation in hive it is using reducer. I thinks there should no reducer needed.
select /*+MAPJOIN(country)*/ cricketers.cricketername, country.countryname from cricketers JOIN country on country.countrydialcode = cricketers.countrydialcode;
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=
In order to set a constant number of reducers:
set mapreduce.job.reduces=
Starting Job = job_1517969427404_0005, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1517969427404_0005/
Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1517969427404_0005
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 1
2018-02-06 20:08:15,616 Stage-1 map = 0%, reduce = 0%
2018-02-06 20:08:42,360 Stage-1 map = 33%, reduce = 0%, Cumulative CPU 2.48 sec
2018-02-06 20:08:43,487 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 5.12 sec
2018-02-06 20:09:06,591 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 7.53 sec
MapReduce Total cumulative CPU time: 7 seconds 530 msec
Ended Job = job_1517969427404_0005
MapReduce Jobs Launched:
Stage-Stage-1: Map: 2 Reduce: 1 Cumulative CPU: 7.53 sec HDFS Read: 13481 HDFS Write: 313 SUCCESS
Total MapReduce CPU Time Spent: 7 seconds 530 msec
OK
Deveilliers SouthAfrica
Steyn SouthAfrica
Steyn SouthAfrica
Cook England
Broad England
Pietersen England
Morgan England
Warne Australia
Hayden Australia
Fleming NewZealand
Mccullum NewZealand
styris NewZealand
Sehwag India
Zaheer India
Raina India
Yuvraj India
Dhoni India
Ganguly India
Dravid India
Sachin India
Time taken: 65.893 seconds, Fetched: 20 row(s)
Can you tell should I set any hive proppery here?
admin
February 7, 2018 at 7:54 amSat Sri Akal Santosh,
Yes you are right in map side join it should not use reducer, but in your case it is using 1 reducer so It seems somehow it is not going through mapside join. So can you please send me hive -e ‘set -v’ > env_output.txt file to check details.
Santosh Singh
February 7, 2018 at 10:31 amThanks for quick reply.
Appreciated !
Actually I am new in hive and using cloudera CDH 5.12. So can you tell me which file or output you need from me.
When I set following setting as ‘true’ then hive do not perform the join query.
set hive.auto.convert.join=true;
when I make it ‘false’ then perform the join query.
set hive.auto.convert.join=false;
Santosh Singh
February 10, 2018 at 12:46 pmCRUD ( create, read, update, and delete) property on HIVE ORC file format table work was not work for me. Its working for me now. Might it help for anyone.
What I did.
1. set these property in /usr/lib/hive/conf/hive-site.xml file
1.1 sudo vi /usr/lib/hive/conf/hive-site.xml
hive.support.concurrency
true
hive.txn.manager
org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
hive.compactor.initiator.on
true
hive.compactor.worker.threads
1
1.2 save this file as Esc+Shift +: wq
2. Restart the cloudera VM
3. update students_orc set city=’Bangalore’ where id=101;
4. Its updated the record.