Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,21 @@ public abstract class ItemsFromStdin implements Iterable<String> {
": one or more, separated by spaces. To read from stdin, specify '-' and supply one item per line.";

private List<String> items = new ArrayList<>();
private boolean readFromStdin = false;

protected void setItems(List<String> arguments) {
items = readItemsFromStdinIfNeeded(arguments);
readFromStdin = arguments != null && !arguments.isEmpty() &&
"-".equals(arguments.iterator().next());

if (readFromStdin) {
items = readItemsFromStdin();
} else {
items = arguments == null ? new ArrayList<>() : arguments;
}
}

public boolean isReadFromStdin() {
return readFromStdin;
}

public List<String> getItems() {
Expand All @@ -52,11 +64,7 @@ public int size() {
return items.size();
}

private static List<String> readItemsFromStdinIfNeeded(List<String> parameters) {
if (parameters.isEmpty() || !"-".equals(parameters.iterator().next())) {
return parameters;
}

private static List<String> readItemsFromStdin() {
List<String> items = new ArrayList<>();
Scanner scanner = new Scanner(System.in, StandardCharsets.UTF_8.name());
while (scanner.hasNextLine()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.lang.Math.max;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Clock;
import java.time.Instant;
Expand Down Expand Up @@ -87,6 +88,7 @@ public final class ContainerInfo implements Comparable<ContainerInfo> {
private long sequenceId;
// Health state of the container (determined by ReplicationManager)
private ContainerHealthState healthState;
private boolean suppressed;

private ContainerInfo(Builder b) {
containerID = ContainerID.valueOf(b.containerID);
Expand All @@ -102,6 +104,7 @@ private ContainerInfo(Builder b) {
replicationConfig = b.replicationConfig;
clock = b.clock;
healthState = b.healthState != null ? b.healthState : ContainerHealthState.HEALTHY;
suppressed = b.suppressed;
}

public static Codec<ContainerInfo> getCodec() {
Expand All @@ -123,6 +126,10 @@ public static ContainerInfo fromProtobuf(HddsProtos.ContainerInfoProto info) {
.setReplicationConfig(config)
.setSequenceId(info.getSequenceId());

if (info.hasSuppressed()) {
builder.setSuppressed(info.getSuppressed());
}

if (info.hasPipelineID()) {
builder.setPipelineID(PipelineID.getFromProtobuf(info.getPipelineID()));
}
Expand Down Expand Up @@ -263,6 +270,26 @@ public void setHealthState(ContainerHealthState newHealthState) {
this.healthState = newHealthState;
}

/**
* Check if container is suppressed.
* Only included in JSON output when true.
*
* @return boolean
*/
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public boolean isSuppressed() {
return suppressed;
}

/**
* Set the boolean for suppressed.
*
* @param suppressed checks if container is suppressed or not
*/
public void setSuppressed(boolean suppressed) {
this.suppressed = suppressed;
}

@JsonIgnore
public HddsProtos.ContainerInfoProto getProtobuf() {
HddsProtos.ContainerInfoProto.Builder builder =
Expand All @@ -288,6 +315,10 @@ public HddsProtos.ContainerInfoProto getProtobuf() {
builder.setPipelineID(getPipelineID().getProtobuf());
}

if (suppressed) {
builder.setSuppressed(true);
}

return builder.build();
}

Expand Down Expand Up @@ -390,6 +421,7 @@ public static class Builder {
private PipelineID pipelineID;
private ReplicationConfig replicationConfig;
private ContainerHealthState healthState;
private boolean suppressed;

public Builder setPipelineID(PipelineID pipelineId) {
this.pipelineID = pipelineId;
Expand Down Expand Up @@ -447,6 +479,11 @@ public Builder setHealthState(ContainerHealthState healthState) {
return this;
}

public Builder setSuppressed(boolean suppressed) {
this.suppressed = suppressed;
return this;
}

/**
* Also resets {@code stateEnterTime}, so make sure to set clock first.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,25 @@ ContainerListResult listContainer(long startContainerID, int count,
ReplicationConfig replicationConfig)
throws IOException;

/**
* Lists a range of containers and get their info.
*
* @param startContainerID start containerID.
* @param count count must be {@literal >} 0.
* @param state Container of this state will be returned.
* @param replicationConfig container replication Config.
* @param suppressed container to be suppressed/unsuppressed from report
* @return a list of containers capped by max count allowed
* in "ozone.scm.container.list.max.count" and total number of containers.
* @throws IOException
*/
ContainerListResult listContainer(long startContainerID, int count,
HddsProtos.LifeCycleState state,
HddsProtos.ReplicationType replicationType,
ReplicationConfig replicationConfig,
Boolean suppressed)
throws IOException;

/**
* Read meta data from an existing container.
* @param containerID - ID of the container.
Expand Down Expand Up @@ -465,4 +484,14 @@ DecommissionScmResponseProto decommissionScm(
*/
void reconcileContainer(long containerID) throws IOException;

/**
* Suppress or unsuppress containers from reports.
* Suppressed containers are excluded from replication manager reports
* regardless of their health state.
*
* @param containerIds container IDs to suppress or unsuppress
* @param suppress true to suppress, false to unsuppress
* @throws IOException
*/
List<Long> suppressContainers(List<Long> containerIds, boolean suppress) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,30 @@ ContainerListResult listContainer(long startContainerID,
HddsProtos.ReplicationType replicationType,
ReplicationConfig replicationConfig) throws IOException;

/**
* Ask SCM for a list of containers with a range of container ID, state
* and replication config, and the limit of count.
* The containers are returned from startID (exclusive), and
* filtered by state and replication config. The returned list is limited to
* count entries.
*
* @param startContainerID start container ID.
* @param count count, if count {@literal <} 0, the max size is unlimited.(
* Usually the count will be replace with a very big
* value instead of being unlimited in case the db is very big)
* @param state Container with this state will be returned.
* @param replicationConfig Replication config for the containers
* @param suppressed container to be suppressed/unsuppressed from report
* @return a list of containers capped by max count allowed
* in "ozone.scm.container.list.max.count" and total number of containers.
* @throws IOException
*/
ContainerListResult listContainer(long startContainerID,
int count, HddsProtos.LifeCycleState state,
HddsProtos.ReplicationType replicationType,
ReplicationConfig replicationConfig,
Boolean suppressed) throws IOException;

/**
* Deletes a container in SCM.
*
Expand Down Expand Up @@ -521,4 +545,13 @@ DecommissionScmResponseProto decommissionScm(
* @throws IOException On error
*/
void reconcileContainer(long containerID) throws IOException;

/**
* Suppress or unsuppress containers from reports.
*
* @param containerIds container IDs to suppress or unsuppress
* @param suppress true to suppress, false to unsuppress
* @throws IOException
*/
List<Long> suppressContainers(List<Long> containerIds, boolean suppress) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopContainerBalancerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SuppressContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SuppressContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.ScmInfo;
Expand Down Expand Up @@ -445,6 +447,16 @@ public ContainerListResult listContainer(long startContainerID, int count,
HddsProtos.ReplicationType replicationType,
ReplicationConfig replicationConfig)
throws IOException {
return listContainer(startContainerID, count, state, replicationType, replicationConfig, null);
}

@Override
public ContainerListResult listContainer(long startContainerID, int count,
HddsProtos.LifeCycleState state,
HddsProtos.ReplicationType replicationType,
ReplicationConfig replicationConfig,
Boolean suppressed)
throws IOException {
Preconditions.checkState(startContainerID >= 0,
"Container ID cannot be negative.");
Preconditions.checkState(count > 0,
Expand All @@ -454,6 +466,9 @@ public ContainerListResult listContainer(long startContainerID, int count,
builder.setStartContainerID(startContainerID);
builder.setCount(count);
builder.setTraceID(TracingUtil.exportCurrentSpan());
if (suppressed != null) {
builder.setSuppressed(suppressed);
}
if (state != null) {
builder.setState(state);
}
Expand Down Expand Up @@ -1312,6 +1327,19 @@ public void reconcileContainer(long containerID) throws IOException {
submitRequest(Type.ReconcileContainer, builder -> builder.setReconcileContainerRequest(request));
}

@Override
public List<Long> suppressContainers(List<Long> containerIds, boolean suppress)
throws IOException {
SuppressContainerRequestProto request = SuppressContainerRequestProto.newBuilder()
.addAllContainerIDs(containerIds)
.setSuppress(suppress)
.build();
SuppressContainerResponseProto response =
submitRequest(Type.SuppressContainer, builder -> builder.setSuppressContainerRequest(request))
.getSuppressContainerResponse();
return response.getFailedContainerIDsList();
}

/**
* Holder class to store the target SCM node ID for routing requests.
* This allows requests to be directed to specific SCM nodes in an HA cluster.
Expand Down
14 changes: 14 additions & 0 deletions hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ message ScmContainerLocationRequest {
optional ReconcileContainerRequestProto reconcileContainerRequest = 49;
optional GetDeletedBlocksTxnSummaryRequestProto getDeletedBlocksTxnSummaryRequest = 50;
optional SCMListContainerIDsRequestProto scmListContainerIDsRequest = 51;
optional SuppressContainerRequestProto suppressContainerRequest = 52;
}

message ScmContainerLocationResponse {
Expand Down Expand Up @@ -147,6 +148,7 @@ message ScmContainerLocationResponse {
optional ReconcileContainerResponseProto reconcileContainerResponse = 49;
optional GetDeletedBlocksTxnSummaryResponseProto getDeletedBlocksTxnSummaryResponse = 50;
optional SCMListContainerIDsResponseProto scmListContainerIDsResponse = 51;
optional SuppressContainerResponseProto suppressContainerResponse = 52;

enum Status {
OK = 1;
Expand Down Expand Up @@ -205,6 +207,7 @@ enum Type {
ReconcileContainer = 45;
GetDeletedBlocksTransactionSummary = 46;
ListContainerIDs = 47;
SuppressContainer = 48;
}

/**
Expand Down Expand Up @@ -313,6 +316,7 @@ message SCMListContainerRequestProto {
optional ReplicationFactor factor = 5;
optional ReplicationType type = 6;
optional ECReplicationConfig ecReplicationConfig = 7;
optional bool suppressed = 8;
}

message SCMListContainerResponseProto {
Expand Down Expand Up @@ -711,6 +715,16 @@ message ReconcileContainerRequestProto {
message ReconcileContainerResponseProto {
}

message SuppressContainerRequestProto {
repeated int64 containerIDs = 1;
optional bool suppress = 2;
}

message SuppressContainerResponseProto {
// Container IDs that failed to suppress or unsuppress. Empty if all succeeded.
repeated int64 failedContainerIDs = 1;
}

/**
* Protocol used from an HDFS node to StorageContainerManager. See the request
* and response messages for details of the RPC calls.
Expand Down
1 change: 1 addition & 0 deletions hadoop-hdds/interface-client/src/main/proto/hdds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ message ContainerInfoProto {
optional ReplicationFactor replicationFactor = 10;
required ReplicationType replicationType = 11;
optional ECReplicationConfig ecReplicationConfig = 12;
optional bool suppressed = 13;
}

message ContainerWithPipeline {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
Expand Down Expand Up @@ -238,4 +239,14 @@ void deleteContainer(ContainerID containerID)
* @return containerStateManger
*/
ContainerStateManager getContainerStateManager();

/**
* Update container info in the container manager.
* This is used for updating container metadata like ackMissing flag.
*
* @param containerInfo Updated container info proto
* @throws IOException
*/
void updateContainerInfo(ContainerID containerID, ContainerInfoProto containerInfo)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,21 @@ public void updateContainerState(final ContainerID cid,
}
}

@Override
public void updateContainerInfo(final ContainerID cid, ContainerInfoProto containerInfo)
throws IOException {
lock.lock();
try {
if (containerExist(cid)) {
containerStateManager.updateContainerInfo(containerInfo);
} else {
throw new ContainerNotFoundException(cid);
}
} finally {
lock.unlock();
}
}

@Override
public void transitionDeletingOrDeletedToTargetState(ContainerID containerID, LifeCycleState targetState)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,14 @@ void removeContainer(HddsProtos.ContainerID containerInfo)
*/
void reinitialize(Table<ContainerID, ContainerInfo> containerStore)
throws IOException;

/**
* Update container info.
*
* @param containerInfo Updated container info proto
* @throws IOException
*/
@Replicate
void updateContainerInfo(HddsProtos.ContainerInfoProto containerInfo)
throws IOException;
}
Loading
Loading