Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,20 @@ impl BroadcastIngesterCapacityScoreTask {
current_sources.insert(source_uid);
}

// When a source disappears (e.g. index deleted), broadcast open_shard_count=0 with a TTL
// so Chitchat auto-cleans the key. This is the only way routers learn to clear stale
// routing entries — Chitchat key removal doesn't reliably fire subscribers.
for removed_source in previous_sources.difference(&current_sources) {
let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, removed_source);
self.cluster.remove_self_key(&key).await;
let capacity = IngesterCapacityScore {
capacity_score,
open_shard_count: 0,
};
let value = serde_json::to_string(&capacity)
.expect("`IngesterCapacityScore` should be JSON serializable");
self.cluster
.set_self_key_value_delete_after_ttl(key, value)
.await;
}

current_sources
Expand Down Expand Up @@ -264,4 +275,44 @@ mod tests {
assert_eq!(deserialized.capacity_score, 6);
assert_eq!(deserialized.open_shard_count, 1);
}

#[tokio::test]
async fn test_removed_source_broadcasts_zero_with_ttl() {
let transport = ChannelTransport::default();
let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true)
.await
.unwrap();

let (_temp_dir, state) = IngesterState::for_test(cluster.clone()).await;
let task = BroadcastIngesterCapacityScoreTask {
cluster: cluster.clone(),
weak_state: state.weak(),
};

let index_uid = IndexUid::for_test("test-index", 0);
let source_uid = SourceUid {
index_uid: index_uid.clone(),
source_id: SourceId::from("test-source"),
};
let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, &source_uid);

// Cycle 1: source is alive.
let open_counts: Vec<(IndexUid, SourceId, usize)> =
vec![(index_uid.clone(), "test-source".into(), 3)];
let current = task
.broadcast_capacity(7, &open_counts, &BTreeSet::new())
.await;

let value = cluster.get_self_key_value(&key).await.unwrap();
let parsed: IngesterCapacityScore = serde_json::from_str(&value).unwrap();
assert_eq!(parsed.open_shard_count, 3);

// Cycle 2: source disappears. Broadcasts 0 with TTL, key still exists.
let _current = task.broadcast_capacity(7, &vec![], &current).await;

let value = cluster.get_self_key_value(&key).await.unwrap();
let parsed: IngesterCapacityScore = serde_json::from_str(&value).unwrap();
assert_eq!(parsed.capacity_score, 7);
assert_eq!(parsed.open_shard_count, 0);
}
}
11 changes: 6 additions & 5 deletions quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ use std::time::Duration;

use quickwit_proto::types::SourceUid;

pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(test) {
Duration::from_millis(50)
} else {
Duration::from_secs(5)
};
pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration =
if cfg!(any(test, feature = "testsuite")) {
Duration::from_millis(50)
} else {
Duration::from_secs(5)
};

pub use capacity_score::{
BroadcastIngesterCapacityScoreTask, IngesterCapacityScoreUpdate,
Expand Down
122 changes: 107 additions & 15 deletions quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};

use itertools::Itertools;
Expand All @@ -29,17 +30,16 @@ use crate::IngesterPool;
pub(super) struct IngesterNode {
pub node_id: NodeId,
pub index_uid: IndexUid,
#[allow(unused)]
pub source_id: SourceId,
/// Score from 0-10. Higher means more available capacity.
pub capacity_score: usize,
/// Number of open shards on this node for this (index, source) pair. Tiebreaker for power of
/// two choices comparison - we favor a node with more open shards.
pub open_shard_count: usize,
}

