Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.record.ChangeType;
import org.apache.fluss.record.LogRecordBatchStatisticsCollector;
import org.apache.fluss.record.MemoryLogRecordsArrowBuilder;
import org.apache.fluss.record.bytesview.BytesView;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.arrow.ArrowWriter;
import org.apache.fluss.rpc.messages.ProduceLogRequest;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
Expand All @@ -55,11 +57,13 @@ public ArrowLogWriteBatch(
int schemaId,
ArrowWriter arrowWriter,
AbstractPagedOutputView outputView,
long createdMs) {
long createdMs,
@Nullable LogRecordBatchStatisticsCollector statisticsCollector) {
super(bucketId, physicalTablePath, createdMs);
this.outputView = outputView;
this.recordsBuilder =
MemoryLogRecordsArrowBuilder.builder(schemaId, arrowWriter, outputView, true);
MemoryLogRecordsArrowBuilder.builder(
schemaId, arrowWriter, outputView, true, statisticsCollector);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metrics.MetricNames;
import org.apache.fluss.record.LogRecordBatchStatisticsCollector;
import org.apache.fluss.row.arrow.ArrowWriter;
import org.apache.fluss.row.arrow.ArrowWriterPool;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
Expand Down Expand Up @@ -628,13 +629,20 @@ private WriteBatch createWriteBatch(
outputView.getPreAllocatedSize(),
tableInfo.getRowType(),
tableInfo.getTableConfig().getArrowCompressionInfo());
LogRecordBatchStatisticsCollector statisticsCollector = null;
if (tableInfo.isStatisticsEnabled()) {
statisticsCollector =
new LogRecordBatchStatisticsCollector(
tableInfo.getRowType(), tableInfo.getStatsIndexMapping());
}
return new ArrowLogWriteBatch(
bucketId,
physicalTablePath,
tableInfo.getSchemaId(),
arrowWriter,
outputView,
clock.milliseconds());
clock.milliseconds(),
statisticsCollector);

case COMPACTED_LOG:
return new CompactedLogWriteBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ void testAppendWithPreAllocatedMemorySegments() throws Exception {
DATA1_ROW_TYPE,
DEFAULT_COMPRESSION),
new PreAllocatedPagedOutputView(memorySegmentList),
System.currentTimeMillis());
System.currentTimeMillis(),
null);
assertThat(arrowLogWriteBatch.pooledMemorySegments()).isEqualTo(memorySegmentList);

int count = 0;
Expand Down Expand Up @@ -210,7 +211,8 @@ void testArrowCompressionRatioEstimated() throws Exception {
DATA1_TABLE_INFO.getSchemaId(),
arrowWriter,
new PreAllocatedPagedOutputView(memorySegmentList),
System.currentTimeMillis());
System.currentTimeMillis(),
null);

int recordCount = 0;
while (arrowLogWriteBatch.tryAppend(
Expand Down Expand Up @@ -310,7 +312,8 @@ private ArrowLogWriteBatch createArrowLogWriteBatch(TableBucket tb, int maxSizeI
DATA1_ROW_TYPE,
DEFAULT_COMPRESSION),
new UnmanagedPagedOutputView(128),
System.currentTimeMillis());
System.currentTimeMillis(),
null);
}

private WriteCallback newWriteCallback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1579,6 +1579,22 @@ public class ConfigOptions {
+ "This mode reduces storage and transmission costs but loses the ability to track previous values. "
+ "This option only affects primary key tables.");

public static final ConfigOption<String> TABLE_STATISTICS_COLUMNS =
key("table.statistics.columns")
.stringType()
.noDefaultValue()
.withDescription(
"Configures column-level statistics collection for the table. "
+ "By default this option is not set and no column statistics are collected. "
+ "The value '*' means collect statistics for all supported columns. "
+ "A comma-separated list of column names means collect statistics only for the specified columns. "
+ "Supported types include: BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, "
+ "STRING, CHAR, DECIMAL, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ. "
+ "Example: 'id,name,timestamp' to collect statistics only for specified columns. "
+ "Note: enabling column statistics requires the V1 batch format. "
+ "Downstream consumers must be upgraded to Fluss v1.0+ before enabling this option, "
+ "as older versions cannot parse the extended batch format.");

// ------------------------------------------------------------------------
// ConfigOptions for Kv
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.config;

import org.apache.fluss.annotation.PublicEvolving;

import java.util.Collections;
import java.util.List;
import java.util.Objects;

import static org.apache.fluss.utils.Preconditions.checkNotNull;

