Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,31 @@ public class ScmConfig extends ReconfigurableConfig {
)
private int transactionToDNsCommitMapLimit = 5000000;

@Config(key = "hdds.scm.container.pending-allocation.roll-interval",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can have config as hdds.scm.container.pending.allocation.roll.interval

defaultValue = "10m",
type = ConfigType.TIME,
tags = { ConfigTag.SCM, ConfigTag.CONTAINER },
description =
"Time interval for rolling the pending container allocation window. " +
"Pending container allocations are tracked in a two-window tumbling bucket " +
"pattern. Each window has this duration. " +
"After 2x this interval, allocations that haven't been confirmed via " +
"container reports will automatically age out. Default is 10 minutes."
)
private Duration pendingContainerAllocationRollInterval = Duration.ofMinutes(10);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rolling period is 5 min or 10 min ? mean previous bucket to be there for additional 5 min to capture containerList IN-Progress


public int getTransactionToDNsCommitMapLimit() {
return transactionToDNsCommitMapLimit;
}

public Duration getPendingContainerAllocationRollInterval() {
return pendingContainerAllocationRollInterval;
}

public void setPendingContainerAllocationRollInterval(Duration duration) {
this.pendingContainerAllocationRollInterval = duration;
}

public Duration getBlockDeletionInterval() {
return blockDeletionInterval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,26 @@
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics;
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.container.common.volume.VolumeUsage;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -81,6 +87,8 @@ public class ContainerManagerImpl implements ContainerManager {

private final long maxContainerSize;

private final PendingContainerTracker pendingContainerTracker;

/**
*
*/
Expand Down Expand Up @@ -109,7 +117,19 @@ public ContainerManagerImpl(
maxContainerSize = (long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);

// Get pending container roll interval from configuration
OzoneConfiguration ozoneConf = (conf instanceof OzoneConfiguration)
? (OzoneConfiguration) conf
: new OzoneConfiguration(conf);
ScmConfig scmConfig = ozoneConf.getObject(ScmConfig.class);
long rollIntervalMs = scmConfig.getPendingContainerAllocationRollInterval().toMillis();

this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create();
this.pendingContainerTracker = new PendingContainerTracker(
maxContainerSize, rollIntervalMs, scmContainerManagerMetrics);

LOG.info("Container allocation pending tracker initialized with maxContainerSize={}B, rollInterval={}ms",
maxContainerSize, rollIntervalMs);
}

@Override
Expand Down Expand Up @@ -242,12 +262,75 @@ private ContainerInfo createContainer(Pipeline pipeline, String owner)
return containerInfo;
}

/**
* Check if a pipeline has sufficient space after considering pending allocations.
* Tracks containers scheduled but not yet written to DataNodes, preventing over-allocation.
*
* @param pipeline The pipeline to check
* @return true if sufficient space is available, false otherwise
*/
private boolean hasSpaceAfterPendingAllocations(Pipeline pipeline) {
try {
for (DatanodeDetails node : pipeline.getNodes()) {
// Get DN's storage statistics
DatanodeInfo datanodeInfo = pipelineManager.getDatanodeInfo(node);
if (datanodeInfo == null) {
LOG.warn("DatanodeInfo not found for node {}", node.getUuidString());
return false;
}

List<StorageReportProto> storageReports = datanodeInfo.getStorageReports();
if (storageReports == null || storageReports.isEmpty()) {
LOG.warn("No storage reports for node {}", node.getUuidString());
return false;
}

// Calculate total capacity and effective allocatable space
// For each disk, calculate how many containers can actually fit,
// since containers are written to individual disks, not spread across them.
// Example: disk1=9GB, disk2=14GB with 5GB containers
// (1*5GB) + (2*5GB) = 15GB → actually 3 containers
long totalCapacity = 0L;
long effectiveAllocatableSpace = 0L;
for (StorageReportProto report : storageReports) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of calcuating all available and then removing, we can do progressive base, like,
required=pending+newAllocation
for each report
required = required - volumeUsage in roundoff value
if (required <= 0)
return true

But we need to reserve also, can do first add and check, if not present, remove containerId

OR other way,
when DN report storage handling, total consolidate value can also be added to memory to avoid looping on every call.

totalCapacity += report.getCapacity();
long usableSpace = VolumeUsage.getUsableSpace(report);
// Calculate how many containers can fit on this disk
long containersOnThisDisk = usableSpace / maxContainerSize;
// Add effective space (containers that fit * container size)
effectiveAllocatableSpace += containersOnThisDisk * maxContainerSize;
}

// Get pending allocations from tracker
long pendingAllocations = pendingContainerTracker.getPendingAllocationSize(node);

// Calculate effective remaining space after pending allocations
long effectiveRemaining = effectiveAllocatableSpace - pendingAllocations;

// Check if there's enough space for a new container
if (effectiveRemaining < maxContainerSize) {
LOG.info("Node {} insufficient space: capacity={}, effective allocatable={}, " +
"pending allocations={}, effective remaining={}, required={}",
node.getUuidString(), totalCapacity, effectiveAllocatableSpace,
pendingAllocations, effectiveRemaining, maxContainerSize);
return false;
}
}

return true;
} catch (Exception e) {
LOG.warn("Error checking space for pipeline {}", pipeline.getId(), e);
return true;
}
}

