Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ proptest = "1.0.0"
regex = "1.5.6"
criterion = { version = "0.7.0", features = ["async_tokio"] }
ldk-node-062 = { package = "ldk-node", version = "=0.6.2" }
ldk-node-070 = { package = "ldk-node", version = "=0.7.0" }

[target.'cfg(not(no_download))'.dev-dependencies]
electrsd = { version = "0.36.1", default-features = false, features = ["legacy", "esplora_a33e97e1", "corepc-node_27_2"] }
Expand Down
277 changes: 228 additions & 49 deletions src/io/sqlite_store/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,74 +9,167 @@ use lightning::io;
use rusqlite::Connection;

pub(super) fn migrate_schema(
connection: &mut Connection, kv_table_name: &str, from_version: u16, to_version: u16,
connection: &mut Connection, kv_table_name: &str, mut from_version: u16, to_version: u16,
) -> io::Result<()> {
assert!(from_version < to_version);
if from_version == 1 && to_version == 2 {
let tx = connection.transaction().map_err(|e| {
let msg = format!(
"Failed to migrate table {} from user_version {} to {}: {}",
kv_table_name, from_version, to_version, e
);
if from_version == 1 && to_version >= 2 {
migrate_v1_to_v2(connection, kv_table_name)?;
from_version = 2;
}
if from_version == 2 && to_version >= 3 {
migrate_v2_to_v3(connection, kv_table_name)?;
}

Ok(())
}

fn migrate_v1_to_v2(connection: &mut Connection, kv_table_name: &str) -> io::Result<()> {
let tx = connection.transaction().map_err(|e| {
let msg = format!("Failed to migrate table {} from v1 to v2: {}", kv_table_name, e);
io::Error::new(io::ErrorKind::Other, msg)
})?;

// Rename 'namespace' column to 'primary_namespace'
let sql = format!(
"ALTER TABLE {}
RENAME COLUMN namespace TO primary_namespace;",
kv_table_name
);

tx.execute(&sql, []).map_err(|e| {
let msg = format!("Failed to migrate table {} from v1 to v2: {}", kv_table_name, e);
io::Error::new(io::ErrorKind::Other, msg)
})?;

// Add new 'secondary_namespace' column
let sql = format!(
"ALTER TABLE {}
ADD secondary_namespace TEXT DEFAULT \"\" NOT NULL;",
kv_table_name
);

tx.execute(&sql, []).map_err(|e| {
let msg = format!("Failed to migrate table {} from v1 to v2: {}", kv_table_name, e);
io::Error::new(io::ErrorKind::Other, msg)
})?;

// Update user_version
tx.pragma(Some(rusqlite::DatabaseName::Main), "user_version", 2u16, |_| Ok(())).map_err(
|e| {
let msg = format!("Failed to upgrade user_version from 1 to 2: {}", e);
io::Error::new(io::ErrorKind::Other, msg)
})?;
},
)?;

tx.commit().map_err(|e| {
let msg = format!("Failed to migrate table {} from v1 to v2: {}", kv_table_name, e);
io::Error::new(io::ErrorKind::Other, msg)
})?;

Ok(())
}

fn migrate_v2_to_v3(connection: &mut Connection, kv_table_name: &str) -> io::Result<()> {
let map_err = |e: rusqlite::Error| -> io::Error {
let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e);
io::Error::new(io::ErrorKind::Other, msg)
};

// Check whether the primary key already includes secondary_namespace.
// Tables migrated from v1 have PK (primary_namespace, key) only — missing
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, I'm confused, why is this true? We explictly have

	// Add new 'secondary_namespace' column
	let sql = format!(
		"ALTER TABLE {}
			ADD secondary_namespace TEXT DEFAULT \"\" NOT NULL;",
		kv_table_name
	);

in migrate_v1_to_v2? Am I missing something, or is this Claude hallucinating?

Copy link
Contributor Author

@benthecarman benthecarman Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In v1 to v2 is added but its not a part of the primary key so we can't use ON CONFLICT on writes for just updating the value. Before we could use replace but now we want to keep the created_at date. This rewrites the table to make it a part of the primary key, otherwise we need to do a query inside to look up the created_at date when replacing. Could add a unique index instead but this felt cleaner

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still confused

This rewrites the table to make it a part of the primary key

But it doesn't?

			PRIMARY KEY (primary_namespace, secondary_namespace, key)

And arguably created_at really shouldn't be part of the key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the v1->v2 migration added secondary_namespace but didn't update the primary key, so databases that started with v1 had PK (primary_namespace, key) instead of PK (primary_namespace, secondary_namespace, key).

This issue didn't cause problems because we used INSERT OR REPLACE so we'd silently overwrite rows without errors (besides potentially losing data). However now we want to use ON CONFLICT so we don't replace the created_at and just update the value. This however now breaks for databases that started in v1 because we'll get conflicts even when the secondary namespace is different because the primary namespace and key are the primary key. So instead of silently replacing data, we now actually error because we're trying to do the ON CONFLICT updates

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, right. But can we check whether we already have the right primary key, before we initiate a potentially costly/laggy migration? Node that still have the v1 schema would need to have been upgraded since LDK Node v0.2. I doubt there would be (m)any nodes requiring this migration at all, given this is such an early version that almost nobody ran in production.

AFAICT you should be able to use PRAGMA table_info(table_name) and check the pk column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

// secondary_namespace. Tables created directly as v2 already have the correct
// PK (primary_namespace, secondary_namespace, key).
let needs_table_rewrite = {
let sql = format!("PRAGMA table_info({})", kv_table_name);
let mut stmt = connection.prepare(&sql).map_err(map_err)?;
let mut pk_cols: Vec<(i64, String)> = stmt
.query_map([], |row| Ok((row.get::<_, i64>(5)?, row.get::<_, String>(1)?)))
.map_err(map_err)?
.collect::<Result<Vec<_>, _>>()
.map_err(map_err)?
.into_iter()
.filter(|(pk, _)| *pk > 0)
.collect();
pk_cols.sort_by_key(|(pk, _)| *pk);
let pk_names: Vec<&str> = pk_cols.iter().map(|(_, name)| name.as_str()).collect();
pk_names != vec!["primary_namespace", "secondary_namespace", "key"]
};

let tx = connection.transaction().map_err(|e| {
let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e);
io::Error::new(io::ErrorKind::Other, msg)
})?;

