Hive Cross Cluster replication
Category : Bigdata
Hive Cross-Cluster Replication
Here I tried to explain cross-Cluster Replication with a Feed entity. This is a simple way to enforce Disaster Recovery policies or aggregate data from multiple clusters to a single cluster for enterprise reporting. To further illustrate Apache Falcon’s capabilities, we will use an HCatalog/Hive table as the Feed entity.
Step 1: First create databases/tables on source and target clusters:
— Run on primary cluster
create database landing_db;
use landing_db;
CREATE TABLE summary_table(id int, value string) PARTITIONED BY (ds string);
ALTER TABLE summary_table ADD PARTITION (ds = ‘2014-01’);
ALTER TABLE summary_table ADD PARTITION (ds = ‘2014-02’);
ALTER TABLE summary_table ADD PARTITION (ds = ‘2014-03’);
insert into summary_table PARTITION(ds) values (1,’abc1′,”2014-01″);
insert into summary_table PARTITION(ds) values (2,’abc2′,”2014-02″);
insert into summary_table PARTITION(ds) values (3,’abc3′,”2014-03″);
— Run on secondary cluster
create database archive_db;
use archive_db;
CREATE TABLE summary_archive_table(id int, value string) PARTITIONED BY (ds string);
Step 2: Now create falcon staging and working directories on both clusters:
hadoop fs -mkdir /apps/falcon/staging
hadoop fs -mkdir /apps/falcon/working
hadoop fs -chown falcon /apps/falcon/staging
hadoop fs -chown falcon /apps/falcon/working
hadoop fs -chmod 777 /apps/falcon/staging
hadoop fs -chmod 755 /apps/falcon/working
Step 3: Configure your source and target cluster for Distcp in NN High Availability.
http://www.hadoopadmin.co.in/bigdata/distcp-between-high-availability-enabled-cluster/
In order to distcp between two HDFS HA cluster (for example A and B),modify the following in the hdfs-site.xml for both clusters:
For example, nameservice for cluster A and B is HAA and HAB respectively.
– Add value to the nameservice for both clusters
dfs.nameservices = CLUSTERAHA,CLUSTERBHA
– Add property dfs.internal.nameservices
In cluster A:
dfs.internal.nameservices =CLUSTERAHA
In cluster B:
dfs.internal.nameservices =CLUSTERBHA
– Add dfs.ha.namenodes.<nameservice>
In cluster A
dfs.ha.namenodes.CLUSTERBHA = nn1,nn2
In cluster B
dfs.ha.namenodes.CLUSTERAHA = nn1,nn2
– Add property dfs.namenode.rpc-address.<cluster>.<nn>
In cluster A
dfs.namenode.rpc-address.CLUSTERBHA.nn1 =server1:8020
dfs.namenode.rpc-address.CLUSTERBHA.nn2 =server2:8020
In cluster B
dfs.namenode.rpc-address.CLUSTERAHA.nn1 =server1:8020
dfs.namenode.rpc-address.CLUSTERAHA.nn2 =server2:8020
– Add property dfs.client.failover.proxy.provider.<cluster – i.e HAA or HAB>
In cluster A
dfs.client.failover.proxy.provider.CLUSTERBHA = org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
In cluster B
dfs.client.failover.proxy.provider.CLUSTERAHA = org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
– Restart HDFS service.
Once complete you will be able to run the distcp command using the nameservice similar to:
hadoop distcp hdfs://falconG/tmp/testDistcp hdfs://falconE/tmp/
hadoop distcp hdfs://CLUSTERAHA/user/s0998dnz/input.txt hdfs://CLUSTERBHA/tmp/
Step 4: Now create cluster entities and submit them like below sample cluster definition for source and target cluster.
[s0998dnz@server1 hiveReplication]$ ll
total 24
-rw-r–r– 1 s0998dnz hdpadm 1031 Jun 15 06:43 cluster1.xml
-rw-r–r– 1 s0998dnz hdpadm 1030 Jun 15 05:11 cluster2.xml
-rw-r–r– 1 s0998dnz hdpadm 1141 Jun 1 05:44 destinationCluster.xml
-rw-r–r– 1 s0998dnz hdpadm 794 Jun 15 05:05 feed.xml
-rw-r–r– 1 s0998dnz hdpadm 1114 Jun 1 06:36 replication-feed.xml
-rw-r–r– 1 s0998dnz hdpadm 1080 Jun 15 05:07 sourceCluster.xml
[s0998dnz@server1 hiveReplication]$ cat cluster1.xml
<?xml version=”1.0″ encoding=”UTF-8″ standalone=”yes”?>
<cluster name=”source” description=”primary” colo=”primary” xmlns=”uri:falcon:cluster:0.1″>
<tags>EntityType=Cluster</tags>
<interfaces>
<interface type=”readonly” endpoint=”hdfs://CLUSTERAHA” version=”2.2.0″/>
<interface type=”write” endpoint=”hdfs://CLUSTERAHA” version=”2.2.0″/>
<interface type=”execute” endpoint=”server2:8050″ version=”2.2.0″/>
<interface type=”workflow” endpoint=”http://server1:11000/oozie/” version=”4.0.0″/>
<interface type=”messaging” endpoint=”tcp://server2:61616?daemon=true” version=”5.1.6″/>
<interface type=”registry” endpoint=”thrift://server2:9083″ version=”1.2.1″ />
</interfaces>
<locations>
<location name=”staging” path=”/apps/falcon/staging”/>
<location name=”temp” path=”/tmp”/>
<location name=”working” path=”/apps/falcon/working”/>
</locations>
</cluster>
[s0998dnz@server2 hiveReplication]$ cat cluster2.xml
<?xml version=”1.0″ encoding=”UTF-8″ standalone=”yes”?>
<cluster name=”target” description=”target” colo=”backup” xmlns=”uri:falcon:cluster:0.1″>
<tags>EntityType=Cluster</tags>
<interfaces>
<interface type=”readonly” endpoint=”hdfs://CLUSTERBHA” version=”2.2.0″/>
<interface type=”write” endpoint=”hdfs://CLUSTERBHA” version=”2.2.0″/>
<interface type=”execute” endpoint=”server2:8050″ version=”2.2.0″/>
<interface type=”workflow” endpoint=”http://server2:11000/oozie/” version=”4.0.0″/>
<interface type=”messaging” endpoint=”tcp://server2:61616?daemon=true” version=”5.1.6″/>
<interface type=”registry” endpoint=”thrift://server2:9083″ version=”1.2.1″ />
</interfaces>
<locations>
<location name=”staging” path=”/apps/falcon/staging”/>
<location name=”temp” path=”/tmp”/>
<location name=”working” path=”/apps/falcon/working”/>
</locations>
</cluster>
falcon entity -type cluster -submit -file cluster1.xml
falcon entity -type cluster -submit -file cluster2.xml
Step 5: Copy updated configuration files (/etc/hadoop/conf/*) from source cluster to target’s server.
zip -r sourceClusterConf1.zip /etc/hadoop/conf/
scp sourceClusterConf1.zip s0998dnz@server1:/home/s0998dnz/
Step 6: At target oozie server run following command.
mkdir -p /hdptmp/hadoop_primary/conf
chmod 777 /hdptmp/hadoop_primary/conf
unzip sourceClusterConf1.zip
cp etc/hadoop/conf/* /hdptmp/hadoop_primary/conf/
cp -r etc/hadoop/conf/* /hdptmp/hadoop_primary/conf/
Step 7: Modify below property in your target cluster’s oozie once you have copied the configuration.
<name>oozie.service.HadoopAccessorService.hadoop.configurations</name>
<value>*={{hadoop_conf_dir}},server2:8050=/hdptmp/hadoop_primary/conf,server1:8050=/hdptmp/hadoop_primary/conf,server1:8020=/hdptmp/hadoop_primary/conf,server2:8020=/hdptmp /hadoop_primary/conf</value>
Note : You can change /hdptmp/hadoop_primary/conf to directory of your choice. However oozie should have access to the path.
Step 8: Finally submit and schedule the feed definition using attached feed.xml file.
[s0998dnz@server1 hiveReplication]$ cat feed.xml
<?xml version=”1.0″ encoding=”UTF-8″?>
<feed description=”Monthly Analytics Summary” name=”replication-feed” xmlns=”uri:falcon:feed:0.1″>
<tags>EntityType=Feed</tags>
<frequency>months(1)</frequency>
<clusters>
<cluster name=”source” type=”source”>
<validity start=”2014-01-01T00:00Z” end=”2015-03-31T00:00Z”/>
<retention limit=”months(36)” action=”delete”/>
</cluster>
<cluster name=”target” type=”target”>
<validity start=”2014-01-01T00:00Z” end=”2016-03-31T00:00Z”/>
<retention limit=”months(180)” action=”delete”/>
<table uri=”catalog:archive_db:summary_archive_table#ds=${YEAR}-${MONTH}” />
</cluster>
</clusters>
<table uri=”catalog:landing_db:summary_table#ds=${YEAR}-${MONTH}” />
<ACL owner=”falcon” />
<schema location=”hcat” provider=”hcat”/>
</feed>
falcon entity -type feed -submit -file feed.xml
falcon entity -type feed -schedule -name replication-feed
This feed has been scheduled from 2014-01 so insert below values in your source table.