Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions adapter/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func (i *Internal) Forward(_ context.Context, req *pb.ForwardRequest) (*pb.Forwa
if i.raft.State() != raft.Leader {
return nil, errors.WithStack(ErrNotLeader)
}
if err := i.raft.VerifyLeader().Error(); err != nil {
return nil, errors.WithStack(ErrNotLeader)
}

if err := i.stampTimestamps(req); err != nil {
return &pb.ForwardResponse{
Expand Down
43 changes: 26 additions & 17 deletions cmd/server/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,29 @@ func joinClusterWaitError(err error) error {
return err
}

func setupFSMStore(cfg config, cleanup *internalutil.CleanupStack) (store.MVCCStore, error) {
var st store.MVCCStore
var err error
if cfg.raftDataDir != "" {
st, err = store.NewPebbleStore(filepath.Join(cfg.raftDataDir, "fsm.db"))
if err != nil {
return nil, errors.WithStack(err)
}
} else {
fsmDir, tmpErr := os.MkdirTemp("", "elastickv-fsm-*")
if tmpErr != nil {
return nil, errors.WithStack(tmpErr)
}
cleanup.Add(func() { os.RemoveAll(fsmDir) })
st, err = store.NewPebbleStore(fsmDir)
if err != nil {
return nil, errors.WithStack(err)
}
}
cleanup.Add(func() { st.Close() })
return st, nil
}

func setupStorage(dir string) (raft.LogStore, raft.StableStore, raft.SnapshotStore, error) {
if dir == "" {
return raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), nil
Expand Down Expand Up @@ -376,24 +399,10 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error {
return err
}

var st store.MVCCStore
if cfg.raftDataDir != "" {
st, err = store.NewPebbleStore(filepath.Join(cfg.raftDataDir, "fsm.db"))
if err != nil {
return errors.WithStack(err)
}
} else {
fsmDir, tmpErr := os.MkdirTemp("", "elastickv-fsm-*")
if tmpErr != nil {
return errors.WithStack(tmpErr)
}
cleanup.Add(func() { os.RemoveAll(fsmDir) })
st, err = store.NewPebbleStore(fsmDir)
if err != nil {
return errors.WithStack(err)
}
st, err := setupFSMStore(cfg, &cleanup)
if err != nil {
return err
}
cleanup.Add(func() { st.Close() })
fsm := kv.NewKvFSM(st)
readTracker := kv.NewActiveTimestampTracker()

Expand Down
167 changes: 167 additions & 0 deletions docs/review_todo.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Review TODO

Critical and high-severity issues found during a comprehensive code review.
Items are ordered by priority within each section.

---

## 1. Data Loss

### ~~1.1 [Critical] `saveLastCommitTS` uses `pebble.NoSync` — timestamp rollback after crash~~ DONE

- **Status:** Fixed. `saveLastCommitTS` changed to `pebble.Sync`; `ApplyMutations` writes `lastCommitTS` atomically in the same `WriteBatch`.

### 1.2 [Low — downgraded] Batch Apply in FSM allows partial application without rollback

- **File:** `kv/fsm.go:44-69`
- **Problem:** When a `RaftCommand` contains multiple requests, each is applied individually. If request N fails, requests 1..N-1 are already persisted with no rollback.
- **Analysis:** Downgraded from Critical. Batch items are independent raw writes from different clients. `applyRawBatch` returns per-item errors via `fsmApplyResponse`, so each caller receives their correct result. Partial success is by design for the raw batching optimization.
- **Remaining concern:** If a future code path batches requests that must be atomic, this would need revisiting.

### ~~1.3 [High] `pebbleStore.Compact()` is unimplemented — unbounded version accumulation~~ DONE

- **Status:** Fixed. Implemented MVCC GC for pebbleStore and `RetentionController` interface (`MinRetainedTS`/`SetMinRetainedTS`).

### ~~1.4 [High] Secondary commit is best-effort — lock residue on failure~~ DONE

- **Status:** Fixed. Added `LockResolver` background worker (`kv/lock_resolver.go`) that runs every 10s on each leader, scans `!txn|lock|` keys for expired locks, checks primary transaction status, and resolves (commit/abort) orphaned locks.

### ~~1.5 [High] `abortPreparedTxn` silently ignores errors~~ DONE

- **Status:** Fixed. Errors are now logged with full context (gid, primary_key, start_ts, abort_ts).

### ~~1.6 [High] MVCC compaction does not distinguish transaction internal keys~~ DONE

- **Status:** Fixed. Both mvccStore and pebbleStore compaction now skip keys with `!txn|` prefix.

---

## 2. Concurrency / Distributed Failures

### ~~2.1 [Critical] TOCTOU in `pebbleStore.ApplyMutations`~~ DONE

- **Status:** Fixed. `ApplyMutations` now holds `mtx.Lock()` from conflict check through batch commit.

### ~~2.2 [High] Leader proxy forward loop risk~~ DONE

- **Status:** Fixed. Added `forwardWithRetry` with `maxForwardRetries=3`. Each retry re-fetches leader address via `LeaderWithID()`. Returns immediately on `ErrLeaderNotFound`.

### ~~2.3 [High] Secondary commit failure leaves locks indefinitely~~ DONE

- **Status:** Fixed. (Same as 1.4) Background `LockResolver` worker resolves expired orphaned locks.

### ~~2.4 [Medium] `redirect` in Coordinate has no timeout~~ DONE

- **Status:** Fixed. Added 5s `context.WithTimeout` to redirect gRPC forward call.

### ~~2.5 [Medium] Proxy gRPC calls (`proxyRawGet` etc.) have no timeout~~ DONE

- **Status:** Fixed. Added 5s `context.WithTimeout` to `proxyRawGet`, `proxyRawScanAt`, and `proxyLatestCommitTS`.

### 2.6 [Low — downgraded] `GRPCConnCache` allows ConnFor after Close

- **File:** `kv/grpc_conn_cache.go`
- **Analysis:** Downgraded. `Close` properly closes all existing connections. Subsequent `ConnFor` lazily re-inits and creates fresh connections — this is by design and tested.

### ~~2.7 [Medium] `Forward` handler skips `VerifyLeader`~~ DONE

- **Status:** Fixed. Added `raft.VerifyLeader()` quorum check to `Forward` handler.

---

## 3. Performance

### ~~3.1 [Critical] `mvccStore.ScanAt` scans the entire treemap~~ DONE

- **Status:** Fixed. Replaced `tree.Each()` with `Iterator()` loop that breaks on limit and seeks via `Ceiling(start)`.

### ~~3.2 [Critical] PebbleStore uses unbounded iterators in `GetAt` / `LatestCommitTS`~~ DONE

- **Status:** Fixed. Both methods now use bounded `IterOptions` scoped to the target key.

### ~~3.3 [High] FSM double-deserialization for single requests~~ DONE

- **Status:** Fixed. Added prefix byte (`0x00` single, `0x01` batch) to `marshalRaftCommand` and `decodeRaftRequests` with legacy fallback.

### ~~3.4 [High] Excessive `pebble.Sync` on every write~~ DONE

- **Status:** Fixed. `PutAt`, `DeleteAt`, `ExpireAt` changed to `pebble.NoSync`. `ApplyMutations` retains `pebble.Sync`.

### 3.5 [High] `VerifyLeader` called on every read — network round-trip (deferred)

- **File:** `kv/shard_store.go:53-58`
- **Problem:** Each read triggers a quorum-based leader verification with network round-trip.
- **Trade-off:** A lease-based cache improves latency but widens the stale-read TOCTOU window (see 4.3). The current approach is the safe default — linearizable reads at the cost of one quorum RTT per read. Implementing leader leases is a major architectural decision requiring careful analysis of acceptable staleness bounds.

### ~~3.6 [High] `mvccStore.Compact` holds exclusive lock during full tree scan~~ DONE

- **Status:** Fixed. Split into 2 phases: scan under RLock, then apply updates in batched Lock/Unlock cycles (batch size 500).

### ~~3.7 [High] `isTxnInternalKey` allocates `[]byte` on every call (5x)~~ DONE

- **Status:** Fixed. Added package-level `var` for all prefix byte slices and common prefix fast-path check.

### ~~3.8 [Medium] txn codec `bytes.Buffer` allocation per encode~~ DONE

- **Status:** Fixed. `EncodeTxnMeta`, `encodeTxnLock`, `encodeTxnIntent` now use direct `make([]byte, size)` + `binary.BigEndian.PutUint64`.

### ~~3.9 [Medium] `decodeKey` copies key bytes on every iteration step~~ DONE

- **Status:** Fixed. Added `decodeKeyUnsafe` returning a zero-copy slice reference. Used in all temporary comparison sites (`GetAt`, `ExistsAt`, `skipToNextUserKey`, `LatestCommitTS`, `Compact`, reverse scan seek). `decodeKey` (copying) retained for `nextScannableUserKey`/`prevScannableUserKey` whose results are stored in `KVPair`.

---

## 4. Data Consistency

### ~~4.1 [High] pebbleStore key encoding ambiguity with meta keys~~ DONE

- **Status:** Fixed. Meta key changed to `\x00_meta_last_commit_ts` prefix. Added `isMetaKey()` helper for both new and legacy keys. `findMaxCommitTS()` migrates legacy key on startup. All scan/compact functions use `isMetaKey()` to skip meta keys.

### ~~4.2 [High] Write Skew not prevented in one-phase transactions~~ DONE

- **Status:** Fixed. Documented as Snapshot Isolation in `handleOnePhaseTxnRequest` and `MVCCStore.ApplyMutations` interface. Write skew is a known limitation; SSI requires read-set tracking at a higher layer.

### ~~4.3 [High] VerifyLeader-to-read TOCTOU allows stale reads~~ DONE (documented)

- **Status:** Documented as known limitation. The TOCTOU window between quorum-verified `VerifyLeader` and the read is inherently small. Full fix requires Raft ReadIndex protocol which is a significant architectural change. Added doc comment to `leaderOKForKey` explaining the trade-off.

### ~~4.4 [High] DynamoDB `ConditionCheck` does not prevent write skew~~ DONE

- **Status:** Already addressed. `buildConditionCheckLockRequest` writes a dummy Put (re-writing the current value) or Del for the checked key. This includes the key in the transaction's write set, so write-write conflict detection covers it.

### ~~4.5 [Medium] Cross-shard `ScanAt` does not guarantee a consistent snapshot~~ DONE

- **Status:** Documented. Added doc comment to `ShardStore.ScanAt` explaining the limitation and recommending transactions or a snapshot fence for callers requiring cross-shard consistency.

---

## 5. Test Coverage

Overall coverage: **60.5%** — **1,043 functions at 0%**.

### ~~5.1 [Critical] FSM Abort path entirely untested~~ DONE

- **Status:** Fixed. Added `kv/fsm_abort_test.go` with 10 tests: Prepare→Abort flow, commit rejection, lock/intent cleanup, rollback record verification, non-primary abort, timestamp validation, idempotent abort conflict, missing primary key, empty mutations.

### ~~5.2 [Critical] PebbleStore transaction functions entirely untested~~ DONE

- **Status:** Fixed. Added `store/lsm_store_txn_test.go` with 14 tests: ApplyMutations (put/delete/TTL/conflict/no-conflict/atomicity/lastCommitTS update), LatestCommitTS (single/multi/not-found), Compact (old version removal/newest retained/tombstone cleanup/meta key skip/txn internal key skip/multi-key).

### ~~5.3 [Critical] `Coordinate.Dispatch` untested~~ DONE

- **Status:** Fixed. Added `kv/coordinator_dispatch_test.go` with 6 tests: raw put, raw delete, one-phase txn, nil request, empty elems, startTS assignment.

### ~~5.4 [High] ShardedCoordinator Abort rollback flow untested~~ DONE

- **Status:** Fixed. Added `kv/sharded_coordinator_abort_test.go` testing that when Shard2 Prepare fails, Shard1's locks are cleaned up via Abort.

### 5.5 [High] Jepsen tests are single-shard, single-workload only

- **Current:** Append workload on one Raft group, 30s duration.
- **Needed:** Multi-shard transactions, CAS workload, longer duration (5-10 min).

### ~~5.6 [Medium] No concurrent access tests for ShardStore / ShardedCoordinator~~ DONE

- **Status:** Fixed. Expanded `store/mvcc_store_concurrency_test.go` from 1 to 8 tests with race detection: concurrent PutAt (different/same keys), concurrent GetAt+PutAt, concurrent ApplyMutations (single/multi-key), concurrent ScanAt+PutAt, scan snapshot consistency.

### 5.7 [Medium] No error-path tests (I/O failure, corrupt data, gRPC connection failure)
7 changes: 6 additions & 1 deletion kv/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package kv
import (
"bytes"
"context"
"time"

pb "github.com/bootjp/elastickv/proto"
"github.com/cockroachdb/errors"
"github.com/hashicorp/raft"
)

const redirectForwardTimeout = 5 * time.Second

func NewCoordinator(txm Transactional, r *raft.Raft) *Coordinate {
return &Coordinate{
transactionManager: txm,
Expand Down Expand Up @@ -218,7 +221,9 @@ func (c *Coordinate) redirect(ctx context.Context, reqs *OperationGroup[OP]) (*C
}
}

r, err := cli.Forward(ctx, c.toForwardRequest(requests))
fwdCtx, cancel := context.WithTimeout(ctx, redirectForwardTimeout)
defer cancel()
r, err := cli.Forward(fwdCtx, c.toForwardRequest(requests))
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
Loading