if needs_table_rewrite {
// Full table rewrite to fix the primary key.
let old_table = format!("{}_v2_old", kv_table_name);

let rename_sql = format!("ALTER TABLE {} RENAME TO {}", kv_table_name, old_table);
tx.execute(&rename_sql, []).map_err(map_err)?;

// Rename 'namespace' column to 'primary_namespace'
let sql = format!(
"ALTER TABLE {}
RENAME COLUMN namespace TO primary_namespace;",
let create_table_sql = format!(
"CREATE TABLE {} (
primary_namespace TEXT NOT NULL,
secondary_namespace TEXT DEFAULT \"\" NOT NULL,
key TEXT NOT NULL CHECK (key <> ''),
value BLOB,
sort_order INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (primary_namespace, secondary_namespace, key)
)",
kv_table_name
);
tx.execute(&create_table_sql, []).map_err(map_err)?;

tx.execute(&sql, []).map_err(|e| {
let msg = format!(
"Failed to migrate table {} from user_version {} to {}: {}",
kv_table_name, from_version, to_version, e
);
io::Error::new(io::ErrorKind::Other, msg)
})?;
// Copy data and backfill sort_order from ROWID for relative ordering
let copy_sql = format!(
"INSERT INTO {} (primary_namespace, secondary_namespace, key, value, sort_order)
SELECT primary_namespace, secondary_namespace, key, value, ROWID FROM {}",
kv_table_name, old_table
);
tx.execute(&copy_sql, []).map_err(map_err)?;

// Add new 'secondary_namespace' column
let sql = format!(
"ALTER TABLE {}
ADD secondary_namespace TEXT DEFAULT \"\" NOT NULL;",
let drop_old_sql = format!("DROP TABLE {}", old_table);
tx.execute(&drop_old_sql, []).map_err(map_err)?;
} else {
// Primary key is already correct — just add the sort_order column and backfill.
let add_col_sql = format!(
"ALTER TABLE {} ADD COLUMN sort_order INTEGER NOT NULL DEFAULT 0",
kv_table_name
);
tx.execute(&add_col_sql, []).map_err(map_err)?;

tx.execute(&sql, []).map_err(|e| {
let msg = format!(
"Failed to migrate table {} from user_version {} to {}: {}",
kv_table_name, from_version, to_version, e
);
io::Error::new(io::ErrorKind::Other, msg)
})?;

// Update user_version
tx.pragma(Some(rusqlite::DatabaseName::Main), "user_version", to_version, |_| Ok(()))
.map_err(|e| {
let msg = format!(
"Failed to upgrade user_version from {} to {}: {}",
from_version, to_version, e
);
io::Error::new(io::ErrorKind::Other, msg)
})?;
let backfill_sql = format!("UPDATE {} SET sort_order = ROWID", kv_table_name);
tx.execute(&backfill_sql, []).map_err(map_err)?;
}

tx.commit().map_err(|e| {
let msg = format!(
"Failed to migrate table {} from user_version {} to {}: {}",
kv_table_name, from_version, to_version, e
);
// Create composite index for paginated listing
let sql = format!(
"CREATE INDEX idx_{}_paginated ON {} (primary_namespace, secondary_namespace, sort_order DESC, key ASC)",
kv_table_name, kv_table_name
);
tx.execute(&sql, []).map_err(map_err)?;

// Update user_version
tx.pragma(Some(rusqlite::DatabaseName::Main), "user_version", 3u16, |_| Ok(())).map_err(
|e| {
let msg = format!("Failed to upgrade user_version from 2 to 3: {}", e);
io::Error::new(io::ErrorKind::Other, msg)
})?;
}
},
)?;

