diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java index 0876bd22ea4f..ed1c7f68b68f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java @@ -138,10 +138,31 @@ public class ScmConfig extends ReconfigurableConfig { ) private int transactionToDNsCommitMapLimit = 5000000; + @Config(key = "hdds.scm.container.pending-allocation.roll-interval", + defaultValue = "10m", + type = ConfigType.TIME, + tags = { ConfigTag.SCM, ConfigTag.CONTAINER }, + description = + "Time interval for rolling the pending container allocation window. " + + "Pending container allocations are tracked in a two-window tumbling bucket " + + "pattern. Each window has this duration. " + + "After 2x this interval, allocations that haven't been confirmed via " + + "container reports will automatically age out. Default is 10 minutes." + ) + private Duration pendingContainerAllocationRollInterval = Duration.ofMinutes(10); + public int getTransactionToDNsCommitMapLimit() { return transactionToDNsCommitMapLimit; } + public Duration getPendingContainerAllocationRollInterval() { + return pendingContainerAllocationRollInterval; + } + + public void setPendingContainerAllocationRollInterval(Duration duration) { + this.pendingContainerAllocationRollInterval = duration; + } + public Duration getBlockDeletionInterval() { return blockDeletionInterval; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index 432c9890e98a..c5947c36f414 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -34,20 +34,26 @@ import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics; import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; import org.apache.hadoop.hdds.scm.ha.SCMHAManager; import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator; +import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; +import org.apache.hadoop.ozone.container.common.volume.VolumeUsage; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,6 +87,8 @@ public class ContainerManagerImpl implements ContainerManager { private final long maxContainerSize; + private final PendingContainerTracker pendingContainerTracker; + /** * */ @@ -109,7 +117,19 @@ public ContainerManagerImpl( maxContainerSize = (long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + // Get pending container roll interval from configuration + OzoneConfiguration ozoneConf = (conf instanceof OzoneConfiguration) + ? (OzoneConfiguration) conf + : new OzoneConfiguration(conf); + ScmConfig scmConfig = ozoneConf.getObject(ScmConfig.class); + long rollIntervalMs = scmConfig.getPendingContainerAllocationRollInterval().toMillis(); + this.scmContainerManagerMetrics = SCMContainerManagerMetrics.create(); + this.pendingContainerTracker = new PendingContainerTracker( + maxContainerSize, rollIntervalMs, scmContainerManagerMetrics); + + LOG.info("Container allocation pending tracker initialized with maxContainerSize={}B, rollInterval={}ms", + maxContainerSize, rollIntervalMs); } @Override @@ -242,12 +262,75 @@ private ContainerInfo createContainer(Pipeline pipeline, String owner) return containerInfo; } + /** + * Check if a pipeline has sufficient space after considering pending allocations. + * Tracks containers scheduled but not yet written to DataNodes, preventing over-allocation. + * + * @param pipeline The pipeline to check + * @return true if sufficient space is available, false otherwise + */ + private boolean hasSpaceAfterPendingAllocations(Pipeline pipeline) { + try { + for (DatanodeDetails node : pipeline.getNodes()) { + // Get DN's storage statistics + DatanodeInfo datanodeInfo = pipelineManager.getDatanodeInfo(node); + if (datanodeInfo == null) { + LOG.warn("DatanodeInfo not found for node {}", node.getUuidString()); + return false; + } + + List storageReports = datanodeInfo.getStorageReports(); + if (storageReports == null || storageReports.isEmpty()) { + LOG.warn("No storage reports for node {}", node.getUuidString()); + return false; + } + + // Calculate total capacity and effective allocatable space + // For each disk, calculate how many containers can actually fit, + // since containers are written to individual disks, not spread across them. + // Example: disk1=9GB, disk2=14GB with 5GB containers + // (1*5GB) + (2*5GB) = 15GB → actually 3 containers + long totalCapacity = 0L; + long effectiveAllocatableSpace = 0L; + for (StorageReportProto report : storageReports) { + totalCapacity += report.getCapacity(); + long usableSpace = VolumeUsage.getUsableSpace(report); + // Calculate how many containers can fit on this disk + long containersOnThisDisk = usableSpace / maxContainerSize; + // Add effective space (containers that fit * container size) + effectiveAllocatableSpace += containersOnThisDisk * maxContainerSize; + } + + // Get pending allocations from tracker + long pendingAllocations = pendingContainerTracker.getPendingAllocationSize(node); + + // Calculate effective remaining space after pending allocations + long effectiveRemaining = effectiveAllocatableSpace - pendingAllocations; + + // Check if there's enough space for a new container + if (effectiveRemaining < maxContainerSize) { + LOG.info("Node {} insufficient space: capacity={}, effective allocatable={}, " + + "pending allocations={}, effective remaining={}, required={}", + node.getUuidString(), totalCapacity, effectiveAllocatableSpace, + pendingAllocations, effectiveRemaining, maxContainerSize); + return false; + } + } + + return true; + } catch (Exception e) { + LOG.warn("Error checking space for pipeline {}", pipeline.getId(), e); + return true; + } + } + private ContainerInfo allocateContainer(final Pipeline pipeline, final String owner) throws IOException { - if (!pipelineManager.hasEnoughSpace(pipeline, maxContainerSize)) { - LOG.debug("Cannot allocate a new container because pipeline {} does not have the required space {}.", - pipeline, maxContainerSize); + // Check if pipeline has sufficient space after considering recent allocations + if (!hasSpaceAfterPendingAllocations(pipeline)) { + LOG.warn("Pipeline {} does not have sufficient space after considering recent allocations", + pipeline.getId()); return null; } @@ -278,6 +361,11 @@ private ContainerInfo allocateContainer(final Pipeline pipeline, containerStateManager.addContainer(containerInfoBuilder.build()); scmContainerManagerMetrics.incNumSuccessfulCreateContainers(); + // Record pending allocation - tracks containers scheduled but not yet written + pendingContainerTracker.recordPendingAllocation(pipeline, containerID); + LOG.debug("Allocated container {} on pipeline {}. Recorded as pending on {} DataNodes", + containerID, pipeline.getId(), pipeline.getNodes().size()); + return containerStateManager.getContainer(containerID); } @@ -468,4 +556,14 @@ public ContainerStateManager getContainerStateManager() { public SCMHAManager getSCMHAManager() { return haManager; } + + /** + * Get the pending container tracker for tracking scheduled containers. + * Used for removing pending containers when they are confirmed via reports. + * + * @return PendingContainerTracker instance + */ + public PendingContainerTracker getPendingContainerTracker() { + return pendingContainerTracker; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 0cebcb10ef2c..b9358096d5d7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -175,6 +175,15 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode, if (!alreadyInDn) { // This is a new Container not in the nodeManager -> dn map yet getNodeManager().addContainer(datanodeDetails, cid); + + // Remove from pending tracker when container is added to DN + // This container was just confirmed for the first time on this DN + // No need to remove on subsequent reports (it's already been removed) + if (container != null && getContainerManager() instanceof ContainerManagerImpl) { + ((ContainerManagerImpl) getContainerManager()) + .getPendingContainerTracker() + .removePendingAllocation(datanodeDetails, cid); + } } if (container == null || ContainerReportValidator .validate(container, datanodeDetails, replica)) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java index 247e3667d9ef..ff4eae77f97d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java @@ -89,6 +89,15 @@ protected void processICR(IncrementalContainerReportFromDatanode report, ContainerID id = ContainerID.valueOf(replicaProto.getContainerID()); final ContainerInfo container; try { + // Check if container is already known to this DN before adding + boolean alreadyOnDn = false; + try { + alreadyOnDn = getNodeManager().getContainers(dd).contains(id); + } catch (NodeNotFoundException e) { + // DN not found, treat as not already on DN + getLogger().debug("Datanode not found when checking containers: {}", dd); + } + try { container = getContainerManager().getContainer(id); // Ensure we reuse the same ContainerID instance in containerInfo @@ -103,6 +112,13 @@ protected void processICR(IncrementalContainerReportFromDatanode report, } if (ContainerReportValidator.validate(container, dd, replicaProto)) { processContainerReplica(dd, container, replicaProto, publisher, detailsForLogging); + + // Remove from pending tracker when container is added to DN + if (!alreadyOnDn && getContainerManager() instanceof ContainerManagerImpl) { + ((ContainerManagerImpl) getContainerManager()) + .getPendingContainerTracker() + .removePendingAllocation(dd, id); + } } success = true; } catch (ContainerNotFoundException e) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/PendingContainerTracker.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/PendingContainerTracker.java new file mode 100644 index 000000000000..4866416b392d --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/PendingContainerTracker.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tracks pending container allocations using a Two Window Tumbling Bucket pattern. + * Similar like HDFS HADOOP-3707. + * + * Two Window Tumbling Bucket for automatic aging and cleanup. + * + * How It Works: + *
  • Each DataNode has two sets: currentWindow and previousWindow
  • + *
  • New allocations go into currentWindow
  • + *
  • Every ROLL_INTERVAL (default 10 minutes): + * + *
  • + *
  • When checking pending: return union of currentWindow + previousWindow
  • + * + * + * Example Timeline: + *
    + * Time  | Action                    | CurrentWindow | PreviousWindow | Total Pending
    + * ------+---------------------------+---------------+----------------+--------------
    + * 00:00 | Allocate Container-1      | {C1}          | {}             | {C1}
    + * 00:05 | Allocate Container-2      | {C1, C2}      | {}             | {C1, C2}
    + * 00:10 | [ROLL] Window tumbles     | {}            | {C1, C2}       | {C1, C2}
    + * 00:12 | Allocate Container-3      | {C3}          | {C1, C2}       | {C1, C2, C3}
    + * 00:15 | Report confirms C1        | {C3}          | {C2}           | {C2, C3}
    + * 00:20 | [ROLL] Window tumbles     | {}            | {C3}           | {C3}
    + *       | (C2 aged out if not reported)
    + * 
    + * + */ +public class PendingContainerTracker { + + private static final Logger LOG = LoggerFactory.getLogger(PendingContainerTracker.class); + + /** + * Roll interval in milliseconds. + * Configurable via hdds.scm.container.pending-allocation.roll-interval. + * Default: 10 minutes. + * Containers automatically age out after 2 × rollIntervalMs. + */ + private final long rollIntervalMs; + + /** + * Map of DataNode UUID to TwoWindowBucket. + */ + private final ConcurrentHashMap datanodeBuckets; + + /** + * Maximum container size in bytes. + */ + private final long maxContainerSize; + + /** + * Metrics for tracking pending containers. + */ + private final SCMContainerManagerMetrics metrics; + + /** + * Two-window bucket for a single DataNode. + * Contains current and previous window sets, plus last roll timestamp. + */ + private static class TwoWindowBucket { + private Set currentWindow = ConcurrentHashMap.newKeySet(); + private Set previousWindow = ConcurrentHashMap.newKeySet(); + private long lastRollTime = Time.monotonicNow(); + private final long rollIntervalMs; + + TwoWindowBucket(long rollIntervalMs) { + this.rollIntervalMs = rollIntervalMs; + } + + /** + * Roll the windows: previous = current, current = empty. + * Called when current time exceeds lastRollTime + rollIntervalMs. + */ + synchronized void rollIfNeeded() { + long now = Time.monotonicNow(); + if (now - lastRollTime >= rollIntervalMs) { + // Shift: current becomes previous + previousWindow = currentWindow; + // Reset: new empty current window + currentWindow = ConcurrentHashMap.newKeySet(); + lastRollTime = now; + LOG.debug("Rolled window. Previous window size: {}, Current window reset to empty", previousWindow.size()); + } + } + + /** + * Get union of both windows (all pending containers). + */ + synchronized Set getAllPending() { + Set all = new HashSet<>(); + all.addAll(currentWindow); + all.addAll(previousWindow); + return all; + } + + /** + * Add container to current window. + */ + synchronized boolean add(ContainerID containerID) { + return currentWindow.add(containerID); + } + + /** + * Remove container from both windows. + */ + synchronized boolean remove(ContainerID containerID) { + boolean removedFromCurrent = currentWindow.remove(containerID); + boolean removedFromPrevious = previousWindow.remove(containerID); + return removedFromCurrent || removedFromPrevious; + } + + /** + * Check if either window is non-empty. + */ + synchronized boolean isEmpty() { + return currentWindow.isEmpty() && previousWindow.isEmpty(); + } + + /** + * Get count of all pending containers (union). + */ + synchronized int getCount() { + return getAllPending().size(); + } + } + + public PendingContainerTracker(long maxContainerSize) { + this(maxContainerSize, 10 * 60 * 1000, null); // Default 10 minutes + } + + public PendingContainerTracker(long maxContainerSize, long rollIntervalMs, + SCMContainerManagerMetrics metrics) { + this.datanodeBuckets = new ConcurrentHashMap<>(); + this.maxContainerSize = maxContainerSize; + this.rollIntervalMs = rollIntervalMs; + this.metrics = metrics; + LOG.info("PendingContainerTracker initialized with maxContainerSize={}B, rollInterval={}ms", + maxContainerSize, rollIntervalMs); + } + + /** + * Record a pending container allocation for all DataNodes in the pipeline. + * Container is added to the current window. + * + * @param pipeline The pipeline where container is allocated + * @param containerID The container being allocated + */ + public void recordPendingAllocation(Pipeline pipeline, ContainerID containerID) { + if (pipeline == null || containerID == null) { + LOG.warn("Ignoring null pipeline or containerID"); + return; + } + + for (DatanodeDetails node : pipeline.getNodes()) { + recordPendingAllocationForDatanode(node, containerID); + } + } + + /** + * Record a pending container allocation for a single DataNode. + * Container is added to the current window. + * + * @param node The DataNode where container is being allocated/replicated + * @param containerID The container being allocated/replicated + */ + public void recordPendingAllocationForDatanode(DatanodeDetails node, ContainerID containerID) { + if (node == null || containerID == null) { + LOG.warn("Ignoring null node or containerID"); + return; + } + + TwoWindowBucket bucket = datanodeBuckets.computeIfAbsent( + node.getUuid(), + k -> new TwoWindowBucket(rollIntervalMs) + ); + + // Roll window if needed before adding + bucket.rollIfNeeded(); + + boolean added = bucket.add(containerID); + LOG.info("Recorded pending container {} on DataNode {}. Added={}, Total pending={}", + containerID, node.getUuidString(), added, bucket.getCount()); + + // Increment metrics counter + if (added && metrics != null) { + metrics.incNumPendingContainersAdded(); + } + } + + /** + * Remove a pending container allocation from a specific DataNode. + * Removes from both current and previous windows. + * Called when container is confirmed. + * + * @param node The DataNode + * @param containerID The container to remove from pending + */ + public void removePendingAllocation(DatanodeDetails node, ContainerID containerID) { + if (node == null || containerID == null) { + return; + } + + TwoWindowBucket bucket = datanodeBuckets.get(node.getUuid()); + if (bucket != null) { + // Roll window if needed before removing + bucket.rollIfNeeded(); + + boolean removed = bucket.remove(containerID); + LOG.info("Removed pending container {} from DataNode {}. Removed={}, Remaining={}", + containerID, node.getUuidString(), removed, bucket.getCount()); + + // Increment metrics counter + if (removed && metrics != null) { + metrics.incNumPendingContainersRemoved(); + } + + // Cleanup empty buckets to prevent memory leak + if (bucket.isEmpty()) { + LOG.info("Cleanup pending bucket"); + datanodeBuckets.remove(node.getUuid(), bucket); + } + } + } + + /** + * Get the total size of pending allocations on a DataNode. + * Returns union of current and previous windows. + * + * @param node The DataNode + * @return Total bytes of pending container allocations + */ + public long getPendingAllocationSize(DatanodeDetails node) { + if (node == null) { + return 0; + } + + TwoWindowBucket bucket = datanodeBuckets.get(node.getUuid()); + LOG.info("Get pending from DataNode {}", + node.getUuidString()); + if (bucket == null) { + LOG.info("Get pending from DataNode {} is null", + node.getUuidString()); + return 0; + } + + // Roll window if needed before querying + bucket.rollIfNeeded(); + + // Each pending container assumes max size + return (long) bucket.getCount() * maxContainerSize; + } + + /** + * Get the set of pending container IDs for a DataNode. + * Returns union of current and previous windows. + * Useful for debugging and monitoring. + * + * @param node The DataNode + * @return Set of pending container IDs + */ + public Set getPendingContainers(DatanodeDetails node) { + if (node == null) { + return Collections.emptySet(); + } + + TwoWindowBucket bucket = datanodeBuckets.get(node.getUuid()); + if (bucket == null) { + return Collections.emptySet(); + } + + bucket.rollIfNeeded(); + return bucket.getAllPending(); + } + + /** + * Get total number of DataNodes with pending allocations. + * + * @return Count of DataNodes + */ + public int getDataNodeCount() { + return datanodeBuckets.size(); + } + + /** + * Get total number of pending containers across all DataNodes. + * Note: Same container on multiple DataNodes is counted once per DataNode. + * The count may include containers from the previous window (up to 10 minutes old). + * + * @return Total pending container count + */ + public long getTotalPendingCount() { + return datanodeBuckets.values().stream() + .mapToLong(TwoWindowBucket::getCount) + .sum(); + } + + @VisibleForTesting + public SCMContainerManagerMetrics getMetrics() { + return metrics; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/metrics/SCMContainerManagerMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/metrics/SCMContainerManagerMetrics.java index ed3670ea6645..32993984285d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/metrics/SCMContainerManagerMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/metrics/SCMContainerManagerMetrics.java @@ -48,6 +48,10 @@ public final class SCMContainerManagerMetrics { private @Metric MutableCounterLong numICRReportsProcessedSuccessful; private @Metric MutableCounterLong numICRReportsProcessedFailed; + // Pending container allocation metrics + private @Metric MutableCounterLong numPendingContainersAdded; + private @Metric MutableCounterLong numPendingContainersRemoved; + private SCMContainerManagerMetrics() { } @@ -141,4 +145,22 @@ public long getNumListContainersOps() { return numListContainerOps.value(); } + // Pending container allocation metrics + + public void incNumPendingContainersAdded() { + this.numPendingContainersAdded.incr(); + } + + public long getNumPendingContainersAdded() { + return numPendingContainersAdded.value(); + } + + public void incNumPendingContainersRemoved() { + this.numPendingContainersRemoved.incr(); + } + + public long getNumPendingContainersRemoved() { + return numPendingContainersRemoved.value(); + } + } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 6a448d6c88df..64a32174c13d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.utils.db.CodecException; import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; import org.apache.hadoop.hdds.utils.db.Table; @@ -227,4 +228,11 @@ void reinitialize(Table pipelineStore) * Get the pipeline metrics. */ SCMPipelineMetrics getMetrics(); + + /** + * Get DatanodeInfo for a specific DataNode which includes per-volume storage reports. + * @param datanodeDetails The datanode to get info for + * @return DatanodeInfo containing detailed node information including per-disk stats, or null if not available + */ + DatanodeInfo getDatanodeInfo(DatanodeDetails datanodeDetails); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index 9c529e22e7e1..f13167abfad2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@ -649,6 +649,11 @@ public boolean hasEnoughSpace(Pipeline pipeline, long containerSize) { return true; } + @Override + public DatanodeInfo getDatanodeInfo(DatanodeDetails datanodeDetails) { + return nodeManager.getDatanodeInfo(datanodeDetails); + } + /** * Schedules a fixed interval job to create pipelines. */ diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java index 218a2137e3e6..fafcd3b05173 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java @@ -26,8 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -43,17 +42,22 @@ import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.DatanodeID; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.scm.HddsTestUtils; import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; import org.apache.hadoop.hdds.scm.ha.SCMHAManager; import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub; import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator; import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition; +import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.pipeline.MockPipelineManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; @@ -103,7 +107,24 @@ void setUp() throws Exception { pipelineManager = spy(base); // Default: allow allocation in tests unless a test overrides it. - doReturn(true).when(pipelineManager).hasEnoughSpace(any(Pipeline.class), anyLong()); + doAnswer(invocation -> { + DatanodeDetails dn = invocation.getArgument(0); + DatanodeInfo info = new DatanodeInfo(dn, + NodeStatus.valueOf(HddsProtos.NodeOperationalState.IN_SERVICE, HddsProtos.NodeState.HEALTHY), + null); + long gb = 1024L * 1024 * 1024; + // 50GB usable => multiple 5GB slots under default OZONE_SCM_CONTAINER_SIZE. + StorageContainerDatanodeProtocolProtos.StorageReportProto report = HddsTestUtils.createStorageReport( + DatanodeID.of(dn.getUuid()), + "/data", + 100L * gb, + 0, + 50L * gb, + HddsProtos.StorageTypeProto.DISK, + false); + info.updateStorageReports(Collections.singletonList(report)); + return info; + }).when(pipelineManager).getDatanodeInfo(any(DatanodeDetails.class)); pipelineManager.createPipeline(RatisReplicationConfig.getInstance( ReplicationFactor.THREE)); @@ -141,50 +162,62 @@ void testAllocateContainer() throws Exception { */ @Test public void testGetMatchingContainerReturnsNullWhenNotEnoughSpaceInDatanodes() throws IOException { - doReturn(false).when(pipelineManager).hasEnoughSpace(any(), anyLong()); - long sizeRequired = 256 * 1024 * 1024; // 256 MB - Pipeline pipeline = pipelineManager.getPipelines().iterator().next(); - // MockPipelineManager#hasEnoughSpace always returns false - // the pipeline has no existing containers, so a new container gets allocated in getMatchingContainer - ContainerInfo container = containerManager + PipelineManager spyPipelineManager = spy(pipelineManager); + doAnswer(invocation -> { + DatanodeDetails dn = invocation.getArgument(0); + DatanodeInfo info = new DatanodeInfo(dn, + NodeStatus.valueOf(HddsProtos.NodeOperationalState.IN_SERVICE, HddsProtos.NodeState.HEALTHY), + null); + long gb = 1024L * 1024 * 1024; + // Default SCM max container size is 5GB; 1GB usable => 0 slots => effective remaining < max container size. + StorageContainerDatanodeProtocolProtos.StorageReportProto report = HddsTestUtils.createStorageReport( + DatanodeID.of(dn.getUuid()), + "/data", + 100L * gb, + 0, + 1 * gb, + HddsProtos.StorageTypeProto.DISK, + false); + info.updateStorageReports(Collections.singletonList(report)); + return info; + }).when(spyPipelineManager).getDatanodeInfo(any(DatanodeDetails.class)); + + File mgrDir = new File(testDir, "matchingContainerNullSpace"); + OzoneConfiguration conf = SCMTestUtils.getConf(mgrDir); + ContainerManager manager = new ContainerManagerImpl(conf, + scmhaManager, sequenceIdGen, spyPipelineManager, + SCMDBDefinition.CONTAINERS.getTable(dbStore), pendingOpsMock); + + Pipeline pipeline = spyPipelineManager.getPipelines().iterator().next(); + ContainerInfo container = manager .getMatchingContainer(sizeRequired, "test", pipeline, Collections.emptySet()); assertNull(container); - // create an EC pipeline to test for EC containers ECReplicationConfig ecReplicationConfig = new ECReplicationConfig(3, 2); - pipelineManager.createPipeline(ecReplicationConfig); - pipeline = pipelineManager.getPipelines(ecReplicationConfig).iterator().next(); - container = containerManager.getMatchingContainer(sizeRequired, "test", pipeline, Collections.emptySet()); + spyPipelineManager.createPipeline(ecReplicationConfig); + pipeline = spyPipelineManager.getPipelines(ecReplicationConfig).iterator().next(); + container = manager.getMatchingContainer(sizeRequired, "test", pipeline, Collections.emptySet()); assertNull(container); } @Test public void testGetMatchingContainerReturnsContainerWhenEnoughSpaceInDatanodes() throws IOException { long sizeRequired = 256 * 1024 * 1024; // 256 MB - - // create a spy to mock hasEnoughSpace to always return true - PipelineManager spyPipelineManager = spy(pipelineManager); - doReturn(true).when(spyPipelineManager) - .hasEnoughSpace(any(Pipeline.class), anyLong()); - - // create a new ContainerManager using the spy File tempDir = new File(testDir, "tempDir"); OzoneConfiguration conf = SCMTestUtils.getConf(tempDir); ContainerManager manager = new ContainerManagerImpl(conf, - scmhaManager, sequenceIdGen, spyPipelineManager, + scmhaManager, sequenceIdGen, pipelineManager, SCMDBDefinition.CONTAINERS.getTable(dbStore), pendingOpsMock); - Pipeline pipeline = spyPipelineManager.getPipelines().iterator().next(); - // the pipeline has no existing containers, so a new container gets allocated in getMatchingContainer + Pipeline pipeline = pipelineManager.getPipelines().iterator().next(); ContainerInfo container = manager .getMatchingContainer(sizeRequired, "test", pipeline, Collections.emptySet()); assertNotNull(container); - // create an EC pipeline to test for EC containers ECReplicationConfig ecReplicationConfig = new ECReplicationConfig(3, 2); - spyPipelineManager.createPipeline(ecReplicationConfig); - pipeline = spyPipelineManager.getPipelines(ecReplicationConfig).iterator().next(); + pipelineManager.createPipeline(ecReplicationConfig); + pipeline = pipelineManager.getPipelines(ecReplicationConfig).iterator().next(); container = manager.getMatchingContainer(sizeRequired, "test", pipeline, Collections.emptySet()); assertNotNull(container); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTracker.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTracker.java new file mode 100644 index 000000000000..f7126cf678ba --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTracker.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import java.io.IOException; +import java.util.Set; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.scm.pipeline.MockPipeline; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Tests for PendingContainerTracker. + */ +public class TestPendingContainerTracker { + + private static final long MAX_CONTAINER_SIZE = 5L * 1024 * 1024 * 1024; // 5GB + + private PendingContainerTracker tracker; + private Pipeline pipeline; + private DatanodeDetails dn1; + private DatanodeDetails dn2; + private DatanodeDetails dn3; + private ContainerID container1; + private ContainerID container2; + private ContainerID container3; + + @BeforeEach + public void setUp() throws IOException { + tracker = new PendingContainerTracker(MAX_CONTAINER_SIZE); + + // Create a 3-node Ratis pipeline + pipeline = MockPipeline.createPipeline(3); + dn1 = pipeline.getNodes().get(0); + dn2 = pipeline.getNodes().get(1); + dn3 = pipeline.getNodes().get(2); + + container1 = ContainerID.valueOf(1L); + container2 = ContainerID.valueOf(2L); + container3 = ContainerID.valueOf(3L); + } + + @Test + public void testRecordPendingAllocation() { + // Initially no pending containers + assertEquals(0, tracker.getPendingContainers(dn1).size()); + assertEquals(0, tracker.getPendingAllocationSize(dn1)); + + // Record a pending allocation + tracker.recordPendingAllocation(pipeline, container1); + + // All 3 DNs should have the container pending + assertEquals(1, tracker.getPendingContainers(dn1).size()); + assertEquals(1, tracker.getPendingContainers(dn2).size()); + assertEquals(1, tracker.getPendingContainers(dn3).size()); + + // Size should be MAX_CONTAINER_SIZE for each DN + assertEquals(MAX_CONTAINER_SIZE, tracker.getPendingAllocationSize(dn1)); + assertEquals(MAX_CONTAINER_SIZE, tracker.getPendingAllocationSize(dn2)); + assertEquals(MAX_CONTAINER_SIZE, tracker.getPendingAllocationSize(dn3)); + } + + @Test + public void testRecordMultiplePendingAllocations() { + tracker.recordPendingAllocation(pipeline, container1); + tracker.recordPendingAllocation(pipeline, container2); + tracker.recordPendingAllocation(pipeline, container3); + + // Each DN should have 3 pending containers + assertEquals(3, tracker.getPendingContainers(dn1).size()); + assertEquals(3, tracker.getPendingContainers(dn2).size()); + assertEquals(3, tracker.getPendingContainers(dn3).size()); + + // Size should be 3 × MAX_CONTAINER_SIZE + assertEquals(3 * MAX_CONTAINER_SIZE, tracker.getPendingAllocationSize(dn1)); + } + + @Test + public void testIdempotentRecording() { + tracker.recordPendingAllocation(pipeline, container1); + tracker.recordPendingAllocation(pipeline, container1); // Duplicate + + // Should still be 1 container (Set deduplication) + assertEquals(1, tracker.getPendingContainers(dn1).size()); + assertEquals(MAX_CONTAINER_SIZE, tracker.getPendingAllocationSize(dn1)); + } + + @Test + public void testRemovePendingAllocation() { + tracker.recordPendingAllocation(pipeline, container1); + tracker.recordPendingAllocation(pipeline, container2); + + assertEquals(2, tracker.getPendingContainers(dn1).size()); + + // Remove one container from DN1 + tracker.removePendingAllocation(dn1, container1); + + assertEquals(1, tracker.getPendingContainers(dn1).size()); + assertEquals(MAX_CONTAINER_SIZE, tracker.getPendingAllocationSize(dn1)); + + // DN2 and DN3 should still have both containers + assertEquals(2, tracker.getPendingContainers(dn2).size()); + assertEquals(2, tracker.getPendingContainers(dn3).size()); + } + + @Test + public void testRemovePendingAllocationFromPipeline() { + tracker.recordPendingAllocation(pipeline, container1); + tracker.recordPendingAllocation(pipeline, container2); + + // Remove container1 from all nodes in pipeline + for (DatanodeDetails dn : pipeline.getNodes()) { + tracker.removePendingAllocation(dn, container1); + } + + // All DNs should have only container2 remaining + assertEquals(1, tracker.getPendingContainers(dn1).size()); + assertEquals(1, tracker.getPendingContainers(dn2).size()); + assertEquals(1, tracker.getPendingContainers(dn3).size()); + } + + @Test + public void testRemoveNonExistentContainer() { + tracker.recordPendingAllocation(pipeline, container1); + + // Remove a container that was never added - should not throw exception + tracker.removePendingAllocation(dn1, container2); + + // DN1 should still have container1 + assertEquals(1, tracker.getPendingContainers(dn1).size()); + } + + @Test + public void testGetPendingContainers() { + tracker.recordPendingAllocation(pipeline, container1); + tracker.recordPendingAllocation(pipeline, container2); + + Set pending = tracker.getPendingContainers(dn1); + + assertEquals(2, pending.size()); + assertThat(pending.contains(container1)); + assertThat(pending.contains(container2)); + + // Returned set should be a copy - modifying it shouldn't affect tracker + pending.add(container3); + assertEquals(2, tracker.getPendingContainers(dn1).size()); // Should still be 2 + } + + @Test + public void testGetPendingContainersForNonExistentDN() { + DatanodeDetails unknownDN = MockDatanodeDetails.randomDatanodeDetails(); + + Set pending = tracker.getPendingContainers(unknownDN); + + assertThat(pending.isEmpty()); + } + + @Test + public void testGetTotalPendingCount() { + assertEquals(0, tracker.getTotalPendingCount()); + + tracker.recordPendingAllocation(pipeline, container1); + + // 1 container × 3 DNs = 3 total pending + assertEquals(3, tracker.getTotalPendingCount()); + + tracker.recordPendingAllocation(pipeline, container2); + + // 2 containers × 3 DNs = 6 total pending + assertEquals(6, tracker.getTotalPendingCount()); + + // Remove from one DN + tracker.removePendingAllocation(dn1, container1); + + // (2 containers × 2 DNs) + (1 container × 1 DN) = 5 total + assertEquals(5, tracker.getTotalPendingCount()); + } + + @Test + public void testConcurrentModification() throws InterruptedException { + // Test thread-safety by having multiple threads add/remove containers + final int numThreads = 10; + final int operationsPerThread = 100; + + Thread[] threads = new Thread[numThreads]; + + for (int i = 0; i < numThreads; i++) { + final int threadId = i; + threads[i] = new Thread(() -> { + for (int j = 0; j < operationsPerThread; j++) { + ContainerID cid = ContainerID.valueOf(threadId * 1000L + j); + tracker.recordPendingAllocation(pipeline, cid); + + if (j % 2 == 0) { + tracker.removePendingAllocation(dn1, cid); + } + } + }); + } + + // Start all threads + for (Thread thread : threads) { + thread.start(); + } + + // Wait for all to finish + for (Thread thread : threads) { + thread.join(); + } + + // Verify no exceptions occurred and counts are reasonable + assertThat(tracker.getTotalPendingCount() >= 0); + assertThat(tracker.getDataNodeCount() <= 3); + } + + @Test + public void testMemoryCleanupOnEmptySet() { + tracker.recordPendingAllocation(pipeline, container1); + + assertEquals(3, tracker.getDataNodeCount()); + + // Remove the only pending container from DN1 + tracker.removePendingAllocation(dn1, container1); + + // DN1 should be removed from the map (memory cleanup) + assertEquals(2, tracker.getDataNodeCount()); + } + + @Test + public void testPendingContainer() { + // Simulate allocation and confirmation flow + + // Allocate 3 containers + tracker.recordPendingAllocation(pipeline, container1); + tracker.recordPendingAllocation(pipeline, container2); + tracker.recordPendingAllocation(pipeline, container3); + + // Each DN should have 3 pending, 15GB total + assertEquals(3, tracker.getPendingContainers(dn1).size()); + assertEquals(3 * MAX_CONTAINER_SIZE, tracker.getPendingAllocationSize(dn1)); + + // DN1 confirms container1 via container report + tracker.removePendingAllocation(dn1, container1); + + // DN1 now has 2 pending, 10GB + assertEquals(2, tracker.getPendingContainers(dn1).size()); + assertEquals(2 * MAX_CONTAINER_SIZE, tracker.getPendingAllocationSize(dn1)); + + // DN2 and DN3 still have 3 pending + assertEquals(3, tracker.getPendingContainers(dn2).size()); + assertEquals(3, tracker.getPendingContainers(dn3).size()); + + // All DNs eventually confirm all containers + for (DatanodeDetails dn : pipeline.getNodes()) { + tracker.removePendingAllocation(dn, container1); + tracker.removePendingAllocation(dn, container2); + tracker.removePendingAllocation(dn, container3); + } + + // All DNs should have 0 pending + assertEquals(0, tracker.getPendingContainers(dn1).size()); + assertEquals(0, tracker.getPendingContainers(dn2).size()); + assertEquals(0, tracker.getPendingContainers(dn3).size()); + assertEquals(0, tracker.getTotalPendingCount()); + assertEquals(0, tracker.getDataNodeCount()); + } + + @Test + public void testRemoveFromBothWindows() { + // This test verifies that removal works from both current and previous windows + // In general, a container could be in previous window after a roll + + // Add containers + tracker.recordPendingAllocation(pipeline, container1); + tracker.recordPendingAllocation(pipeline, container2); + + assertEquals(2, tracker.getPendingContainers(dn1).size()); + + // Remove container1 - should work regardless of which window it's in + tracker.removePendingAllocation(dn1, container1); + + assertEquals(1, tracker.getPendingContainers(dn1).size()); + + Set pending = tracker.getPendingContainers(dn1); + assertFalse(pending.contains(container1)); + assertThat(pending.contains(container2)); + } + + @Test + public void testUnionOfBothWindows() { + // This test verifies the two-window concept: + // getPendingContainers should return union of current + previous windows + + // Add container1 + tracker.recordPendingAllocation(pipeline, container1); + + assertEquals(1, tracker.getPendingContainers(dn1).size()); + Set pending1 = tracker.getPendingContainers(dn1); + assertThat(pending1.contains(container1)); + + // Add container2 - should be in same window initially + tracker.recordPendingAllocation(pipeline, container2); + + assertEquals(2, tracker.getPendingContainers(dn1).size()); + Set pending2 = tracker.getPendingContainers(dn1); + assertThat(pending2.contains(container1)); + assertThat(pending2.contains(container2)); + + // Both containers should be in the union + assertEquals(2, pending2.size()); + } + + @Test + public void testIdempotencyAcrossWindows() { + // Recording same container multiple times should only count it once + // This should work even if it spans windows + + tracker.recordPendingAllocation(pipeline, container1); + assertEquals(1, tracker.getPendingContainers(dn1).size()); + + // Record again - should still be 1 (idempotency via Set) + tracker.recordPendingAllocation(pipeline, container1); + assertEquals(1, tracker.getPendingContainers(dn1).size()); + + // Add different container + tracker.recordPendingAllocation(pipeline, container2); + assertEquals(2, tracker.getPendingContainers(dn1).size()); + + // Record container1 again + tracker.recordPendingAllocation(pipeline, container1); + assertEquals(2, tracker.getPendingContainers(dn1).size()); // Still 2, not 3 + } + + @Test + public void testExplicitRemoval() { + + tracker.recordPendingAllocation(pipeline, container1); + tracker.recordPendingAllocation(pipeline, container2); + tracker.recordPendingAllocation(pipeline, container3); + + assertEquals(3, tracker.getPendingContainers(dn1).size()); + + // Simulate container report confirms container1 and container2 + tracker.removePendingAllocation(dn1, container1); + tracker.removePendingAllocation(dn1, container2); + + // Immediately reflects the removal (doesn't wait for aging) + assertEquals(1, tracker.getPendingContainers(dn1).size()); + + Set pending = tracker.getPendingContainers(dn1); + assertEquals(1, pending.size()); + assertThat(pending.contains(container3)); + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java index 9b7c5c77b2cd..e3eb241e8f15 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -26,8 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -37,6 +36,7 @@ import java.time.Clock; import java.time.ZoneId; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hdds.HddsConfigKeys; @@ -160,7 +160,25 @@ SCMNodeManager createNodeManager(OzoneConfiguration config) { ContainerManager createContainerManager() throws IOException { pipelineManager = spy(pipelineManager); - doReturn(true).when(pipelineManager).hasEnoughSpace(any(), anyLong()); + + doAnswer(invocation -> { + DatanodeDetails dn = invocation.getArgument(0); + DatanodeInfo info = new DatanodeInfo(dn, + NodeStatus.valueOf(HddsProtos.NodeOperationalState.IN_SERVICE, HddsProtos.NodeState.HEALTHY), + null); + long gb = 1024L * 1024 * 1024; + // 50GB usable => multiple 5GB slots under default OZONE_SCM_CONTAINER_SIZE. + StorageContainerDatanodeProtocolProtos.StorageReportProto report = HddsTestUtils.createStorageReport( + DatanodeID.of(dn.getUuid()), + "/data", + 100L * gb, + 0, + 50L * gb, + HddsProtos.StorageTypeProto.DISK, + false); + info.updateStorageReports(Collections.singletonList(report)); + return info; + }).when(pipelineManager).getDatanodeInfo(any(DatanodeDetails.class)); return new ContainerManagerImpl(conf, scmhaManager, sequenceIdGen, pipelineManager, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java index d6a3fc546352..32904701e647 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java @@ -346,4 +346,9 @@ public int openContainerLimit(List datanodes) { public SCMPipelineMetrics getMetrics() { return null; } + + @Override + public org.apache.hadoop.hdds.scm.node.DatanodeInfo getDatanodeInfo(DatanodeDetails datanodeDetails) { + return null; + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTrackerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTrackerIntegration.java new file mode 100644 index 000000000000..f2a1280a64b1 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestPendingContainerTrackerIntegration.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.metrics.SCMContainerManagerMetrics; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.TestDataUtil; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.ozone.test.GenericTestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration tests for PendingContainerTracker. + */ +@Timeout(300) +public class TestPendingContainerTrackerIntegration { + + private static final Logger LOG = + LoggerFactory.getLogger(TestPendingContainerTrackerIntegration.class); + private MiniOzoneCluster cluster; + private StorageContainerManager scm; + private OzoneClient client; + private ContainerManager containerManager; + private PendingContainerTracker pendingTracker; + private SCMContainerManagerMetrics metrics; + private OzoneBucket bucket; + + @BeforeEach + public void setup() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + + conf.set(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, "60s"); + + // Reduce heartbeat interval for faster container reports + conf.set(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, "10s"); + + conf.set("ozone.scm.container.size", "100MB"); + conf.set("ozone.scm.pipeline.owner.container.count", "1"); + conf.set("ozone.scm.pipeline.per.metadata.disk", "1"); + conf.set("ozone.scm.datanode.pipeline.limit", "1"); + + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(3) + .build(); + cluster.waitForClusterToBeReady(); + cluster.waitTobeOutOfSafeMode(); + + scm = cluster.getStorageContainerManager(); + containerManager = scm.getContainerManager(); + client = cluster.newClient(); + + // Create bucket for testing + bucket = TestDataUtil.createVolumeAndBucket(client); + + // Get the pending tracker + if (containerManager instanceof ContainerManagerImpl) { + pendingTracker = ((ContainerManagerImpl) containerManager) + .getPendingContainerTracker(); + assertNotNull(pendingTracker, "PendingContainerTracker should be initialized"); + } + metrics = pendingTracker.getMetrics(); + // metrics = SCMContainerManagerMetrics.create(); + + LOG.info("Test setup complete - ICR interval: 5s, Heartbeat interval: 1s"); + } + + @AfterEach + public void cleanup() throws Exception { + if (metrics != null) { + metrics.unRegister(); + } + if (client != null) { + client.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Test: Write key → Container allocation → Pending tracked → ICR → Pending removed. + */ + @Test + public void testKeyWriteRecordsPendingAndICRRemovesIt() throws Exception { + long initialAdded = metrics.getNumPendingContainersAdded(); + long initialRemoved = metrics.getNumPendingContainersRemoved(); + + // Allocate a container directly + ContainerInfo container = containerManager.allocateContainer( + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + "omServiceIdDefault"); + + // Find the container that was allocated + ContainerInfo containerInfo = scm.getContainerManager().getContainers().get(0); + ContainerWithPipeline containerWithPipeline = + scm.getClientProtocolServer().getContainerWithPipeline( + containerInfo.getContainerID()); + + Pipeline pipeline = containerWithPipeline.getPipeline(); + + // Verify pending containers are tracked for all nodes in pipeline + List nodesWithPending = new ArrayList<>(); + for (DatanodeDetails dn : pipeline.getNodes()) { + long pendingSize = pendingTracker.getPendingAllocationSize(dn); + if (pendingSize > 0) { + nodesWithPending.add(dn); + LOG.info("DataNode {} has {} bytes pending", dn.getUuidString(), pendingSize); + + Set pendingContainers = pendingTracker.getPendingContainers(dn); + assertThat(pendingContainers.contains(container.containerID())); + } + } + + assertThat(!nodesWithPending.isEmpty()); + + // Verify metrics increased + long afterAdded = metrics.getNumPendingContainersAdded(); + assertThat(afterAdded > initialAdded); + + LOG.info("Pending tracked successfully. Waiting for ICR to remove pending..."); + + // Write a key + String keyName = "testKey1"; + byte[] data = "Hello Ozone - Testing Pending Container Tracker".getBytes(UTF_8); + + LOG.info("Writing key: {}", keyName); + try (OzoneOutputStream out = bucket.createKey(keyName, data.length, + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + new java.util.HashMap<>())) { + out.write(data); + } + LOG.info("Key written successfully"); + + // Wait for ICRs to be sent + GenericTestUtils.waitFor(() -> { + for (DatanodeDetails dn : nodesWithPending) { + Set pendingContainers = pendingTracker.getPendingContainers(dn); + if (pendingContainers.contains(container.containerID())) { + LOG.info("Still waiting for ICR from DN {}", dn.getUuidString()); + return false; + } + } + + LOG.info("All pending containers removed via ICR!"); + return true; + }, 100, 5000); + + // Verify all pending containers removed + for (DatanodeDetails dn : nodesWithPending) { + Set pendingContainers = pendingTracker.getPendingContainers(dn); + assertThat(!pendingContainers.contains(container.containerID())); + } + + // Verify remove metrics increased + long afterRemoved = metrics.getNumPendingContainersRemoved(); + assertThat(afterRemoved > initialRemoved); + + LOG.info("After added = " + afterAdded); + + } + + /** + * Test: Verify idempotency - container reported multiple times. + */ + @Test + public void testIdempotentPendingTracking() throws Exception { + // Allocate a container directly + ContainerInfo container = containerManager.allocateContainer( + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + "omServiceIdDefault"); + + Pipeline pipeline = scm.getPipelineManager().getPipeline(container.getPipelineID()); + DatanodeDetails firstNode = pipeline.getFirstNode(); + + // Record initial state + long initialSize = pendingTracker.getPendingAllocationSize(firstNode); + int initialCount = pendingTracker.getPendingContainers(firstNode).size(); + + LOG.info("Initial pending state: size={}, count={}", initialSize, initialCount); + + // Try adding the same container again (simulates retry or duplicate allocation) + pendingTracker.recordPendingAllocationForDatanode(firstNode, container.containerID()); + + long afterSize = pendingTracker.getPendingAllocationSize(firstNode); + int afterCount = pendingTracker.getPendingContainers(firstNode).size(); + + // Size and count should remain the same (idempotent) + assertEquals(initialSize, afterSize, + "Pending size should not change when adding duplicate container"); + assertEquals(initialCount, afterCount, + "Pending count should not change when adding duplicate container"); + + } + + /** + * Test: Verify metrics are updated correctly. + */ + @Test + public void testMetricsUpdateThroughLifecycle() throws Exception { + long initialAdded = metrics.getNumPendingContainersAdded(); + long initialRemoved = metrics.getNumPendingContainersRemoved(); + + LOG.info("Initial metrics: added={}, removed={}", initialAdded, initialRemoved); + + // Write multiple keys + for (int i = 0; i < 3; i++) { + String keyName = "metricsTestKey" + i; + byte[] data = ("Metrics test " + i).getBytes(UTF_8); + + try (OzoneOutputStream out = bucket.createKey(keyName, data.length, + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), + new java.util.HashMap<>())) { + out.write(data); + } + } + + // addedMetrics should increase as containers are allocated + GenericTestUtils.waitFor(() -> { + long afterAdded = metrics.getNumPendingContainersAdded(); + return afterAdded > initialAdded; + }, 100, 5000); + + // Removed metric should increase after icr process + GenericTestUtils.waitFor(() -> { + long afterRemoved = metrics.getNumPendingContainersRemoved(); + return initialRemoved < afterRemoved; + }, 100, 5000); + } +}