Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions crates/core/common/src/incrementalizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl BlockNumForm {
/// ## Special cases
/// - If a join as an `on` condition that is an inequality on `_block_num`, e.g. `l._block_num <= r._block_num`, then Δ(L⋈R)=(L[t−1]​⋈ΔR)∪(ΔL⋈ΔR), a term is optimized away.
/// This is because the inequality can be relaxed to `start <= r_block_num`. More generally, if the `on` clause can be relaxed by a lower start bound, we can push it down and potentially eliminate a term. This is not yet implemented.
/// - If a join has a `r._block_num = l.block_num` condition, then Δ(L⋈R)=ΔL⋈ΔR. These joins may stack with each other and with linear operators, or on top of output of general joins. This special case is not yet implemented.
/// - If a join has a `l._block_num = r._block_num` condition, then Δ(L⋈R)=ΔL⋈ΔR. These joins may stack with each other and with linear operators, or on top of output of general joins.
///
/// ## Further reading
/// - The inner join update formula is well-known and commonly implemented in incremental view maintenance systems. For one academic reference see:
Expand Down Expand Up @@ -123,7 +123,13 @@ impl TreeNodeRewriter for Incrementalizer {
match incremental_op_kind(&node, BlockNumForm::Propagated)
.map_err(|e| DataFusionError::External(e.into()))?
{
IncrementalOpKind::Linear => Ok(Transformed::no(node)),
IncrementalOpKind::Linear | IncrementalOpKind::BlockNumEqJoin => {
// Linear ops and _block_num equality joins just push the current range
// to both children via normal tree recursion. For BlockNumEqJoin this works
// because _block_num equality guarantees temporal alignment:
// Δ(L⋈R) = ΔL⋈ΔR and History(L⋈R) = History(L)⋈History(R).
Ok(Transformed::no(node))
}
IncrementalOpKind::InnerJoin => {
let LogicalPlan::Join(join) = node else {
unreachable!("IncrementalOpKind::InnerJoin only returned for Join nodes")
Expand Down Expand Up @@ -279,6 +285,9 @@ pub enum NonIncrementalQueryError {
pub enum IncrementalOpKind {
Linear,
InnerJoin,
/// An inner join with `l._block_num = r._block_num` in its equi-join conditions.
/// Acts like a linear operator: the range filter can be pushed to both children.
BlockNumEqJoin,
Table,
}

Expand All @@ -305,12 +314,13 @@ pub fn incremental_op_kind(

// Joins
Join(join) => match join.join_type {
// TODO: detect and split out `l._block_num = r._block_num` joins
JoinType::Inner => Ok(InnerJoin),

// Semi-joins are just projections of inner joins
JoinType::LeftSemi => Ok(InnerJoin),
JoinType::RightSemi => Ok(InnerJoin),
JoinType::Inner | JoinType::LeftSemi | JoinType::RightSemi => {
if has_block_num_eq_condition(&join.on) {
Ok(BlockNumEqJoin)
} else {
Ok(InnerJoin)
}
}

// Outer and anti joins are not incremental
JoinType::Left
Expand Down Expand Up @@ -360,6 +370,16 @@ pub fn incremental_op_kind(
}
}

/// Returns true if any equi-join condition pair equates `_block_num` columns on both sides.
fn has_block_num_eq_condition(on: &[(Expr, Expr)]) -> bool {
on.iter()
.any(|(l, r)| is_block_num_col(l) && is_block_num_col(r))
}

fn is_block_num_col(expr: &Expr) -> bool {
matches!(expr, Expr::Column(c) if c.name == RESERVED_BLOCK_NUM_COLUMN_NAME)
}

fn empty_relation(schema: DFSchemaRef) -> EmptyRelation {
EmptyRelation {
produce_one_row: false,
Expand Down
23 changes: 23 additions & 0 deletions tests/config/packages/block_num_eq_join/amp.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { defineDataset } from "@edgeandnode/amp"

export default defineDataset(() => ({
name: "block_num_eq_join",
network: "anvil",
dependencies: {
anvil_rpc: "_/anvil_rpc@0.0.0",
},
tables: {
// Two tables with distinct per-block values, designed to be joined on block_num.
// left_val and right_val are non-join columns with predictable, different values
// per block, so result assertions can verify the join matched the correct rows.
lefty: {
sql: `SELECT block_num, CAST(block_num AS BIGINT) * 10 + 1 AS left_val FROM anvil_rpc.blocks`,
network: "anvil",
},
righty: {
sql: `SELECT block_num, CAST(block_num AS BIGINT) * 10 + 2 AS right_val FROM anvil_rpc.blocks`,
network: "anvil",
},
},
functions: {},
}))
5 changes: 5 additions & 0 deletions tests/config/packages/block_num_eq_join/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"name": "@amp-datasets/block-num-eq-join",
"private": true,
"type": "module"
}
4 changes: 4 additions & 0 deletions tests/config/packages/block_num_eq_join/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"extends": "../tsconfig.json",
"include": ["amp.config.ts"]
}
119 changes: 119 additions & 0 deletions tests/specs/streaming-join-block-num-eq-anvil.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Streaming block_num = block_num join test using Anvil
#
# Validates correctness of streaming joins where the join condition is
# `l.block_num = r.block_num` — the pattern targeted by the _block_num
# equality incrementalizer optimization (where Δ(L⋈R) = ΔL⋈ΔR).
#
# Two derived tables (lefty, righty) each carry a distinct computed column
# per block (left_val = block_num*10+1, right_val = block_num*10+2).
# A streaming query joins them on block_num. Result assertions verify
# the non-join columns, proving the join matched the correct rows on both
# sides — not just that the right number of rows appeared.
#
# Four incremental batches with varying sizes stress history/delta
# boundaries: as history grows (0→3→5→6→9), any leakage of
# history×delta or delta×history terms would surface as duplicates,
# and any mis-matched rows would show wrong left_val/right_val.

- anvil: {}

# ── Batch 1: genesis + 3 mined = blocks 0-3 ──────────────────────────

- name: mine_initial
anvil_mine: 3

- name: dump_anvil_rpc_initial
dataset: _/anvil_rpc@0.0.0
end: 3

- name: register_block_num_eq_join
dataset: block_num_eq_join
tag: "0.0.0"

- name: dump_derived_initial
dataset: _/block_num_eq_join@0.0.0
end: 3

- name: register_stream
stream: |
SELECT l.block_num, l.left_val, r.right_val
FROM block_num_eq_join.lefty l
JOIN block_num_eq_join.righty r ON l._block_num = r._block_num
SETTINGS stream = true

- name: take_batch_1
stream: register_stream
take: 4
results: |
[
{"block_num": 0, "left_val": 1, "right_val": 2},
{"block_num": 1, "left_val": 11, "right_val": 12},
{"block_num": 2, "left_val": 21, "right_val": 22},
{"block_num": 3, "left_val": 31, "right_val": 32}
]

# ── Batch 2: 2 new blocks (4-5), history = 0-3 ──────────────────────

- name: mine_batch_2
anvil_mine: 2

- name: dump_anvil_rpc_batch_2
dataset: _/anvil_rpc@0.0.0
end: 5

- name: dump_derived_batch_2
dataset: _/block_num_eq_join@0.0.0
end: 5

- name: take_batch_2
stream: register_stream
take: 2
results: |
[
{"block_num": 4, "left_val": 41, "right_val": 42},
{"block_num": 5, "left_val": 51, "right_val": 52}
]

# ── Batch 3: 1 new block (6), history = 0-5 ─────────────────────────

- name: mine_batch_3
anvil_mine: 1

- name: dump_anvil_rpc_batch_3
dataset: _/anvil_rpc@0.0.0
end: 6

- name: dump_derived_batch_3
dataset: _/block_num_eq_join@0.0.0
end: 6

- name: take_batch_3
stream: register_stream
take: 1
results: |
[
{"block_num": 6, "left_val": 61, "right_val": 62}
]

# ── Batch 4: 3 new blocks (7-9), history = 0-6 ──────────────────────

- name: mine_batch_4
anvil_mine: 3

- name: dump_anvil_rpc_batch_4
dataset: _/anvil_rpc@0.0.0
end: 9

- name: dump_derived_batch_4
dataset: _/block_num_eq_join@0.0.0
end: 9

- name: take_batch_4
stream: register_stream
take: 3
results: |
[
{"block_num": 7, "left_val": 71, "right_val": 72},
{"block_num": 8, "left_val": 81, "right_val": 82},
{"block_num": 9, "left_val": 91, "right_val": 92}
]
26 changes: 26 additions & 0 deletions tests/src/tests/it_streaming_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,29 @@ async fn streaming_join_with_reorg() {
.await
.expect("Failed to run streaming join with reorg spec");
}

#[tokio::test(flavor = "multi_thread")]
async fn streaming_join_block_num_eq() {
logging::init();

let test_ctx = TestCtxBuilder::new("streaming_join_block_num_eq")
.with_anvil_ipc()
.with_dataset_manifest("anvil_rpc")
.build()
.await
.expect("Failed to create test environment");

let mut client = test_ctx
.new_flight_client()
.await
.expect("Failed to connect FlightClient");

run_spec(
"streaming-join-block-num-eq-anvil",
&test_ctx,
&mut client,
None,
)
.await
.expect("Failed to run streaming join block_num eq spec");
}
Loading