Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,26 @@ public final class ContainerScanHelper {
private final ContainerController controller;
private final AbstractContainerScannerMetrics metrics;
private final long minScanGap;
private final int maxRetries;

public static ContainerScanHelper withoutScanGap(Logger log, ContainerController controller,
AbstractContainerScannerMetrics metrics) {
return new ContainerScanHelper(log, controller, metrics, 0);
AbstractContainerScannerMetrics metrics, ContainerScannerConfiguration conf) {
return new ContainerScanHelper(log, controller, metrics, 0, conf.getDataScanMaxRetries());
}

public static ContainerScanHelper withScanGap(Logger log, ContainerController controller,
AbstractContainerScannerMetrics metrics, ContainerScannerConfiguration conf) {
return new ContainerScanHelper(log, controller, metrics, conf.getContainerScanMinGap());
return new ContainerScanHelper(log, controller, metrics, conf.getContainerScanMinGap(),
conf.getDataScanMaxRetries());
}

private ContainerScanHelper(Logger log, ContainerController controller,
AbstractContainerScannerMetrics metrics, long minScanGap) {
AbstractContainerScannerMetrics metrics, long minScanGap, int maxRetries) {
this.log = log;
this.controller = controller;
this.metrics = metrics;
this.minScanGap = minScanGap;
this.maxRetries = maxRetries;
}

public void scanData(Container<?> container, DataTransferThrottler throttler, Canceler canceler)
Expand All @@ -65,7 +68,29 @@ public void scanData(Container<?> container, DataTransferThrottler throttler, Ca
ContainerData containerData = container.getContainerData();
long containerId = containerData.getContainerID();
logScanStart(containerData, "data");
DataScanResult result = container.scanData(throttler, canceler);

DataScanResult result = null;
int retryCount = 0;
boolean scanCompleted = false;

while (retryCount <= maxRetries && !scanCompleted) {
long initialChecksum = containerData.getDataChecksum();

result = container.scanData(throttler, canceler);

if (result.isDeleted() || initialChecksum == containerData.getDataChecksum()) {
scanCompleted = true;
} else {
if (retryCount >= maxRetries) {
log.warn("Container [{}] data checksum changed during the scan. Maximum retries ({}) exceeded. " +
"Skipping this scan.", containerId, maxRetries);
return;
}
log.info("Container [{}] data checksum changed during the scan. Rescanning. Retry count: {}/{}",
containerId, retryCount + 1, maxRetries);
retryCount++;
}
}

if (result.isDeleted()) {
log.debug("Container [{}] has been deleted during the data scan.", containerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,12 @@ public class ContainerScannerConfiguration {
"hdds.container.scrub.on.demand.volume.bytes.per.second";
public static final String CONTAINER_SCAN_MIN_GAP =
"hdds.container.scrub.min.gap";
public static final String DATA_SCAN_MAX_RETRIES_KEY =
"hdds.container.scrub.max.retries";

static final long CONTAINER_SCAN_MIN_GAP_DEFAULT =
Duration.ofMinutes(15).toMillis();
public static final int DATA_SCAN_MAX_RETRIES_DEFAULT = 2;

public static final long METADATA_SCAN_INTERVAL_DEFAULT =
Duration.ofHours(3).toMillis();
Expand Down Expand Up @@ -127,6 +130,16 @@ public class ContainerScannerConfiguration {
private long onDemandBandwidthPerVolume
= ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT;

@Config(key = "hdds.container.scrub.max.retries",
defaultValue = "2",
type = ConfigType.INT,
tags = { DATANODE },
description = "The maximum number of times to retry scanning a container"
+ " if its data checksum changes during the scan (for example, due"
+ " to concurrent reconciliation)."
)
private int dataScanMaxRetries = DATA_SCAN_MAX_RETRIES_DEFAULT;

@Config(key = "hdds.container.scrub.min.gap",
defaultValue = "15m",
type = ConfigType.TIME,
Expand Down Expand Up @@ -172,6 +185,13 @@ public void validate() {
onDemandBandwidthPerVolume, ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT);
onDemandBandwidthPerVolume = ON_DEMAND_BANDWIDTH_PER_VOLUME_DEFAULT;
}

if (dataScanMaxRetries < 0) {
LOG.warn("{} must be >= 0 and was set to {}. Defaulting to {}",
DATA_SCAN_MAX_RETRIES_KEY, dataScanMaxRetries,
DATA_SCAN_MAX_RETRIES_DEFAULT);
dataScanMaxRetries = DATA_SCAN_MAX_RETRIES_DEFAULT;
}
}

public void setEnabled(boolean enabled) {
Expand Down Expand Up @@ -221,4 +241,12 @@ public long getContainerScanMinGap() {
public void setContainerScanMinGap(long scanGap) {
containerScanMinGap = scanGap;
}

public int getDataScanMaxRetries() {
return dataScanMaxRetries;
}

public void setDataScanMaxRetries(int dataScanMaxRetries) {
this.dataScanMaxRetries = dataScanMaxRetries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public OnDemandContainerScanner(
scanExecutor = Executors.newSingleThreadExecutor();
containerRescheduleCheckSet = ConcurrentHashMap.newKeySet();
this.scannerHelper = ContainerScanHelper.withScanGap(LOG, controller, metrics, conf);
this.scannerHelperWithoutGap = ContainerScanHelper.withoutScanGap(LOG, controller, metrics);
this.scannerHelperWithoutGap = ContainerScanHelper.withoutScanGap(LOG, controller, metrics, conf);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,60 @@ public void iteratorIsOrderedByScanTime(ContainerLayoutVersion layout)
assertEquals(containerCount, containersToBeScanned);
}

@ContainerLayoutTestInfo.ContainerTest
public void testIteratorOrderWhenScanAborted(ContainerLayoutVersion layout)
throws StorageContainerException {
setLayoutVersion(layout);
HddsVolume vol = mockHddsVolume("uuid-1");
ContainerSet containerSet = newContainerSet();

// Create 3 containers
KeyValueContainerData data1 = new KeyValueContainerData(1, layout, (long) StorageUnit.GB.toBytes(5),
UUID.randomUUID().toString(), UUID.randomUUID().toString());
data1.setVolume(vol);
data1.setState(ContainerProtos.ContainerDataProto.State.CLOSED);
// c1 was scanned the farthest in the past.
data1.updateDataScanTime(Instant.now().minusMillis(10_000));
Container<?> c1 = new KeyValueContainer(data1, new OzoneConfiguration());
containerSet.addContainer(c1);

KeyValueContainerData data2 = new KeyValueContainerData(2, layout, (long) StorageUnit.GB.toBytes(5),
UUID.randomUUID().toString(), UUID.randomUUID().toString());
data2.setVolume(vol);
data2.setState(ContainerProtos.ContainerDataProto.State.CLOSED);
// c2 was scanned the second farthest
data2.updateDataScanTime(Instant.now().minusMillis(5_000));
Container<?> c2 = new KeyValueContainer(data2, new OzoneConfiguration());
containerSet.addContainer(c2);

KeyValueContainerData data3 = new KeyValueContainerData(3, layout, (long) StorageUnit.GB.toBytes(5),
UUID.randomUUID().toString(), UUID.randomUUID().toString());
data3.setVolume(vol);
data3.setState(ContainerProtos.ContainerDataProto.State.CLOSED);
// c3 was scanned the most recently.
data3.updateDataScanTime(Instant.now().minusMillis(1_000));
Container<?> c3 = new KeyValueContainer(data3, new OzoneConfiguration());
containerSet.addContainer(c3);

// Initial order should be c1, c2, c3 based on when they were last scanned.
Iterator<Container<?>> iter = containerSet.getContainerIterator(vol);
assertEquals(1, iter.next().getContainerData().getContainerID());
data1.updateDataScanTime(Instant.now());
assertEquals(2, iter.next().getContainerData().getContainerID());
// Simulate c2's scan being aborted, so do not update its timestamp.
// Despite c2's scan time not being updated, c3 should still be processed next and then the iterator should end.
assertEquals(3, iter.next().getContainerData().getContainerID());
data3.updateDataScanTime(Instant.now());
assertFalse(iter.hasNext());

// c2 should now come up first when we recreate the iterator since it was not marked as having a complete scan.
// Then 1 and 3 which were already scanned should come up.
Iterator<Container<?>> iter2 = containerSet.getContainerIterator(vol);
assertEquals(2, iter2.next().getContainerData().getContainerID());
assertEquals(1, iter2.next().getContainerData().getContainerID());
assertEquals(3, iter2.next().getContainerData().getContainerID());
}

@ContainerLayoutTestInfo.ContainerTest
public void testGetContainerReport(ContainerLayoutVersion layout)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,53 @@ public void testScanTimestampUpdated() throws Exception {
eq(deletedContainer.getContainerData().getContainerID()), any());
}

/**
* If the data checksum of a container changes from its initial value when the scan started, it means that
* reconciliation updated the container while the scan was running.
* The container scanner should redo the scan instead of persisting the potentially stale merkle tree it built.
*/
@Test
public void testContainerRescannedWhenChecksumChanges() throws Exception {
Container<?> rescanned = mockKeyValueContainer();
when(rescanned.scanMetaData()).thenReturn(getHealthyMetadataScanResult());
when(rescanned.scanData(any(DataTransferThrottler.class), any(Canceler.class)))
.thenReturn(getHealthyDataScanResult());
when(rescanned.getContainerData().getDataChecksum()).thenReturn(1L, 2L);

setContainers(rescanned, healthy);

scanner.runIteration();
verify(rescanned, times(2)).scanData(any(), any());
}

@Test
public void testContainerScanMaxRetries() throws Exception {
Container<?> rescanned = mockKeyValueContainer();
when(rescanned.scanMetaData()).thenReturn(getHealthyMetadataScanResult());
when(rescanned.scanData(any(DataTransferThrottler.class), any(Canceler.class)))
.thenReturn(getHealthyDataScanResult());
// Simulate checksum continuously changing
when(rescanned.getContainerData().getDataChecksum()).thenReturn(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L);

setContainers(rescanned, healthy);

scanner.runIteration();

// max retries is 2 by default, so it should be scanned 3 times (1 initial + 2 retries)
verify(rescanned, times(3)).scanData(any(), any());

// Check that timestamp is not updated when aborted
verify(controller, never())
.updateDataScanTimestamp(eq(rescanned.getContainerData().getContainerID()), any());

// Check that merkle tree is not written
verify(controller, never())
.updateContainerChecksum(eq(rescanned.getContainerData().getContainerID()), any());

// Check that container is not marked unhealthy
verifyContainerMarkedUnhealthy(rescanned, never());
}

@Test
@Override
public void testUnhealthyContainerRescanned() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.ozoneimpl;

import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.getHealthyDataScanResult;
import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.getHealthyMetadataScanResult;
import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.getUnhealthyDataScanResult;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -278,6 +279,55 @@ public void testShutdownDuringScan() throws Exception {
verifyContainerMarkedUnhealthy(healthy, never());
}

/**
* If the data checksum of a container changes from its initial value when the scan started, it means that
* reconciliation updated the container while the scan was running.
* The container scanner should redo the scan instead of persisting the potentially stale merkle tree it built.
*/
@Test
public void testContainerRescannedWhenChecksumChanges() throws Exception {
Container<?> rescanned = mockKeyValueContainer();
when(rescanned.scanMetaData()).thenReturn(getHealthyMetadataScanResult());
when(rescanned.scanData(any(DataTransferThrottler.class), any(Canceler.class)))
.thenReturn(getHealthyDataScanResult());
when(rescanned.getContainerData().getDataChecksum()).thenReturn(1L, 2L);

scanContainer(rescanned);
verify(rescanned, times(2)).scanData(any(), any());

// With a retry, the scan should still eventually succeed and update these values.
verify(controller, times(1))
.updateDataScanTimestamp(eq(rescanned.getContainerData().getContainerID()), any());
verify(controller, times(1))
.updateContainerChecksum(eq(rescanned.getContainerData().getContainerID()), any());
}

@Test
public void testContainerScanMaxRetries() throws Exception {
Container<?> rescanned = mockKeyValueContainer();
when(rescanned.scanMetaData()).thenReturn(getHealthyMetadataScanResult());
when(rescanned.scanData(any(DataTransferThrottler.class), any(Canceler.class)))
.thenReturn(getHealthyDataScanResult());
// Simulate checksum continuously changing
when(rescanned.getContainerData().getDataChecksum()).thenReturn(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L);

scanContainer(rescanned);

// max retries is 2 by default, so it should be scanned 3 times (1 initial + 2 retries)
verify(rescanned, times(3)).scanData(any(), any());

// Check that timestamp is not updated when aborted
verify(controller, never())
.updateDataScanTimestamp(eq(rescanned.getContainerData().getContainerID()), any());

// Check that merkle tree is not written
verify(controller, never())
.updateContainerChecksum(eq(rescanned.getContainerData().getContainerID()), any());

// Check that container is not marked unhealthy
verifyContainerMarkedUnhealthy(rescanned, never());
}

@Test
@Override
public void testUnhealthyContainerRescanned() throws Exception {
Expand Down
Loading