tables) throws Exception {
+ for (Table table : tables) {
+ LOG.info("Loading data into table: {}", table);
+ loadTable(table);
+ }
+ }
+
+ private void verifyRestoredTableDataIntegrity(TableName restoredTableName,
+ String originalTableChecksum, int expectedRowCount) throws Exception {
+ try (Table restoredTable = TEST_UTIL.getConnection().getTable(restoredTableName);
+ ResultScanner scanner = restoredTable.getScanner(new Scan())) {
+
+ // Verify the checksum for the original table (before it was truncated) matches the checksum
+ // of the restored table.
+ String restoredTableChecksum = TEST_UTIL.checksumRows(restoredTable);
+ assertEquals("The restored table's row checksum did not match the original table's checksum",
+ originalTableChecksum, restoredTableChecksum);
+
+ // Verify the data in the restored table is the same as when it was originally loaded
+ // into the table.
+ int count = 0;
+ for (Result result : scanner) {
+ // The data has a numerical match between its row key and value (such as rowLoad1 and
+ // value1)
+ // We can use this to ensure a row key has the expected value.
+ String rowKey = Bytes.toString(result.getRow());
+ int index = Integer.parseInt(rowKey.replace("rowLoad", ""));
+
+ // Verify the Value
+ byte[] actualValue = result.getValue(famName, qualName);
+ assertNotNull("Value missing for row key: " + rowKey, actualValue);
+ String expectedValue = "val" + index;
+ assertEquals("Value mismatch for row key: " + rowKey, expectedValue,
+ Bytes.toString(actualValue));
+
+ count++;
+ }
+ assertEquals(expectedRowCount, count);
+ }
+ }
}
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
index 54667752f01b..a1ce9c97a687 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
@@ -28,7 +28,6 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
@@ -37,21 +36,6 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
-/**
- * Integration-style tests for Point-in-Time Restore (PITR).
- *
- * These tests exercise the full backup / continuous backup / restore flow: - create backups at
- * multiple historical points in time (via {@code BackupDriver}) - exercise WAL-based
- * replication/continuous backup - validate Point-in-Time Restore behavior (successful restores,
- * failure cases)
- *
- *
- * NOTE: Some tests also create HFiles and perform HBase bulk-loads (HFile -> table) so the restore
- * flow is validated when bulk-loaded storefiles are present in WALs. This ensures the
- * BulkLoadCollector/BulkFilesCollector logic (discovering bulk-loaded store files referenced from
- * WAL bulk-load descriptors) is exercised by the test suite.
- *
- */
@Category(LargeTests.class)
public class TestPointInTimeRestore extends TestBackupBase {
@ClassRule
@@ -83,8 +67,8 @@ private static void setUpBackups() throws Exception {
// Simulate a backup taken 20 days ago
EnvironmentEdgeManager
.injectEdge(() -> System.currentTimeMillis() - 20 * ONE_DAY_IN_MILLISECONDS);
- // Insert initial data into table1
- PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000);
+ PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Insert initial data into
+ // table1
// Perform a full backup for table1 with continuous backup enabled
String[] args =
@@ -96,12 +80,6 @@ private static void setUpBackups() throws Exception {
EnvironmentEdgeManager
.injectEdge(() -> System.currentTimeMillis() - 15 * ONE_DAY_IN_MILLISECONDS);
PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Add more data to table1
-
- Path dir = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadByFamily");
- PITRTestUtil.generateHFiles(dir, TEST_UTIL.getConfiguration(), Bytes.toString(famName));
- PITRTestUtil.bulkLoadHFiles(table1, dir, TEST_UTIL.getConnection(),
- TEST_UTIL.getConfiguration());
-
PITRTestUtil.loadRandomData(TEST_UTIL, table2, famName, 500); // Insert data into table2
PITRTestUtil.waitForReplication(); // Ensure replication is complete
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupAdminImpl.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupAdminImpl.java
index 3d0e3c008583..b78d44c144b5 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupAdminImpl.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupAdminImpl.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.backup.impl;
-import static org.apache.hadoop.hbase.backup.BackupInfo.withRoot;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -267,8 +266,7 @@ public void testGetAffectedBackupSessions() throws IOException {
BackupInfo b3 = createBackupInfo("backup_003", 3000L, BackupType.INCREMENTAL, table);
BackupInfo b4 = createBackupInfo("backup_004", 4000L, BackupType.INCREMENTAL, table);
- when(mockTable.getBackupHistory(withRoot("/backup/root")))
- .thenReturn(List.of(b4, b3, b2, b1, b0));
+ when(mockTable.getBackupHistory("/backup/root")).thenReturn(List.of(b4, b3, b2, b1, b0));
List result = backupAdminImpl.getAffectedBackupSessions(current, table, mockTable);
@@ -295,7 +293,7 @@ public void testGetAffectedBackupSessions_resetsOnFullBackup() throws IOExceptio
BackupInfo b2 = createBackupInfo("backup_002", 2000L, BackupType.FULL, table);
BackupInfo b3 = createBackupInfo("backup_003", 3000L, BackupType.INCREMENTAL, table);
- when(mockTable.getBackupHistory(withRoot("/backup/root"))).thenReturn(List.of(b3, b2, b1, b0));
+ when(mockTable.getBackupHistory("/backup/root")).thenReturn(List.of(b3, b2, b1, b0));
List result = backupAdminImpl.getAffectedBackupSessions(current, table, mockTable);
@@ -322,8 +320,7 @@ public void testGetAffectedBackupSessions_skipsNonMatchingTable() throws IOExcep
TableName.valueOf("other_table"));
BackupInfo b4 = createBackupInfo("backup_004", 4000L, BackupType.INCREMENTAL, table);
- when(mockTable.getBackupHistory(withRoot("/backup/root")))
- .thenReturn(List.of(b4, b3, b2, b1, b0));
+ when(mockTable.getBackupHistory("/backup/root")).thenReturn(List.of(b4, b3, b2, b1, b0));
List result = backupAdminImpl.getAffectedBackupSessions(current, table, mockTable);
@@ -352,8 +349,7 @@ public void testGetAffectedBackupSessions_ignoresFullBackupOfOtherTable() throws
BackupInfo b3 = createBackupInfo("backup_003", 3000L, BackupType.INCREMENTAL, table);
BackupInfo b4 = createBackupInfo("backup_004", 4000L, BackupType.INCREMENTAL, table);
- when(mockTable.getBackupHistory(withRoot("/backup/root")))
- .thenReturn(List.of(b4, b3, b2, b1, b0));
+ when(mockTable.getBackupHistory("/backup/root")).thenReturn(List.of(b4, b3, b2, b1, b0));
List result = backupAdminImpl.getAffectedBackupSessions(current, table, mockTable);
@@ -637,7 +633,8 @@ public void testCheckIfValidForMerge_validCase() throws IOException {
BackupSystemTable table = mock(BackupSystemTable.class);
when(table.readBackupInfo("b1")).thenReturn(b1);
when(table.readBackupInfo("b2")).thenReturn(b2);
- when(table.getBackupHistory(any())).thenReturn(List.of(b1, b2));
+ when(table.getBackupHistory(eq(-1), any(), any(), any(), any(), any()))
+ .thenReturn(List.of(b1, b2));
new BackupAdminImpl(mock(Connection.class)).checkIfValidForMerge(ids, table);
}
@@ -731,7 +728,8 @@ public void testCheckIfValidForMerge_failsWhenHoleInImages() throws IOException
when(table.readBackupInfo("b2")).thenReturn(b2);
when(table.readBackupInfo("b3")).thenReturn(b3);
- when(table.getBackupHistory(any())).thenReturn(List.of(b1, b2, b3));
+ when(table.getBackupHistory(eq(-1), any(), any(), any(), any(), any()))
+ .thenReturn(List.of(b1, b2, b3));
// Simulate a "hole" by omitting b2 from images
String[] idsWithHole = { "b1", "b3" };
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
index fdcb637dbfe2..e5da7754b00d 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
@@ -17,13 +17,12 @@
*/
package org.apache.hadoop.hbase.backup.impl;
-import static org.apache.hadoop.hbase.backup.BackupInfo.withState;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.logDirectoryStructure;
import static org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.setupBackupFolders;
+import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
+import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
-import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR;
-import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -86,7 +85,7 @@ public void testDetermineWALCleanupCutoffTimeOfCleanupCommand() throws IOExcepti
// Ordered as newest to oldest, will be reversed in the method
List backupInfos = List.of(full2, inc, full1);
- when(sysTable.getBackupHistory(withState(BackupInfo.BackupState.COMPLETE)))
+ when(sysTable.getBackupInfos(BackupInfo.BackupState.COMPLETE))
.thenReturn(new ArrayList<>(backupInfos));
// WHEN
@@ -136,7 +135,7 @@ public void testDeleteOldWALFilesOfCleanupCommand() throws IOException {
fs.mkdirs(backupWalDir);
long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
- setupBackupFolders(fs, backupWalDir, currentTime); // Create 5 days of WAL/bulkload-files folder
+ setupBackupFolders(fs, backupWalDir, currentTime); // Create 5 days of WAL/bulk folders
logDirectoryStructure(fs, backupWalDir, "Before cleanup:");
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
index 78991a463da1..438543e4a805 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
@@ -19,6 +19,8 @@
import static org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
+import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
+import static org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_PEER_UUID;
@@ -27,20 +29,12 @@
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.WAL_FILE_PREFIX;
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.copyWithCleanup;
-import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR;
-import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
import java.io.IOException;
import java.text.SimpleDateFormat;
@@ -55,17 +49,14 @@
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.backup.util.BackupFileSystemManager;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -89,7 +80,6 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.mockito.MockedStatic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -554,12 +544,6 @@ private void deleteTable(TableName tableName) throws IOException {
private void addReplicationPeer(String peerId, Path backupRootDir,
Map> tableMap) throws IOException {
- addReplicationPeer(peerId, backupRootDir, tableMap, replicationEndpoint);
- }
-
- private void addReplicationPeer(String peerId, Path backupRootDir,
- Map> tableMap, String customReplicationEndpointImpl)
- throws IOException {
Map additionalArgs = new HashMap<>();
additionalArgs.put(CONF_PEER_UUID, UUID.randomUUID().toString());
additionalArgs.put(CONF_BACKUP_ROOT_DIR, backupRootDir.toString());
@@ -568,7 +552,7 @@ private void addReplicationPeer(String peerId, Path backupRootDir,
additionalArgs.put(CONF_STAGED_WAL_FLUSH_INTERVAL, "10");
ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
- .setReplicationEndpointImpl(customReplicationEndpointImpl).setReplicateAllUserTables(false)
+ .setReplicationEndpointImpl(replicationEndpoint).setReplicateAllUserTables(false)
.setTableCFsMap(tableMap).putAllConfiguration(additionalArgs).build();
admin.addReplicationPeer(peerId, peerConfig);
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 0e81c95677c3..87241d437cd8 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -161,8 +161,9 @@ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix)
"hbase.bulkload.locality.sensitive.enabled";
private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name";
- static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
+ public static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
"hbase.mapreduce.use.multi.table.hfileoutputformat";
+ public static final boolean MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_DEFAULT = true;
/**
* ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 4c0b12ef7333..cf4397d1052f 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.mapreduce;
+import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_DEFAULT;
+import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY;
+
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -378,14 +381,35 @@ public Job createSubmittableJob(String[] args) throws IOException {
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputValueClass(MapReduceExtendedCell.class);
- try (Connection conn = ConnectionFactory.createConnection(conf);) {
- List tableInfoList = new ArrayList();
- for (TableName tableName : tableNames) {
+ try (Connection conn = ConnectionFactory.createConnection(conf)) {
+ if (
+ conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
+ MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_DEFAULT)
+ ) {
+ // The HFiles will be output to something like this for each table:
+ // .../BULK_OUTPUT_CONF_KEY/namespace/table/columnFamily
+ List tableInfoList = new ArrayList();
+ for (TableName tableName : tableNames) {
+ Table table = conn.getTable(tableName);
+ RegionLocator regionLocator = getRegionLocator(tableName, conf, conn);
+ tableInfoList.add(new TableInfo(table.getDescriptor(), regionLocator));
+ }
+ MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfoList);
+ } else {
+ // The HFiles will be output to something like: .../BULK_OUTPUT_CONF_KEY/columnFamily
+ // This is useful for scenarios where we are running the WALPlayer consecutively on just
+ // one table at a time, and BULK_OUTPUT_CONF_KEY is already set to a "namespace/table"
+ // directory path for each table.
+ if (tableNames.size() != 1) {
+ throw new IOException("Expected table names list to have only one table since "
+ + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY + " is set to false. Got the following "
+ + "list of tables instead: " + tableNames);
+ }
+ TableName tableName = tableNames.get(0);
Table table = conn.getTable(tableName);
RegionLocator regionLocator = getRegionLocator(tableName, conf, conn);
- tableInfoList.add(new TableInfo(table.getDescriptor(), regionLocator));
+ HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
}
- MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfoList);
}
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
index bbadabab69bf..26a38f5b5367 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.mapreduce;
+import static org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
@@ -34,6 +35,7 @@
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
@@ -46,6 +48,7 @@
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
@@ -90,13 +93,20 @@ public class TestWALPlayer {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALPlayer.class);
+ private static final byte[] FAMILY = Bytes.toBytes("family");
+ private static final byte[] COLUMN1 = Bytes.toBytes("c1");
+ private static final byte[] COLUMN2 = Bytes.toBytes("c2");
+ private static final byte[] ROW = Bytes.toBytes("row");
+
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static SingleProcessHBaseCluster cluster;
private static Path rootDir;
private static Path walRootDir;
- private static FileSystem fs;
+ private static FileSystem localFs;
private static FileSystem logFs;
private static Configuration conf;
+ private static FileSystem hdfs;
+ private static String bulkLoadOutputDir;
@Rule
public TestName name = new TestName();
@@ -106,15 +116,18 @@ public static void beforeClass() throws Exception {
conf = TEST_UTIL.getConfiguration();
rootDir = TEST_UTIL.createRootDir();
walRootDir = TEST_UTIL.createWALRootDir();
- fs = CommonFSUtils.getRootDirFileSystem(conf);
+ localFs = CommonFSUtils.getRootDirFileSystem(conf);
logFs = CommonFSUtils.getWALFileSystem(conf);
cluster = TEST_UTIL.startMiniCluster();
+ hdfs = TEST_UTIL.getTestFileSystem();
+ bulkLoadOutputDir = new Path(new Path(TEST_UTIL.getConfiguration().get("fs.defaultFS")),
+ Path.SEPARATOR + "bulkLoadOutput").toString();
}
@AfterClass
public static void afterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
- fs.delete(rootDir, true);
+ localFs.delete(rootDir, true);
logFs.delete(walRootDir, true);
}
@@ -235,18 +248,11 @@ public void testWALPlayerBulkLoadWithOverriddenTimestamps() throws Exception {
public void testWALPlayer() throws Exception {
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
- final byte[] FAMILY = Bytes.toBytes("family");
- final byte[] COLUMN1 = Bytes.toBytes("c1");
- final byte[] COLUMN2 = Bytes.toBytes("c2");
- final byte[] ROW = Bytes.toBytes("row");
Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);
- // put a row into the first table
- Put p = new Put(ROW);
- p.addColumn(FAMILY, COLUMN1, COLUMN1);
- p.addColumn(FAMILY, COLUMN2, COLUMN2);
- t1.put(p);
+ putRowIntoTable(t1);
+
// delete one column
Delete d = new Delete(ROW);
d.addColumns(FAMILY, COLUMN1);
@@ -411,6 +417,109 @@ public void testFailOnEmptyWALFilesWhenNotIgnored() throws Exception {
assertNotEquals("WALPlayer should fail on empty files when not ignored", 0, exitCode);
}
+ /**
+ * Verifies the HFile output format for WALPlayer has the following directory structure when
+ * hbase.mapreduce.use.multi.table.hfileoutputformat is set to true:
+ * .../BULK_OUTPUT_CONF_KEY/namespace/tableName/columnFamily
+ */
+ @Test
+ public void testWALPlayerMultiTableHFileOutputFormat() throws Exception {
+ String namespace = "ns_" + name.getMethodName();
+ TEST_UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build());
+ final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
+ final TableName tableName2 = TableName.valueOf(namespace, name.getMethodName() + "2");
+ Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
+ Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);
+
+ putRowIntoTable(t1);
+ putRowIntoTable(t2);
+
+ Configuration multiTableOutputConf = new Configuration(conf);
+ setConfSimilarToIncrementalBackupWALToHFilesMethod(multiTableOutputConf);
+
+ // We are testing this config variable's effect on HFile output for the WALPlayer
+ multiTableOutputConf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
+
+ WALPlayer player = new WALPlayer(multiTableOutputConf);
+ String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(),
+ HConstants.HREGION_LOGDIR_NAME).toString();
+ String tables = tableName1.getNameAsString() + "," + tableName2.getNameAsString();
+
+ ToolRunner.run(multiTableOutputConf, player, new String[] { walInputDir, tables });
+
+ assertMultiTableOutputFormatDirStructure(tableName1, "default");
+ assertMultiTableOutputFormatDirStructure(tableName2, namespace);
+
+ hdfs.delete(new Path(bulkLoadOutputDir), true);
+ }
+
+ /**
+ * Verifies the HFile output format for WALPlayer has the following directory structure when
+ * hbase.mapreduce.use.multi.table.hfileoutputformat is set to false:
+ * .../BULK_OUTPUT_CONF_KEY/columnFamily. Also verifies an exception occurs when the WALPlayer is
+ * run on multiple tables at once while hbase.mapreduce.use.multi.table.hfileoutputformat is set
+ * to false.
+ */
+ @Test
+ public void testWALPlayerSingleTableHFileOutputFormat() throws Exception {
+ String namespace = "ns_" + name.getMethodName();
+ TEST_UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build());
+ final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
+ final TableName tableName2 = TableName.valueOf(namespace, name.getMethodName() + "2");
+ Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
+ Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);
+
+ putRowIntoTable(t1);
+ putRowIntoTable(t2);
+
+ String bulkLoadOutputDir = new Path(new Path(TEST_UTIL.getConfiguration().get("fs.defaultFS")),
+ Path.SEPARATOR + "bulkLoadOutput").toString();
+
+ Configuration singleTableOutputConf = new Configuration(conf);
+ setConfSimilarToIncrementalBackupWALToHFilesMethod(singleTableOutputConf);
+
+ // We are testing this config variable's effect on HFile output for the WALPlayer
+ singleTableOutputConf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
+
+ WALPlayer player = new WALPlayer(singleTableOutputConf);
+
+ String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(),
+ HConstants.HREGION_LOGDIR_NAME).toString();
+ String tables = tableName1.getNameAsString() + "," + tableName2.getNameAsString();
+
+ // Expecting a failure here since we are running WALPlayer on multiple tables even though the
+ // multi-table HFile output format is disabled
+ try {
+ ToolRunner.run(singleTableOutputConf, player, new String[] { walInputDir, tables });
+ fail("Expected a failure to occur due to using WALPlayer with multiple tables while having "
+ + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY + " set to false");
+ } catch (IOException e) {
+ String expectedMsg = "Expected table names list to have only one table since "
+ + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY + " is set to false. Got the following "
+ + "list of tables instead: [testWALPlayerSingleTableHFileOutputFormat1, " + namespace
+ + ":testWALPlayerSingleTableHFileOutputFormat2]";
+ assertTrue(e.getMessage().contains(expectedMsg));
+ }
+
+ // Successfully run WALPlayer on just one table while having multi-table HFile output format
+ // disabled
+ ToolRunner.run(singleTableOutputConf, player,
+ new String[] { walInputDir, tableName1.getNameAsString() });
+
+ Path bulkLoadOutputDirForTable = new Path(bulkLoadOutputDir, "family");
+ assertTrue("Expected path to exist: " + bulkLoadOutputDirForTable,
+ hdfs.exists(bulkLoadOutputDirForTable));
+
+ hdfs.delete(new Path(bulkLoadOutputDir), true);
+ }
+
+ private void putRowIntoTable(Table table) throws IOException {
+ Put p = new Put(ROW);
+ p.addColumn(FAMILY, COLUMN1, COLUMN1);
+ p.addColumn(FAMILY, COLUMN2, COLUMN2);
+ table.put(p);
+ }
+
private Path createEmptyWALFile(String walDir) throws IOException {
FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
Path inputDir = new Path("/" + walDir);
@@ -422,4 +531,22 @@ private Path createEmptyWALFile(String walDir) throws IOException {
return inputDir;
}
+
+ private void setConfSimilarToIncrementalBackupWALToHFilesMethod(Configuration conf) {
+ conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkLoadOutputDir);
+ conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
+ conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
+ conf.set("mapreduce.job.name", name.getMethodName() + "-" + System.currentTimeMillis());
+ conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
+ }
+
+ private void assertMultiTableOutputFormatDirStructure(TableName tableName, String namespace)
+ throws IOException {
+ Path qualifierAndFamilyDir =
+ new Path(tableName.getQualifierAsString(), new String(FAMILY, StandardCharsets.UTF_8));
+ Path namespaceQualifierFamilyDir = new Path(namespace, qualifierAndFamilyDir);
+ Path bulkLoadOutputDirForTable = new Path(bulkLoadOutputDir, namespaceQualifierFamilyDir);
+ assertTrue("Expected path to exist: " + bulkLoadOutputDirForTable,
+ hdfs.exists(bulkLoadOutputDirForTable));
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 75314c48bd28..73421dced454 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -866,10 +866,6 @@ public long getTotalReplicatedEdits() {
return totalReplicatedEdits.get();
}
- long getSleepForRetries() {
- return sleepForRetries;
- }
-
@Override
public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch, ReplicationResult replicated) {
String walName = entryBatch.getLastWalPath().getName();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 4c695634517b..a6ecc95d8aeb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -162,15 +162,7 @@ void shipEdits(WALEntryBatch entryBatch) {
List entries = entryBatch.getWalEntries();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
- /*
- * Delegate to the endpoint to decide how to treat empty entry batches. In most replication
- * flows, receiving an empty entry batch means that everything so far has been successfully
- * replicated and committed — so it's safe to mark the WAL position as committed (COMMIT).
- * However, some endpoints (e.g., asynchronous S3 backups) may buffer writes and delay actual
- * persistence. In such cases, we must avoid committing the WAL position prematurely.
- */
- final ReplicationResult result = getReplicationResult();
- updateLogPosition(entryBatch, result);
+ updateLogPosition(entryBatch, ReplicationResult.COMMITTED);
return;
}
int currentSize = (int) entryBatch.getHeapSize();
@@ -277,11 +269,7 @@ private void cleanUpHFileRefs(WALEdit edit) throws IOException {
}
}
- @RestrictedApi(
- explanation = "Package-private for test visibility only. Do not use outside tests.",
- link = "",
- allowedOnPath = "(.*/src/test/.*|.*/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java)")
- boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replicated) {
+ private boolean updateLogPosition(WALEntryBatch batch, ReplicationResult replicated) {
boolean updated = false;
// if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
// record on zk, so let's call it. The last wal position maybe zero if end of file is true and