/**
* Configuration for statistics columns collection with three states:
*
* <ul>
* <li>{@link Mode#DISABLED}: Statistics collection is disabled (config not set).
* <li>{@link Mode#ALL}: Collect statistics for all supported columns ("*" configuration).
* <li>{@link Mode#SPECIFIED}: Collect statistics for specific columns only.
* </ul>
*
* @since 1.0
*/
@PublicEvolving
public class StatisticsColumnsConfig {

/** The mode of statistics columns collection. */
public enum Mode {
/** Statistics collection is disabled. */
DISABLED,
/** Collect statistics for all supported columns. */
ALL,
/** Collect statistics for specified columns only. */
SPECIFIED
}

private static final StatisticsColumnsConfig DISABLED_INSTANCE =
new StatisticsColumnsConfig(Mode.DISABLED, Collections.emptyList());

private static final StatisticsColumnsConfig ALL_INSTANCE =
new StatisticsColumnsConfig(Mode.ALL, Collections.emptyList());

private final Mode mode;
private final List<String> columns;

private StatisticsColumnsConfig(Mode mode, List<String> columns) {
this.mode = mode;
this.columns = columns;
}

/** Creates a disabled statistics columns configuration. */
public static StatisticsColumnsConfig disabled() {
return DISABLED_INSTANCE;
}

/** Creates a configuration that collects statistics for all supported columns. */
public static StatisticsColumnsConfig all() {
return ALL_INSTANCE;
}

/** Creates a configuration that collects statistics for the specified columns. */
public static StatisticsColumnsConfig of(List<String> columns) {
checkNotNull(columns, "columns must not be null");
return new StatisticsColumnsConfig(Mode.SPECIFIED, Collections.unmodifiableList(columns));
}

/** Returns the mode of statistics columns collection. */
public Mode getMode() {
return mode;
}

/** Returns the specified columns. Only meaningful when mode is {@link Mode#SPECIFIED}. */
public List<String> getColumns() {
return columns;
}

/** Returns whether statistics collection is enabled (mode is not DISABLED). */
public boolean isEnabled() {
return mode != Mode.DISABLED;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StatisticsColumnsConfig that = (StatisticsColumnsConfig) o;
return mode == that.mode && Objects.equals(columns, that.columns);
}

@Override
public int hashCode() {
return Objects.hash(mode, columns);
}

@Override
public String toString() {
switch (mode) {
case DISABLED:
return "StatisticsColumnsConfig{DISABLED}";
case ALL:
return "StatisticsColumnsConfig{ALL}";
case SPECIFIED:
return "StatisticsColumnsConfig{SPECIFIED: " + columns + "}";
default:
return "StatisticsColumnsConfig{UNKNOWN}";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.config;

import org.apache.fluss.annotation.Internal;
import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.types.DataField;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypeChecks;
import org.apache.fluss.types.RowType;

import java.util.Map;
import java.util.stream.Collectors;

/**
* Utility class for validating table statistics configuration.
*
* <p>This provides simple validation methods that can be called during CREATE TABLE operations to
* ensure statistics configuration is valid and compatible with the table schema.
*/
@Internal
public class StatisticsConfigUtils {

private StatisticsConfigUtils() {}

/**
* Validates statistics configuration for a table descriptor.
*
* @param tableDescriptor the table descriptor to validate
* @throws InvalidConfigException if the statistics configuration is invalid
*/
public static void validateStatisticsConfig(TableDescriptor tableDescriptor) {
Map<String, String> properties = tableDescriptor.getProperties();
String statisticsColumns = properties.get(ConfigOptions.TABLE_STATISTICS_COLUMNS.key());

// Not set means statistics disabled - no validation needed
if (statisticsColumns == null) {
return;
}

RowType rowType = tableDescriptor.getSchema().getRowType();

// Wildcard means all supported columns - no validation needed
if ("*".equals(statisticsColumns.trim())) {
return;
}

// Parse using TableConfig's logic via StatisticsColumnsConfig
Configuration config = new Configuration();
config.setString(ConfigOptions.TABLE_STATISTICS_COLUMNS.key(), statisticsColumns);
StatisticsColumnsConfig columnsConfig = new TableConfig(config).getStatisticsColumns();

if (columnsConfig.getMode() == StatisticsColumnsConfig.Mode.SPECIFIED
&& columnsConfig.getColumns().isEmpty()) {
throw new InvalidConfigException(
"Statistics columns configuration cannot be empty. "
+ "Use '*' to collect statistics for all supported columns, "
+ "or remove the property to disable statistics collection.");
}

if (columnsConfig.getMode() == StatisticsColumnsConfig.Mode.SPECIFIED) {
validateColumns(rowType, columnsConfig);
}
}

/**
* Validates that the specified columns exist in the schema and are of supported types.
*
* @param rowType the table schema
* @param columnsConfig the statistics columns configuration
* @throws InvalidConfigException if validation fails
*/
private static void validateColumns(RowType rowType, StatisticsColumnsConfig columnsConfig) {
Map<String, DataType> columnTypeMap = buildColumnTypeMap(rowType);

for (String columnName : columnsConfig.getColumns()) {
// Check if column exists
if (!columnTypeMap.containsKey(columnName)) {
throw new InvalidConfigException(
String.format(
"Column '%s' specified in statistics collection does not exist in table schema",
columnName));
}

// Check if column type is supported (whitelist approach)
DataType dataType = columnTypeMap.get(columnName);
if (!DataTypeChecks.isSupportedStatisticsType(dataType)) {
throw new InvalidConfigException(
String.format(
"Column '%s' of type '%s' is not supported for statistics collection. "
+ "Supported types are: BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, "
+ "FLOAT, DOUBLE, STRING, CHAR, DECIMAL, DATE, TIME, TIMESTAMP, "
+ "and TIMESTAMP_LTZ.",
columnName, dataType.asSummaryString()));
}
}
}

/**
* Builds a map from column name to data type for quick lookup.
*
* @param rowType the table schema
* @return map of column name to data type
*/
private static Map<String, DataType> buildColumnTypeMap(RowType rowType) {
return rowType.getFields().stream()
.collect(Collectors.toMap(DataField::getName, DataField::getType));
}
}
Loading