Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
28808ed
fix: address critical data safety, consistency, and performance issues
bootjp Mar 24, 2026
18672cb
feat: implement pebbleStore MVCC compaction and improve mvccStore com…
bootjp Mar 24, 2026
1b3b3f4
fix: skip txn internal keys in compaction and optimize txn codec enco…
bootjp Mar 24, 2026
dd5b560
fix: meta key namespace, lock resolver, forward retry, and decodeKeyU…
bootjp Mar 25, 2026
a4b3315
test: add critical test coverage for FSM abort, pebbleStore txn, and …
bootjp Mar 25, 2026
e327789
test: add ShardedCoordinator abort rollback and concurrent access tests
bootjp Mar 25, 2026
97560f3
Initial plan
Copilot Mar 25, 2026
e89d609
fix: correct iterator bounds in GetAt/LatestCommitTS and use pre-allo…
Copilot Mar 25, 2026
217891a
fix(gci): auto-fix import formatting in lock_resolver.go, sharded_coo…
Copilot Mar 25, 2026
12ee197
refactor: reduce cyclomatic and cognitive complexity via helper extra…
Copilot Mar 25, 2026
bfafb89
fix: resolve all 23 golangci-lint issues (cyclop, gocognit, gci, forc…
Copilot Mar 25, 2026
e156718
fix: update compaction tests for readTSCompacted watermark check
bootjp Mar 25, 2026
0bc48e8
fix: data race in ApplyMutations accessing lastCommitTS without mtx
bootjp Mar 25, 2026
33ee065
fix: address PR review comments
bootjp Mar 25, 2026
ac93800
fix: guard marshalRaftCommand against allocation overflow
bootjp Mar 26, 2026
4183e90
fix: address second round of PR review comments
bootjp Mar 26, 2026
27e34e4
Apply suggestion from @Copilot
bootjp Mar 26, 2026
7180133
fix: restore newVersions field in compactEntry struct
bootjp Mar 26, 2026
2914e7a
fix: address review comments — size guard, compactEntry, minRetainedTS
bootjp Mar 26, 2026
1605faa
fix: address review comments — scan bounds, encodeKey dedup, size check
bootjp Mar 26, 2026
8ea3ba9
fix: address review comments — compaction, ApplyMutations, resolver t…
bootjp Mar 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions adapter/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func (i *Internal) Forward(_ context.Context, req *pb.ForwardRequest) (*pb.Forwa
if i.raft.State() != raft.Leader {
return nil, errors.WithStack(ErrNotLeader)
}
if err := i.raft.VerifyLeader().Error(); err != nil {
return nil, errors.WithStack(ErrNotLeader)
}

if err := i.stampTimestamps(req); err != nil {
return &pb.ForwardResponse{
Expand Down
167 changes: 167 additions & 0 deletions docs/review_todo.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Review TODO

Critical and high-severity issues found during a comprehensive code review.
Items are ordered by priority within each section.

---

## 1. Data Loss

### ~~1.1 [Critical] `saveLastCommitTS` uses `pebble.NoSync` — timestamp rollback after crash~~ DONE

- **Status:** Fixed. Durable `lastCommitTS` is now guaranteed by the synced `ApplyMutations` `WriteBatch`; `saveLastCommitTS` remains a best-effort hint written with `pebble.NoSync`.

### 1.2 [Low — downgraded] Batch Apply in FSM allows partial application without rollback

- **File:** `kv/fsm.go:44-69`
- **Problem:** When a `RaftCommand` contains multiple requests, each is applied individually. If request N fails, requests 1..N-1 are already persisted with no rollback.
- **Analysis:** Downgraded from Critical. Batch items are independent raw writes from different clients. `applyRawBatch` returns per-item errors via `fsmApplyResponse`, so each caller receives their correct result. Partial success is by design for the raw batching optimization.
- **Remaining concern:** If a future code path batches requests that must be atomic, this would need revisiting.

### ~~1.3 [High] `pebbleStore.Compact()` is unimplemented — unbounded version accumulation~~ DONE

- **Status:** Fixed. Implemented MVCC GC for pebbleStore and `RetentionController` interface (`MinRetainedTS`/`SetMinRetainedTS`).

### ~~1.4 [High] Secondary commit is best-effort — lock residue on failure~~ DONE

- **Status:** Fixed. Added `LockResolver` background worker (`kv/lock_resolver.go`) that runs every 10s on each leader, scans `!txn|lock|` keys for expired locks, checks primary transaction status, and resolves (commit/abort) orphaned locks.

### ~~1.5 [High] `abortPreparedTxn` silently ignores errors~~ DONE

- **Status:** Fixed. Errors are now logged with full context (gid, primary_key, start_ts, abort_ts).

### ~~1.6 [High] MVCC compaction does not distinguish transaction internal keys~~ DONE

- **Status:** Fixed. Both mvccStore and pebbleStore compaction now skip keys with `!txn|` prefix.

---

## 2. Concurrency / Distributed Failures

### ~~2.1 [Critical] TOCTOU in `pebbleStore.ApplyMutations`~~ DONE

- **Status:** Fixed. `ApplyMutations` now holds `mtx.Lock()` from conflict check through batch commit.

### ~~2.2 [High] Leader proxy forward loop risk~~ DONE

- **Status:** Fixed. Added `forwardWithRetry` with `maxForwardRetries=3`. Each retry re-fetches leader address via `LeaderWithID()`. Returns immediately on `ErrLeaderNotFound`.

### ~~2.3 [High] Secondary commit failure leaves locks indefinitely~~ DONE

- **Status:** Fixed. (Same as 1.4) Background `LockResolver` worker resolves expired orphaned locks.

### ~~2.4 [Medium] `redirect` in Coordinate has no timeout~~ DONE

- **Status:** Fixed. Added 5s `context.WithTimeout` to redirect gRPC forward call.

### ~~2.5 [Medium] Proxy gRPC calls (`proxyRawGet` etc.) have no timeout~~ DONE

- **Status:** Fixed. Added 5s `context.WithTimeout` to `proxyRawGet`, `proxyRawScanAt`, and `proxyLatestCommitTS`.

### 2.6 [Low — downgraded] `GRPCConnCache` allows ConnFor after Close

- **File:** `kv/grpc_conn_cache.go`
- **Analysis:** Downgraded. `Close` properly closes all existing connections. Subsequent `ConnFor` lazily re-inits and creates fresh connections — this is by design and tested.

### ~~2.7 [Medium] `Forward` handler skips `VerifyLeader`~~ DONE

- **Status:** Fixed. Added `raft.VerifyLeader()` quorum check to `Forward` handler.

---

## 3. Performance

### ~~3.1 [Critical] `mvccStore.ScanAt` scans the entire treemap~~ DONE

- **Status:** Fixed. Replaced `tree.Each()` with `Iterator()` loop that breaks on limit and seeks via `Ceiling(start)`.

### ~~3.2 [Critical] PebbleStore uses unbounded iterators in `GetAt` / `LatestCommitTS`~~ DONE

- **Status:** Fixed. Both methods now use bounded `IterOptions` scoped to the target key.

### ~~3.3 [High] FSM double-deserialization for single requests~~ DONE

- **Status:** Fixed. Added prefix byte (`0x00` single, `0x01` batch) to `marshalRaftCommand` and `decodeRaftRequests` with legacy fallback.

### ~~3.4 [High] Excessive `pebble.Sync` on every write~~ DONE

- **Status:** Fixed. `PutAt`, `DeleteAt`, `ExpireAt` changed to `pebble.NoSync`. `ApplyMutations` retains `pebble.Sync`.

### 3.5 [High] `VerifyLeader` called on every read — network round-trip (deferred)

- **File:** `kv/shard_store.go:53-58`
- **Problem:** Each read triggers a quorum-based leader verification with network round-trip.
- **Trade-off:** A lease-based cache improves latency but widens the stale-read TOCTOU window (see 4.3). The current approach is the safe default — linearizable reads at the cost of one quorum RTT per read. Implementing leader leases is a major architectural decision requiring careful analysis of acceptable staleness bounds.

### ~~3.6 [High] `mvccStore.Compact` holds exclusive lock during full tree scan~~ DONE

- **Status:** Fixed. Split into 2 phases: scan under RLock, then apply updates in batched Lock/Unlock cycles (batch size 500).

### ~~3.7 [High] `isTxnInternalKey` allocates `[]byte` on every call (5x)~~ DONE

- **Status:** Fixed. Added package-level `var` for all prefix byte slices and common prefix fast-path check.

### ~~3.8 [Medium] txn codec `bytes.Buffer` allocation per encode~~ DONE

- **Status:** Fixed. `EncodeTxnMeta`, `encodeTxnLock`, `encodeTxnIntent` now use direct `make([]byte, size)` + `binary.BigEndian.PutUint64`.

### ~~3.9 [Medium] `decodeKey` copies key bytes on every iteration step~~ DONE

- **Status:** Fixed. Added `decodeKeyUnsafe` returning a zero-copy slice reference. Used in all temporary comparison sites (`GetAt`, `ExistsAt`, `skipToNextUserKey`, `LatestCommitTS`, `Compact`, reverse scan seek). `decodeKey` (copying) retained for `nextScannableUserKey`/`prevScannableUserKey` whose results are stored in `KVPair`.

---

## 4. Data Consistency

### ~~4.1 [High] pebbleStore key encoding ambiguity with meta keys~~ DONE

- **Status:** Fixed. Meta key changed to `\x00_meta_last_commit_ts` prefix. Added `isMetaKey()` helper for both new and legacy keys. `findMaxCommitTS()` migrates legacy key on startup. All scan/compact functions use `isMetaKey()` to skip meta keys.

### ~~4.2 [High] Write Skew not prevented in one-phase transactions~~ DONE

- **Status:** Fixed. Documented as Snapshot Isolation in `handleOnePhaseTxnRequest` and `MVCCStore.ApplyMutations` interface. Write skew is a known limitation; SSI requires read-set tracking at a higher layer.

### ~~4.3 [High] VerifyLeader-to-read TOCTOU allows stale reads~~ DONE (documented)

- **Status:** Documented as known limitation. The TOCTOU window between quorum-verified `VerifyLeader` and the read is inherently small. Full fix requires Raft ReadIndex protocol which is a significant architectural change. Added doc comment to `leaderOKForKey` explaining the trade-off.

### ~~4.4 [High] DynamoDB `ConditionCheck` does not prevent write skew~~ DONE

- **Status:** Already addressed. `buildConditionCheckLockRequest` writes a dummy Put (re-writing the current value) or Del for the checked key. This includes the key in the transaction's write set, so write-write conflict detection covers it.

### ~~4.5 [Medium] Cross-shard `ScanAt` does not guarantee a consistent snapshot~~ DONE

- **Status:** Documented. Added doc comment to `ShardStore.ScanAt` explaining the limitation and recommending transactions or a snapshot fence for callers requiring cross-shard consistency.

---

## 5. Test Coverage

Overall coverage: **60.5%** — **1,043 functions at 0%**.

### ~~5.1 [Critical] FSM Abort path entirely untested~~ DONE

- **Status:** Fixed. Added `kv/fsm_abort_test.go` with 10 tests: Prepare→Abort flow, commit rejection, lock/intent cleanup, rollback record verification, non-primary abort, timestamp validation, idempotent abort conflict, missing primary key, empty mutations.

### ~~5.2 [Critical] PebbleStore transaction functions entirely untested~~ DONE

- **Status:** Fixed. Added `store/lsm_store_txn_test.go` with 14 tests: ApplyMutations (put/delete/TTL/conflict/no-conflict/atomicity/lastCommitTS update), LatestCommitTS (single/multi/not-found), Compact (old version removal/newest retained/tombstone cleanup/meta key skip/txn internal key skip/multi-key).

### ~~5.3 [Critical] `Coordinate.Dispatch` untested~~ DONE

- **Status:** Fixed. Added `kv/coordinator_dispatch_test.go` with 6 tests: raw put, raw delete, one-phase txn, nil request, empty elems, startTS assignment.

### ~~5.4 [High] ShardedCoordinator Abort rollback flow untested~~ DONE

- **Status:** Fixed. Added `kv/sharded_coordinator_abort_test.go` testing that when Shard2 Prepare fails, Shard1's locks are cleaned up via Abort.

### 5.5 [High] Jepsen tests are single-shard, single-workload only

- **Current:** Append workload on one Raft group, 30s duration.
- **Needed:** Multi-shard transactions, CAS workload, longer duration (5-10 min).

### ~~5.6 [Medium] No concurrent access tests for ShardStore / ShardedCoordinator~~ DONE

- **Status:** Fixed. Expanded `store/mvcc_store_concurrency_test.go` from 1 to 8 tests with race detection: concurrent PutAt (different/same keys), concurrent GetAt+PutAt, concurrent ApplyMutations (single/multi-key), concurrent ScanAt+PutAt, scan snapshot consistency.

### 5.7 [Medium] No error-path tests (I/O failure, corrupt data, gRPC connection failure)
7 changes: 6 additions & 1 deletion kv/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package kv
import (
"bytes"
"context"
"time"

pb "github.com/bootjp/elastickv/proto"
"github.com/cockroachdb/errors"
"github.com/hashicorp/raft"
)

const redirectForwardTimeout = 5 * time.Second

func NewCoordinator(txm Transactional, r *raft.Raft) *Coordinate {
return &Coordinate{
transactionManager: txm,
Expand Down Expand Up @@ -237,7 +240,9 @@ func (c *Coordinate) redirect(ctx context.Context, reqs *OperationGroup[OP]) (*C
}
}

r, err := cli.Forward(ctx, c.toForwardRequest(requests))
fwdCtx, cancel := context.WithTimeout(ctx, redirectForwardTimeout)
defer cancel()
r, err := cli.Forward(fwdCtx, c.toForwardRequest(requests))
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
181 changes: 181 additions & 0 deletions kv/coordinator_dispatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package kv

import (
"context"
"testing"

"github.com/bootjp/elastickv/store"
"github.com/stretchr/testify/require"
)

func TestCoordinateDispatch_RawPut(t *testing.T) {
t.Parallel()

st := store.NewMVCCStore()
fsm := NewKvFSM(st)
r, stop := newSingleRaft(t, "dispatch-raw-put", fsm)
t.Cleanup(stop)

tm := NewTransaction(r)
c := NewCoordinator(tm, r)
ctx := context.Background()

resp, err := c.Dispatch(ctx, &OperationGroup[OP]{
Elems: []*Elem[OP]{
{Op: Put, Key: []byte("k1"), Value: []byte("v1")},
},
})
require.NoError(t, err)
require.NotNil(t, resp)

// Verify the value was written.
val, err := st.GetAt(ctx, []byte("k1"), ^uint64(0))
require.NoError(t, err)
require.Equal(t, []byte("v1"), val)
}

func TestCoordinateDispatch_RawDel(t *testing.T) {
t.Parallel()

st := store.NewMVCCStore()
fsm := NewKvFSM(st)
r, stop := newSingleRaft(t, "dispatch-raw-del", fsm)
t.Cleanup(stop)

tm := NewTransaction(r)
c := NewCoordinator(tm, r)
ctx := context.Background()

// Write a value first.
_, err := c.Dispatch(ctx, &OperationGroup[OP]{
Elems: []*Elem[OP]{
{Op: Put, Key: []byte("k1"), Value: []byte("v1")},
},
})
require.NoError(t, err)

// Delete the value.
_, err = c.Dispatch(ctx, &OperationGroup[OP]{
Elems: []*Elem[OP]{
{Op: Del, Key: []byte("k1")},
},
})
require.NoError(t, err)

// Verify the key is gone.
_, err = st.GetAt(ctx, []byte("k1"), ^uint64(0))
require.ErrorIs(t, err, store.ErrKeyNotFound)
}

func TestCoordinateDispatch_TxnOnePhase(t *testing.T) {
t.Parallel()

st := store.NewMVCCStore()
fsm := NewKvFSM(st)
r, stop := newSingleRaft(t, "dispatch-txn", fsm)
t.Cleanup(stop)

tm := NewTransaction(r)
c := NewCoordinator(tm, r)
ctx := context.Background()

startTS := c.clock.Next()
resp, err := c.Dispatch(ctx, &OperationGroup[OP]{
IsTxn: true,
StartTS: startTS,
Elems: []*Elem[OP]{
{Op: Put, Key: []byte("a"), Value: []byte("1")},
{Op: Put, Key: []byte("b"), Value: []byte("2")},
},
})
require.NoError(t, err)
require.NotNil(t, resp)

// Both keys should be readable.
v1, err := st.GetAt(ctx, []byte("a"), ^uint64(0))
require.NoError(t, err)
require.Equal(t, []byte("1"), v1)

v2, err := st.GetAt(ctx, []byte("b"), ^uint64(0))
require.NoError(t, err)
require.Equal(t, []byte("2"), v2)
}

func TestCoordinateDispatch_NilRequest(t *testing.T) {
t.Parallel()

c := &Coordinate{
clock: NewHLC(),
}

_, err := c.Dispatch(context.Background(), nil)
require.ErrorIs(t, err, ErrInvalidRequest)
}

func TestCoordinateDispatch_EmptyElems(t *testing.T) {
t.Parallel()

c := &Coordinate{
clock: NewHLC(),
}

_, err := c.Dispatch(context.Background(), &OperationGroup[OP]{})
require.ErrorIs(t, err, ErrInvalidRequest)
}

func TestCoordinateDispatch_TxnAssignsStartTS(t *testing.T) {
t.Parallel()

tx := &stubTransactional{}
st := store.NewMVCCStore()
fsm := NewKvFSM(st)
r, stop := newSingleRaft(t, "dispatch-ts-assign", fsm)
t.Cleanup(stop)

c := &Coordinate{
transactionManager: tx,
raft: r,
clock: NewHLC(),
}

// When StartTS is 0 for a txn, Dispatch should assign one.
resp, err := c.Dispatch(context.Background(), &OperationGroup[OP]{
IsTxn: true,
Elems: []*Elem[OP]{
{Op: Put, Key: []byte("k"), Value: []byte("v")},
},
})
require.NoError(t, err)
require.NotNil(t, resp)
require.Equal(t, 1, tx.commits)

// The request should have a non-zero startTS.
require.Len(t, tx.reqs, 1)
require.Len(t, tx.reqs[0], 1)
require.Greater(t, tx.reqs[0][0].Ts, uint64(0))
}

func TestCoordinateDispatchRaw_CallsTransactionManager(t *testing.T) {
t.Parallel()

tx := &stubTransactional{}
st := store.NewMVCCStore()
fsm := NewKvFSM(st)
r, stop := newSingleRaft(t, "dispatch-raw-tm", fsm)
t.Cleanup(stop)

c := &Coordinate{
transactionManager: tx,
raft: r,
clock: NewHLC(),
}

resp, err := c.Dispatch(context.Background(), &OperationGroup[OP]{
Elems: []*Elem[OP]{
{Op: Put, Key: []byte("k"), Value: []byte("v")},
},
})
require.NoError(t, err)
require.NotNil(t, resp)
require.Equal(t, 1, tx.commits)
}
Loading
Loading