private ContainerInfo allocateContainer(final Pipeline pipeline,
final String owner)
throws IOException {
if (!pipelineManager.hasEnoughSpace(pipeline, maxContainerSize)) {
LOG.debug("Cannot allocate a new container because pipeline {} does not have the required space {}.",
pipeline, maxContainerSize);
// Check if pipeline has sufficient space after considering recent allocations
if (!hasSpaceAfterPendingAllocations(pipeline)) {
LOG.warn("Pipeline {} does not have sufficient space after considering recent allocations",
pipeline.getId());
return null;
}

Expand Down Expand Up @@ -278,6 +361,11 @@ private ContainerInfo allocateContainer(final Pipeline pipeline,

containerStateManager.addContainer(containerInfoBuilder.build());
scmContainerManagerMetrics.incNumSuccessfulCreateContainers();
// Record pending allocation - tracks containers scheduled but not yet written
pendingContainerTracker.recordPendingAllocation(pipeline, containerID);
LOG.debug("Allocated container {} on pipeline {}. Recorded as pending on {} DataNodes",
containerID, pipeline.getId(), pipeline.getNodes().size());

return containerStateManager.getContainer(containerID);
}

Expand Down Expand Up @@ -468,4 +556,14 @@ public ContainerStateManager getContainerStateManager() {
public SCMHAManager getSCMHAManager() {
return haManager;
}

/**
* Get the pending container tracker for tracking scheduled containers.
* Used for removing pending containers when they are confirmed via reports.
*
* @return PendingContainerTracker instance
*/
public PendingContainerTracker getPendingContainerTracker() {
return pendingContainerTracker;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,15 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
if (!alreadyInDn) {
// This is a new Container not in the nodeManager -> dn map yet
getNodeManager().addContainer(datanodeDetails, cid);

// Remove from pending tracker when container is added to DN
// This container was just confirmed for the first time on this DN
// No need to remove on subsequent reports (it's already been removed)
if (container != null && getContainerManager() instanceof ContainerManagerImpl) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code handling different from ICR and FCR, can be same only.

((ContainerManagerImpl) getContainerManager())
.getPendingContainerTracker()
.removePendingAllocation(datanodeDetails, cid);
}
}
if (container == null || ContainerReportValidator
.validate(container, datanodeDetails, replica)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ protected void processICR(IncrementalContainerReportFromDatanode report,
ContainerID id = ContainerID.valueOf(replicaProto.getContainerID());
final ContainerInfo container;
try {
// Check if container is already known to this DN before adding
boolean alreadyOnDn = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need check if container exist? or just remove if exist as single call ?

try {
alreadyOnDn = getNodeManager().getContainers(dd).contains(id);
} catch (NodeNotFoundException e) {
// DN not found, treat as not already on DN
getLogger().debug("Datanode not found when checking containers: {}", dd);
}

try {
container = getContainerManager().getContainer(id);
// Ensure we reuse the same ContainerID instance in containerInfo
Expand All @@ -103,6 +112,13 @@ protected void processICR(IncrementalContainerReportFromDatanode report,
}
if (ContainerReportValidator.validate(container, dd, replicaProto)) {
processContainerReplica(dd, container, replicaProto, publisher, detailsForLogging);

// Remove from pending tracker when container is added to DN
if (!alreadyOnDn && getContainerManager() instanceof ContainerManagerImpl) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check if node report is also send in ICR, this is for reason that node information should be updated with ICR at same time.

((ContainerManagerImpl) getContainerManager())
.getPendingContainerTracker()
.removePendingAllocation(dd, id);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Say, DN is healthy, all containers confirmed, no new allocations → that DN's bucket never rolls even though heartbeats come every 30 seconds, right?

t=0    Container C1 allocated → pending recorded in tracker

t=60-120  FCR arrives from DN
          → cid = C1
          → alreadyInDn = expectedContainersInDatanode.remove(C1) = FALSE
          → !alreadyInDn = TRUE → removePendingAllocation called → rollIfNeeded fires ✓
          → C1 added to NM DN-set

How abt rolls on every processHeartbeat, every 30 seconds regardless of container state changes ?

}
}
success = true;
} catch (ContainerNotFoundException e) {
Expand Down
Loading