#[derive(Debug, Default)]
#[derive(Debug)]
pub(super) struct RoutingEntry {
pub index_uid: IndexUid,
pub nodes: HashMap<NodeId, IngesterNode>,
/// Whether this entry has been seeded from a control plane response. During a rolling
/// deployment, Chitchat broadcasts from already-upgraded nodes may populate the table
Expand All @@ -48,6 +48,16 @@ pub(super) struct RoutingEntry {
seeded_from_cp: bool,
}

impl RoutingEntry {
fn new(index_uid: IndexUid) -> Self {
Self {
index_uid,
nodes: HashMap::new(),
seeded_from_cp: false,
}
}
}

/// Given a slice of candidates, picks the better of two random choices.
/// Higher capacity_score wins; tiebreak on more open_shard_count (more landing spots).
fn power_of_two_choices<'a>(candidates: &[&'a IngesterNode]) -> &'a IngesterNode {
Expand Down Expand Up @@ -210,13 +220,26 @@ impl RoutingTable {
capacity_score: usize,
open_shard_count: usize,
) {
let key = (index_uid.index_id.to_string(), source_id.clone());

let entry = self.table.entry(key).or_default();
let key = (index_uid.index_id.to_string(), source_id);

let entry = self
.table
.entry(key)
.or_insert_with(|| RoutingEntry::new(index_uid.clone()));
match entry.index_uid.cmp(&index_uid) {
// If we receive an update for a new incarnation of the index, then we clear the entry.
Ordering::Less => {
entry.index_uid = index_uid.clone();
entry.nodes.clear();
entry.seeded_from_cp = false;
}
// If we receive an update for a previous incarnation of the index, then we ignore it.
Ordering::Greater => return,
Ordering::Equal => {}
}
let ingester_node = IngesterNode {
node_id: node_id.clone(),
index_uid,
source_id,
capacity_score,
open_shard_count,
};
Expand All @@ -233,6 +256,22 @@ impl RoutingTable {
source_id: SourceId,
shards: Vec<Shard>,
) {
let key = (index_uid.index_id.to_string(), source_id);
let entry = self
.table
.entry(key)
.or_insert_with(|| RoutingEntry::new(index_uid.clone()));
match entry.index_uid.cmp(&index_uid) {
// If we receive an update for a new incarnation of the index, then we clear the entry.
Ordering::Less => {
entry.index_uid = index_uid.clone();
entry.nodes.clear();
}
// If we receive an update for a previous incarnation of the index, then we ignore it.
Ordering::Greater => return,
Ordering::Equal => {}
}

let per_leader_count: HashMap<NodeId, usize> = shards
.iter()
.map(|shard| {
Expand All @@ -243,9 +282,6 @@ impl RoutingTable {
.into_grouping_map()
.sum();

let key = (index_uid.index_id.to_string(), source_id.clone());
let entry = self.table.entry(key).or_default();

for (node_id, open_shard_count) in per_leader_count {
entry
.nodes
Expand All @@ -254,7 +290,6 @@ impl RoutingTable {
.or_insert_with(|| IngesterNode {
node_id,
index_uid: index_uid.clone(),
source_id: source_id.clone(),
capacity_score: 5,
open_shard_count,
});
Expand Down Expand Up @@ -381,7 +416,7 @@ mod tests {
// Node with capacity_score=0 is not eligible.
table.apply_capacity_update(
"node-2".into(),
IndexUid::for_test("test-index", 0),
index_uid.clone(),
"test-source".into(),
0,
2,
Expand Down Expand Up @@ -511,21 +546,18 @@ mod tests {
let high = IngesterNode {
node_id: "high".into(),
index_uid: IndexUid::for_test("idx", 0),
source_id: "src".into(),
capacity_score: 9,
open_shard_count: 2,
};
let mid = IngesterNode {
node_id: "mid".into(),
index_uid: IndexUid::for_test("idx", 0),
source_id: "src".into(),
capacity_score: 5,
open_shard_count: 2,
};
let low = IngesterNode {
node_id: "low".into(),
index_uid: IndexUid::for_test("idx", 0),
source_id: "src".into(),
capacity_score: 1,
open_shard_count: 2,
};
Expand Down Expand Up @@ -621,4 +653,64 @@ mod tests {
"az_unaware"
);
}

#[test]
fn test_incarnation_check_clears_stale_nodes() {
let mut table = RoutingTable::default();
let key = ("test-index".to_string(), "test-source".to_string());

// Populate with incarnation 0: two nodes.
table.apply_capacity_update(
"node-1".into(),
IndexUid::for_test("test-index", 0),
"test-source".into(),
8,
3,
);
table.apply_capacity_update(
"node-2".into(),
IndexUid::for_test("test-index", 0),
"test-source".into(),
6,
2,
);
let entry = table.table.get(&key).unwrap();
assert_eq!(entry.nodes.len(), 2);
assert_eq!(entry.index_uid, IndexUid::for_test("test-index", 0));

// Capacity update with incarnation 1 clears stale nodes.
table.apply_capacity_update(
"node-3".into(),
IndexUid::for_test("test-index", 1),
"test-source".into(),
5,
1,
);
let entry = table.table.get(&key).unwrap();
assert_eq!(entry.nodes.len(), 1);
assert!(entry.nodes.contains_key("node-3"));
assert!(!entry.nodes.contains_key("node-1"));
assert!(!entry.nodes.contains_key("node-2"));
assert_eq!(entry.index_uid, IndexUid::for_test("test-index", 1));

// merge_from_shards with incarnation 2 clears stale nodes.
let shards = vec![Shard {
index_uid: Some(IndexUid::for_test("test-index", 2)),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(1u64)),
shard_state: ShardState::Open as i32,
leader_id: "node-4".to_string(),
..Default::default()
}];
table.merge_from_shards(
IndexUid::for_test("test-index", 2),
"test-source".into(),
shards,
);
let entry = table.table.get(&key).unwrap();
assert_eq!(entry.nodes.len(), 1);
assert!(entry.nodes.contains_key("node-4"));
assert!(!entry.nodes.contains_key("node-3"));
assert_eq!(entry.index_uid, IndexUid::for_test("test-index", 2));
}
}
1 change: 1 addition & 0 deletions quickwit/quickwit-integration-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ quickwit-cli = { workspace = true }
quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-config = { workspace = true, features = ["testsuite"] }
quickwit-indexing = { workspace = true, features = ["testsuite"] }
quickwit-ingest = { workspace = true, features = ["testsuite"] }
quickwit-metastore = { workspace = true, features = ["testsuite"] }
quickwit-opentelemetry = { workspace = true, features = ["testsuite"] }
quickwit-proto = { workspace = true, features = ["testsuite"] }
Expand Down
Loading
Loading