From aed28479e401f772f0fa466c77324c10479854b1 Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Tue, 10 Mar 2026 16:38:50 +0000 Subject: [PATCH 1/6] Add optional field for Statement.execute() --- arrow-format/FlightSql.proto | 2 ++ .../jdbc/client/ArrowFlightSqlClientHandler.java | 6 ++++++ .../apache/arrow/flight/sql/FlightSqlClient.java | 13 +++++++++++++ 3 files changed, 21 insertions(+) diff --git a/arrow-format/FlightSql.proto b/arrow-format/FlightSql.proto index 566230c2a6..55b0178d8a 100644 --- a/arrow-format/FlightSql.proto +++ b/arrow-format/FlightSql.proto @@ -1550,6 +1550,8 @@ message ActionCreatePreparedStatementResult { // If the query provided contained parameters, parameter_schema contains the // schema of the expected parameters. It should be an IPC-encapsulated Schema, as described in Schema.fbs. bytes parameter_schema = 3; + + optional bool is_update = 4; } /* diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java index f0ea284239..35b9d23dc7 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java @@ -456,6 +456,12 @@ public long executeUpdate() { @Override public StatementType getType() { + // If the server provided the is_update field, use it to determine the statement type + final Boolean isUpdate = preparedStatement.isUpdate(); + if (isUpdate != null) { + return isUpdate ? StatementType.UPDATE : StatementType.SELECT; + } + // Fall back to the legacy logic: check if the result set schema is empty final Schema schema = preparedStatement.getResultSetSchema(); return schema.getFields().isEmpty() ? StatementType.UPDATE : StatementType.SELECT; } diff --git a/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java b/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java index 623f9311e8..0af09faee1 100644 --- a/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java +++ b/flight/flight-sql/src/main/java/org/apache/arrow/flight/sql/FlightSqlClient.java @@ -1284,6 +1284,19 @@ public Schema getParameterSchema() { return parameterSchema; } + /** + * Returns whether the server indicated this prepared statement is an update query. + * + * @return true if the server indicated this is an update query, false if the server indicated + * this is a select query, or null if the server did not provide this information. + */ + public Boolean isUpdate() { + if (preparedStatementResult.hasIsUpdate()) { + return preparedStatementResult.getIsUpdate(); + } + return null; + } + /** Get the schema of the result set (should be identical to {@link #getResultSetSchema()}). */ public SchemaResult fetchSchema(CallOption... options) { checkOpen(); From 7c3fa3f0aa53415e44af47adf982535f8bead585 Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Tue, 10 Mar 2026 17:17:46 +0000 Subject: [PATCH 2/6] Add is_update field to ActionCreatePreparedStatementResult in PreparedStatement.execute() --- .../ArrowFlightJdbcFlightStreamResultSet.java | 2 +- ...owFlightJdbcVectorSchemaRootResultSet.java | 2 +- .../driver/jdbc/ArrowFlightMetaImpl.java | 23 ++++++++++++++----- .../client/ArrowFlightSqlClientHandler.java | 13 +++++++++++ 4 files changed, 32 insertions(+), 8 deletions(-) diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java index 2885f7895b..376e5b11e7 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java @@ -106,7 +106,7 @@ static ArrowFlightJdbcFlightStreamResultSet fromFlightInfo( final TimeZone timeZone = TimeZone.getDefault(); final QueryState state = new QueryState(); - final Meta.Signature signature = ArrowFlightMetaImpl.newSignature(null, null, null); + final Meta.Signature signature = ArrowFlightMetaImpl.newSignature(null, null, null, null); final AvaticaResultSetMetaData resultSetMetaData = new AvaticaResultSetMetaData(null, null, signature); diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcVectorSchemaRootResultSet.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcVectorSchemaRootResultSet.java index 49334951de..58a97e13bd 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcVectorSchemaRootResultSet.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcVectorSchemaRootResultSet.java @@ -73,7 +73,7 @@ public static ArrowFlightJdbcVectorSchemaRootResultSet fromVectorSchemaRoot( final TimeZone timeZone = TimeZone.getDefault(); final QueryState state = new QueryState(); - final Meta.Signature signature = ArrowFlightMetaImpl.newSignature(null, null, null); + final Meta.Signature signature = ArrowFlightMetaImpl.newSignature(null, null, null, null); final AvaticaResultSetMetaData resultSetMetaData = new AvaticaResultSetMetaData(null, null, signature); diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java index 64529b50c8..0d85b5eddb 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightMetaImpl.java @@ -53,7 +53,8 @@ public ArrowFlightMetaImpl(final AvaticaConnection connection) { } /** Construct a signature. */ - static Signature newSignature(final String sql, Schema resultSetSchema, Schema parameterSchema) { + static Signature newSignature( + final String sql, Schema resultSetSchema, Schema parameterSchema, Boolean isUpdate) { List columnMetaData = resultSetSchema == null ? new ArrayList<>() @@ -62,10 +63,17 @@ static Signature newSignature(final String sql, Schema resultSetSchema, Schema p parameterSchema == null ? new ArrayList<>() : ConvertUtils.convertArrowFieldsToAvaticaParameters(parameterSchema.getFields()); - StatementType statementType = - resultSetSchema == null || resultSetSchema.getFields().isEmpty() - ? StatementType.IS_DML - : StatementType.SELECT; + // If the server provided the is_update field, use it to determine the statement type + StatementType statementType; + if (isUpdate != null) { + statementType = isUpdate ? StatementType.IS_DML : StatementType.SELECT; + } else { + // Fall back to the legacy logic: check if the result set schema is empty + statementType = + resultSetSchema == null || resultSetSchema.getFields().isEmpty() + ? StatementType.IS_DML + : StatementType.SELECT; + } return new Signature( columnMetaData, sql, @@ -178,7 +186,10 @@ private PreparedStatement prepareForHandle(final String query, StatementHandle h ((ArrowFlightConnection) connection).getClientHandler().prepare(query); handle.signature = newSignature( - query, preparedStatement.getDataSetSchema(), preparedStatement.getParameterSchema()); + query, + preparedStatement.getDataSetSchema(), + preparedStatement.getParameterSchema(), + preparedStatement.isUpdate()); statementHandlePreparedStatementMap.put(new StatementHandleKey(handle), preparedStatement); return preparedStatement; } diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java index 35b9d23dc7..91370f920d 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java @@ -388,6 +388,14 @@ public interface PreparedStatement extends AutoCloseable { */ Schema getParameterSchema(); + /** + * Gets whether this {@link PreparedStatement} is an update statement. + * + * @return {@code true} if this is an update statement, {@code false} if it's a query, or {@code + * null} if the server did not provide this information. + */ + Boolean isUpdate(); + void setParameters(VectorSchemaRoot parameters); @Override @@ -481,6 +489,11 @@ public void setParameters(VectorSchemaRoot parameters) { preparedStatement.setParameters(parameters); } + @Override + public Boolean isUpdate() { + return preparedStatement.isUpdate(); + } + @Override public void close() { try { From f2a4521eb654d8a1348e5c1db3f66996038e226b Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Tue, 10 Mar 2026 17:22:28 +0000 Subject: [PATCH 3/6] Add docs for new field --- arrow-format/FlightSql.proto | 3 +++ 1 file changed, 3 insertions(+) diff --git a/arrow-format/FlightSql.proto b/arrow-format/FlightSql.proto index 55b0178d8a..b1dc57b33b 100644 --- a/arrow-format/FlightSql.proto +++ b/arrow-format/FlightSql.proto @@ -1551,6 +1551,9 @@ message ActionCreatePreparedStatementResult { // schema of the expected parameters. It should be an IPC-encapsulated Schema, as described in Schema.fbs. bytes parameter_schema = 3; + // When set to true, the query should be executed with CommandPreparedStatementUpdate, + // when set to false, the query should be executed with CommandPreparedStatementQuery. + // If not set, the client can choose how to execute the query. optional bool is_update = 4; } From de67e7d3e968370b90d370fb77fb40c0155609a1 Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Wed, 11 Mar 2026 13:28:51 +0000 Subject: [PATCH 4/6] Add server side implementation in mock Flight SQL producer --- .../jdbc/utils/MockFlightSqlProducer.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/MockFlightSqlProducer.java b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/MockFlightSqlProducer.java index 45c2a96404..72b50a872b 100644 --- a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/MockFlightSqlProducer.java +++ b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/utils/MockFlightSqlProducer.java @@ -101,6 +101,7 @@ public final class MockFlightSqlProducer implements FlightSqlProducer { private final SqlInfoBuilder sqlInfoBuilder = new SqlInfoBuilder(); private final Map parameterSchemas = new HashMap<>(); private final Map>> expectedParameterValues = new HashMap<>(); + private final Map isUpdateMap = new HashMap<>(); private final Map actionTypeCounter = new HashMap<>(); @@ -176,6 +177,32 @@ public void addUpdateQuery(final String sqlCommand, final long updatedRows) { }); } + /** + * Registers a new {@link StatementType#SELECT} SQL query with is_update field set. + * + * @param sqlCommand the SQL command under which to register the new query. + * @param schema the schema to use for the query result. + * @param resultProviders the result provider for this query. + */ + public void addSelectQueryV2( + final String sqlCommand, + final Schema schema, + final List> resultProviders) { + addSelectQuery(sqlCommand, schema, resultProviders); + isUpdateMap.put(sqlCommand, false); + } + + /** + * Registers a new {@link StatementType#UPDATE} SQL query with is_update field set. + * + * @param sqlCommand the SQL command. + * @param updatedRows the number of rows affected. + */ + public void addUpdateQueryV2(final String sqlCommand, final long updatedRows) { + addUpdateQuery(sqlCommand, updatedRows); + isUpdateMap.put(sqlCommand, true); + } + /** * Adds a catalog query to the results. * @@ -247,6 +274,12 @@ public void createPreparedStatement( resultBuilder.setParameterSchema(ByteString.copyFrom(outputStream.toByteArray())); } + // Set is_update field if present + final Boolean isUpdate = isUpdateMap.get(query); + if (isUpdate != null) { + resultBuilder.setIsUpdate(isUpdate); + } + listener.onNext(new Result(pack(resultBuilder.build()).toByteArray())); } catch (final Throwable t) { listener.onError(t); From 1de96821250904f143683e2632f7dab359663d65 Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Wed, 11 Mar 2026 13:29:25 +0000 Subject: [PATCH 5/6] Add tests for new protocol version using PreparedStatement.execute() --- .../ArrowFlightPreparedStatementTest.java | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatementTest.java b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatementTest.java index 0369c3a162..bdd7d1bb0c 100644 --- a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatementTest.java +++ b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightPreparedStatementTest.java @@ -98,6 +98,39 @@ public void testSimpleQueryNoParameterBindingWithExecute() throws SQLException { } } + @Test + public void testSimpleQueryNoParameterBindingWithExecuteV2() throws SQLException { + final String query = "SELECT * FROM TEST_V2"; + final Schema schema = + new Schema(Collections.singletonList(Field.nullable("", Types.MinorType.INT.getType()))); + PRODUCER.addSelectQueryV2( + query, + schema, + Collections.singletonList( + listener -> { + try (final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) { + root.allocateNew(); + ((IntVector) root.getVector(0)).setSafe(0, 123); + root.setRowCount(1); + listener.start(root); + listener.putNext(); + } finally { + listener.completed(); + } + })); + try (final PreparedStatement preparedStatement = connection.prepareStatement(query)) { + boolean isResultSet = preparedStatement.execute(); + assertTrue(isResultSet); + final ResultSet resultSet = preparedStatement.getResultSet(); + assertTrue(resultSet.next()); + assertEquals(123, resultSet.getInt(1)); + assertFalse(resultSet.next()); + assertFalse(preparedStatement.getMoreResults()); + assertEquals(-1, preparedStatement.getUpdateCount()); + } + } + @Test public void testQueryWithParameterBinding() throws SQLException { final String query = "Fake query with parameters"; @@ -203,6 +236,20 @@ public void testUpdateQueryWithExecute() throws SQLException { } } + @Test + public void testUpdateQueryWithExecuteV2() throws SQLException { + String query = "Fake update with execute V2"; + PRODUCER.addUpdateQueryV2(query, /*updatedRows*/ 99); + try (final PreparedStatement stmt = connection.prepareStatement(query)) { + boolean isResultSet = stmt.execute(); + assertFalse(isResultSet); + int updated = stmt.getUpdateCount(); + assertEquals(99, updated); + assertFalse(stmt.getMoreResults()); + assertEquals(-1, stmt.getUpdateCount()); + } + } + @Test public void testUpdateQueryWithParameters() throws SQLException { String query = "Fake update with parameters"; From fbb0c8a34644450e4b2588cbcb71d4558c89d2e9 Mon Sep 17 00:00:00 2001 From: Pedro Matias Date: Wed, 11 Mar 2026 13:44:53 +0000 Subject: [PATCH 6/6] Add test for Statement.execute() with protocol changes --- .../jdbc/ArrowFlightStatementExecuteTest.java | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightStatementExecuteTest.java b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightStatementExecuteTest.java index 632cb0ba56..5844f95202 100644 --- a/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightStatementExecuteTest.java +++ b/flight/flight-sql-jdbc-core/src/test/java/org/apache/arrow/driver/jdbc/ArrowFlightStatementExecuteTest.java @@ -62,6 +62,9 @@ public class ArrowFlightStatementExecuteTest { private static final String SAMPLE_LARGE_UPDATE_QUERY = "UPDATE this_large_table SET this_large_field = that_large_field FROM this_large_test WHERE this_large_condition"; private static final long SAMPLE_LARGE_UPDATE_COUNT = Long.MAX_VALUE; + private static final String SAMPLE_QUERY_CMD_V2 = "SELECT * FROM this_test_v2"; + private static final String SAMPLE_LARGE_UPDATE_QUERY_V2 = + "UPDATE this_large_table_v2 SET this_large_field = that_large_field FROM this_large_test WHERE this_large_condition"; private static final MockFlightSqlProducer PRODUCER = new MockFlightSqlProducer(); @RegisterExtension @@ -96,6 +99,30 @@ public static void setUpBeforeClass() { })); PRODUCER.addUpdateQuery(SAMPLE_UPDATE_QUERY, SAMPLE_UPDATE_COUNT); PRODUCER.addUpdateQuery(SAMPLE_LARGE_UPDATE_QUERY, SAMPLE_LARGE_UPDATE_COUNT); + + // V2 queries with is_update field set + PRODUCER.addSelectQueryV2( + SAMPLE_QUERY_CMD_V2, + SAMPLE_QUERY_SCHEMA, + Collections.singletonList( + listener -> { + try (final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + final VectorSchemaRoot root = + VectorSchemaRoot.create(SAMPLE_QUERY_SCHEMA, allocator)) { + final UInt1Vector vector = (UInt1Vector) root.getVector(VECTOR_NAME); + IntStream.range(0, SAMPLE_QUERY_ROWS) + .forEach(index -> vector.setSafe(index, index)); + vector.setValueCount(SAMPLE_QUERY_ROWS); + root.setRowCount(SAMPLE_QUERY_ROWS); + listener.start(root); + listener.putNext(); + } catch (final Throwable throwable) { + listener.error(throwable); + } finally { + listener.completed(); + } + })); + PRODUCER.addUpdateQueryV2(SAMPLE_LARGE_UPDATE_QUERY_V2, SAMPLE_LARGE_UPDATE_COUNT); } @BeforeEach @@ -168,4 +195,42 @@ public void testUpdateCountShouldStartOnZero() throws SQLException { is(allOf(equalTo(statement.getLargeUpdateCount()), equalTo(0L)))); assertThat(statement.getResultSet(), is(nullValue())); } + + @Test + public void testExecuteShouldRunSelectQueryV2() throws SQLException { + assertThat(statement.execute(SAMPLE_QUERY_CMD_V2), is(true)); + final Set numbers = + IntStream.range(0, SAMPLE_QUERY_ROWS) + .boxed() + .map(Integer::byteValue) + .collect(Collectors.toCollection(HashSet::new)); + try (final ResultSet resultSet = statement.getResultSet()) { + final int columnCount = resultSet.getMetaData().getColumnCount(); + assertThat(columnCount, is(1)); + int rowCount = 0; + for (; resultSet.next(); rowCount++) { + assertThat(numbers.remove(resultSet.getByte(1)), is(true)); + } + assertThat(rowCount, is(equalTo(SAMPLE_QUERY_ROWS))); + } + assertThat(numbers, is(Collections.emptySet())); + assertThat( + (long) statement.getUpdateCount(), + is(allOf(equalTo(statement.getLargeUpdateCount()), equalTo(-1L)))); + } + + @Test + public void testExecuteShouldRunUpdateQueryForLargeUpdateV2() throws SQLException { + assertThat(statement.execute(SAMPLE_LARGE_UPDATE_QUERY_V2), is(false)); // UPDATE query. + final long updateCountSmall = statement.getUpdateCount(); + final long updateCountLarge = statement.getLargeUpdateCount(); + assertThat(updateCountLarge, is(equalTo(SAMPLE_LARGE_UPDATE_COUNT))); + assertThat( + updateCountSmall, + is( + allOf( + equalTo((long) AvaticaUtils.toSaturatedInt(updateCountLarge)), + not(equalTo(updateCountLarge))))); + assertThat(statement.getResultSet(), is(nullValue())); + } }