tx.commit().map_err(|e| {
let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e);
io::Error::new(io::ErrorKind::Other, msg)
})?;

Ok(())
}

#[cfg(test)]
mod tests {
use std::fs;

use lightning::util::persist::KVStoreSync;
use lightning::util::persist::{KVStoreSync, PaginatedKVStoreSync};
use rusqlite::{named_params, Connection};

use crate::io::sqlite_store::SqliteStore;
Expand Down Expand Up @@ -128,7 +221,7 @@ mod tests {
let sql = format!(
"INSERT OR REPLACE INTO {} (namespace, key, value) VALUES (:namespace, :key, :value);",
kv_table_name
);
);
let mut stmt = connection.prepare_cached(&sql).unwrap();

stmt.execute(named_params! {
Expand Down Expand Up @@ -166,4 +259,90 @@ mod tests {
// Check we can continue to use the store just fine.
do_read_write_remove_list_persist(&store);
}

#[test]
fn rwrl_post_schema_2_migration() {
let old_schema_version = 2u16;

let mut temp_path = random_storage_path();
temp_path.push("rwrl_post_schema_2_migration");

let db_file_name = "test_db".to_string();
let kv_table_name = "test_table".to_string();

let test_ns = "testspace";
let test_sub = "testsub";

{
// Create a v2 database manually
fs::create_dir_all(temp_path.clone()).unwrap();
let mut db_file_path = temp_path.clone();
db_file_path.push(db_file_name.clone());

let connection = Connection::open(db_file_path.clone()).unwrap();

connection
.pragma(
Some(rusqlite::DatabaseName::Main),
"user_version",
old_schema_version,
|_| Ok(()),
)
.unwrap();

let sql = format!(
"CREATE TABLE IF NOT EXISTS {} (
primary_namespace TEXT NOT NULL,
secondary_namespace TEXT DEFAULT \"\" NOT NULL,
key TEXT NOT NULL CHECK (key <> ''),
value BLOB, PRIMARY KEY ( primary_namespace, secondary_namespace, key )
);",
kv_table_name
);
connection.execute(&sql, []).unwrap();

// Insert 3 rows in a known order
for i in 0..3 {
let key = format!("key_{}", i);
let sql = format!(
"INSERT INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:ns, :sub, :key, :value);",
kv_table_name
);
let mut stmt = connection.prepare_cached(&sql).unwrap();
stmt.execute(named_params! {
":ns": test_ns,
":sub": test_sub,
":key": key,
":value": vec![i as u8; 8],
})
.unwrap();
}
}

// Open with new code, triggering v2→v3 migration
let store = SqliteStore::new(temp_path, Some(db_file_name), Some(kv_table_name)).unwrap();

// Verify data survived
for i in 0..3 {
let key = format!("key_{}", i);
let data = store.read(test_ns, test_sub, &key).unwrap();
assert_eq!(data, vec![i as u8; 8]);
}

// Verify paginated listing works and returns entries in ROWID-backfilled order (newest first)
let response =
PaginatedKVStoreSync::list_paginated(&store, test_ns, test_sub, None).unwrap();
assert_eq!(response.keys.len(), 3);
// ROWIDs were 1, 2, 3 so sort_order was backfilled as 1, 2, 3; newest first
assert_eq!(response.keys, vec!["key_2", "key_1", "key_0"]);

// Verify we can write new entries and they get proper ordering
KVStoreSync::write(&store, test_ns, test_sub, "key_new", vec![99u8; 8]).unwrap();
let response =
PaginatedKVStoreSync::list_paginated(&store, test_ns, test_sub, None).unwrap();
assert_eq!(response.keys[0], "key_new");

// Check we can continue to use the store just fine.
do_read_write_remove_list_persist(&store);
}
}
Loading
Loading