Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions cmd/server/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,29 @@ func joinClusterWaitError(err error) error {
return err
}

func setupFSMStore(cfg config, cleanup *internalutil.CleanupStack) (store.MVCCStore, error) {
var st store.MVCCStore
var err error
if cfg.raftDataDir != "" {
st, err = store.NewPebbleStore(filepath.Join(cfg.raftDataDir, "fsm.db"))
if err != nil {
return nil, errors.WithStack(err)
}
} else {
fsmDir, tmpErr := os.MkdirTemp("", "elastickv-fsm-*")
if tmpErr != nil {
return nil, errors.WithStack(tmpErr)
}
cleanup.Add(func() { os.RemoveAll(fsmDir) })
st, err = store.NewPebbleStore(fsmDir)
if err != nil {
return nil, errors.WithStack(err)
}
}
cleanup.Add(func() { st.Close() })
return st, nil
}

func setupStorage(dir string) (raft.LogStore, raft.StableStore, raft.SnapshotStore, error) {
if dir == "" {
return raft.NewInmemStore(), raft.NewInmemStore(), raft.NewInmemSnapshotStore(), nil
Expand Down Expand Up @@ -376,24 +399,10 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error {
return err
}

var st store.MVCCStore
if cfg.raftDataDir != "" {
st, err = store.NewPebbleStore(filepath.Join(cfg.raftDataDir, "fsm.db"))
if err != nil {
return errors.WithStack(err)
}
} else {
fsmDir, tmpErr := os.MkdirTemp("", "elastickv-fsm-*")
if tmpErr != nil {
return errors.WithStack(tmpErr)
}
cleanup.Add(func() { os.RemoveAll(fsmDir) })
st, err = store.NewPebbleStore(fsmDir)
if err != nil {
return errors.WithStack(err)
}
st, err := setupFSMStore(cfg, &cleanup)
if err != nil {
return err
}
cleanup.Add(func() { st.Close() })
fsm := kv.NewKvFSM(st)
readTracker := kv.NewActiveTimestampTracker()

Expand Down
23 changes: 13 additions & 10 deletions kv/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,20 @@ func decodeRaftRequests(data []byte) ([]*pb.Request, error) {
}
return cmd.Requests, nil
default:
// Legacy format (no prefix byte) — try both for backward compatibility.
cmd := &pb.RaftCommand{}
if err := proto.Unmarshal(data, cmd); err == nil && len(cmd.Requests) > 0 {
return cmd.Requests, nil
}
req := &pb.Request{}
if err := proto.Unmarshal(data, req); err != nil {
return nil, errors.WithStack(err)
}
return []*pb.Request{req}, nil
return decodeLegacyRaftRequest(data)
}
}

func decodeLegacyRaftRequest(data []byte) ([]*pb.Request, error) {
cmd := &pb.RaftCommand{}
if err := proto.Unmarshal(data, cmd); err == nil && len(cmd.Requests) > 0 {
return cmd.Requests, nil
}
req := &pb.Request{}
if err := proto.Unmarshal(data, req); err != nil {
return nil, errors.WithStack(err)
}
return []*pb.Request{req}, nil
}

func requestCommitTS(r *pb.Request) (uint64, error) {
Expand Down
10 changes: 6 additions & 4 deletions kv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ const (
lockResolverInterval = 10 * time.Second
// lockResolverBatchSize limits the number of locks scanned per group per cycle.
lockResolverBatchSize = 100
// txnNamespaceSentinel is appended to a namespace prefix key to produce a
// key that sorts just after all valid keys in that namespace.
txnNamespaceSentinel byte = 0xFF
)

// LockResolver periodically scans for expired transaction locks and resolves
Expand Down Expand Up @@ -92,8 +95,8 @@ func (lr *LockResolver) resolveGroupLocks(ctx context.Context, gid uint64, g *Sh
}

// Scan lock key range: [!txn|lock| ... !txn|lock|<max>)
lockStart := txnLockKey(nil) // "!txn|lock|"
lockEnd := append(txnLockKey(nil), byte(0xFF)) // one past the lock namespace
lockStart := txnLockKey(nil) // "!txn|lock|"
lockEnd := append(txnLockKey(nil), txnNamespaceSentinel) // one past the lock namespace

lockKVs, err := g.Store.ScanAt(ctx, lockStart, lockEnd, lockResolverBatchSize, math.MaxUint64)
if err != nil {
Expand All @@ -103,9 +106,8 @@ func (lr *LockResolver) resolveGroupLocks(ctx context.Context, gid uint64, g *Sh
var resolved, skipped int
for _, kvp := range lockKVs {
if ctx.Err() != nil {
return ctx.Err()
return errors.WithStack(ctx.Err())
}

userKey, ok := txnUserKeyFromLockKey(kvp.Key)
if !ok {
continue
Expand Down
2 changes: 1 addition & 1 deletion kv/sharded_coordinator_abort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"testing"

pb "github.com/bootjp/elastickv/proto"
"github.com/bootjp/elastickv/distribution"
pb "github.com/bootjp/elastickv/proto"
"github.com/bootjp/elastickv/store"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
Expand Down
9 changes: 6 additions & 3 deletions kv/txn_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ const (

const txnLockFlagPrimary byte = 0x01

// uint64FieldSize is the byte size of a serialized uint64 field.
const uint64FieldSize = 8

// TxnMeta is embedded into transactional raft log requests via a synthetic
// mutation (key prefix "!txn|meta|"). It is not persisted in the MVCC store.
type TxnMeta struct {
Expand All @@ -30,7 +33,7 @@ type TxnMeta struct {

func EncodeTxnMeta(m TxnMeta) []byte {
// version(1) + LockTTLms(8) + CommitTS(8) + primaryLen(8) + primaryKey
size := 1 + 8 + 8 + 8 + len(m.PrimaryKey)
size := 1 + uint64FieldSize + uint64FieldSize + uint64FieldSize + len(m.PrimaryKey)
b := make([]byte, size)
b[0] = txnMetaVersion
binary.BigEndian.PutUint64(b[1:], m.LockTTLms)
Expand Down Expand Up @@ -79,7 +82,7 @@ type txnLock struct {

func encodeTxnLock(l txnLock) []byte {
// version(1) + StartTS(8) + TTLExpireAt(8) + flags(1) + primaryLen(8) + primaryKey
size := 1 + 8 + 8 + 1 + 8 + len(l.PrimaryKey)
size := 1 + uint64FieldSize + uint64FieldSize + 1 + uint64FieldSize + len(l.PrimaryKey)
b := make([]byte, size)
b[0] = txnLockVersion
binary.BigEndian.PutUint64(b[1:], l.StartTS)
Expand Down Expand Up @@ -143,7 +146,7 @@ const (

func encodeTxnIntent(i txnIntent) []byte {
// version(1) + StartTS(8) + Op(1) + valLen(8) + value
size := 1 + 8 + 1 + 8 + len(i.Value)
size := 1 + uint64FieldSize + 1 + uint64FieldSize + len(i.Value)
b := make([]byte, size)
b[0] = txnIntentVersion
binary.BigEndian.PutUint64(b[1:], i.StartTS)
Expand Down
8 changes: 4 additions & 4 deletions kv/txn_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ var (
const txnStartTSSuffixLen = 8

func txnLockKey(userKey []byte) []byte {
k := make([]byte, 0, len(txnLockPrefix)+len(userKey))
k = append(k, txnLockPrefix...)
k := make([]byte, 0, len(txnLockPrefixBytes)+len(userKey))
k = append(k, txnLockPrefixBytes...)
k = append(k, userKey...)
return k
}

func txnIntentKey(userKey []byte) []byte {
k := make([]byte, 0, len(txnIntentPrefix)+len(userKey))
k = append(k, txnIntentPrefix...)
k := make([]byte, 0, len(txnIntentPrefixBytes)+len(userKey))
k = append(k, txnIntentPrefixBytes...)
k = append(k, userKey...)
return k
}
Expand Down
Loading
Loading