From a5fb8ea91113d642e27d59267ea4460479ed7875 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Mon, 16 Mar 2026 11:24:42 -0400 Subject: [PATCH 1/5] Fix index reincarnation bug --- quickwit/Cargo.lock | 1 + .../src/ingest_v2/broadcast/capacity_score.rs | 89 ++++++++++++++- .../src/ingest_v2/broadcast/mod.rs | 11 +- .../src/ingest_v2/routing_table.rs | 105 ++++++++++++++++-- .../quickwit-integration-tests/Cargo.toml | 1 + .../src/tests/ingest_v2_tests.rs | 102 +++++++++++++++++ .../tag_fields/0002_negative_tags.yaml | 8 +- .../scenarii/tag_fields/_setup.quickwit.yaml | 10 +- .../tag_fields/_teardown.quickwit.yaml | 2 +- 9 files changed, 298 insertions(+), 31 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 621246f4baf..6204f0ae0a1 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7024,6 +7024,7 @@ dependencies = [ "quickwit-common", "quickwit-config", "quickwit-indexing", + "quickwit-ingest", "quickwit-metastore", "quickwit-opentelemetry", "quickwit-proto", diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs index 062539db797..572c5c5288f 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs @@ -40,6 +40,7 @@ pub struct IngesterCapacityScore { pub struct BroadcastIngesterCapacityScoreTask { cluster: Cluster, weak_state: WeakIngesterState, + pending_removal: BTreeSet, } impl BroadcastIngesterCapacityScoreTask { @@ -47,6 +48,7 @@ impl BroadcastIngesterCapacityScoreTask { let mut broadcaster = Self { cluster, weak_state, + pending_removal: BTreeSet::new(), }; tokio::spawn(async move { broadcaster.run().await }) } @@ -102,7 +104,7 @@ impl BroadcastIngesterCapacityScoreTask { } async fn broadcast_capacity( - &self, + &mut self, capacity_score: usize, open_shard_counts: &OpenShardCounts, previous_sources: &BTreeSet, @@ -122,12 +124,38 @@ impl BroadcastIngesterCapacityScoreTask { let value = serde_json::to_string(&capacity) .expect("`IngesterCapacityScore` should be JSON serializable"); self.cluster.set_self_key_value(key, value).await; + self.pending_removal.remove(&source_uid); current_sources.insert(source_uid); } - for removed_source in previous_sources.difference(¤t_sources) { - let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, removed_source); - self.cluster.remove_self_key(&key).await; + let removed_sources: Vec = previous_sources + .difference(¤t_sources) + .cloned() + .collect(); + + // When a source disappears (e.g. index deleted), broadcast open_shard_count=0 for one + // cycle, then remove the key the second cycle. This is the only way routers learn to clear + // stale routing entries. + for removed_source in removed_sources { + if self.pending_removal.remove(&removed_source) { + // this is the second broadcast cycle that this source has been in the "removed" + // state, clean it up. + let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, &removed_source); + self.cluster.remove_self_key(&key).await; + } else { + // this is the first broadcast cycle that this key has been in the "removed" state, + // broadcast that it has no open shards. + let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, &removed_source); + let capacity = IngesterCapacityScore { + capacity_score, + open_shard_count: 0, + }; + let value = serde_json::to_string(&capacity) + .expect("`IngesterCapacityScore` should be JSON serializable"); + self.cluster.set_self_key_value(key, value).await; + self.pending_removal.insert(removed_source.clone()); + current_sources.insert(removed_source); + } } current_sources @@ -195,6 +223,7 @@ mod tests { let task = BroadcastIngesterCapacityScoreTask { cluster, weak_state, + pending_removal: BTreeSet::new(), }; assert!(task.snapshot().await.is_err()); } @@ -228,9 +257,10 @@ mod tests { assert_eq!(capacity_score, 6); - let task = BroadcastIngesterCapacityScoreTask { + let mut task = BroadcastIngesterCapacityScoreTask { cluster: cluster.clone(), weak_state: state.weak(), + pending_removal: BTreeSet::new(), }; let update_counter = Arc::new(AtomicUsize::new(0)); @@ -264,4 +294,53 @@ mod tests { assert_eq!(deserialized.capacity_score, 6); assert_eq!(deserialized.open_shard_count, 1); } + + #[tokio::test] + async fn test_removed_source_broadcasts_zero_then_deletes_key() { + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) + .await + .unwrap(); + + let (_temp_dir, state) = IngesterState::for_test(cluster.clone()).await; + let mut task = BroadcastIngesterCapacityScoreTask { + cluster: cluster.clone(), + weak_state: state.weak(), + pending_removal: BTreeSet::new(), + }; + + let index_uid = IndexUid::for_test("test-index", 0); + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: SourceId::from("test-source"), + }; + let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, &source_uid); + let empty_counts: Vec<(IndexUid, SourceId, usize)> = vec![]; + + // Cycle 1: source is alive. + let open_counts: Vec<(IndexUid, SourceId, usize)> = + vec![(index_uid.clone(), "test-source".into(), 3)]; + let current = task + .broadcast_capacity(7, &open_counts, &BTreeSet::new()) + .await; + + let value = cluster.get_self_key_value(&key).await.unwrap(); + let parsed: IngesterCapacityScore = serde_json::from_str(&value).unwrap(); + assert_eq!(parsed.open_shard_count, 3); + + // Cycle 2: source disappears. Broadcasts 0, key still exists. + let current = task.broadcast_capacity(7, &empty_counts, ¤t).await; + + assert!(task.pending_removal.contains(&source_uid)); + let value = cluster.get_self_key_value(&key).await.unwrap(); + let parsed: IngesterCapacityScore = serde_json::from_str(&value).unwrap(); + assert_eq!(parsed.capacity_score, 7); + assert_eq!(parsed.open_shard_count, 0); + + // Cycle 3: source still gone. Key removed. + let _current = task.broadcast_capacity(7, &empty_counts, ¤t).await; + + assert!(!task.pending_removal.contains(&source_uid)); + assert!(cluster.get_self_key_value(&key).await.is_none()); + } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs index b579382af78..15e75311993 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs @@ -20,11 +20,12 @@ use std::time::Duration; use quickwit_proto::types::SourceUid; -pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(test) { - Duration::from_millis(50) -} else { - Duration::from_secs(5) -}; +pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration = + if cfg!(any(test, feature = "testsuite")) { + Duration::from_millis(50) + } else { + Duration::from_secs(5) + }; pub use capacity_score::{ BroadcastIngesterCapacityScoreTask, IngesterCapacityScoreUpdate, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs index 529922d657a..f4bf58b9602 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; use itertools::Itertools; @@ -29,8 +30,6 @@ use crate::IngesterPool; pub(super) struct IngesterNode { pub node_id: NodeId, pub index_uid: IndexUid, - #[allow(unused)] - pub source_id: SourceId, /// Score from 0-10. Higher means more available capacity. pub capacity_score: usize, /// Number of open shards on this node for this (index, source) pair. Tiebreaker for power of @@ -40,6 +39,7 @@ pub(super) struct IngesterNode { #[derive(Debug, Default)] pub(super) struct RoutingEntry { + pub index_uid: Option, pub nodes: HashMap, /// Whether this entry has been seeded from a control plane response. During a rolling /// deployment, Chitchat broadcasts from already-upgraded nodes may populate the table @@ -71,6 +71,18 @@ fn pick_from(candidates: Vec<&IngesterNode>) -> Option<&IngesterNode> { } impl RoutingEntry { + /// Compares the incoming `IndexUid` against the stored incarnation. + /// - `Greater` or `Equal` when nothing is stored (treat as new). + /// - `Equal` when incarnations match — caller should apply the update. + /// - `Greater` when incoming is newer — caller should clear stale nodes, then apply. + /// - `Less` when incoming is older — caller should discard the update. + fn compare_incarnation(&self, incoming: &IndexUid) -> Ordering { + let Some(stored) = &self.index_uid else { + return Ordering::Greater; + }; + incoming.cmp(stored) + } + /// Pick an ingester node to persist the request to. Uses power of two choices based on reported /// ingester capacity, if more than one eligible node exists. Prefers nodes in the same /// availability zone, falling back to remote nodes. @@ -210,13 +222,21 @@ impl RoutingTable { capacity_score: usize, open_shard_count: usize, ) { - let key = (index_uid.index_id.to_string(), source_id.clone()); + let key = (index_uid.index_id.to_string(), source_id); let entry = self.table.entry(key).or_default(); + match entry.compare_incarnation(&index_uid) { + Ordering::Less => return, + Ordering::Greater => { + entry.nodes.clear(); + entry.seeded_from_cp = false; + } + Ordering::Equal => {} + } + entry.index_uid = Some(index_uid.clone()); let ingester_node = IngesterNode { node_id: node_id.clone(), index_uid, - source_id, capacity_score, open_shard_count, }; @@ -233,6 +253,16 @@ impl RoutingTable { source_id: SourceId, shards: Vec, ) { + let key = (index_uid.index_id.to_string(), source_id); + let entry = self.table.entry(key).or_default(); + + match entry.compare_incarnation(&index_uid) { + Ordering::Less => return, + Ordering::Greater => entry.nodes.clear(), + Ordering::Equal => {} + } + entry.index_uid = Some(index_uid.clone()); + let per_leader_count: HashMap = shards .iter() .map(|shard| { @@ -243,9 +273,6 @@ impl RoutingTable { .into_grouping_map() .sum(); - let key = (index_uid.index_id.to_string(), source_id.clone()); - let entry = self.table.entry(key).or_default(); - for (node_id, open_shard_count) in per_leader_count { entry .nodes @@ -254,7 +281,6 @@ impl RoutingTable { .or_insert_with(|| IngesterNode { node_id, index_uid: index_uid.clone(), - source_id: source_id.clone(), capacity_score: 5, open_shard_count, }); @@ -511,21 +537,18 @@ mod tests { let high = IngesterNode { node_id: "high".into(), index_uid: IndexUid::for_test("idx", 0), - source_id: "src".into(), capacity_score: 9, open_shard_count: 2, }; let mid = IngesterNode { node_id: "mid".into(), index_uid: IndexUid::for_test("idx", 0), - source_id: "src".into(), capacity_score: 5, open_shard_count: 2, }; let low = IngesterNode { node_id: "low".into(), index_uid: IndexUid::for_test("idx", 0), - source_id: "src".into(), capacity_score: 1, open_shard_count: 2, }; @@ -621,4 +644,64 @@ mod tests { "az_unaware" ); } + + #[test] + fn test_incarnation_check_clears_stale_nodes() { + let mut table = RoutingTable::default(); + let key = ("test-index".to_string(), "test-source".to_string()); + + // Populate with incarnation 0: two nodes. + table.apply_capacity_update( + "node-1".into(), + IndexUid::for_test("test-index", 0), + "test-source".into(), + 8, + 3, + ); + table.apply_capacity_update( + "node-2".into(), + IndexUid::for_test("test-index", 0), + "test-source".into(), + 6, + 2, + ); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 2); + assert_eq!(entry.index_uid, Some(IndexUid::for_test("test-index", 0))); + + // Capacity update with incarnation 1 clears stale nodes. + table.apply_capacity_update( + "node-3".into(), + IndexUid::for_test("test-index", 1), + "test-source".into(), + 5, + 1, + ); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 1); + assert!(entry.nodes.contains_key("node-3")); + assert!(!entry.nodes.contains_key("node-1")); + assert!(!entry.nodes.contains_key("node-2")); + assert_eq!(entry.index_uid, Some(IndexUid::for_test("test-index", 1))); + + // merge_from_shards with incarnation 2 clears stale nodes. + let shards = vec![Shard { + index_uid: Some(IndexUid::for_test("test-index", 2)), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1u64)), + shard_state: ShardState::Open as i32, + leader_id: "node-4".to_string(), + ..Default::default() + }]; + table.merge_from_shards( + IndexUid::for_test("test-index", 2), + "test-source".into(), + shards, + ); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 1); + assert!(entry.nodes.contains_key("node-4")); + assert!(!entry.nodes.contains_key("node-3")); + assert_eq!(entry.index_uid, Some(IndexUid::for_test("test-index", 2))); + } } diff --git a/quickwit/quickwit-integration-tests/Cargo.toml b/quickwit/quickwit-integration-tests/Cargo.toml index 55308cff556..e7f1dab23db 100644 --- a/quickwit/quickwit-integration-tests/Cargo.toml +++ b/quickwit/quickwit-integration-tests/Cargo.toml @@ -40,6 +40,7 @@ quickwit-cli = { workspace = true } quickwit-common = { workspace = true, features = ["testsuite"] } quickwit-config = { workspace = true, features = ["testsuite"] } quickwit-indexing = { workspace = true, features = ["testsuite"] } +quickwit-ingest = { workspace = true, features = ["testsuite"] } quickwit-metastore = { workspace = true, features = ["testsuite"] } quickwit-opentelemetry = { workspace = true, features = ["testsuite"] } quickwit-proto = { workspace = true, features = ["testsuite"] } diff --git a/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs b/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs index e998cff0b3c..b63058b3d9c 100644 --- a/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs @@ -1001,3 +1001,105 @@ async fn test_graceful_shutdown_no_data_loss() { .unwrap() .unwrap(); } + +/// Verifies that after deleting an index and recreating it with the same name, +/// ingest works correctly once the capacity broadcast has propagated (>2 broadcast +/// cycles). Uses 2 ingesters to exercise the Chitchat broadcast path. +#[tokio::test] +async fn test_ingest_after_index_recreate_multi_node() { + quickwit_common::setup_logging_for_tests(); + + let sandbox = ClusterSandboxBuilder::default() + .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) + .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) + .add_node([ + QuickwitService::ControlPlane, + QuickwitService::Metastore, + QuickwitService::Searcher, + ]) + .build_and_start() + .await; + + let index_id = "test-recreate-multi-node"; + let index_config = format!( + r#" + version: 0.8 + index_id: {index_id} + doc_mapping: + field_mappings: + - name: body + type: text + indexing_settings: + commit_timeout_secs: 1 + ingest_settings: + min_shards: 2 + "# + ); + + // Step 1: Create index and ingest to seed the routing table. + let original_metadata = sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .create(index_config.clone(), ConfigFormat::Yaml, false) + .await + .unwrap(); + + ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "first incarnation"}), + CommitType::Force, + ) + .await + .unwrap(); + + // Wait for the broadcast to propagate routing entries to all routers. + tokio::time::sleep(Duration::from_millis(60)).await; + + // Step 2: Delete the index. + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .delete(index_id, false) + .await + .unwrap(); + + // Step 3: Wait for 2+ broadcast cycles (50ms each with testsuite feature) so that + // ingesters broadcast open_shard_count=0 and routers clear stale entries. + tokio::time::sleep(Duration::from_millis(120)).await; + + // Step 4: Recreate with the same name — new incarnation. + let new_metadata = sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .create(index_config, ConfigFormat::Yaml, false) + .await + .unwrap(); + + assert_ne!( + original_metadata.index_uid.incarnation_id, + new_metadata.index_uid.incarnation_id + ); + + // Step 5: Ingest into the recreated index. If stale routing entries weren't + // cleared, this would fail with NoShardsAvailable after exhausting retries. + let ingest_resp = ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "second incarnation"}), + CommitType::Force, + ) + .await + .unwrap(); + assert_eq!(ingest_resp.num_ingested_docs, Some(1)); + + // Cleanup. + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .delete(index_id, false) + .await + .unwrap(); + + sandbox.shutdown().await.unwrap(); +} diff --git a/quickwit/rest-api-tests/scenarii/tag_fields/0002_negative_tags.yaml b/quickwit/rest-api-tests/scenarii/tag_fields/0002_negative_tags.yaml index 99f0e5ed285..f1f900c342c 100644 --- a/quickwit/rest-api-tests/scenarii/tag_fields/0002_negative_tags.yaml +++ b/quickwit/rest-api-tests/scenarii/tag_fields/0002_negative_tags.yaml @@ -1,23 +1,23 @@ # regression test for https://github.com/quickwit-oss/quickwit/issues/4698 -endpoint: tag-simple/search +endpoint: simple/search params: query: "tag:1" expected: num_hits: 3 --- -endpoint: tag-simple/search +endpoint: simple/search params: query: "-tag:2" expected: num_hits: 4 --- -endpoint: tag-simple/search +endpoint: simple/search params: query: "tag:2" expected: num_hits: 1 --- -endpoint: tag-simple/search +endpoint: simple/search params: query: "-tag:1" expected: diff --git a/quickwit/rest-api-tests/scenarii/tag_fields/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/tag_fields/_setup.quickwit.yaml index 4ae0b2eb465..1208ca48343 100644 --- a/quickwit/rest-api-tests/scenarii/tag_fields/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/tag_fields/_setup.quickwit.yaml @@ -4,14 +4,14 @@ endpoint: indexes/allowedtypes status_code: null --- method: DELETE -endpoint: indexes/tag-simple +endpoint: indexes/simple status_code: null --- method: POST endpoint: indexes/ json: version: "0.7" - index_id: tag-simple + index_id: simple doc_mapping: field_mappings: - name: seq @@ -21,7 +21,7 @@ json: tag_fields: ["tag"] --- method: POST -endpoint: tag-simple/ingest +endpoint: simple/ingest params: commit: force ndjson: @@ -29,7 +29,7 @@ ndjson: - {"seq": 2, "tag": 2} --- method: POST -endpoint: tag-simple/ingest +endpoint: simple/ingest params: commit: force ndjson: @@ -37,7 +37,7 @@ ndjson: - {"seq": 3, "tag": null} --- method: POST -endpoint: tag-simple/ingest +endpoint: simple/ingest params: commit: force ndjson: diff --git a/quickwit/rest-api-tests/scenarii/tag_fields/_teardown.quickwit.yaml b/quickwit/rest-api-tests/scenarii/tag_fields/_teardown.quickwit.yaml index 0c3ac8fd6a4..fa0ca391b51 100644 --- a/quickwit/rest-api-tests/scenarii/tag_fields/_teardown.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/tag_fields/_teardown.quickwit.yaml @@ -3,4 +3,4 @@ endpoint: indexes/allowedtypes status_code: null --- method: DELETE -endpoint: indexes/tag-simple +endpoint: indexes/simple From 37c92ab71ab2986ab297e2a943d96c7612abb40d Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 25 Mar 2026 14:12:20 -0400 Subject: [PATCH 2/5] Use chitchat TTL instead of manual deletion --- .../src/ingest_v2/broadcast/capacity_score.rs | 69 ++++++------------- 1 file changed, 21 insertions(+), 48 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs index 572c5c5288f..ad1ba5b148c 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs @@ -40,7 +40,6 @@ pub struct IngesterCapacityScore { pub struct BroadcastIngesterCapacityScoreTask { cluster: Cluster, weak_state: WeakIngesterState, - pending_removal: BTreeSet, } impl BroadcastIngesterCapacityScoreTask { @@ -48,7 +47,6 @@ impl BroadcastIngesterCapacityScoreTask { let mut broadcaster = Self { cluster, weak_state, - pending_removal: BTreeSet::new(), }; tokio::spawn(async move { broadcaster.run().await }) } @@ -104,7 +102,7 @@ impl BroadcastIngesterCapacityScoreTask { } async fn broadcast_capacity( - &mut self, + &self, capacity_score: usize, open_shard_counts: &OpenShardCounts, previous_sources: &BTreeSet, @@ -124,38 +122,23 @@ impl BroadcastIngesterCapacityScoreTask { let value = serde_json::to_string(&capacity) .expect("`IngesterCapacityScore` should be JSON serializable"); self.cluster.set_self_key_value(key, value).await; - self.pending_removal.remove(&source_uid); current_sources.insert(source_uid); } - let removed_sources: Vec = previous_sources - .difference(¤t_sources) - .cloned() - .collect(); - - // When a source disappears (e.g. index deleted), broadcast open_shard_count=0 for one - // cycle, then remove the key the second cycle. This is the only way routers learn to clear - // stale routing entries. - for removed_source in removed_sources { - if self.pending_removal.remove(&removed_source) { - // this is the second broadcast cycle that this source has been in the "removed" - // state, clean it up. - let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, &removed_source); - self.cluster.remove_self_key(&key).await; - } else { - // this is the first broadcast cycle that this key has been in the "removed" state, - // broadcast that it has no open shards. - let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, &removed_source); - let capacity = IngesterCapacityScore { - capacity_score, - open_shard_count: 0, - }; - let value = serde_json::to_string(&capacity) - .expect("`IngesterCapacityScore` should be JSON serializable"); - self.cluster.set_self_key_value(key, value).await; - self.pending_removal.insert(removed_source.clone()); - current_sources.insert(removed_source); - } + // When a source disappears (e.g. index deleted), broadcast open_shard_count=0 with a TTL + // so Chitchat auto-cleans the key. This is the only way routers learn to clear stale + // routing entries — Chitchat key removal doesn't reliably fire subscribers. + for removed_source in previous_sources.difference(¤t_sources) { + let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, removed_source); + let capacity = IngesterCapacityScore { + capacity_score, + open_shard_count: 0, + }; + let value = serde_json::to_string(&capacity) + .expect("`IngesterCapacityScore` should be JSON serializable"); + self.cluster + .set_self_key_value_delete_after_ttl(key, value) + .await; } current_sources @@ -223,7 +206,6 @@ mod tests { let task = BroadcastIngesterCapacityScoreTask { cluster, weak_state, - pending_removal: BTreeSet::new(), }; assert!(task.snapshot().await.is_err()); } @@ -257,10 +239,9 @@ mod tests { assert_eq!(capacity_score, 6); - let mut task = BroadcastIngesterCapacityScoreTask { + let task = BroadcastIngesterCapacityScoreTask { cluster: cluster.clone(), weak_state: state.weak(), - pending_removal: BTreeSet::new(), }; let update_counter = Arc::new(AtomicUsize::new(0)); @@ -296,17 +277,16 @@ mod tests { } #[tokio::test] - async fn test_removed_source_broadcasts_zero_then_deletes_key() { + async fn test_removed_source_broadcasts_zero_with_ttl() { let transport = ChannelTransport::default(); let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) .await .unwrap(); let (_temp_dir, state) = IngesterState::for_test(cluster.clone()).await; - let mut task = BroadcastIngesterCapacityScoreTask { + let task = BroadcastIngesterCapacityScoreTask { cluster: cluster.clone(), weak_state: state.weak(), - pending_removal: BTreeSet::new(), }; let index_uid = IndexUid::for_test("test-index", 0); @@ -315,7 +295,6 @@ mod tests { source_id: SourceId::from("test-source"), }; let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, &source_uid); - let empty_counts: Vec<(IndexUid, SourceId, usize)> = vec![]; // Cycle 1: source is alive. let open_counts: Vec<(IndexUid, SourceId, usize)> = @@ -328,19 +307,13 @@ mod tests { let parsed: IngesterCapacityScore = serde_json::from_str(&value).unwrap(); assert_eq!(parsed.open_shard_count, 3); - // Cycle 2: source disappears. Broadcasts 0, key still exists. - let current = task.broadcast_capacity(7, &empty_counts, ¤t).await; + // Cycle 2: source disappears. Broadcasts 0 with TTL, key still exists. + let empty_counts: Vec<(IndexUid, SourceId, usize)> = vec![]; + let _current = task.broadcast_capacity(7, &empty_counts, ¤t).await; - assert!(task.pending_removal.contains(&source_uid)); let value = cluster.get_self_key_value(&key).await.unwrap(); let parsed: IngesterCapacityScore = serde_json::from_str(&value).unwrap(); assert_eq!(parsed.capacity_score, 7); assert_eq!(parsed.open_shard_count, 0); - - // Cycle 3: source still gone. Key removed. - let _current = task.broadcast_capacity(7, &empty_counts, ¤t).await; - - assert!(!task.pending_removal.contains(&source_uid)); - assert!(cluster.get_self_key_value(&key).await.is_none()); } } From e20f9141cc74375fe94eae21e8b61100b9533633 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 25 Mar 2026 14:23:07 -0400 Subject: [PATCH 3/5] fix tests --- quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs index f4bf58b9602..83e5ab04e9c 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs @@ -407,7 +407,7 @@ mod tests { // Node with capacity_score=0 is not eligible. table.apply_capacity_update( "node-2".into(), - IndexUid::for_test("test-index", 0), + index_uid.clone(), "test-source".into(), 0, 2, From 3ea98e7197210b7693b7db0e06b5a51fb6feb549 Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 25 Mar 2026 15:17:25 -0400 Subject: [PATCH 4/5] PR comments --- .../src/ingest_v2/broadcast/capacity_score.rs | 3 +- .../src/ingest_v2/routing_table.rs | 65 +++++++++++-------- 2 files changed, 38 insertions(+), 30 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs index ad1ba5b148c..d37e12ad11c 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs @@ -308,8 +308,7 @@ mod tests { assert_eq!(parsed.open_shard_count, 3); // Cycle 2: source disappears. Broadcasts 0 with TTL, key still exists. - let empty_counts: Vec<(IndexUid, SourceId, usize)> = vec![]; - let _current = task.broadcast_capacity(7, &empty_counts, ¤t).await; + let _current = task.broadcast_capacity(7, &[], ¤t).await; let value = cluster.get_self_key_value(&key).await.unwrap(); let parsed: IngesterCapacityScore = serde_json::from_str(&value).unwrap(); diff --git a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs index 83e5ab04e9c..edd1b4a36aa 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs @@ -37,9 +37,9 @@ pub(super) struct IngesterNode { pub open_shard_count: usize, } -#[derive(Debug, Default)] +#[derive(Debug)] pub(super) struct RoutingEntry { - pub index_uid: Option, + pub index_uid: IndexUid, pub nodes: HashMap, /// Whether this entry has been seeded from a control plane response. During a rolling /// deployment, Chitchat broadcasts from already-upgraded nodes may populate the table @@ -48,6 +48,16 @@ pub(super) struct RoutingEntry { seeded_from_cp: bool, } +impl RoutingEntry { + fn new(index_uid: IndexUid) -> Self { + Self { + index_uid, + nodes: HashMap::new(), + seeded_from_cp: false, + } + } +} + /// Given a slice of candidates, picks the better of two random choices. /// Higher capacity_score wins; tiebreak on more open_shard_count (more landing spots). fn power_of_two_choices<'a>(candidates: &[&'a IngesterNode]) -> &'a IngesterNode { @@ -71,18 +81,6 @@ fn pick_from(candidates: Vec<&IngesterNode>) -> Option<&IngesterNode> { } impl RoutingEntry { - /// Compares the incoming `IndexUid` against the stored incarnation. - /// - `Greater` or `Equal` when nothing is stored (treat as new). - /// - `Equal` when incarnations match — caller should apply the update. - /// - `Greater` when incoming is newer — caller should clear stale nodes, then apply. - /// - `Less` when incoming is older — caller should discard the update. - fn compare_incarnation(&self, incoming: &IndexUid) -> Ordering { - let Some(stored) = &self.index_uid else { - return Ordering::Greater; - }; - incoming.cmp(stored) - } - /// Pick an ingester node to persist the request to. Uses power of two choices based on reported /// ingester capacity, if more than one eligible node exists. Prefers nodes in the same /// availability zone, falling back to remote nodes. @@ -224,16 +222,21 @@ impl RoutingTable { ) { let key = (index_uid.index_id.to_string(), source_id); - let entry = self.table.entry(key).or_default(); - match entry.compare_incarnation(&index_uid) { - Ordering::Less => return, - Ordering::Greater => { + let entry = self + .table + .entry(key) + .or_insert_with(|| RoutingEntry::new(index_uid.clone())); + match entry.index_uid.cmp(&index_uid) { + // If we receive an update for a new incarnation of the index, then we clear the entry. + Ordering::Less => { + entry.index_uid = index_uid.clone(); entry.nodes.clear(); entry.seeded_from_cp = false; } + // If we receive an update for a previous incarnation of the index, then we ignore it. + Ordering::Greater => return, Ordering::Equal => {} } - entry.index_uid = Some(index_uid.clone()); let ingester_node = IngesterNode { node_id: node_id.clone(), index_uid, @@ -254,14 +257,20 @@ impl RoutingTable { shards: Vec, ) { let key = (index_uid.index_id.to_string(), source_id); - let entry = self.table.entry(key).or_default(); - - match entry.compare_incarnation(&index_uid) { - Ordering::Less => return, - Ordering::Greater => entry.nodes.clear(), + let entry = self + .table + .entry(key) + .or_insert_with(|| RoutingEntry::new(index_uid.clone())); + match entry.index_uid.cmp(&index_uid) { + // If we receive an update for a new incarnation of the index, then we clear the entry. + Ordering::Less => { + entry.index_uid = index_uid.clone(); + entry.nodes.clear(); + } + // If we receive an update for a previous incarnation of the index, then we ignore it. + Ordering::Greater => return, Ordering::Equal => {} } - entry.index_uid = Some(index_uid.clone()); let per_leader_count: HashMap = shards .iter() @@ -667,7 +676,7 @@ mod tests { ); let entry = table.table.get(&key).unwrap(); assert_eq!(entry.nodes.len(), 2); - assert_eq!(entry.index_uid, Some(IndexUid::for_test("test-index", 0))); + assert_eq!(entry.index_uid, IndexUid::for_test("test-index", 0)); // Capacity update with incarnation 1 clears stale nodes. table.apply_capacity_update( @@ -682,7 +691,7 @@ mod tests { assert!(entry.nodes.contains_key("node-3")); assert!(!entry.nodes.contains_key("node-1")); assert!(!entry.nodes.contains_key("node-2")); - assert_eq!(entry.index_uid, Some(IndexUid::for_test("test-index", 1))); + assert_eq!(entry.index_uid, IndexUid::for_test("test-index", 1)); // merge_from_shards with incarnation 2 clears stale nodes. let shards = vec![Shard { @@ -702,6 +711,6 @@ mod tests { assert_eq!(entry.nodes.len(), 1); assert!(entry.nodes.contains_key("node-4")); assert!(!entry.nodes.contains_key("node-3")); - assert_eq!(entry.index_uid, Some(IndexUid::for_test("test-index", 2))); + assert_eq!(entry.index_uid, IndexUid::for_test("test-index", 2)); } } From d650ff3e5f809a6b26b32f32ac5f3c4547ef84cb Mon Sep 17 00:00:00 2001 From: Nadav Gov-Ari Date: Wed, 25 Mar 2026 15:22:57 -0400 Subject: [PATCH 5/5] fix vec issue --- .../quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs index d37e12ad11c..d2eede8cb88 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs @@ -308,7 +308,7 @@ mod tests { assert_eq!(parsed.open_shard_count, 3); // Cycle 2: source disappears. Broadcasts 0 with TTL, key still exists. - let _current = task.broadcast_capacity(7, &[], ¤t).await; + let _current = task.broadcast_capacity(7, &vec![], ¤t).await; let value = cluster.get_self_key_value(&key).await.unwrap(); let parsed: IngesterCapacityScore = serde_json::from_str(&value).unwrap();