diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 621246f4baf..6204f0ae0a1 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7024,6 +7024,7 @@ dependencies = [ "quickwit-common", "quickwit-config", "quickwit-indexing", + "quickwit-ingest", "quickwit-metastore", "quickwit-opentelemetry", "quickwit-proto", diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs index 062539db797..d2eede8cb88 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs @@ -125,9 +125,20 @@ impl BroadcastIngesterCapacityScoreTask { current_sources.insert(source_uid); } + // When a source disappears (e.g. index deleted), broadcast open_shard_count=0 with a TTL + // so Chitchat auto-cleans the key. This is the only way routers learn to clear stale + // routing entries — Chitchat key removal doesn't reliably fire subscribers. for removed_source in previous_sources.difference(¤t_sources) { let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, removed_source); - self.cluster.remove_self_key(&key).await; + let capacity = IngesterCapacityScore { + capacity_score, + open_shard_count: 0, + }; + let value = serde_json::to_string(&capacity) + .expect("`IngesterCapacityScore` should be JSON serializable"); + self.cluster + .set_self_key_value_delete_after_ttl(key, value) + .await; } current_sources @@ -264,4 +275,44 @@ mod tests { assert_eq!(deserialized.capacity_score, 6); assert_eq!(deserialized.open_shard_count, 1); } + + #[tokio::test] + async fn test_removed_source_broadcasts_zero_with_ttl() { + let transport = ChannelTransport::default(); + let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) + .await + .unwrap(); + + let (_temp_dir, state) = IngesterState::for_test(cluster.clone()).await; + let task = BroadcastIngesterCapacityScoreTask { + cluster: cluster.clone(), + weak_state: state.weak(), + }; + + let index_uid = IndexUid::for_test("test-index", 0); + let source_uid = SourceUid { + index_uid: index_uid.clone(), + source_id: SourceId::from("test-source"), + }; + let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, &source_uid); + + // Cycle 1: source is alive. + let open_counts: Vec<(IndexUid, SourceId, usize)> = + vec![(index_uid.clone(), "test-source".into(), 3)]; + let current = task + .broadcast_capacity(7, &open_counts, &BTreeSet::new()) + .await; + + let value = cluster.get_self_key_value(&key).await.unwrap(); + let parsed: IngesterCapacityScore = serde_json::from_str(&value).unwrap(); + assert_eq!(parsed.open_shard_count, 3); + + // Cycle 2: source disappears. Broadcasts 0 with TTL, key still exists. + let _current = task.broadcast_capacity(7, &vec![], ¤t).await; + + let value = cluster.get_self_key_value(&key).await.unwrap(); + let parsed: IngesterCapacityScore = serde_json::from_str(&value).unwrap(); + assert_eq!(parsed.capacity_score, 7); + assert_eq!(parsed.open_shard_count, 0); + } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs index b579382af78..15e75311993 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs @@ -20,11 +20,12 @@ use std::time::Duration; use quickwit_proto::types::SourceUid; -pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(test) { - Duration::from_millis(50) -} else { - Duration::from_secs(5) -}; +pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration = + if cfg!(any(test, feature = "testsuite")) { + Duration::from_millis(50) + } else { + Duration::from_secs(5) + }; pub use capacity_score::{ BroadcastIngesterCapacityScoreTask, IngesterCapacityScoreUpdate, diff --git a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs index 529922d657a..edd1b4a36aa 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; use itertools::Itertools; @@ -29,8 +30,6 @@ use crate::IngesterPool; pub(super) struct IngesterNode { pub node_id: NodeId, pub index_uid: IndexUid, - #[allow(unused)] - pub source_id: SourceId, /// Score from 0-10. Higher means more available capacity. pub capacity_score: usize, /// Number of open shards on this node for this (index, source) pair. Tiebreaker for power of @@ -38,8 +37,9 @@ pub(super) struct IngesterNode { pub open_shard_count: usize, } -#[derive(Debug, Default)] +#[derive(Debug)] pub(super) struct RoutingEntry { + pub index_uid: IndexUid, pub nodes: HashMap, /// Whether this entry has been seeded from a control plane response. During a rolling /// deployment, Chitchat broadcasts from already-upgraded nodes may populate the table @@ -48,6 +48,16 @@ pub(super) struct RoutingEntry { seeded_from_cp: bool, } +impl RoutingEntry { + fn new(index_uid: IndexUid) -> Self { + Self { + index_uid, + nodes: HashMap::new(), + seeded_from_cp: false, + } + } +} + /// Given a slice of candidates, picks the better of two random choices. /// Higher capacity_score wins; tiebreak on more open_shard_count (more landing spots). fn power_of_two_choices<'a>(candidates: &[&'a IngesterNode]) -> &'a IngesterNode { @@ -210,13 +220,26 @@ impl RoutingTable { capacity_score: usize, open_shard_count: usize, ) { - let key = (index_uid.index_id.to_string(), source_id.clone()); - - let entry = self.table.entry(key).or_default(); + let key = (index_uid.index_id.to_string(), source_id); + + let entry = self + .table + .entry(key) + .or_insert_with(|| RoutingEntry::new(index_uid.clone())); + match entry.index_uid.cmp(&index_uid) { + // If we receive an update for a new incarnation of the index, then we clear the entry. + Ordering::Less => { + entry.index_uid = index_uid.clone(); + entry.nodes.clear(); + entry.seeded_from_cp = false; + } + // If we receive an update for a previous incarnation of the index, then we ignore it. + Ordering::Greater => return, + Ordering::Equal => {} + } let ingester_node = IngesterNode { node_id: node_id.clone(), index_uid, - source_id, capacity_score, open_shard_count, }; @@ -233,6 +256,22 @@ impl RoutingTable { source_id: SourceId, shards: Vec, ) { + let key = (index_uid.index_id.to_string(), source_id); + let entry = self + .table + .entry(key) + .or_insert_with(|| RoutingEntry::new(index_uid.clone())); + match entry.index_uid.cmp(&index_uid) { + // If we receive an update for a new incarnation of the index, then we clear the entry. + Ordering::Less => { + entry.index_uid = index_uid.clone(); + entry.nodes.clear(); + } + // If we receive an update for a previous incarnation of the index, then we ignore it. + Ordering::Greater => return, + Ordering::Equal => {} + } + let per_leader_count: HashMap = shards .iter() .map(|shard| { @@ -243,9 +282,6 @@ impl RoutingTable { .into_grouping_map() .sum(); - let key = (index_uid.index_id.to_string(), source_id.clone()); - let entry = self.table.entry(key).or_default(); - for (node_id, open_shard_count) in per_leader_count { entry .nodes @@ -254,7 +290,6 @@ impl RoutingTable { .or_insert_with(|| IngesterNode { node_id, index_uid: index_uid.clone(), - source_id: source_id.clone(), capacity_score: 5, open_shard_count, }); @@ -381,7 +416,7 @@ mod tests { // Node with capacity_score=0 is not eligible. table.apply_capacity_update( "node-2".into(), - IndexUid::for_test("test-index", 0), + index_uid.clone(), "test-source".into(), 0, 2, @@ -511,21 +546,18 @@ mod tests { let high = IngesterNode { node_id: "high".into(), index_uid: IndexUid::for_test("idx", 0), - source_id: "src".into(), capacity_score: 9, open_shard_count: 2, }; let mid = IngesterNode { node_id: "mid".into(), index_uid: IndexUid::for_test("idx", 0), - source_id: "src".into(), capacity_score: 5, open_shard_count: 2, }; let low = IngesterNode { node_id: "low".into(), index_uid: IndexUid::for_test("idx", 0), - source_id: "src".into(), capacity_score: 1, open_shard_count: 2, }; @@ -621,4 +653,64 @@ mod tests { "az_unaware" ); } + + #[test] + fn test_incarnation_check_clears_stale_nodes() { + let mut table = RoutingTable::default(); + let key = ("test-index".to_string(), "test-source".to_string()); + + // Populate with incarnation 0: two nodes. + table.apply_capacity_update( + "node-1".into(), + IndexUid::for_test("test-index", 0), + "test-source".into(), + 8, + 3, + ); + table.apply_capacity_update( + "node-2".into(), + IndexUid::for_test("test-index", 0), + "test-source".into(), + 6, + 2, + ); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 2); + assert_eq!(entry.index_uid, IndexUid::for_test("test-index", 0)); + + // Capacity update with incarnation 1 clears stale nodes. + table.apply_capacity_update( + "node-3".into(), + IndexUid::for_test("test-index", 1), + "test-source".into(), + 5, + 1, + ); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 1); + assert!(entry.nodes.contains_key("node-3")); + assert!(!entry.nodes.contains_key("node-1")); + assert!(!entry.nodes.contains_key("node-2")); + assert_eq!(entry.index_uid, IndexUid::for_test("test-index", 1)); + + // merge_from_shards with incarnation 2 clears stale nodes. + let shards = vec![Shard { + index_uid: Some(IndexUid::for_test("test-index", 2)), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1u64)), + shard_state: ShardState::Open as i32, + leader_id: "node-4".to_string(), + ..Default::default() + }]; + table.merge_from_shards( + IndexUid::for_test("test-index", 2), + "test-source".into(), + shards, + ); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.len(), 1); + assert!(entry.nodes.contains_key("node-4")); + assert!(!entry.nodes.contains_key("node-3")); + assert_eq!(entry.index_uid, IndexUid::for_test("test-index", 2)); + } } diff --git a/quickwit/quickwit-integration-tests/Cargo.toml b/quickwit/quickwit-integration-tests/Cargo.toml index 55308cff556..e7f1dab23db 100644 --- a/quickwit/quickwit-integration-tests/Cargo.toml +++ b/quickwit/quickwit-integration-tests/Cargo.toml @@ -40,6 +40,7 @@ quickwit-cli = { workspace = true } quickwit-common = { workspace = true, features = ["testsuite"] } quickwit-config = { workspace = true, features = ["testsuite"] } quickwit-indexing = { workspace = true, features = ["testsuite"] } +quickwit-ingest = { workspace = true, features = ["testsuite"] } quickwit-metastore = { workspace = true, features = ["testsuite"] } quickwit-opentelemetry = { workspace = true, features = ["testsuite"] } quickwit-proto = { workspace = true, features = ["testsuite"] } diff --git a/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs b/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs index e998cff0b3c..b63058b3d9c 100644 --- a/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs @@ -1001,3 +1001,105 @@ async fn test_graceful_shutdown_no_data_loss() { .unwrap() .unwrap(); } + +/// Verifies that after deleting an index and recreating it with the same name, +/// ingest works correctly once the capacity broadcast has propagated (>2 broadcast +/// cycles). Uses 2 ingesters to exercise the Chitchat broadcast path. +#[tokio::test] +async fn test_ingest_after_index_recreate_multi_node() { + quickwit_common::setup_logging_for_tests(); + + let sandbox = ClusterSandboxBuilder::default() + .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) + .add_node([QuickwitService::Indexer, QuickwitService::Janitor]) + .add_node([ + QuickwitService::ControlPlane, + QuickwitService::Metastore, + QuickwitService::Searcher, + ]) + .build_and_start() + .await; + + let index_id = "test-recreate-multi-node"; + let index_config = format!( + r#" + version: 0.8 + index_id: {index_id} + doc_mapping: + field_mappings: + - name: body + type: text + indexing_settings: + commit_timeout_secs: 1 + ingest_settings: + min_shards: 2 + "# + ); + + // Step 1: Create index and ingest to seed the routing table. + let original_metadata = sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .create(index_config.clone(), ConfigFormat::Yaml, false) + .await + .unwrap(); + + ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "first incarnation"}), + CommitType::Force, + ) + .await + .unwrap(); + + // Wait for the broadcast to propagate routing entries to all routers. + tokio::time::sleep(Duration::from_millis(60)).await; + + // Step 2: Delete the index. + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .delete(index_id, false) + .await + .unwrap(); + + // Step 3: Wait for 2+ broadcast cycles (50ms each with testsuite feature) so that + // ingesters broadcast open_shard_count=0 and routers clear stale entries. + tokio::time::sleep(Duration::from_millis(120)).await; + + // Step 4: Recreate with the same name — new incarnation. + let new_metadata = sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .create(index_config, ConfigFormat::Yaml, false) + .await + .unwrap(); + + assert_ne!( + original_metadata.index_uid.incarnation_id, + new_metadata.index_uid.incarnation_id + ); + + // Step 5: Ingest into the recreated index. If stale routing entries weren't + // cleared, this would fail with NoShardsAvailable after exhausting retries. + let ingest_resp = ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "second incarnation"}), + CommitType::Force, + ) + .await + .unwrap(); + assert_eq!(ingest_resp.num_ingested_docs, Some(1)); + + // Cleanup. + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .delete(index_id, false) + .await + .unwrap(); + + sandbox.shutdown().await.unwrap(); +} diff --git a/quickwit/rest-api-tests/scenarii/tag_fields/0002_negative_tags.yaml b/quickwit/rest-api-tests/scenarii/tag_fields/0002_negative_tags.yaml index 99f0e5ed285..f1f900c342c 100644 --- a/quickwit/rest-api-tests/scenarii/tag_fields/0002_negative_tags.yaml +++ b/quickwit/rest-api-tests/scenarii/tag_fields/0002_negative_tags.yaml @@ -1,23 +1,23 @@ # regression test for https://github.com/quickwit-oss/quickwit/issues/4698 -endpoint: tag-simple/search +endpoint: simple/search params: query: "tag:1" expected: num_hits: 3 --- -endpoint: tag-simple/search +endpoint: simple/search params: query: "-tag:2" expected: num_hits: 4 --- -endpoint: tag-simple/search +endpoint: simple/search params: query: "tag:2" expected: num_hits: 1 --- -endpoint: tag-simple/search +endpoint: simple/search params: query: "-tag:1" expected: diff --git a/quickwit/rest-api-tests/scenarii/tag_fields/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/tag_fields/_setup.quickwit.yaml index 4ae0b2eb465..1208ca48343 100644 --- a/quickwit/rest-api-tests/scenarii/tag_fields/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/tag_fields/_setup.quickwit.yaml @@ -4,14 +4,14 @@ endpoint: indexes/allowedtypes status_code: null --- method: DELETE -endpoint: indexes/tag-simple +endpoint: indexes/simple status_code: null --- method: POST endpoint: indexes/ json: version: "0.7" - index_id: tag-simple + index_id: simple doc_mapping: field_mappings: - name: seq @@ -21,7 +21,7 @@ json: tag_fields: ["tag"] --- method: POST -endpoint: tag-simple/ingest +endpoint: simple/ingest params: commit: force ndjson: @@ -29,7 +29,7 @@ ndjson: - {"seq": 2, "tag": 2} --- method: POST -endpoint: tag-simple/ingest +endpoint: simple/ingest params: commit: force ndjson: @@ -37,7 +37,7 @@ ndjson: - {"seq": 3, "tag": null} --- method: POST -endpoint: tag-simple/ingest +endpoint: simple/ingest params: commit: force ndjson: diff --git a/quickwit/rest-api-tests/scenarii/tag_fields/_teardown.quickwit.yaml b/quickwit/rest-api-tests/scenarii/tag_fields/_teardown.quickwit.yaml index 0c3ac8fd6a4..fa0ca391b51 100644 --- a/quickwit/rest-api-tests/scenarii/tag_fields/_teardown.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/tag_fields/_teardown.quickwit.yaml @@ -3,4 +3,4 @@ endpoint: indexes/allowedtypes status_code: null --- method: DELETE -endpoint: indexes/tag-simple +endpoint: indexes/simple