From 656aa40d3d699ff9328317b3fc13ca206b24d800 Mon Sep 17 00:00:00 2001 From: ylembachar Date: Fri, 13 Mar 2026 21:59:34 +0100 Subject: [PATCH 1/9] feat: add inclusion_position and mark earlier index as tentative shielded --- .gitignore | 5 +- internal/data/db.sqlc.gen.go | 2 +- internal/data/metrics.sql.go | 53 ++++++++++-------- internal/data/models.sqlc.gen.go | 20 ++++--- internal/data/sql/queries/metrics.sql | 50 ++++++++--------- internal/metrics/tx_mapper_db.go | 56 +++++++++++++------ .../20260313160000_add_inclusion_position.sql | 9 +++ 7 files changed, 118 insertions(+), 77 deletions(-) create mode 100644 migrations/20260313160000_add_inclusion_position.sql diff --git a/.gitignore b/.gitignore index fa632b8..051890c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ build .envrc - -*.txt \ No newline at end of file +.env +*.txt +.idea \ No newline at end of file diff --git a/internal/data/db.sqlc.gen.go b/internal/data/db.sqlc.gen.go index 92f18a0..062888c 100644 --- a/internal/data/db.sqlc.gen.go +++ b/internal/data/db.sqlc.gen.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.26.0 +// sqlc v1.28.0 package data diff --git a/internal/data/metrics.sql.go b/internal/data/metrics.sql.go index a827bc4..935779d 100644 --- a/internal/data/metrics.sql.go +++ b/internal/data/metrics.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.26.0 +// sqlc v1.28.0 // source: metrics.sql package data @@ -40,16 +40,15 @@ func (q *Queries) CreateBlock(ctx context.Context, arg CreateBlockParams) error } const createDecryptedTX = `-- name: CreateDecryptedTX :exec -INSERT into decrypted_tx( - slot, - tx_index, - tx_hash, - tx_status, - decryption_key_id, - transaction_submitted_event_id -) -VALUES ($1, $2, $3, $4, $5, $6) -ON CONFLICT DO NOTHING +INSERT INTO decrypted_tx ( + slot, + tx_index, + tx_hash, + tx_status, + inclusion_position, + decryption_key_id, + transaction_submitted_event_id +) VALUES ($1, $2, $3, $4, $5, $6, $7) ` type CreateDecryptedTXParams struct { @@ -57,6 +56,7 @@ type CreateDecryptedTXParams struct { TxIndex int64 TxHash []byte TxStatus TxStatusVal + InclusionPosition string DecryptionKeyID int64 TransactionSubmittedEventID int64 } @@ -67,6 +67,7 @@ func (q *Queries) CreateDecryptedTX(ctx context.Context, arg CreateDecryptedTXPa arg.TxIndex, arg.TxHash, arg.TxStatus, + arg.InclusionPosition, arg.DecryptionKeyID, arg.TransactionSubmittedEventID, ) @@ -651,20 +652,22 @@ func (q *Queries) UpsertGraffitiIfShutterized(ctx context.Context, arg UpsertGra const upsertTX = `-- name: UpsertTX :exec INSERT INTO decrypted_tx ( - slot, - tx_index, - tx_hash, - tx_status, - decryption_key_id, - transaction_submitted_event_id, - block_number + slot, + tx_index, + tx_hash, + tx_status, + inclusion_position, + decryption_key_id, + transaction_submitted_event_id, + block_number ) -VALUES ($1, $2, $3, $4, $5, $6, $7) -ON CONFLICT (slot, tx_index) -DO UPDATE -SET tx_status = $4, - block_number = $7, - updated_at = NOW() +VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (slot, tx_index) + DO UPDATE + SET tx_status = $4, + inclusion_position = $5, + block_number = $8, + updated_at = NOW() ` type UpsertTXParams struct { @@ -672,6 +675,7 @@ type UpsertTXParams struct { TxIndex int64 TxHash []byte TxStatus TxStatusVal + InclusionPosition string DecryptionKeyID int64 TransactionSubmittedEventID int64 BlockNumber pgtype.Int8 @@ -683,6 +687,7 @@ func (q *Queries) UpsertTX(ctx context.Context, arg UpsertTXParams) error { arg.TxIndex, arg.TxHash, arg.TxStatus, + arg.InclusionPosition, arg.DecryptionKeyID, arg.TransactionSubmittedEventID, arg.BlockNumber, diff --git a/internal/data/models.sqlc.gen.go b/internal/data/models.sqlc.gen.go index ba47d06..1c89c37 100644 --- a/internal/data/models.sqlc.gen.go +++ b/internal/data/models.sqlc.gen.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.26.0 +// sqlc v1.28.0 package data @@ -14,14 +14,15 @@ import ( type TxStatusVal string const ( - TxStatusValIncluded TxStatusVal = "included" - TxStatusValNotincluded TxStatusVal = "not included" - TxStatusValNotdecrypted TxStatusVal = "not decrypted" - TxStatusValInvalid TxStatusVal = "invalid" - TxStatusValPending TxStatusVal = "pending" - TxStatusValShieldedinclusion TxStatusVal = "shielded inclusion" - TxStatusValUnshieldedinclusion TxStatusVal = "unshielded inclusion" - TxStatusValInvalidfeetoolow TxStatusVal = "invalid fee too low" + TxStatusValIncluded TxStatusVal = "included" + TxStatusValNotincluded TxStatusVal = "not included" + TxStatusValNotdecrypted TxStatusVal = "not decrypted" + TxStatusValInvalid TxStatusVal = "invalid" + TxStatusValPending TxStatusVal = "pending" + TxStatusValShieldedinclusion TxStatusVal = "shielded inclusion" + TxStatusValUnshieldedinclusion TxStatusVal = "unshielded inclusion" + TxStatusValInvalidfeetoolow TxStatusVal = "invalid fee too low" + TxStatusValTentativeshieldedinclusion TxStatusVal = "tentative shielded inclusion" ) func (e *TxStatusVal) Scan(src interface{}) error { @@ -122,6 +123,7 @@ type DecryptedTx struct { CreatedAt pgtype.Timestamptz UpdatedAt pgtype.Timestamptz BlockNumber pgtype.Int8 + InclusionPosition string } type DecryptionKey struct { diff --git a/internal/data/sql/queries/metrics.sql b/internal/data/sql/queries/metrics.sql index c905f24..cea3465 100644 --- a/internal/data/sql/queries/metrics.sql +++ b/internal/data/sql/queries/metrics.sql @@ -82,16 +82,15 @@ SELECT * FROM block WHERE slot = $1 FOR UPDATE; -- name: CreateDecryptedTX :exec -INSERT into decrypted_tx( - slot, - tx_index, - tx_hash, - tx_status, - decryption_key_id, - transaction_submitted_event_id -) -VALUES ($1, $2, $3, $4, $5, $6) -ON CONFLICT DO NOTHING; +INSERT INTO decrypted_tx ( + slot, + tx_index, + tx_hash, + tx_status, + inclusion_position, + decryption_key_id, + transaction_submitted_event_id +) VALUES ($1, $2, $3, $4, $5, $6, $7); -- name: CreateValidatorRegistryMessage :exec INSERT into validator_registration_message( @@ -161,20 +160,22 @@ ON CONFLICT DO NOTHING; -- name: UpsertTX :exec INSERT INTO decrypted_tx ( - slot, - tx_index, - tx_hash, - tx_status, - decryption_key_id, - transaction_submitted_event_id, - block_number + slot, + tx_index, + tx_hash, + tx_status, + inclusion_position, + decryption_key_id, + transaction_submitted_event_id, + block_number ) -VALUES ($1, $2, $3, $4, $5, $6, $7) -ON CONFLICT (slot, tx_index) -DO UPDATE -SET tx_status = $4, - block_number = $7, - updated_at = NOW(); +VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (slot, tx_index) + DO UPDATE + SET tx_status = $4, + inclusion_position = $5, + block_number = $8, + updated_at = NOW(); -- name: CreateTransactionSubmittedEventsSyncedUntil :exec INSERT INTO transaction_submitted_events_synced_until (block_hash, block_number) VALUES ($1, $2) @@ -206,5 +207,4 @@ WITH upserted AS ( updated_at = NOW() RETURNING 1 ) -SELECT EXISTS (SELECT 1 FROM upserted) AS did_upsert; - +SELECT EXISTS (SELECT 1 FROM upserted) AS did_upsert; \ No newline at end of file diff --git a/internal/metrics/tx_mapper_db.go b/internal/metrics/tx_mapper_db.go index adc84a5..118a8fb 100644 --- a/internal/metrics/tx_mapper_db.go +++ b/internal/metrics/tx_mapper_db.go @@ -367,6 +367,25 @@ func (tm *TxMapperDB) AddProposerDuties(ctx context.Context, epoch uint64) error return err } +const ( + InclPosUnknown = "unknown" + InclPosExact = "exact" + InclPosLater = "later" + InclPosEarlier = "earlier" + InclPosWrong = "wrong_slot" +) + +func classifyInclusion(expectedIndex int, receiptIndex uint) (data.TxStatusVal, string) { + switch { + case receiptIndex == uint(expectedIndex): + return data.TxStatusValShieldedinclusion, InclPosExact + case receiptIndex > uint(expectedIndex): + return data.TxStatusValUnshieldedinclusion, InclPosLater + default: + return data.TxStatusValTentativeshieldedinclusion, InclPosEarlier + } +} + func (tm *TxMapperDB) processTransactionExecution( ctx context.Context, te *TxExecution, @@ -413,6 +432,7 @@ func (tm *TxMapperDB) processTransactionExecution( TxIndex: txSubEvent.TxIndex, TxHash: common.Hash{}.Bytes(), TxStatus: data.TxStatusValNotdecrypted, + InclusionPosition: InclPosUnknown, DecryptionKeyID: decryptionKeyID, TransactionSubmittedEventID: txSubEvent.ID, }) @@ -453,6 +473,7 @@ func (tm *TxMapperDB) processTransactionExecution( TxIndex: txSubEvent.TxIndex, TxHash: decryptedTx.Hash().Bytes(), TxStatus: data.TxStatusValPending, + InclusionPosition: InclPosUnknown, DecryptionKeyID: decryptionKeyID, TransactionSubmittedEventID: txSubEvent.ID, }) @@ -470,6 +491,7 @@ func (tm *TxMapperDB) processTransactionExecution( TxIndex: txSubEvent.TxIndex, TxHash: decryptedTx.Hash().Bytes(), TxStatus: txStatus, + InclusionPosition: InclPosUnknown, DecryptionKeyID: decryptionKeyID, TransactionSubmittedEventID: txSubEvent.ID, }) @@ -486,6 +508,7 @@ func (tm *TxMapperDB) processTransactionExecution( TxIndex: txSubEvent.TxIndex, TxHash: decryptedTx.Hash().Bytes(), TxStatus: data.TxStatusValPending, + InclusionPosition: InclPosUnknown, DecryptionKeyID: decryptionKeyID, TransactionSubmittedEventID: txSubEvent.ID, }) @@ -514,6 +537,7 @@ func (tm *TxMapperDB) processTransactionExecution( TxIndex: txIndex, TxHash: txHash[:], TxStatus: data.TxStatusValNotincluded, + InclusionPosition: InclPosUnknown, DecryptionKeyID: decryptionKeyID, TransactionSubmittedEventID: txSubEventID, }) @@ -535,30 +559,30 @@ func (tm *TxMapperDB) processTransactionExecution( } inclusionSlot := utils.GetSlotForBlock(block.Header().Time, tm.genesisTimestamp, tm.slotDuration) - txStatus := data.TxStatusValShieldedinclusion - - log.Info().Uint("tx-index", receipt.TransactionIndex). - Uint64("inclusion-slot", inclusionSlot). - Msg("receipt data") - - log.Info().Int("index", index). - Int64("inclusion-slot", slot). - Msg("local data") - - if receipt.TransactionIndex != uint(index) { - log.Info().Uint("tx-index", receipt.TransactionIndex).Msg("transaction index mismatch") - txStatus = data.TxStatusValUnshieldedinclusion - } + txStatus, inclusionPosition := data.TxStatusValShieldedinclusion, InclPosUnknown if inclusionSlot != uint64(slot) { - log.Info().Int64("slot", slot).Msg("transaction slot mismatch") txStatus = data.TxStatusValUnshieldedinclusion + inclusionPosition = InclPosWrong + } else { + txStatus, inclusionPosition = classifyInclusion(index, receipt.TransactionIndex) } + log.Info(). + Int64("expected-slot", slot). + Uint64("receipt-slot", inclusionSlot). + Uint("expected-index", uint(index)). + Uint("receipt-index", receipt.TransactionIndex). + Hex("tx-hash", receipt.TxHash.Bytes()). + Str("inclusion_position", inclusionPosition). + Str("tx_status", string(txStatus)). + Msg("transaction receipt classified") + err = tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ Slot: slot, TxIndex: txIndex, TxHash: receipt.TxHash.Bytes(), TxStatus: txStatus, + InclusionPosition: inclusionPosition, DecryptionKeyID: decryptionKeyID, TransactionSubmittedEventID: txSubEventID, BlockNumber: pgtype.Int8{Int64: receipt.BlockNumber.Int64(), Valid: true}, @@ -815,4 +839,4 @@ func (tm *TxMapperDB) UpsertGraffitiIfShutterized(ctx context.Context, validator BlockNumber: blockNumber, }) return upserted, err -} +} \ No newline at end of file diff --git a/migrations/20260313160000_add_inclusion_position.sql b/migrations/20260313160000_add_inclusion_position.sql new file mode 100644 index 0000000..730b71e --- /dev/null +++ b/migrations/20260313160000_add_inclusion_position.sql @@ -0,0 +1,9 @@ +-- +goose Up +ALTER TABLE decrypted_tx + ADD COLUMN inclusion_position TEXT NOT NULL DEFAULT 'unknown'; + +ALTER TYPE tx_status_val ADD VALUE IF NOT EXISTS 'tentative shielded inclusion'; + +-- +goose Down +ALTER TABLE decrypted_tx DROP COLUMN inclusion_position; +-- cannot easily drop enum value; left as-is on down. \ No newline at end of file From b53c5aef5da90321fadeb4df905fb0936908b5be Mon Sep 17 00:00:00 2001 From: ylembachar Date: Fri, 13 Mar 2026 23:24:31 +0100 Subject: [PATCH 2/9] feat: improve inclusion classification and expected ordering --- internal/metrics/tx_mapper_db.go | 187 ++++++++++++++++++++++++++++--- internal/metrics/types.go | 2 + internal/watcher/blocks.go | 12 ++ 3 files changed, 184 insertions(+), 17 deletions(-) diff --git a/internal/metrics/tx_mapper_db.go b/internal/metrics/tx_mapper_db.go index 118a8fb..aaeb13f 100644 --- a/internal/metrics/tx_mapper_db.go +++ b/internal/metrics/tx_mapper_db.go @@ -43,6 +43,19 @@ type TxMapperDB struct { chainID int64 genesisTimestamp uint64 slotDuration uint64 + statusDone sync.Map +} + +// markDone records that a tx hash has been finalized (by block or receipt) +// to prevent double classification. +func (tm *TxMapperDB) markDone(hash common.Hash) { + tm.statusDone.Store(hash.Hex(), struct{}{}) +} + +// isDone reports whether a tx hash was already finalized. +func (tm *TxMapperDB) isDone(hash common.Hash) bool { + _, ok := tm.statusDone.Load(hash.Hex()) + return ok } type validatorData struct { @@ -386,6 +399,133 @@ func classifyInclusion(expectedIndex int, receiptIndex uint) (data.TxStatusVal, } } +type batchEntry struct { + hash common.Hash + status data.TxStatusVal + txIndex int64 + decryptionKeyID int64 + submittedEventID int64 +} + +func classifyWithPredecessors(expectedPos, blockPos int, entries []batchEntry) (data.TxStatusVal, + string) { + // later index always unshielded + if blockPos > expectedPos { + return data.TxStatusValUnshieldedinclusion, InclPosLater + } + + pos := InclPosExact + if blockPos < expectedPos { + pos = InclPosEarlier + } + + badPredecessor := false + allShieldedBefore := true + seenTentative := false + for i := 0; i < expectedPos; i++ { + switch entries[i].status { + case data.TxStatusValShieldedinclusion: + // ok + case data.TxStatusValTentativeshieldedinclusion: + allShieldedBefore = false + seenTentative = true + case data.TxStatusValUnshieldedinclusion: + badPredecessor = true + default: + allShieldedBefore = false + } + if badPredecessor { + break + } + } + + if badPredecessor { + return data.TxStatusValUnshieldedinclusion, pos + } + if pos == InclPosExact && allShieldedBefore { + return data.TxStatusValShieldedinclusion, pos + } + // exact with tentative predecessor, or earlier with good predecessors -> ambiguous (tentative) + _ = seenTentative // kept for readability; not used in decision + return data.TxStatusValTentativeshieldedinclusion, pos +} + +// HandleBlock performs predecessor-aware classification at block arrival. +// Receipt-based classification is a fallback and skips if a tx is already finalized. +func (tm *TxMapperDB) HandleBlock(ctx context.Context, blockNumber int64, slot int64, txs types.Transactions) error { + if len(txs) == 0 { + return nil + } + + rows, err := tm.db.Query(ctx, ` + SELECT tx_hash, tx_index, tx_status, decryption_key_id, transaction_submitted_event_id + FROM decrypted_tx + WHERE slot = $1 + AND tx_hash <> '\x00' + ORDER BY tx_index`, slot) + if err != nil { + return err + } + defer rows.Close() + + var entries []batchEntry + indexByHash := make(map[string]int) + + for rows.Next() { + var ( + hashBytes []byte + txIdx int64 + status data.TxStatusVal + decID int64 + subID int64 + ) + if err := rows.Scan(&hashBytes, &txIdx, &status, &decID, &subID); err != nil { + return err + } + h := common.BytesToHash(hashBytes) + indexByHash[h.Hex()] = len(entries) + entries = append(entries, batchEntry{ + hash: h, + status: status, + txIndex: txIdx, + decryptionKeyID: decID, + submittedEventID: subID, + }) + } + + if len(entries) == 0 { + return nil + } + + for blockPos, tx := range txs { + h := tx.Hash() + expectedPos, ok := indexByHash[h.Hex()] + if !ok || tm.isDone(h) { + continue + } + + status, pos := classifyWithPredecessors(expectedPos, blockPos, entries) + entries[expectedPos].status = status // update for later predecessor checks + + if err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ + Slot: slot, + TxIndex: entries[expectedPos].txIndex, + TxHash: h.Bytes(), + TxStatus: status, + InclusionPosition: pos, + DecryptionKeyID: entries[expectedPos].decryptionKeyID, + TransactionSubmittedEventID: entries[expectedPos].submittedEventID, + BlockNumber: pgtype.Int8{Int64: blockNumber, Valid: true}, + }); err != nil { + log.Err(err).Hex("tx-hash", h.Bytes()).Msg("failed to upsert tx from block body") + continue + } + tm.markDone(h) + } + + return nil +} + func (tm *TxMapperDB) processTransactionExecution( ctx context.Context, te *TxExecution, @@ -416,9 +556,10 @@ func (tm *TxMapperDB) processTransactionExecution( } slot := te.DecKeysAndMessages[0].Slot + expectedIdx := 0 var wg sync.WaitGroup - for index, txSubEvent := range txSubEvents { + for _, txSubEvent := range txSubEvents { decryptionKeyID, err := getDecryptionKeyID(txSubEvent, identityPreimageToDecKeyAndMsg) if err != nil { log.Err(err).Msg("error while trying to retrieve decryption key ID") @@ -450,6 +591,9 @@ func (tm *TxMapperDB) processTransactionExecution( Uint8("tx-type", decryptedTx.Type()). Msg("tx-data") + currExpected := expectedIdx + expectedIdx++ + // channel to propagate errors between goroutines txErrorSignalCh := make(chan error, 1) @@ -521,9 +665,14 @@ func (tm *TxMapperDB) processTransactionExecution( }(ctx, tm.config.InclusionDelay, decryptedTx, txSubEvent, slot, decryptionKeyID, txErrorSignalCh) // Second goroutine: Wait for receipt - go func(ctx context.Context, index int, txHash common.Hash, txIndex int64, slot int64, decryptionKeyID int64, txSubEventID int64, txErrorSignalCh chan error) { + go func(ctx context.Context, expectedIndex int, txHash common.Hash, txIndex int64, slot int64, decryptionKeyID int64, txSubEventID int64, txErrorSignalCh chan error) { defer wg.Done() + // Fallback path: skip if block-time classification already finalized this tx. + if tm.isDone(txHash) { + return + } + // Wait for the receipt with a timeout receipt, err := tm.waitForReceiptWithTimeout(ctx, txHash, ReceiptWaitTimeout, txErrorSignalCh) if err != nil { @@ -531,18 +680,21 @@ func (tm *TxMapperDB) processTransactionExecution( if errors.Is(err, errSendTransaction) { return } - // update/create status to not included - err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ - Slot: slot, - TxIndex: txIndex, - TxHash: txHash[:], - TxStatus: data.TxStatusValNotincluded, - InclusionPosition: InclPosUnknown, - DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEventID, - }) - if err != nil { - log.Err(err).Msg("failed to upsert decrypted tx") + if !tm.isDone(txHash) { // guard against concurrent HandleBlock + err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ + Slot: slot, + TxIndex: txIndex, + TxHash: txHash[:], + TxStatus: data.TxStatusValNotincluded, + InclusionPosition: InclPosUnknown, + DecryptionKeyID: decryptionKeyID, + TransactionSubmittedEventID: txSubEventID, + BlockNumber: pgtype.Int8{}, + }) + if err != nil { + log.Err(err).Msg("failed to upsert decrypted tx") + } + // tm.markDone(txHash) } return } @@ -564,13 +716,13 @@ func (tm *TxMapperDB) processTransactionExecution( txStatus = data.TxStatusValUnshieldedinclusion inclusionPosition = InclPosWrong } else { - txStatus, inclusionPosition = classifyInclusion(index, receipt.TransactionIndex) + txStatus, inclusionPosition = classifyInclusion(expectedIndex, receipt.TransactionIndex) } log.Info(). Int64("expected-slot", slot). Uint64("receipt-slot", inclusionSlot). - Uint("expected-index", uint(index)). + Uint("expected-index", uint(expectedIndex)). Uint("receipt-index", receipt.TransactionIndex). Hex("tx-hash", receipt.TxHash.Bytes()). Str("inclusion_position", inclusionPosition). @@ -590,7 +742,8 @@ func (tm *TxMapperDB) processTransactionExecution( if err != nil { log.Err(err).Msg("failed to update decrypted tx") } - }(ctx, index, decryptedTx.Hash(), txSubEvent.TxIndex, slot, decryptionKeyID, txSubEvent.ID, txErrorSignalCh) + }(ctx, currExpected, decryptedTx.Hash(), txSubEvent.TxIndex, slot, decryptionKeyID, txSubEvent.ID, + txErrorSignalCh) } // Wait for all routines to end diff --git a/internal/metrics/types.go b/internal/metrics/types.go index ad94f58..f925a03 100644 --- a/internal/metrics/types.go +++ b/internal/metrics/types.go @@ -3,6 +3,7 @@ package metrics import ( "context" + "github.com/ethereum/go-ethereum/core/types" "github.com/jackc/pgx/v5" sequencerBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/sequencer" validatorRegistryBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/validatorregistry" @@ -66,4 +67,5 @@ type TxMapper interface { UpdateValidatorStatus(ctx context.Context) error AddProposerDuties(ctx context.Context, epoch uint64) error UpsertGraffitiIfShutterized(ctx context.Context, validatorIndex int64, graffiti string, blockNumber int64) (bool, error) + HandleBlock(ctx context.Context, blockNumber int64, slot int64, txs types.Transactions) error } diff --git a/internal/watcher/blocks.go b/internal/watcher/blocks.go index 2357239..b03f31e 100644 --- a/internal/watcher/blocks.go +++ b/internal/watcher/blocks.go @@ -106,6 +106,18 @@ func (bw *BlocksWatcher) processBlock(ctx context.Context, header *types.Header) if err != nil { return err } + + // classify decrypted txs included in this block as early as possible + block, err := bw.ethClient.BlockByHash(ctx, header.Hash()) + if err == nil { + slot := utils.GetSlotForBlock(header.Time, GenesisTimestamp, SlotDuration) + if err := bw.txMapper.HandleBlock(ctx, block.Number().Int64(), int64(slot), block.Transactions()); err != nil { + log.Err(err).Msg("tx mapper handle block failed") + } + } else { + log.Err(err).Uint64("block", header.Number.Uint64()).Msg("failed to fetch block body for classification") + } + bw.clearOldBlocks(header) if err := bw.transactionSubmittedSyncer.Sync(ctx, header); err != nil { From f8ea2572da4e1df8c45d2577765d2533ff6e8c0d Mon Sep 17 00:00:00 2001 From: ylembachar Date: Mon, 16 Mar 2026 17:48:27 +0100 Subject: [PATCH 3/9] test: add inclusion classification unit tests --- internal/metrics/tx_mapper_db_test.go | 114 ++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 internal/metrics/tx_mapper_db_test.go diff --git a/internal/metrics/tx_mapper_db_test.go b/internal/metrics/tx_mapper_db_test.go new file mode 100644 index 0000000..dfcbce0 --- /dev/null +++ b/internal/metrics/tx_mapper_db_test.go @@ -0,0 +1,114 @@ +package metrics + +import ( + "testing" + + "github.com/shutter-network/observer/internal/data" +) + +func TestClassifyInclusion(t *testing.T) { + t.Parallel() + cases := []struct { + name string + expectedIndex int + receiptIndex uint + wantStatus data.TxStatusVal + wantPos string + }{ + {"exact", 2, 2, data.TxStatusValShieldedinclusion, InclPosExact}, + {"later", 1, 3, data.TxStatusValUnshieldedinclusion, InclPosLater}, + {"earlier", 4, 1, data.TxStatusValTentativeshieldedinclusion, InclPosEarlier}, + } + for _, c := range cases { + gotStatus, gotPos := classifyInclusion(c.expectedIndex, c.receiptIndex) + if gotStatus != c.wantStatus || gotPos != c.wantPos { + t.Fatalf("%s: got (%v,%s), want (%v,%s)", c.name, gotStatus, gotPos, c.wantStatus, + c.wantPos) + } + } +} + +func TestClassifyWithPredecessors(t *testing.T) { + t.Parallel() + + makeEntries := func(statuses ...data.TxStatusVal) []batchEntry { + out := make([]batchEntry, len(statuses)) + for i, st := range statuses { + out[i] = batchEntry{status: st} + } + return out + } + + cases := []struct { + name string + expectedPos int + blockPos int + entries []batchEntry + wantStatus data.TxStatusVal + wantPos string + }{ + { + name: "pos0-exact-shielded", + expectedPos: 0, blockPos: 0, + entries: makeEntries(data.TxStatusValPending), + wantStatus: data.TxStatusValShieldedinclusion, wantPos: InclPosExact, + }, + { + name: "pos0-later-unshielded", + expectedPos: 0, blockPos: 1, + entries: makeEntries(data.TxStatusValPending), + wantStatus: data.TxStatusValUnshieldedinclusion, wantPos: InclPosLater, + }, + { + name: "exact-shielded", + expectedPos: 1, blockPos: 1, + entries: makeEntries(data.TxStatusValShieldedinclusion, data.TxStatusValPending), + wantStatus: data.TxStatusValShieldedinclusion, wantPos: InclPosExact, + }, + { + name: "later-unshielded", + expectedPos: 0, blockPos: 1, + entries: makeEntries(data.TxStatusValPending), + wantStatus: data.TxStatusValUnshieldedinclusion, wantPos: InclPosLater, + }, + { + name: "exact-tentative-predecessor", + expectedPos: 1, blockPos: 1, + entries: makeEntries(data.TxStatusValTentativeshieldedinclusion, data.TxStatusValPending), + wantStatus: data.TxStatusValTentativeshieldedinclusion, wantPos: InclPosExact, + }, + { + name: "earlier-tentative", + expectedPos: 2, blockPos: 1, + entries: makeEntries(data.TxStatusValShieldedinclusion, data.TxStatusValPending, data.TxStatusValPending), + wantStatus: data.TxStatusValTentativeshieldedinclusion, wantPos: InclPosEarlier, + }, + { + name: "bad-predecessor-unshielded", + expectedPos: 1, blockPos: 1, + entries: makeEntries(data.TxStatusValUnshieldedinclusion, data.TxStatusValPending), + wantStatus: data.TxStatusValUnshieldedinclusion, wantPos: InclPosExact, + }, + { + name: "earlier due to invalid predecessor -> tentative earlier", + expectedPos: 2, blockPos: 1, + entries: makeEntries(data.TxStatusValInvalid, data.TxStatusValPending, data.TxStatusValPending), + wantStatus: data.TxStatusValTentativeshieldedinclusion, wantPos: InclPosEarlier, + }, + { + name: "exact with not included predecessor -> tentative", + expectedPos: 1, blockPos: 1, + entries: makeEntries(data.TxStatusValNotincluded, data.TxStatusValPending), + wantStatus: data.TxStatusValTentativeshieldedinclusion, wantPos: InclPosExact, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + gotStatus, gotPos := classifyWithPredecessors(c.expectedPos, c.blockPos, c.entries) + if gotStatus != c.wantStatus || gotPos != c.wantPos { + t.Errorf("got (%v, %s), want (%v, %s)", gotStatus, gotPos, c.wantStatus, c.wantPos) + } + }) + } +} \ No newline at end of file From 4db5db7b48dae7ea939a2da23674076d69cc8096 Mon Sep 17 00:00:00 2001 From: ylembachar Date: Tue, 17 Mar 2026 17:45:40 +0100 Subject: [PATCH 4/9] feat: add debug logs for block-based tx classification --- internal/metrics/tx_mapper_db.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/internal/metrics/tx_mapper_db.go b/internal/metrics/tx_mapper_db.go index aaeb13f..86e4181 100644 --- a/internal/metrics/tx_mapper_db.go +++ b/internal/metrics/tx_mapper_db.go @@ -457,6 +457,12 @@ func (tm *TxMapperDB) HandleBlock(ctx context.Context, blockNumber int64, slot i return nil } + log.Debug(). + Int64("slot", slot). + Int64("block_number", blockNumber). + Int("num_txs", len(txs)). + Msg("handling block for tx classification") + rows, err := tm.db.Query(ctx, ` SELECT tx_hash, tx_index, tx_status, decryption_key_id, transaction_submitted_event_id FROM decrypted_tx @@ -493,6 +499,11 @@ func (tm *TxMapperDB) HandleBlock(ctx context.Context, blockNumber int64, slot i }) } + log.Debug(). + Int64("slot", slot). + Int("num_candidates", len(entries)). + Msg("loaded decrypted tx candidates for block classification") + if len(entries) == 0 { return nil } @@ -507,6 +518,17 @@ func (tm *TxMapperDB) HandleBlock(ctx context.Context, blockNumber int64, slot i status, pos := classifyWithPredecessors(expectedPos, blockPos, entries) entries[expectedPos].status = status // update for later predecessor checks + log.Debug(). + Int64("slot", slot). + Int64("block_number", blockNumber). + Int("block_pos", blockPos). + Int("expected_pos", expectedPos). + Int64("tx_index", entries[expectedPos].txIndex). + Hex("tx_hash", h.Bytes()). + Str("tx_status", string(status)). + Str("inclusion_position", pos). + Msg("classified tx from block body") + if err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ Slot: slot, TxIndex: entries[expectedPos].txIndex, From 38d48130c919b130f8d087885a0169ff0243ab93 Mon Sep 17 00:00:00 2001 From: ylembachar Date: Wed, 18 Mar 2026 01:16:17 +0100 Subject: [PATCH 5/9] fix: trigger block classification after decrypted txs are stored --- internal/metrics/tx_mapper_db.go | 138 +++++++++++++++++++------------ 1 file changed, 85 insertions(+), 53 deletions(-) diff --git a/internal/metrics/tx_mapper_db.go b/internal/metrics/tx_mapper_db.go index 86e4181..e3cfd64 100644 --- a/internal/metrics/tx_mapper_db.go +++ b/internal/metrics/tx_mapper_db.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "fmt" "math" + "math/big" "strings" "sync" "time" @@ -548,6 +549,39 @@ func (tm *TxMapperDB) HandleBlock(ctx context.Context, blockNumber int64, slot i return nil } +func (tm *TxMapperDB) maybeHandleStoredBlock(ctx context.Context, slot int64) { + storedBlock, err := tm.dbQuery.QueryBlockFromSlot(ctx, slot) + if err != nil { + if !errors.Is(err, pgx.ErrNoRows) { + log.Err(err).Int64("slot", slot).Msg("failed to query stored block") + } + return + } + + block, err := tm.ethClient.BlockByNumber(ctx, big.NewInt(storedBlock.BlockNumber)) + if err != nil { + log.Err(err). + Int64("slot", slot). + Int64("block_number", storedBlock.BlockNumber). + Msg("failed to fetch stored block") + return + } + + if err := tm.HandleBlock(ctx, storedBlock.BlockNumber, slot, block.Transactions()); err != nil { + log.Err(err). + Int64("slot", slot). + Int64("block_number", storedBlock.BlockNumber). + Msg("failed to handle stored block after decryption") + } +} + +type txExecutionJob struct { + expectedIndex int + decryptedTx *types.Transaction + txSubEvent data.TransactionSubmittedEvent + decryptionKeyID int64 +} + func (tm *TxMapperDB) processTransactionExecution( ctx context.Context, te *TxExecution, @@ -579,14 +613,16 @@ func (tm *TxMapperDB) processTransactionExecution( slot := te.DecKeysAndMessages[0].Slot expectedIdx := 0 + jobs := make([]txExecutionJob, 0, len(txSubEvents)) - var wg sync.WaitGroup + // First pass: decrypt and create initial decrypted_tx rows synchronously. for _, txSubEvent := range txSubEvents { decryptionKeyID, err := getDecryptionKeyID(txSubEvent, identityPreimageToDecKeyAndMsg) if err != nil { log.Err(err).Msg("error while trying to retrieve decryption key ID") continue } + decryptedTx, err := getDecryptedTX(txSubEvent, identityPreimageToDecKeyAndMsg) if err != nil { log.Err(err).Msg("error while trying to get decrypted tx hash") @@ -613,16 +649,43 @@ func (tm *TxMapperDB) processTransactionExecution( Uint8("tx-type", decryptedTx.Type()). Msg("tx-data") - currExpected := expectedIdx + err = tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ + Slot: slot, + TxIndex: txSubEvent.TxIndex, + TxHash: decryptedTx.Hash().Bytes(), + TxStatus: data.TxStatusValPending, + InclusionPosition: InclPosUnknown, + DecryptionKeyID: decryptionKeyID, + TransactionSubmittedEventID: txSubEvent.ID, + }) + if err != nil { + log.Err(err).Msg("failed to create decrypted tx") + continue + } + + jobs = append(jobs, txExecutionJob{ + expectedIndex: expectedIdx, + decryptedTx: decryptedTx, + txSubEvent: txSubEvent, + decryptionKeyID: decryptionKeyID, + }) expectedIdx++ + } - // channel to propagate errors between goroutines - txErrorSignalCh := make(chan error, 1) + // If the block already exists for this slot, classify now that rows are present. + tm.maybeHandleStoredBlock(ctx, slot) + + var wg sync.WaitGroup + for _, job := range jobs { + currExpected := job.expectedIndex + decryptedTx := job.decryptedTx + txSubEvent := job.txSubEvent + decryptionKeyID := job.decryptionKeyID + txErrorSignalCh := make(chan error, 1) wg.Add(2) - // First goroutine: Send transaction - go func(ctx context.Context, inclusionDelay int64, decryptedTx *types.Transaction, txSubEvent data.TransactionSubmittedEvent, slot int64, decryptionKeyID int64, txErrorSignalCh chan error) { + go func(ctx context.Context, decryptedTx *types.Transaction, txSubEvent data.TransactionSubmittedEvent, slot int64, decryptionKeyID int64, txErrorSignalCh chan error) { defer wg.Done() select { @@ -634,75 +697,48 @@ func (tm *TxMapperDB) processTransactionExecution( log.Err(err).Msg("failed to send transaction") if err.Error() == "AlreadyKnown" { log.Debug().Hex("tx-hash", decryptedTx.Hash().Bytes()).Msg("already known") - err := tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ - Slot: slot, - TxIndex: txSubEvent.TxIndex, - TxHash: decryptedTx.Hash().Bytes(), - TxStatus: data.TxStatusValPending, - InclusionPosition: InclPosUnknown, - DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEvent.ID, - }) - if err != nil { - txErrorSignalCh <- fmt.Errorf("failed to create decrypted tx: %w", err) - return - } - } else { - txStatus := data.TxStatusValInvalid - if isFeeTooLowError(err) { - txStatus = data.TxStatusValInvalidfeetoolow - } - err := tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ - Slot: slot, - TxIndex: txSubEvent.TxIndex, - TxHash: decryptedTx.Hash().Bytes(), - TxStatus: txStatus, - InclusionPosition: InclPosUnknown, - DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEvent.ID, - }) - if err != nil { - log.Err(err).Msg("failed to create decrypted tx") - } - txErrorSignalCh <- fmt.Errorf("%w: %v", errSendTransaction, err) return } - } else { - log.Info().Hex("tx-hash", decryptedTx.Hash().Bytes()).Msg("transaction sent") - err := tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ + + txStatus := data.TxStatusValInvalid + if isFeeTooLowError(err) { + txStatus = data.TxStatusValInvalidfeetoolow + } + err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ Slot: slot, TxIndex: txSubEvent.TxIndex, TxHash: decryptedTx.Hash().Bytes(), - TxStatus: data.TxStatusValPending, + TxStatus: txStatus, InclusionPosition: InclPosUnknown, DecryptionKeyID: decryptionKeyID, TransactionSubmittedEventID: txSubEvent.ID, + BlockNumber: pgtype.Int8{}, }) if err != nil { - txErrorSignalCh <- fmt.Errorf("failed to create decrypted tx: %w", err) - return + log.Err(err).Msg("failed to upsert decrypted tx") } + txErrorSignalCh <- fmt.Errorf("%w: %v", errSendTransaction, err) + return } + + log.Info().Hex("tx-hash", decryptedTx.Hash().Bytes()).Msg("transaction sent") } - }(ctx, tm.config.InclusionDelay, decryptedTx, txSubEvent, slot, decryptionKeyID, txErrorSignalCh) + }(ctx, decryptedTx, txSubEvent, slot, decryptionKeyID, txErrorSignalCh) - // Second goroutine: Wait for receipt go func(ctx context.Context, expectedIndex int, txHash common.Hash, txIndex int64, slot int64, decryptionKeyID int64, txSubEventID int64, txErrorSignalCh chan error) { defer wg.Done() - // Fallback path: skip if block-time classification already finalized this tx. if tm.isDone(txHash) { return } - // Wait for the receipt with a timeout receipt, err := tm.waitForReceiptWithTimeout(ctx, txHash, ReceiptWaitTimeout, txErrorSignalCh) if err != nil { log.Err(err).Msg("") if errors.Is(err, errSendTransaction) { return } - if !tm.isDone(txHash) { // guard against concurrent HandleBlock + if !tm.isDone(txHash) { err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ Slot: slot, TxIndex: txIndex, @@ -716,12 +752,10 @@ func (tm *TxMapperDB) processTransactionExecution( if err != nil { log.Err(err).Msg("failed to upsert decrypted tx") } - // tm.markDone(txHash) } return } - // receipt found log.Info().Hex("tx-hash", receipt.TxHash.Bytes()). Uint64("receipt-status", receipt.Status). Msg("transaction receipt found") @@ -764,11 +798,9 @@ func (tm *TxMapperDB) processTransactionExecution( if err != nil { log.Err(err).Msg("failed to update decrypted tx") } - }(ctx, currExpected, decryptedTx.Hash(), txSubEvent.TxIndex, slot, decryptionKeyID, txSubEvent.ID, - txErrorSignalCh) + }(ctx, currExpected, decryptedTx.Hash(), txSubEvent.TxIndex, slot, decryptionKeyID, txSubEvent.ID, txErrorSignalCh) } - // Wait for all routines to end wg.Wait() return nil } From ac2d5bf4f1701f0b7fbb76453fc2b98a9f764602 Mon Sep 17 00:00:00 2001 From: ylembachar Date: Wed, 18 Mar 2026 02:30:59 +0100 Subject: [PATCH 6/9] feat: split tx mapper logic into focused files --- internal/metrics/tx_mapper.go | 161 +++ internal/metrics/tx_mapper_classification.go | 209 ++++ internal/metrics/tx_mapper_db.go | 1049 ----------------- internal/metrics/tx_mapper_decryption.go | 210 ++++ internal/metrics/tx_mapper_execution.go | 303 +++++ ...tx_mapper_db_test.go => tx_mapper_test.go} | 0 internal/metrics/tx_mapper_validators.go | 298 +++++ internal/metrics/types.go | 71 -- 8 files changed, 1181 insertions(+), 1120 deletions(-) create mode 100644 internal/metrics/tx_mapper.go create mode 100644 internal/metrics/tx_mapper_classification.go delete mode 100644 internal/metrics/tx_mapper_db.go create mode 100644 internal/metrics/tx_mapper_decryption.go create mode 100644 internal/metrics/tx_mapper_execution.go rename internal/metrics/{tx_mapper_db_test.go => tx_mapper_test.go} (100%) create mode 100644 internal/metrics/tx_mapper_validators.go delete mode 100644 internal/metrics/types.go diff --git a/internal/metrics/tx_mapper.go b/internal/metrics/tx_mapper.go new file mode 100644 index 0000000..cbd3650 --- /dev/null +++ b/internal/metrics/tx_mapper.go @@ -0,0 +1,161 @@ +// Package metrics provides the Prometheus exporter and the transaction mapper logic. +// +// TxMapperDB is the observer's persistence and classification pipeline. +// +// Flow: +// 1. AddTransactionSubmittedEvent stores sequencer tx events. +// 2. AddDecryptionKeysAndMessages decrypts slot txs and creates initial decrypted_tx rows. +// 3. processTransactionExecution sends txs and waits for receipts. +// 4. HandleBlock and receipt handling classify inclusion. +// 5. Validator methods persist validator-related state. +package metrics + +import ( + "context" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/pkg/errors" + sequencerBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/sequencer" + validatorRegistryBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/validatorregistry" + metricsCommon "github.com/shutter-network/observer/common" + "github.com/shutter-network/observer/internal/data" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/beaconapiclient" +) + +const ReceiptWaitTimeout = 1 * time.Hour + +var errSendTransaction = errors.New("send transaction failed") + +type TxMapperDB struct { + db *pgxpool.Pool + dbQuery *data.Queries + config *metricsCommon.Config + ethClient *ethclient.Client + beaconAPIClient *beaconapiclient.Client + chainID int64 + genesisTimestamp uint64 + slotDuration uint64 + statusDone sync.Map +} + +type TxEventStore interface { + AddTransactionSubmittedEvent(ctx context.Context, tx pgx.Tx, st *sequencerBindings.SequencerTransactionSubmitted) error + AddBlock(ctx context.Context, b *data.Block) error + AddKeyShare(ctx context.Context, dks *data.DecryptionKeyShare) error +} + +type DecryptionStore interface { + AddDecryptionKeysAndMessages(ctx context.Context, dkam *DecKeysAndMessages) error +} + +type BlockClassifier interface { + HandleBlock(ctx context.Context, blockNumber int64, slot int64, txs types.Transactions) error +} + +type ValidatorStore interface { + QueryBlockNumberFromValidatorRegistryEventsSyncedUntil(ctx context.Context) (int64, error) + AddValidatorRegistryEvent(ctx context.Context, tx pgx.Tx, vr *validatorRegistryBindings.ValidatorregistryUpdated) error + UpdateValidatorStatus(ctx context.Context) error + AddProposerDuties(ctx context.Context, epoch uint64) error + UpsertGraffitiIfShutterized(ctx context.Context, validatorIndex int64, graffiti string, blockNumber int64) (bool, error) +} + +type TxMapper interface { + TxEventStore + DecryptionStore + BlockClassifier + ValidatorStore +} + +// markDone records that a tx hash has been finalized (by block or receipt) +// to prevent double classification. +func (tm *TxMapperDB) markDone(hash common.Hash) { + tm.statusDone.Store(hash.Hex(), struct{}{}) +} + +// isDone reports whether a tx hash was already finalized. +func (tm *TxMapperDB) isDone(hash common.Hash) bool { + _, ok := tm.statusDone.Load(hash.Hex()) + return ok +} + +func NewTxMapperDB( + ctx context.Context, + db *pgxpool.Pool, + config *metricsCommon.Config, + ethClient *ethclient.Client, + beaconAPIClient *beaconapiclient.Client, + chainID int64, + genesisTimestamp uint64, + slotDuration uint64, +) TxMapper { + return &TxMapperDB{ + db: db, + dbQuery: data.New(db), + config: config, + ethClient: ethClient, + beaconAPIClient: beaconAPIClient, + chainID: chainID, + genesisTimestamp: genesisTimestamp, + slotDuration: slotDuration, + } +} + +func (tm *TxMapperDB) AddTransactionSubmittedEvent(ctx context.Context, tx pgx.Tx, st *sequencerBindings.SequencerTransactionSubmitted) error { + q := tm.dbQuery + if tx != nil { + // Use transaction if available + q = tm.dbQuery.WithTx(tx) + } + err := q.CreateTransactionSubmittedEvent(ctx, data.CreateTransactionSubmittedEventParams{ + EventBlockHash: st.Raw.BlockHash.Bytes(), + EventBlockNumber: int64(st.Raw.BlockNumber), + EventTxIndex: int64(st.Raw.TxIndex), + EventLogIndex: int64(st.Raw.Index), + Eon: int64(st.Eon), + TxIndex: int64(st.TxIndex), + IdentityPrefix: st.IdentityPrefix[:], + Sender: st.Sender.Bytes(), + EncryptedTransaction: st.EncryptedTransaction, + EventTxHash: st.Raw.TxHash.Bytes(), + }) + if err != nil { + return err + } + metricsEncTxReceived.Inc() + return nil +} + +func (tm *TxMapperDB) AddKeyShare(ctx context.Context, dks *data.DecryptionKeyShare) error { + err := tm.dbQuery.CreateDecryptionKeyShare(ctx, data.CreateDecryptionKeyShareParams{ + Eon: dks.Eon, + DecryptionKeyShare: dks.DecryptionKeyShare, + Slot: dks.Slot, + IdentityPreimage: dks.IdentityPreimage, + KeyperIndex: dks.KeyperIndex, + }) + if err != nil { + return err + } + metricsKeyShareReceived.Inc() + return nil +} + +func (tm *TxMapperDB) AddBlock( + ctx context.Context, + b *data.Block, +) error { + err := tm.dbQuery.CreateBlock(ctx, data.CreateBlockParams{ + BlockHash: b.BlockHash, + BlockNumber: b.BlockNumber, + BlockTimestamp: b.BlockTimestamp, + Slot: b.Slot, + }) + return err +} \ No newline at end of file diff --git a/internal/metrics/tx_mapper_classification.go b/internal/metrics/tx_mapper_classification.go new file mode 100644 index 0000000..c3e5540 --- /dev/null +++ b/internal/metrics/tx_mapper_classification.go @@ -0,0 +1,209 @@ +package metrics + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + + "github.com/shutter-network/observer/internal/data" +) + +const ( + InclPosUnknown = "unknown" + InclPosExact = "exact" + InclPosLater = "later" + InclPosEarlier = "earlier" + InclPosWrong = "wrong_slot" +) + +func classifyInclusion(expectedIndex int, receiptIndex uint) (data.TxStatusVal, string) { + switch { + case receiptIndex == uint(expectedIndex): + return data.TxStatusValShieldedinclusion, InclPosExact + case receiptIndex > uint(expectedIndex): + return data.TxStatusValUnshieldedinclusion, InclPosLater + default: + return data.TxStatusValTentativeshieldedinclusion, InclPosEarlier + } +} + +type batchEntry struct { + hash common.Hash + status data.TxStatusVal + txIndex int64 + decryptionKeyID int64 + submittedEventID int64 +} + +func classifyWithPredecessors(expectedPos, blockPos int, entries []batchEntry) (data.TxStatusVal, + string) { + // later index always unshielded + if blockPos > expectedPos { + return data.TxStatusValUnshieldedinclusion, InclPosLater + } + + pos := InclPosExact + if blockPos < expectedPos { + pos = InclPosEarlier + } + + badPredecessor := false + allShieldedBefore := true + seenTentative := false + for i := 0; i < expectedPos; i++ { + switch entries[i].status { + case data.TxStatusValShieldedinclusion: + // ok + case data.TxStatusValTentativeshieldedinclusion: + allShieldedBefore = false + seenTentative = true + case data.TxStatusValUnshieldedinclusion: + badPredecessor = true + default: + allShieldedBefore = false + } + if badPredecessor { + break + } + } + + if badPredecessor { + return data.TxStatusValUnshieldedinclusion, pos + } + if pos == InclPosExact && allShieldedBefore { + return data.TxStatusValShieldedinclusion, pos + } + // exact with tentative predecessor, or earlier with good predecessors -> ambiguous (tentative) + _ = seenTentative // kept for readability; not used in decision + return data.TxStatusValTentativeshieldedinclusion, pos +} + +// HandleBlock performs predecessor-aware classification at block arrival. +// Receipt-based classification is a fallback and skips if a tx is already finalized. +func (tm *TxMapperDB) HandleBlock(ctx context.Context, blockNumber int64, slot int64, txs types.Transactions) error { + if len(txs) == 0 { + return nil + } + + log.Debug(). + Int64("slot", slot). + Int64("block_number", blockNumber). + Int("num_txs", len(txs)). + Msg("handling block for tx classification") + + rows, err := tm.db.Query(ctx, ` + SELECT tx_hash, tx_index, tx_status, decryption_key_id, transaction_submitted_event_id + FROM decrypted_tx + WHERE slot = $1 + AND tx_hash <> '\x00' + ORDER BY tx_index`, slot) + if err != nil { + return err + } + defer rows.Close() + + var entries []batchEntry + indexByHash := make(map[string]int) + + for rows.Next() { + var ( + hashBytes []byte + txIdx int64 + status data.TxStatusVal + decID int64 + subID int64 + ) + if err := rows.Scan(&hashBytes, &txIdx, &status, &decID, &subID); err != nil { + return err + } + h := common.BytesToHash(hashBytes) + indexByHash[h.Hex()] = len(entries) + entries = append(entries, batchEntry{ + hash: h, + status: status, + txIndex: txIdx, + decryptionKeyID: decID, + submittedEventID: subID, + }) + } + + log.Debug(). + Int64("slot", slot). + Int("num_candidates", len(entries)). + Msg("loaded decrypted tx candidates for block classification") + + if len(entries) == 0 { + return nil + } + + for blockPos, tx := range txs { + h := tx.Hash() + expectedPos, ok := indexByHash[h.Hex()] + if !ok || tm.isDone(h) { + continue + } + + status, pos := classifyWithPredecessors(expectedPos, blockPos, entries) + entries[expectedPos].status = status // update for later predecessor checks + + log.Debug(). + Int64("slot", slot). + Int64("block_number", blockNumber). + Int("block_pos", blockPos). + Int("expected_pos", expectedPos). + Int64("tx_index", entries[expectedPos].txIndex). + Hex("tx_hash", h.Bytes()). + Str("tx_status", string(status)). + Str("inclusion_position", pos). + Msg("classified tx from block body") + + if err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ + Slot: slot, + TxIndex: entries[expectedPos].txIndex, + TxHash: h.Bytes(), + TxStatus: status, + InclusionPosition: pos, + DecryptionKeyID: entries[expectedPos].decryptionKeyID, + TransactionSubmittedEventID: entries[expectedPos].submittedEventID, + BlockNumber: pgtype.Int8{Int64: blockNumber, Valid: true}, + }); err != nil { + log.Err(err).Hex("tx-hash", h.Bytes()).Msg("failed to upsert tx from block body") + continue + } + tm.markDone(h) + } + + return nil +} + +func (tm *TxMapperDB) maybeHandleStoredBlock(ctx context.Context, slot int64) { + storedBlock, err := tm.dbQuery.QueryBlockFromSlot(ctx, slot) + if err != nil { + if !errors.Is(err, pgx.ErrNoRows) { // handling the error check properly + log.Err(err).Int64("slot", slot).Msg("failed to query stored block") + } + return + } + + block, err := tm.ethClient.BlockByNumber(ctx, big.NewInt(storedBlock.BlockNumber)) + if err != nil { + log.Err(err). + Int64("slot", slot). + Int64("block_number", storedBlock.BlockNumber). + Msg("failed to fetch stored block") + return + } + + if err := tm.HandleBlock(ctx, storedBlock.BlockNumber, slot, block.Transactions()); err != nil { + log.Err(err). + Int64("slot", slot). + Int64("block_number", storedBlock.BlockNumber). + Msg("failed to handle stored block after decryption") + } +} \ No newline at end of file diff --git a/internal/metrics/tx_mapper_db.go b/internal/metrics/tx_mapper_db.go deleted file mode 100644 index e3cfd64..0000000 --- a/internal/metrics/tx_mapper_db.go +++ /dev/null @@ -1,1049 +0,0 @@ -package metrics - -import ( - "context" - "encoding/hex" - "fmt" - "math" - "math/big" - "strings" - "sync" - "time" - - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethclient" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgtype" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/pkg/errors" - "github.com/rs/zerolog/log" - sequencerBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/sequencer" - validatorRegistryBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/validatorregistry" - metricsCommon "github.com/shutter-network/observer/common" - dbTypes "github.com/shutter-network/observer/common/database" - "github.com/shutter-network/observer/common/utils" - "github.com/shutter-network/observer/internal/data" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/beaconapiclient" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/validatorregistry" - "github.com/shutter-network/shutter/shlib/shcrypto" - blst "github.com/supranational/blst/bindings/go" -) - -const ReceiptWaitTimeout = 1 * time.Hour - -var errSendTransaction = errors.New("send transaction failed") - -type TxMapperDB struct { - db *pgxpool.Pool - dbQuery *data.Queries - config *metricsCommon.Config - ethClient *ethclient.Client - beaconAPIClient *beaconapiclient.Client - chainID int64 - genesisTimestamp uint64 - slotDuration uint64 - statusDone sync.Map -} - -// markDone records that a tx hash has been finalized (by block or receipt) -// to prevent double classification. -func (tm *TxMapperDB) markDone(hash common.Hash) { - tm.statusDone.Store(hash.Hex(), struct{}{}) -} - -// isDone reports whether a tx hash was already finalized. -func (tm *TxMapperDB) isDone(hash common.Hash) bool { - _, ok := tm.statusDone.Load(hash.Hex()) - return ok -} - -type validatorData struct { - validatorStatus string - validatorValidity data.ValidatorRegistrationValidity -} - -func NewTxMapperDB( - ctx context.Context, - db *pgxpool.Pool, - config *metricsCommon.Config, - ethClient *ethclient.Client, - beaconAPIClient *beaconapiclient.Client, - chainID int64, - genesisTimestamp uint64, - slotDuration uint64, -) TxMapper { - return &TxMapperDB{ - db: db, - dbQuery: data.New(db), - config: config, - ethClient: ethClient, - beaconAPIClient: beaconAPIClient, - chainID: chainID, - genesisTimestamp: genesisTimestamp, - slotDuration: slotDuration, - } -} - -func (tm *TxMapperDB) AddTransactionSubmittedEvent(ctx context.Context, tx pgx.Tx, st *sequencerBindings.SequencerTransactionSubmitted) error { - q := tm.dbQuery - if tx != nil { - // Use transaction if available - q = tm.dbQuery.WithTx(tx) - } - err := q.CreateTransactionSubmittedEvent(ctx, data.CreateTransactionSubmittedEventParams{ - EventBlockHash: st.Raw.BlockHash.Bytes(), - EventBlockNumber: int64(st.Raw.BlockNumber), - EventTxIndex: int64(st.Raw.TxIndex), - EventLogIndex: int64(st.Raw.Index), - Eon: int64(st.Eon), - TxIndex: int64(st.TxIndex), - IdentityPrefix: st.IdentityPrefix[:], - Sender: st.Sender.Bytes(), - EncryptedTransaction: st.EncryptedTransaction, - EventTxHash: st.Raw.TxHash.Bytes(), - }) - if err != nil { - return err - } - metricsEncTxReceived.Inc() - return nil -} - -func (tm *TxMapperDB) AddDecryptionKeysAndMessages( - ctx context.Context, - decKeysAndMessages *DecKeysAndMessages, -) error { - if len(decKeysAndMessages.Keys) == 0 { - return nil - } - tx, err := tm.db.Begin(ctx) - if err != nil { - return err - } - defer tx.Rollback(ctx) - qtx := tm.dbQuery.WithTx(tx) - - eons, slots, instanceIDs, txPointers, keyIndexes := getDecryptionMessageInfos(decKeysAndMessages) - decryptionKeyIDs, err := qtx.CreateDecryptionKeys(ctx, data.CreateDecryptionKeysParams{ - Column1: eons, - Column2: decKeysAndMessages.Identities, - Column3: decKeysAndMessages.Keys, - }) - if err != nil { - return err - } - if len(decryptionKeyIDs) == 0 || len(decryptionKeyIDs) != len(slots) { - log.Debug().Msg("no new decryption key was added") - return nil - } - err = qtx.CreateDecryptionKeyMessages(ctx, data.CreateDecryptionKeyMessagesParams{ - Column1: slots, - Column2: instanceIDs, - Column3: eons, - Column4: txPointers, - }) - if err != nil { - return err - } - - err = qtx.CreateDecryptionKeysMessageDecryptionKey(ctx, data.CreateDecryptionKeysMessageDecryptionKeyParams{ - Column1: slots, - Column2: keyIndexes, - Column3: decryptionKeyIDs, - }) - if err != nil { - return err - } - - totalDecKeysAndMessages := len(decKeysAndMessages.Keys) - err = tx.Commit(ctx) - if err != nil { - log.Err(err).Msg("unable to commit db transaction") - return err - } - - for i := 0; i < totalDecKeysAndMessages; i++ { - metricsDecKeyReceived.Inc() - } - - dkam := make([]*DecKeyAndMessage, totalDecKeysAndMessages) - for index, key := range decKeysAndMessages.Keys { - identityPreimage := decKeysAndMessages.Identities[index] - dkam[index] = &DecKeyAndMessage{ - Slot: decKeysAndMessages.Slot, - TxPointer: decKeysAndMessages.TxPointer, - Eon: decKeysAndMessages.Eon, - Key: key, - IdentityPreimage: identityPreimage, - KeyIndex: int64(index), - DecryptionKeyID: decryptionKeyIDs[index], - } - } - - if len(dkam) > 0 { - dkam = dkam[1:] - } - err = tm.processTransactionExecution(ctx, &TxExecution{ - DecKeysAndMessages: dkam, - }) - if err != nil { - log.Err(err).Int64("slot", decKeysAndMessages.Slot).Msg("failed to process transaction execution") - return err - } - return nil -} - -func (tm *TxMapperDB) AddKeyShare(ctx context.Context, dks *data.DecryptionKeyShare) error { - err := tm.dbQuery.CreateDecryptionKeyShare(ctx, data.CreateDecryptionKeyShareParams{ - Eon: dks.Eon, - DecryptionKeyShare: dks.DecryptionKeyShare, - Slot: dks.Slot, - IdentityPreimage: dks.IdentityPreimage, - KeyperIndex: dks.KeyperIndex, - }) - if err != nil { - return err - } - metricsKeyShareReceived.Inc() - return nil -} - -func (tm *TxMapperDB) AddBlock( - ctx context.Context, - b *data.Block, -) error { - err := tm.dbQuery.CreateBlock(ctx, data.CreateBlockParams{ - BlockHash: b.BlockHash, - BlockNumber: b.BlockNumber, - BlockTimestamp: b.BlockTimestamp, - Slot: b.Slot, - }) - return err -} - -func (tm *TxMapperDB) QueryBlockNumberFromValidatorRegistryEventsSyncedUntil(ctx context.Context) (int64, error) { - data, err := tm.dbQuery.QueryValidatorRegistryEventsSyncedUntil(ctx) - if err != nil { - return 0, err - } - return data.BlockNumber, nil -} - -func (tm *TxMapperDB) AddValidatorRegistryEvent(ctx context.Context, tx pgx.Tx, vr *validatorRegistryBindings.ValidatorregistryUpdated) error { - regMessage := &validatorregistry.AggregateRegistrationMessage{} - err := regMessage.Unmarshal(vr.Message) - if err != nil { - log.Err(err).Hex("tx-hash", vr.Raw.TxHash.Bytes()).Msg("error unmarshalling registration message") - } else { - validatorIDtoValidity, err := tm.validateValidatorRegistryEvent(ctx, vr, regMessage, uint64(tm.chainID), tm.config.ValidatorRegistryContractAddress) - if err != nil { - log.Err(err).Msg("error validating validator registry events") - return err - } - - q := tm.dbQuery - if tx != nil { - // Use transaction if available - q = tm.dbQuery.WithTx(tx) - } - - for validatorID, validatorData := range validatorIDtoValidity { - err := q.CreateValidatorRegistryMessage(ctx, data.CreateValidatorRegistryMessageParams{ - Version: dbTypes.Uint64ToPgTypeInt8(uint64(regMessage.Version)), - ChainID: dbTypes.Uint64ToPgTypeInt8(regMessage.ChainID), - ValidatorRegistryAddress: regMessage.ValidatorRegistryAddress.Bytes(), - ValidatorIndex: dbTypes.Int64ToPgTypeInt8(validatorID), - Nonce: dbTypes.Uint64ToPgTypeInt8(uint64(regMessage.Nonce)), - IsRegisteration: dbTypes.BoolToPgTypeBool(regMessage.IsRegistration), - Signature: vr.Signature, - EventBlockNumber: int64(vr.Raw.BlockNumber), - EventTxIndex: int64(vr.Raw.TxIndex), - EventLogIndex: int64(vr.Raw.Index), - Validity: validatorData.validatorValidity, - }) - if err != nil { - return err - } - - if validatorData.validatorValidity == data.ValidatorRegistrationValidityValid && - validatorData.validatorStatus != "" { - err := q.CreateValidatorStatus(ctx, data.CreateValidatorStatusParams{ - ValidatorIndex: dbTypes.Int64ToPgTypeInt8(validatorID), - Status: validatorData.validatorStatus, - }) - if err != nil { - return err - } - } - } - } - return nil -} - -func (tm *TxMapperDB) UpdateValidatorStatus(ctx context.Context) error { - batchSize := 100 - jumpBy := 0 - numWorkers := 5 - sem := make(chan struct{}, numWorkers) - var wg sync.WaitGroup - - for { - // Query a batch of validator statuses - validatorStatus, err := tm.dbQuery.QueryValidatorStatuses(ctx, data.QueryValidatorStatusesParams{ - Limit: int32(batchSize), - Offset: int32(jumpBy), - }) - if err != nil { - if errors.Is(err, pgx.ErrNoRows) { - break - } - return err - } - - if len(validatorStatus) == 0 { - break - } - - // Launch goroutines to process each status concurrently - for _, vs := range validatorStatus { - sem <- struct{}{} - wg.Add(1) - go func(vs data.QueryValidatorStatusesRow) { - defer wg.Done() - defer func() { <-sem }() - - validatorIndex := uint64(vs.ValidatorIndex.Int64) - //TODO: should we keep this log or remove it? - log.Debug().Uint64("validatorIndex", validatorIndex).Msg("validator status being updated") - validator, err := tm.beaconAPIClient.GetValidatorByIndex(ctx, "head", validatorIndex) - if err != nil { - log.Err(err).Uint64("validatorIndex", validatorIndex).Msg("failed to get validator from beacon chain") - return - } - if validator == nil { - return - } - - err = tm.dbQuery.CreateValidatorStatus(ctx, data.CreateValidatorStatusParams{ - ValidatorIndex: dbTypes.Uint64ToPgTypeInt8(validatorIndex), - Status: validator.Data.Status, - }) - if err != nil { - log.Err(err).Uint64("validatorIndex", validatorIndex).Msg("failed to create validator status") - return - } - }(vs) - } - - wg.Wait() - - // Wait for 3 seconds before processing the next batch - select { - case <-ctx.Done(): - return ctx.Err() // Handle context cancellation - case <-time.After(3 * time.Second): - } - - jumpBy += batchSize - } - - return nil -} - -func (tm *TxMapperDB) AddProposerDuties(ctx context.Context, epoch uint64) error { - proposerDuties, err := tm.beaconAPIClient.GetProposerDutiesByEpoch(ctx, epoch) - if err != nil { - return err - } - if proposerDuties == nil { - return errors.Errorf("no proposer duties found for epoch %d", epoch) - } - - log.Info().Uint64("epoch", epoch).Msg("processing proposer duties") - - publicKeys := make([]string, len(proposerDuties.Data)) - validatorIndices := make([]int64, len(proposerDuties.Data)) - slots := make([]int64, len(proposerDuties.Data)) - - for i := 0; i < len(proposerDuties.Data); i++ { - publicKeys[i] = proposerDuties.Data[i].Pubkey - validatorIndices[i] = int64(proposerDuties.Data[i].ValidatorIndex) - slots[i] = int64(proposerDuties.Data[i].Slot) - } - - err = tm.dbQuery.CreateProposerDuties(ctx, data.CreateProposerDutiesParams{ - Column1: publicKeys, - Column2: validatorIndices, - Column3: slots, - }) - return err -} - -const ( - InclPosUnknown = "unknown" - InclPosExact = "exact" - InclPosLater = "later" - InclPosEarlier = "earlier" - InclPosWrong = "wrong_slot" -) - -func classifyInclusion(expectedIndex int, receiptIndex uint) (data.TxStatusVal, string) { - switch { - case receiptIndex == uint(expectedIndex): - return data.TxStatusValShieldedinclusion, InclPosExact - case receiptIndex > uint(expectedIndex): - return data.TxStatusValUnshieldedinclusion, InclPosLater - default: - return data.TxStatusValTentativeshieldedinclusion, InclPosEarlier - } -} - -type batchEntry struct { - hash common.Hash - status data.TxStatusVal - txIndex int64 - decryptionKeyID int64 - submittedEventID int64 -} - -func classifyWithPredecessors(expectedPos, blockPos int, entries []batchEntry) (data.TxStatusVal, - string) { - // later index always unshielded - if blockPos > expectedPos { - return data.TxStatusValUnshieldedinclusion, InclPosLater - } - - pos := InclPosExact - if blockPos < expectedPos { - pos = InclPosEarlier - } - - badPredecessor := false - allShieldedBefore := true - seenTentative := false - for i := 0; i < expectedPos; i++ { - switch entries[i].status { - case data.TxStatusValShieldedinclusion: - // ok - case data.TxStatusValTentativeshieldedinclusion: - allShieldedBefore = false - seenTentative = true - case data.TxStatusValUnshieldedinclusion: - badPredecessor = true - default: - allShieldedBefore = false - } - if badPredecessor { - break - } - } - - if badPredecessor { - return data.TxStatusValUnshieldedinclusion, pos - } - if pos == InclPosExact && allShieldedBefore { - return data.TxStatusValShieldedinclusion, pos - } - // exact with tentative predecessor, or earlier with good predecessors -> ambiguous (tentative) - _ = seenTentative // kept for readability; not used in decision - return data.TxStatusValTentativeshieldedinclusion, pos -} - -// HandleBlock performs predecessor-aware classification at block arrival. -// Receipt-based classification is a fallback and skips if a tx is already finalized. -func (tm *TxMapperDB) HandleBlock(ctx context.Context, blockNumber int64, slot int64, txs types.Transactions) error { - if len(txs) == 0 { - return nil - } - - log.Debug(). - Int64("slot", slot). - Int64("block_number", blockNumber). - Int("num_txs", len(txs)). - Msg("handling block for tx classification") - - rows, err := tm.db.Query(ctx, ` - SELECT tx_hash, tx_index, tx_status, decryption_key_id, transaction_submitted_event_id - FROM decrypted_tx - WHERE slot = $1 - AND tx_hash <> '\x00' - ORDER BY tx_index`, slot) - if err != nil { - return err - } - defer rows.Close() - - var entries []batchEntry - indexByHash := make(map[string]int) - - for rows.Next() { - var ( - hashBytes []byte - txIdx int64 - status data.TxStatusVal - decID int64 - subID int64 - ) - if err := rows.Scan(&hashBytes, &txIdx, &status, &decID, &subID); err != nil { - return err - } - h := common.BytesToHash(hashBytes) - indexByHash[h.Hex()] = len(entries) - entries = append(entries, batchEntry{ - hash: h, - status: status, - txIndex: txIdx, - decryptionKeyID: decID, - submittedEventID: subID, - }) - } - - log.Debug(). - Int64("slot", slot). - Int("num_candidates", len(entries)). - Msg("loaded decrypted tx candidates for block classification") - - if len(entries) == 0 { - return nil - } - - for blockPos, tx := range txs { - h := tx.Hash() - expectedPos, ok := indexByHash[h.Hex()] - if !ok || tm.isDone(h) { - continue - } - - status, pos := classifyWithPredecessors(expectedPos, blockPos, entries) - entries[expectedPos].status = status // update for later predecessor checks - - log.Debug(). - Int64("slot", slot). - Int64("block_number", blockNumber). - Int("block_pos", blockPos). - Int("expected_pos", expectedPos). - Int64("tx_index", entries[expectedPos].txIndex). - Hex("tx_hash", h.Bytes()). - Str("tx_status", string(status)). - Str("inclusion_position", pos). - Msg("classified tx from block body") - - if err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ - Slot: slot, - TxIndex: entries[expectedPos].txIndex, - TxHash: h.Bytes(), - TxStatus: status, - InclusionPosition: pos, - DecryptionKeyID: entries[expectedPos].decryptionKeyID, - TransactionSubmittedEventID: entries[expectedPos].submittedEventID, - BlockNumber: pgtype.Int8{Int64: blockNumber, Valid: true}, - }); err != nil { - log.Err(err).Hex("tx-hash", h.Bytes()).Msg("failed to upsert tx from block body") - continue - } - tm.markDone(h) - } - - return nil -} - -func (tm *TxMapperDB) maybeHandleStoredBlock(ctx context.Context, slot int64) { - storedBlock, err := tm.dbQuery.QueryBlockFromSlot(ctx, slot) - if err != nil { - if !errors.Is(err, pgx.ErrNoRows) { - log.Err(err).Int64("slot", slot).Msg("failed to query stored block") - } - return - } - - block, err := tm.ethClient.BlockByNumber(ctx, big.NewInt(storedBlock.BlockNumber)) - if err != nil { - log.Err(err). - Int64("slot", slot). - Int64("block_number", storedBlock.BlockNumber). - Msg("failed to fetch stored block") - return - } - - if err := tm.HandleBlock(ctx, storedBlock.BlockNumber, slot, block.Transactions()); err != nil { - log.Err(err). - Int64("slot", slot). - Int64("block_number", storedBlock.BlockNumber). - Msg("failed to handle stored block after decryption") - } -} - -type txExecutionJob struct { - expectedIndex int - decryptedTx *types.Transaction - txSubEvent data.TransactionSubmittedEvent - decryptionKeyID int64 -} - -func (tm *TxMapperDB) processTransactionExecution( - ctx context.Context, - te *TxExecution, -) error { - totalDecKeysAndMessages := len(te.DecKeysAndMessages) - if totalDecKeysAndMessages == 0 { - return nil - } - txSubEvents, err := tm.dbQuery.QueryTransactionSubmittedEvent(ctx, data.QueryTransactionSubmittedEventParams{ - Eon: te.DecKeysAndMessages[0].Eon, - TxIndex: te.DecKeysAndMessages[0].TxPointer, - Column3: totalDecKeysAndMessages, - }) - if err != nil { - return err - } - - if len(txSubEvents) != totalDecKeysAndMessages { - log.Debug().Int("total tx sub events", len(txSubEvents)). - Int("total decryption keys", totalDecKeysAndMessages). - Msg("total tx submitted events dont match total decryption keys") - return nil - } - - identityPreimageToDecKeyAndMsg := make(map[string]*DecKeyAndMessage) - for _, dkam := range te.DecKeysAndMessages { - identityPreimageToDecKeyAndMsg[hex.EncodeToString(dkam.IdentityPreimage)] = dkam - } - - slot := te.DecKeysAndMessages[0].Slot - expectedIdx := 0 - jobs := make([]txExecutionJob, 0, len(txSubEvents)) - - // First pass: decrypt and create initial decrypted_tx rows synchronously. - for _, txSubEvent := range txSubEvents { - decryptionKeyID, err := getDecryptionKeyID(txSubEvent, identityPreimageToDecKeyAndMsg) - if err != nil { - log.Err(err).Msg("error while trying to retrieve decryption key ID") - continue - } - - decryptedTx, err := getDecryptedTX(txSubEvent, identityPreimageToDecKeyAndMsg) - if err != nil { - log.Err(err).Msg("error while trying to get decrypted tx hash") - err := tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ - Slot: slot, - TxIndex: txSubEvent.TxIndex, - TxHash: common.Hash{}.Bytes(), - TxStatus: data.TxStatusValNotdecrypted, - InclusionPosition: InclPosUnknown, - DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEvent.ID, - }) - if err != nil { - log.Err(err).Msg("failed to create decrypted tx") - } - continue - } - - log.Info().Uint64("gas", decryptedTx.Gas()). - Uint64("gas-price", decryptedTx.GasPrice().Uint64()). - Uint64("cost", decryptedTx.Cost().Uint64()). - Uint64("max-priority-fee-per-gas", decryptedTx.GasTipCap().Uint64()). - Uint64("max-fee-per-gas", decryptedTx.GasFeeCap().Uint64()). - Uint8("tx-type", decryptedTx.Type()). - Msg("tx-data") - - err = tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ - Slot: slot, - TxIndex: txSubEvent.TxIndex, - TxHash: decryptedTx.Hash().Bytes(), - TxStatus: data.TxStatusValPending, - InclusionPosition: InclPosUnknown, - DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEvent.ID, - }) - if err != nil { - log.Err(err).Msg("failed to create decrypted tx") - continue - } - - jobs = append(jobs, txExecutionJob{ - expectedIndex: expectedIdx, - decryptedTx: decryptedTx, - txSubEvent: txSubEvent, - decryptionKeyID: decryptionKeyID, - }) - expectedIdx++ - } - - // If the block already exists for this slot, classify now that rows are present. - tm.maybeHandleStoredBlock(ctx, slot) - - var wg sync.WaitGroup - for _, job := range jobs { - currExpected := job.expectedIndex - decryptedTx := job.decryptedTx - txSubEvent := job.txSubEvent - decryptionKeyID := job.decryptionKeyID - - txErrorSignalCh := make(chan error, 1) - wg.Add(2) - - go func(ctx context.Context, decryptedTx *types.Transaction, txSubEvent data.TransactionSubmittedEvent, slot int64, decryptionKeyID int64, txErrorSignalCh chan error) { - defer wg.Done() - - select { - case <-ctx.Done(): - txErrorSignalCh <- fmt.Errorf("transaction send cancelled due to context: %w", ctx.Err()) - return - case <-time.After(time.Duration(tm.config.InclusionDelay) * time.Second): - if err := tm.ethClient.SendTransaction(ctx, decryptedTx); err != nil { - log.Err(err).Msg("failed to send transaction") - if err.Error() == "AlreadyKnown" { - log.Debug().Hex("tx-hash", decryptedTx.Hash().Bytes()).Msg("already known") - return - } - - txStatus := data.TxStatusValInvalid - if isFeeTooLowError(err) { - txStatus = data.TxStatusValInvalidfeetoolow - } - err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ - Slot: slot, - TxIndex: txSubEvent.TxIndex, - TxHash: decryptedTx.Hash().Bytes(), - TxStatus: txStatus, - InclusionPosition: InclPosUnknown, - DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEvent.ID, - BlockNumber: pgtype.Int8{}, - }) - if err != nil { - log.Err(err).Msg("failed to upsert decrypted tx") - } - txErrorSignalCh <- fmt.Errorf("%w: %v", errSendTransaction, err) - return - } - - log.Info().Hex("tx-hash", decryptedTx.Hash().Bytes()).Msg("transaction sent") - } - }(ctx, decryptedTx, txSubEvent, slot, decryptionKeyID, txErrorSignalCh) - - go func(ctx context.Context, expectedIndex int, txHash common.Hash, txIndex int64, slot int64, decryptionKeyID int64, txSubEventID int64, txErrorSignalCh chan error) { - defer wg.Done() - - if tm.isDone(txHash) { - return - } - - receipt, err := tm.waitForReceiptWithTimeout(ctx, txHash, ReceiptWaitTimeout, txErrorSignalCh) - if err != nil { - log.Err(err).Msg("") - if errors.Is(err, errSendTransaction) { - return - } - if !tm.isDone(txHash) { - err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ - Slot: slot, - TxIndex: txIndex, - TxHash: txHash[:], - TxStatus: data.TxStatusValNotincluded, - InclusionPosition: InclPosUnknown, - DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEventID, - BlockNumber: pgtype.Int8{}, - }) - if err != nil { - log.Err(err).Msg("failed to upsert decrypted tx") - } - } - return - } - - log.Info().Hex("tx-hash", receipt.TxHash.Bytes()). - Uint64("receipt-status", receipt.Status). - Msg("transaction receipt found") - - block, err := tm.ethClient.BlockByNumber(ctx, receipt.BlockNumber) - if err != nil { - log.Err(err).Uint64("block-number", receipt.BlockNumber.Uint64()).Msg("failed to retrieve block") - return - } - - inclusionSlot := utils.GetSlotForBlock(block.Header().Time, tm.genesisTimestamp, tm.slotDuration) - txStatus, inclusionPosition := data.TxStatusValShieldedinclusion, InclPosUnknown - if inclusionSlot != uint64(slot) { - txStatus = data.TxStatusValUnshieldedinclusion - inclusionPosition = InclPosWrong - } else { - txStatus, inclusionPosition = classifyInclusion(expectedIndex, receipt.TransactionIndex) - } - - log.Info(). - Int64("expected-slot", slot). - Uint64("receipt-slot", inclusionSlot). - Uint("expected-index", uint(expectedIndex)). - Uint("receipt-index", receipt.TransactionIndex). - Hex("tx-hash", receipt.TxHash.Bytes()). - Str("inclusion_position", inclusionPosition). - Str("tx_status", string(txStatus)). - Msg("transaction receipt classified") - - err = tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ - Slot: slot, - TxIndex: txIndex, - TxHash: receipt.TxHash.Bytes(), - TxStatus: txStatus, - InclusionPosition: inclusionPosition, - DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEventID, - BlockNumber: pgtype.Int8{Int64: receipt.BlockNumber.Int64(), Valid: true}, - }) - if err != nil { - log.Err(err).Msg("failed to update decrypted tx") - } - }(ctx, currExpected, decryptedTx.Hash(), txSubEvent.TxIndex, slot, decryptionKeyID, txSubEvent.ID, txErrorSignalCh) - } - - wg.Wait() - return nil -} - -func (tm *TxMapperDB) validateValidatorRegistryEvent( - ctx context.Context, - vr *validatorRegistryBindings.ValidatorregistryUpdated, - regMessage *validatorregistry.AggregateRegistrationMessage, - chainID uint64, - validatorRegistryContractAddress string, -) (map[int64]*validatorData, error) { - staticRegistrationMessageValidity := validateValidatorRegistryMessageContents(regMessage, chainID, validatorRegistryContractAddress) - - var publicKeys []*blst.P1Affine - var validators []*beaconapiclient.GetValidatorByIndexResponse - validatorIDtoValidity := make(map[int64]*validatorData) - - for _, validatorIndex := range regMessage.ValidatorIndices() { - validatorIDtoValidity[validatorIndex] = &validatorData{validatorValidity: staticRegistrationMessageValidity} - nonceBefore, err := tm.dbQuery.QueryValidatorRegistrationMessageNonceBefore(ctx, data.QueryValidatorRegistrationMessageNonceBeforeParams{ - ValidatorIndex: dbTypes.Int64ToPgTypeInt8(validatorIndex), - EventBlockNumber: int64(vr.Raw.BlockNumber), - EventTxIndex: int64(vr.Raw.TxIndex), - EventLogIndex: int64(vr.Raw.Index), - }) - - if err != nil { - if errors.Is(err, pgx.ErrNoRows) { - // No previous nonce means the message is valid regarding nonce - nonceBefore = pgtype.Int8{Int64: -1, Valid: true} - } else { - return nil, errors.Wrapf(err, "failed to query latest nonce for validator %d", validatorIndex) - } - } - - if regMessage.Nonce > math.MaxInt32 || int64(regMessage.Nonce) <= nonceBefore.Int64 { - // skip the validator - log.Warn(). - Uint32("nonce", regMessage.Nonce). - Int64("before-nonce", nonceBefore.Int64). - Msg("ignoring validator with invalid nonce") - validatorIDtoValidity[validatorIndex].validatorValidity = data.ValidatorRegistrationValidityInvalidmessage - continue - } - validator, err := tm.beaconAPIClient.GetValidatorByIndex(ctx, "head", uint64(validatorIndex)) - if err != nil { - return nil, errors.Wrapf(err, "failed to get validator %d", validatorIndex) - } - if validator == nil { - // validator not found - log.Warn().Msg("registration message for unknown validator") - validatorIDtoValidity[validatorIndex].validatorValidity = data.ValidatorRegistrationValidityInvalidmessage - continue - } - validatorIDtoValidity[validatorIndex].validatorStatus = validator.Data.Status - publicKey, err := validator.Data.Validator.GetPubkey() - if err != nil { - return nil, errors.Wrapf(err, "failed to get public key of validator %d", validatorIndex) - } - publicKeys = append(publicKeys, publicKey) - validators = append(validators, validator) - } - if len(publicKeys) > 0 { - // now we need to check for signature verification depending on the message version - sig := new(blst.P2Affine).Uncompress(vr.Signature) - if sig == nil { - return nil, fmt.Errorf("ignoring registration message with undecodable signature") - } - - if regMessage.Version == validatorregistry.LegacyValidatorRegistrationMessageVersion { - regMessage := new(validatorregistry.LegacyRegistrationMessage) - err := regMessage.Unmarshal(vr.Message) - if err != nil { - return nil, errors.Wrapf(err, "failed to unmarshal legacy registration message") - } - if valid := validatorregistry.VerifySignature(sig, publicKeys[0], regMessage); !valid { - validatorIDtoValidity[int64(validators[0].Data.Index)].validatorValidity = data.ValidatorRegistrationValidityInvalidsignature - log.Warn().Msg("invalid legacy registration message with invalid signature") - } - } else { - if valid := validatorregistry.VerifyAggregateSignature(sig, publicKeys, regMessage); !valid { - for _, validator := range validators { - validatorIDtoValidity[int64(validator.Data.Index)].validatorValidity = data.ValidatorRegistrationValidityInvalidsignature - } - log.Warn().Msg("invalid aggregate registration message with invalid signature") - } - } - } - return validatorIDtoValidity, nil -} - -func validateValidatorRegistryMessageContents( - msg *validatorregistry.AggregateRegistrationMessage, - chainID uint64, - validatorRegistryContractAddress string, -) data.ValidatorRegistrationValidity { - validity := data.ValidatorRegistrationValidityValid - if msg.Version != validatorregistry.AggregateValidatorRegistrationMessageVersion && - msg.Version != validatorregistry.LegacyValidatorRegistrationMessageVersion { - return data.ValidatorRegistrationValidityInvalidmessage - } - if msg.ChainID != chainID { - return data.ValidatorRegistrationValidityInvalidmessage - } - if msg.ValidatorRegistryAddress.String() != validatorRegistryContractAddress { - return data.ValidatorRegistrationValidityInvalidmessage - } - if msg.ValidatorIndex > math.MaxInt64 { - return data.ValidatorRegistrationValidityInvalidmessage - } - return validity -} - -func getDecryptionMessageInfos(dkam *DecKeysAndMessages) ([]int64, []int64, []int64, []int64, []int64) { - eons := make([]int64, len(dkam.Keys)) - slots := make([]int64, len(dkam.Keys)) - instanceIDs := make([]int64, len(dkam.Keys)) - txPointers := make([]int64, len(dkam.Keys)) - keyIndexes := make([]int64, len(dkam.Keys)) - - for index := range dkam.Keys { - eons[index] = dkam.Eon - slots[index] = dkam.Slot - instanceIDs[index] = dkam.InstanceID - txPointers[index] = dkam.TxPointer - keyIndexes[index] = int64(index) - } - - return eons, slots, instanceIDs, txPointers, keyIndexes -} - -func computeIdentity(prefix []byte, sender common.Address) []byte { - imageBytes := append(prefix, sender.Bytes()...) - return imageBytes -} - -func getDecryptedTX( - txSubEvent data.TransactionSubmittedEvent, - identityPreimageToDecKeyAndMsg map[string]*DecKeyAndMessage, -) (*types.Transaction, error) { - identityPreimage := computeIdentity(txSubEvent.IdentityPrefix, common.BytesToAddress(txSubEvent.Sender)) - dkam, ok := identityPreimageToDecKeyAndMsg[hex.EncodeToString(identityPreimage)] - if !ok { - return nil, fmt.Errorf("identity preimage not found %s", hex.EncodeToString(identityPreimage)) - } - tx, err := decryptTransaction(dkam.Key, txSubEvent.EncryptedTransaction) - if err != nil { - return nil, err - } - return tx, nil -} - -func getDecryptionKeyID( - txSubEvent data.TransactionSubmittedEvent, - identityPreimageToDecKeyAndMsg map[string]*DecKeyAndMessage, -) (int64, error) { - identityPreimage := computeIdentity(txSubEvent.IdentityPrefix, common.BytesToAddress(txSubEvent.Sender)) - dkam, ok := identityPreimageToDecKeyAndMsg[hex.EncodeToString(identityPreimage)] - if !ok { - return 0, fmt.Errorf("identity preimage not found %s", hex.EncodeToString(identityPreimage)) - } - return dkam.DecryptionKeyID, nil -} - -func decryptTransaction(key []byte, encrypted []byte) (*types.Transaction, error) { - decryptionKey := new(shcrypto.EpochSecretKey) - err := decryptionKey.Unmarshal(key) - if err != nil { - return nil, errors.Wrapf(err, "invalid decryption key") - } - encryptedMsg := new(shcrypto.EncryptedMessage) - err = encryptedMsg.Unmarshal(encrypted) - if err != nil { - return nil, errors.Wrapf(err, "invalid encrypted msg") - } - decryptedMsg, err := encryptedMsg.Decrypt(decryptionKey) - if err != nil { - return nil, errors.Wrapf(err, "failed to decrypt message") - } - - tx := new(types.Transaction) - err = tx.UnmarshalBinary(decryptedMsg) - if err != nil { - return nil, errors.Wrapf(err, "Failed to unmarshal decrypted message to transaction type") - } - return tx, nil -} - -func isFeeTooLowError(err error) bool { - if err == nil { - return false - } - return strings.Contains(strings.ToLower(err.Error()), "feetoolow") || - strings.Contains(strings.ToLower(err.Error()), "underpriced") || - strings.Contains(strings.ToLower(err.Error()), "maxfeepergaslessthanblockbasefee") -} - -// waitForReceiptWithTimeout waits for a transaction receipt with a provided timeout. -func (tm *TxMapperDB) waitForReceiptWithTimeout(ctx context.Context, txHash common.Hash, receiptWaitTimeout time.Duration, txErrorSignalCh chan error) (*types.Receipt, error) { - ctx, cancel := context.WithTimeout(ctx, receiptWaitTimeout) - defer cancel() - - // wait for the transaction receipt - receipt, err := tm.waitForReceipt(ctx, txHash, txErrorSignalCh) - if err != nil { - return nil, fmt.Errorf("failed to get receipt for transaction %s: %w", txHash.Hex(), err) - } - return receipt, nil -} - -// waitForReceipt polls the Ethereum network for the transaction receipt until it's available or the context is canceled. -func (tm *TxMapperDB) waitForReceipt(ctx context.Context, txHash common.Hash, txErrorSignalCh chan error) (*types.Receipt, error) { - for { - // check if the context has been canceled or timed out - select { - case <-ctx.Done(): - return nil, ctx.Err() - case err := <-txErrorSignalCh: // Listen for errors from the sending goroutine - if err != nil { - return nil, err - } - default: - } - - // query for the transaction receipt - receipt, err := tm.ethClient.TransactionReceipt(ctx, txHash) - if err == ethereum.NotFound { - // If the receipt is not found, continue polling - time.Sleep(3 * time.Second) - continue - } else if err != nil { - return nil, err - } - - return receipt, nil - } -} - -func (tm *TxMapperDB) UpsertGraffitiIfShutterized(ctx context.Context, validatorIndex int64, graffiti string, blockNumber int64) (bool, error) { - upserted, err := tm.dbQuery.UpsertGraffitiIfShutterized(ctx, data.UpsertGraffitiIfShutterizedParams{ - ValidatorIndex: dbTypes.Int64ToPgTypeInt8(validatorIndex), - Graffiti: graffiti, - BlockNumber: blockNumber, - }) - return upserted, err -} \ No newline at end of file diff --git a/internal/metrics/tx_mapper_decryption.go b/internal/metrics/tx_mapper_decryption.go new file mode 100644 index 0000000..47cf65f --- /dev/null +++ b/internal/metrics/tx_mapper_decryption.go @@ -0,0 +1,210 @@ +package metrics + +import ( + "context" + "encoding/hex" + "fmt" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + + "github.com/shutter-network/observer/internal/data" + "github.com/shutter-network/shutter/shlib/shcrypto" +) + +type DecryptionData struct { + Key []byte + Slot int64 +} + +type KeyShare struct { + Share []byte + Slot int64 +} + +type Tx struct { + EncryptedTx []byte + DD *DecryptionData + KS *KeyShare + BlockHash []byte +} + +type DecKeysAndMessages struct { + Eon int64 + Keys [][]byte + Identities [][]byte + Slot int64 + InstanceID int64 + TxPointer int64 +} + +type DecKeyAndMessage struct { + Slot int64 + TxPointer int64 + Eon int64 + Key []byte + IdentityPreimage []byte + KeyIndex int64 + DecryptionKeyID int64 +} + +func (tm *TxMapperDB) AddDecryptionKeysAndMessages( + ctx context.Context, + decKeysAndMessages *DecKeysAndMessages, +) error { + if len(decKeysAndMessages.Keys) == 0 { + return nil + } + tx, err := tm.db.Begin(ctx) + if err != nil { + return err + } + defer tx.Rollback(ctx) + qtx := tm.dbQuery.WithTx(tx) + + eons, slots, instanceIDs, txPointers, keyIndexes := getDecryptionMessageInfos(decKeysAndMessages) + decryptionKeyIDs, err := qtx.CreateDecryptionKeys(ctx, data.CreateDecryptionKeysParams{ + Column1: eons, + Column2: decKeysAndMessages.Identities, + Column3: decKeysAndMessages.Keys, + }) + if err != nil { + return err + } + if len(decryptionKeyIDs) == 0 || len(decryptionKeyIDs) != len(slots) { + log.Debug().Msg("no new decryption key was added") + return nil + } + err = qtx.CreateDecryptionKeyMessages(ctx, data.CreateDecryptionKeyMessagesParams{ + Column1: slots, + Column2: instanceIDs, + Column3: eons, + Column4: txPointers, + }) + if err != nil { + return err + } + + err = qtx.CreateDecryptionKeysMessageDecryptionKey(ctx, data.CreateDecryptionKeysMessageDecryptionKeyParams{ + Column1: slots, + Column2: keyIndexes, + Column3: decryptionKeyIDs, + }) + if err != nil { + return err + } + + totalDecKeysAndMessages := len(decKeysAndMessages.Keys) + err = tx.Commit(ctx) + if err != nil { + log.Err(err).Msg("unable to commit db transaction") + return err + } + + for i := 0; i < totalDecKeysAndMessages; i++ { + metricsDecKeyReceived.Inc() + } + + dkam := make([]*DecKeyAndMessage, totalDecKeysAndMessages) + for index, key := range decKeysAndMessages.Keys { + identityPreimage := decKeysAndMessages.Identities[index] + dkam[index] = &DecKeyAndMessage{ + Slot: decKeysAndMessages.Slot, + TxPointer: decKeysAndMessages.TxPointer, + Eon: decKeysAndMessages.Eon, + Key: key, + IdentityPreimage: identityPreimage, + KeyIndex: int64(index), + DecryptionKeyID: decryptionKeyIDs[index], + } + } + + if len(dkam) > 0 { + dkam = dkam[1:] + } + err = tm.processTransactionExecution(ctx, &TxExecution{ + DecKeysAndMessages: dkam, + }) + if err != nil { + log.Err(err).Int64("slot", decKeysAndMessages.Slot).Msg("failed to process transaction execution") + return err + } + return nil +} + +func getDecryptionMessageInfos(dkam *DecKeysAndMessages) ([]int64, []int64, []int64, []int64, []int64) { + eons := make([]int64, len(dkam.Keys)) + slots := make([]int64, len(dkam.Keys)) + instanceIDs := make([]int64, len(dkam.Keys)) + txPointers := make([]int64, len(dkam.Keys)) + keyIndexes := make([]int64, len(dkam.Keys)) + + for index := range dkam.Keys { + eons[index] = dkam.Eon + slots[index] = dkam.Slot + instanceIDs[index] = dkam.InstanceID + txPointers[index] = dkam.TxPointer + keyIndexes[index] = int64(index) + } + + return eons, slots, instanceIDs, txPointers, keyIndexes +} + +func computeIdentity(prefix []byte, sender common.Address) []byte { + imageBytes := append(prefix, sender.Bytes()...) + return imageBytes +} + +func getDecryptedTX( + txSubEvent data.TransactionSubmittedEvent, + identityPreimageToDecKeyAndMsg map[string]*DecKeyAndMessage, +) (*types.Transaction, error) { + identityPreimage := computeIdentity(txSubEvent.IdentityPrefix, common.BytesToAddress(txSubEvent.Sender)) + dkam, ok := identityPreimageToDecKeyAndMsg[hex.EncodeToString(identityPreimage)] + if !ok { + return nil, fmt.Errorf("identity preimage not found %s", hex.EncodeToString(identityPreimage)) + } + tx, err := decryptTransaction(dkam.Key, txSubEvent.EncryptedTransaction) + if err != nil { + return nil, err + } + return tx, nil +} + +func getDecryptionKeyID( + txSubEvent data.TransactionSubmittedEvent, + identityPreimageToDecKeyAndMsg map[string]*DecKeyAndMessage, +) (int64, error) { + identityPreimage := computeIdentity(txSubEvent.IdentityPrefix, common.BytesToAddress(txSubEvent.Sender)) + dkam, ok := identityPreimageToDecKeyAndMsg[hex.EncodeToString(identityPreimage)] + if !ok { + return 0, fmt.Errorf("identity preimage not found %s", hex.EncodeToString(identityPreimage)) + } + return dkam.DecryptionKeyID, nil +} + +func decryptTransaction(key []byte, encrypted []byte) (*types.Transaction, error) { + decryptionKey := new(shcrypto.EpochSecretKey) + err := decryptionKey.Unmarshal(key) + if err != nil { + return nil, errors.Wrapf(err, "invalid decryption key") + } + encryptedMsg := new(shcrypto.EncryptedMessage) + err = encryptedMsg.Unmarshal(encrypted) + if err != nil { + return nil, errors.Wrapf(err, "invalid encrypted msg") + } + decryptedMsg, err := encryptedMsg.Decrypt(decryptionKey) + if err != nil { + return nil, errors.Wrapf(err, "failed to decrypt message") + } + + tx := new(types.Transaction) + err = tx.UnmarshalBinary(decryptedMsg) + if err != nil { + return nil, errors.Wrapf(err, "Failed to unmarshal decrypted message to transaction type") + } + return tx, nil +} \ No newline at end of file diff --git a/internal/metrics/tx_mapper_execution.go b/internal/metrics/tx_mapper_execution.go new file mode 100644 index 0000000..ae11bab --- /dev/null +++ b/internal/metrics/tx_mapper_execution.go @@ -0,0 +1,303 @@ +package metrics + +import ( + "context" + "encoding/hex" + "fmt" + "strings" + "sync" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/jackc/pgx/v5/pgtype" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + + "github.com/shutter-network/observer/common/utils" + "github.com/shutter-network/observer/internal/data" +) + +type TxExecution struct { + // BlockNumber int64 + DecKeysAndMessages []*DecKeyAndMessage +} + +type txExecutionJob struct { + expectedIndex int + decryptedTx *types.Transaction + txSubEvent data.TransactionSubmittedEvent + decryptionKeyID int64 +} + +func (tm *TxMapperDB) processTransactionExecution( + ctx context.Context, + te *TxExecution, +) error { + totalDecKeysAndMessages := len(te.DecKeysAndMessages) + if totalDecKeysAndMessages == 0 { + return nil + } + txSubEvents, err := tm.dbQuery.QueryTransactionSubmittedEvent(ctx, data.QueryTransactionSubmittedEventParams{ + Eon: te.DecKeysAndMessages[0].Eon, + TxIndex: te.DecKeysAndMessages[0].TxPointer, + Column3: totalDecKeysAndMessages, + }) + if err != nil { + return err + } + + if len(txSubEvents) != totalDecKeysAndMessages { + log.Debug().Int("total tx sub events", len(txSubEvents)). + Int("total decryption keys", totalDecKeysAndMessages). + Msg("total tx submitted events dont match total decryption keys") + return nil + } + + identityPreimageToDecKeyAndMsg := make(map[string]*DecKeyAndMessage) + for _, dkam := range te.DecKeysAndMessages { + identityPreimageToDecKeyAndMsg[hex.EncodeToString(dkam.IdentityPreimage)] = dkam + } + + slot := te.DecKeysAndMessages[0].Slot + expectedIdx := 0 + jobs := make([]txExecutionJob, 0, len(txSubEvents)) + + // First pass: decrypt and create initial decrypted_tx rows synchronously. + for _, txSubEvent := range txSubEvents { + decryptionKeyID, err := getDecryptionKeyID(txSubEvent, identityPreimageToDecKeyAndMsg) + if err != nil { + log.Err(err).Msg("error while trying to retrieve decryption key ID") + continue + } + + decryptedTx, err := getDecryptedTX(txSubEvent, identityPreimageToDecKeyAndMsg) + if err != nil { + log.Err(err).Msg("error while trying to get decrypted tx hash") + err := tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ + Slot: slot, + TxIndex: txSubEvent.TxIndex, + TxHash: common.Hash{}.Bytes(), + TxStatus: data.TxStatusValNotdecrypted, + InclusionPosition: InclPosUnknown, + DecryptionKeyID: decryptionKeyID, + TransactionSubmittedEventID: txSubEvent.ID, + }) + if err != nil { + log.Err(err).Msg("failed to create decrypted tx") + } + continue + } + + log.Info().Uint64("gas", decryptedTx.Gas()). + Uint64("gas-price", decryptedTx.GasPrice().Uint64()). + Uint64("cost", decryptedTx.Cost().Uint64()). + Uint64("max-priority-fee-per-gas", decryptedTx.GasTipCap().Uint64()). + Uint64("max-fee-per-gas", decryptedTx.GasFeeCap().Uint64()). + Uint8("tx-type", decryptedTx.Type()). + Msg("tx-data") + + err = tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ + Slot: slot, + TxIndex: txSubEvent.TxIndex, + TxHash: decryptedTx.Hash().Bytes(), + TxStatus: data.TxStatusValPending, + InclusionPosition: InclPosUnknown, + DecryptionKeyID: decryptionKeyID, + TransactionSubmittedEventID: txSubEvent.ID, + }) + if err != nil { + log.Err(err).Msg("failed to create decrypted tx") + continue + } + + jobs = append(jobs, txExecutionJob{ + expectedIndex: expectedIdx, + decryptedTx: decryptedTx, + txSubEvent: txSubEvent, + decryptionKeyID: decryptionKeyID, + }) + expectedIdx++ + } + + // If the block already exists for this slot, classify now that rows are present. + tm.maybeHandleStoredBlock(ctx, slot) + + var wg sync.WaitGroup + for _, job := range jobs { + currExpected := job.expectedIndex + decryptedTx := job.decryptedTx + txSubEvent := job.txSubEvent + decryptionKeyID := job.decryptionKeyID + + txErrorSignalCh := make(chan error, 1) + wg.Add(2) + + go func(ctx context.Context, decryptedTx *types.Transaction, txSubEvent data.TransactionSubmittedEvent, slot int64, decryptionKeyID int64, txErrorSignalCh chan error) { + defer wg.Done() + + select { + case <-ctx.Done(): + txErrorSignalCh <- fmt.Errorf("transaction send cancelled due to context: %w", ctx.Err()) + return + case <-time.After(time.Duration(tm.config.InclusionDelay) * time.Second): + if err := tm.ethClient.SendTransaction(ctx, decryptedTx); err != nil { + log.Err(err).Msg("failed to send transaction") + if err.Error() == "AlreadyKnown" { + log.Debug().Hex("tx-hash", decryptedTx.Hash().Bytes()).Msg("already known") + return + } + + txStatus := data.TxStatusValInvalid + if isFeeTooLowError(err) { + txStatus = data.TxStatusValInvalidfeetoolow + } + err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ + Slot: slot, + TxIndex: txSubEvent.TxIndex, + TxHash: decryptedTx.Hash().Bytes(), + TxStatus: txStatus, + InclusionPosition: InclPosUnknown, + DecryptionKeyID: decryptionKeyID, + TransactionSubmittedEventID: txSubEvent.ID, + BlockNumber: pgtype.Int8{}, + }) + if err != nil { + log.Err(err).Msg("failed to upsert decrypted tx") + } + txErrorSignalCh <- fmt.Errorf("%w: %v", errSendTransaction, err) + return + } + + log.Info().Hex("tx-hash", decryptedTx.Hash().Bytes()).Msg("transaction sent") + } + }(ctx, decryptedTx, txSubEvent, slot, decryptionKeyID, txErrorSignalCh) + + go func(ctx context.Context, expectedIndex int, txHash common.Hash, txIndex int64, slot int64, decryptionKeyID int64, txSubEventID int64, txErrorSignalCh chan error) { + defer wg.Done() + + if tm.isDone(txHash) { + return + } + + receipt, err := tm.waitForReceiptWithTimeout(ctx, txHash, ReceiptWaitTimeout, txErrorSignalCh) + if err != nil { + log.Err(err).Msg("") + if errors.Is(err, errSendTransaction) { + return + } + if !tm.isDone(txHash) { + err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ + Slot: slot, + TxIndex: txIndex, + TxHash: txHash[:], + TxStatus: data.TxStatusValNotincluded, + InclusionPosition: InclPosUnknown, + DecryptionKeyID: decryptionKeyID, + TransactionSubmittedEventID: txSubEventID, + BlockNumber: pgtype.Int8{}, + }) + if err != nil { + log.Err(err).Msg("failed to upsert decrypted tx") + } + } + return + } + + log.Info().Hex("tx-hash", receipt.TxHash.Bytes()). + Uint64("receipt-status", receipt.Status). + Msg("transaction receipt found") + + block, err := tm.ethClient.BlockByNumber(ctx, receipt.BlockNumber) + if err != nil { + log.Err(err).Uint64("block-number", receipt.BlockNumber.Uint64()).Msg("failed to retrieve block") + return + } + + inclusionSlot := utils.GetSlotForBlock(block.Header().Time, tm.genesisTimestamp, tm.slotDuration) + txStatus, inclusionPosition := data.TxStatusValShieldedinclusion, InclPosUnknown + if inclusionSlot != uint64(slot) { + txStatus = data.TxStatusValUnshieldedinclusion + inclusionPosition = InclPosWrong + } else { + txStatus, inclusionPosition = classifyInclusion(expectedIndex, receipt.TransactionIndex) + } + + log.Info(). + Int64("expected-slot", slot). + Uint64("receipt-slot", inclusionSlot). + Uint("expected-index", uint(expectedIndex)). + Uint("receipt-index", receipt.TransactionIndex). + Hex("tx-hash", receipt.TxHash.Bytes()). + Str("inclusion_position", inclusionPosition). + Str("tx_status", string(txStatus)). + Msg("transaction receipt classified") + + err = tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ + Slot: slot, + TxIndex: txIndex, + TxHash: receipt.TxHash.Bytes(), + TxStatus: txStatus, + InclusionPosition: inclusionPosition, + DecryptionKeyID: decryptionKeyID, + TransactionSubmittedEventID: txSubEventID, + BlockNumber: pgtype.Int8{Int64: receipt.BlockNumber.Int64(), Valid: true}, + }) + if err != nil { + log.Err(err).Msg("failed to update decrypted tx") + } + }(ctx, currExpected, decryptedTx.Hash(), txSubEvent.TxIndex, slot, decryptionKeyID, txSubEvent.ID, txErrorSignalCh) + } + + wg.Wait() + return nil +} + +func (tm *TxMapperDB) waitForReceiptWithTimeout(ctx context.Context, txHash common.Hash, receiptWaitTimeout time.Duration, txErrorSignalCh chan error) (*types.Receipt, error) { + ctx, cancel := context.WithTimeout(ctx, receiptWaitTimeout) + defer cancel() + + // wait for the transaction receipt + receipt, err := tm.waitForReceipt(ctx, txHash, txErrorSignalCh) + if err != nil { + return nil, fmt.Errorf("failed to get receipt for transaction %s: %w", txHash.Hex(), err) + } + return receipt, nil +} + +func (tm *TxMapperDB) waitForReceipt(ctx context.Context, txHash common.Hash, txErrorSignalCh chan error) (*types.Receipt, error) { + for { + // check if the context has been canceled or timed out + select { + case <-ctx.Done(): + return nil, ctx.Err() + case err := <-txErrorSignalCh: // Listen for errors from the sending goroutine + if err != nil { + return nil, err + } + default: + } + + // query for the transaction receipt + receipt, err := tm.ethClient.TransactionReceipt(ctx, txHash) + if errors.Is(err, ethereum.NotFound) || err == ethereum.NotFound { + // If the receipt is not found, continue polling + time.Sleep(3 * time.Second) + continue + } else if err != nil { + return nil, err + } + + return receipt, nil + } +} + +func isFeeTooLowError(err error) bool { + if err == nil { + return false + } + return strings.Contains(strings.ToLower(err.Error()), "feetoolow") || + strings.Contains(strings.ToLower(err.Error()), "underpriced") || + strings.Contains(strings.ToLower(err.Error()), "maxfeepergaslessthanblockbasefee") +} \ No newline at end of file diff --git a/internal/metrics/tx_mapper_db_test.go b/internal/metrics/tx_mapper_test.go similarity index 100% rename from internal/metrics/tx_mapper_db_test.go rename to internal/metrics/tx_mapper_test.go diff --git a/internal/metrics/tx_mapper_validators.go b/internal/metrics/tx_mapper_validators.go new file mode 100644 index 0000000..3aeea51 --- /dev/null +++ b/internal/metrics/tx_mapper_validators.go @@ -0,0 +1,298 @@ +package metrics + +import ( + "context" + "fmt" + "math" + "sync" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + + validatorRegistryBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/validatorregistry" + dbTypes "github.com/shutter-network/observer/common/database" + "github.com/shutter-network/observer/internal/data" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/beaconapiclient" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/validatorregistry" + blst "github.com/supranational/blst/bindings/go" +) + +type validatorData struct { + validatorStatus string + validatorValidity data.ValidatorRegistrationValidity +} + +func (tm *TxMapperDB) QueryBlockNumberFromValidatorRegistryEventsSyncedUntil(ctx context.Context) (int64, error) { + data, err := tm.dbQuery.QueryValidatorRegistryEventsSyncedUntil(ctx) + if err != nil { + return 0, err + } + return data.BlockNumber, nil +} + +func (tm *TxMapperDB) UpsertGraffitiIfShutterized(ctx context.Context, validatorIndex int64, graffiti string, blockNumber int64) (bool, error) { + upserted, err := tm.dbQuery.UpsertGraffitiIfShutterized(ctx, data.UpsertGraffitiIfShutterizedParams{ + ValidatorIndex: dbTypes.Int64ToPgTypeInt8(validatorIndex), + Graffiti: graffiti, + BlockNumber: blockNumber, + }) + return upserted, err +} + +func (tm *TxMapperDB) AddValidatorRegistryEvent(ctx context.Context, tx pgx.Tx, vr *validatorRegistryBindings.ValidatorregistryUpdated) error { + regMessage := &validatorregistry.AggregateRegistrationMessage{} + err := regMessage.Unmarshal(vr.Message) + if err != nil { + log.Err(err).Hex("tx-hash", vr.Raw.TxHash.Bytes()).Msg("error unmarshalling registration message") + } else { + validatorIDtoValidity, err := tm.validateValidatorRegistryEvent(ctx, vr, regMessage, uint64(tm.chainID), tm.config.ValidatorRegistryContractAddress) + if err != nil { + log.Err(err).Msg("error validating validator registry events") + return err + } + + q := tm.dbQuery + if tx != nil { + // Use transaction if available + q = tm.dbQuery.WithTx(tx) + } + + for validatorID, validatorData := range validatorIDtoValidity { + err := q.CreateValidatorRegistryMessage(ctx, data.CreateValidatorRegistryMessageParams{ + Version: dbTypes.Uint64ToPgTypeInt8(uint64(regMessage.Version)), + ChainID: dbTypes.Uint64ToPgTypeInt8(regMessage.ChainID), + ValidatorRegistryAddress: regMessage.ValidatorRegistryAddress.Bytes(), + ValidatorIndex: dbTypes.Int64ToPgTypeInt8(validatorID), + Nonce: dbTypes.Uint64ToPgTypeInt8(uint64(regMessage.Nonce)), + IsRegisteration: dbTypes.BoolToPgTypeBool(regMessage.IsRegistration), + Signature: vr.Signature, + EventBlockNumber: int64(vr.Raw.BlockNumber), + EventTxIndex: int64(vr.Raw.TxIndex), + EventLogIndex: int64(vr.Raw.Index), + Validity: validatorData.validatorValidity, + }) + if err != nil { + return err + } + + if validatorData.validatorValidity == data.ValidatorRegistrationValidityValid && + validatorData.validatorStatus != "" { + err := q.CreateValidatorStatus(ctx, data.CreateValidatorStatusParams{ + ValidatorIndex: dbTypes.Int64ToPgTypeInt8(validatorID), + Status: validatorData.validatorStatus, + }) + if err != nil { + return err + } + } + } + } + return nil +} + +func (tm *TxMapperDB) UpdateValidatorStatus(ctx context.Context) error { + batchSize := 100 + jumpBy := 0 + numWorkers := 5 + sem := make(chan struct{}, numWorkers) + var wg sync.WaitGroup + + for { + // Query a batch of validator statuses + validatorStatus, err := tm.dbQuery.QueryValidatorStatuses(ctx, data.QueryValidatorStatusesParams{ + Limit: int32(batchSize), + Offset: int32(jumpBy), + }) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + break + } + return err + } + + if len(validatorStatus) == 0 { + break + } + + // Launch goroutines to process each status concurrently + for _, vs := range validatorStatus { + sem <- struct{}{} + wg.Add(1) + go func(vs data.QueryValidatorStatusesRow) { + defer wg.Done() + defer func() { <-sem }() + + validatorIndex := uint64(vs.ValidatorIndex.Int64) + //TODO: should we keep this log or remove it? + log.Debug().Uint64("validatorIndex", validatorIndex).Msg("validator status being updated") + validator, err := tm.beaconAPIClient.GetValidatorByIndex(ctx, "head", validatorIndex) + if err != nil { + log.Err(err).Uint64("validatorIndex", validatorIndex).Msg("failed to get validator from beacon chain") + return + } + if validator == nil { + return + } + + err = tm.dbQuery.CreateValidatorStatus(ctx, data.CreateValidatorStatusParams{ + ValidatorIndex: dbTypes.Uint64ToPgTypeInt8(validatorIndex), + Status: validator.Data.Status, + }) + if err != nil { + log.Err(err).Uint64("validatorIndex", validatorIndex).Msg("failed to create validator status") + return + } + }(vs) + } + + wg.Wait() + + // Wait for 3 seconds before processing the next batch + select { + case <-ctx.Done(): + return ctx.Err() // Handle context cancellation + case <-time.After(3 * time.Second): + } + + jumpBy += batchSize + } + + return nil +} + +func (tm *TxMapperDB) AddProposerDuties(ctx context.Context, epoch uint64) error { + proposerDuties, err := tm.beaconAPIClient.GetProposerDutiesByEpoch(ctx, epoch) + if err != nil { + return err + } + if proposerDuties == nil { + return errors.Errorf("no proposer duties found for epoch %d", epoch) + } + + log.Info().Uint64("epoch", epoch).Msg("processing proposer duties") + + publicKeys := make([]string, len(proposerDuties.Data)) + validatorIndices := make([]int64, len(proposerDuties.Data)) + slots := make([]int64, len(proposerDuties.Data)) + + for i := 0; i < len(proposerDuties.Data); i++ { + publicKeys[i] = proposerDuties.Data[i].Pubkey + validatorIndices[i] = int64(proposerDuties.Data[i].ValidatorIndex) + slots[i] = int64(proposerDuties.Data[i].Slot) + } + + err = tm.dbQuery.CreateProposerDuties(ctx, data.CreateProposerDutiesParams{ + Column1: publicKeys, + Column2: validatorIndices, + Column3: slots, + }) + return err +} + +func (tm *TxMapperDB) validateValidatorRegistryEvent( + ctx context.Context, + vr *validatorRegistryBindings.ValidatorregistryUpdated, + regMessage *validatorregistry.AggregateRegistrationMessage, + chainID uint64, + validatorRegistryContractAddress string, +) (map[int64]*validatorData, error) { + staticRegistrationMessageValidity := validateValidatorRegistryMessageContents(regMessage, chainID, validatorRegistryContractAddress) + + var publicKeys []*blst.P1Affine + var validators []*beaconapiclient.GetValidatorByIndexResponse + validatorIDtoValidity := make(map[int64]*validatorData) + + for _, validatorIndex := range regMessage.ValidatorIndices() { + validatorIDtoValidity[validatorIndex] = &validatorData{validatorValidity: staticRegistrationMessageValidity} + nonceBefore, err := tm.dbQuery.QueryValidatorRegistrationMessageNonceBefore(ctx, data.QueryValidatorRegistrationMessageNonceBeforeParams{ + ValidatorIndex: dbTypes.Int64ToPgTypeInt8(validatorIndex), + EventBlockNumber: int64(vr.Raw.BlockNumber), + EventTxIndex: int64(vr.Raw.TxIndex), + EventLogIndex: int64(vr.Raw.Index), + }) + + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + // No previous nonce means the message is valid regarding nonce + nonceBefore = pgtype.Int8{Int64: -1, Valid: true} + } else { + return nil, errors.Wrapf(err, "failed to query latest nonce for validator %d", validatorIndex) + } + } + + if regMessage.Nonce > math.MaxInt32 || int64(regMessage.Nonce) <= nonceBefore.Int64 { + // skip the validator + log.Warn(). + Uint32("nonce", regMessage.Nonce). + Int64("before-nonce", nonceBefore.Int64). + Msg("ignoring validator with invalid nonce") + validatorIDtoValidity[validatorIndex].validatorValidity = data.ValidatorRegistrationValidityInvalidmessage + continue + } + validator, err := tm.beaconAPIClient.GetValidatorByIndex(ctx, "head", uint64(validatorIndex)) + if err != nil { + return nil, errors.Wrapf(err, "failed to get validator %d", validatorIndex) + } + if validator == nil { + // validator not found + log.Warn().Msg("registration message for unknown validator") + validatorIDtoValidity[validatorIndex].validatorValidity = data.ValidatorRegistrationValidityInvalidmessage + continue + } + validatorIDtoValidity[validatorIndex].validatorStatus = validator.Data.Status + publicKey, err := validator.Data.Validator.GetPubkey() + if err != nil { + return nil, errors.Wrapf(err, "failed to get public key of validator %d", validatorIndex) + } + publicKeys = append(publicKeys, publicKey) + validators = append(validators, validator) + } + if len(publicKeys) > 0 { + // now we need to check for signature verification depending on the message version + sig := new(blst.P2Affine).Uncompress(vr.Signature) + if sig == nil { + return nil, fmt.Errorf("ignoring registration message with undecodable signature") + } + + if regMessage.Version == validatorregistry.LegacyValidatorRegistrationMessageVersion { + regMessage := new(validatorregistry.LegacyRegistrationMessage) + err := regMessage.Unmarshal(vr.Message) + if err != nil { + return nil, errors.Wrapf(err, "failed to unmarshal legacy registration message") + } + if valid := validatorregistry.VerifySignature(sig, publicKeys[0], regMessage); !valid { + validatorIDtoValidity[int64(validators[0].Data.Index)].validatorValidity = data.ValidatorRegistrationValidityInvalidsignature + log.Warn().Msg("invalid legacy registration message with invalid signature") + } + } else { + if valid := validatorregistry.VerifyAggregateSignature(sig, publicKeys, regMessage); !valid { + for _, validator := range validators { + validatorIDtoValidity[int64(validator.Data.Index)].validatorValidity = data.ValidatorRegistrationValidityInvalidsignature + } + log.Warn().Msg("invalid aggregate registration message with invalid signature") + } + } + } + return validatorIDtoValidity, nil +} + +func validateValidatorRegistryMessageContents(msg *validatorregistry.AggregateRegistrationMessage, chainID uint64, validatorRegistryContractAddress string) data.ValidatorRegistrationValidity { + validity := data.ValidatorRegistrationValidityValid + if msg.Version != validatorregistry.AggregateValidatorRegistrationMessageVersion && + msg.Version != validatorregistry.LegacyValidatorRegistrationMessageVersion { + return data.ValidatorRegistrationValidityInvalidmessage + } + if msg.ChainID != chainID { + return data.ValidatorRegistrationValidityInvalidmessage + } + if msg.ValidatorRegistryAddress.String() != validatorRegistryContractAddress { + return data.ValidatorRegistrationValidityInvalidmessage + } + if msg.ValidatorIndex > math.MaxInt64 { + return data.ValidatorRegistrationValidityInvalidmessage + } + return validity +} \ No newline at end of file diff --git a/internal/metrics/types.go b/internal/metrics/types.go deleted file mode 100644 index f925a03..0000000 --- a/internal/metrics/types.go +++ /dev/null @@ -1,71 +0,0 @@ -package metrics - -import ( - "context" - - "github.com/ethereum/go-ethereum/core/types" - "github.com/jackc/pgx/v5" - sequencerBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/sequencer" - validatorRegistryBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/validatorregistry" - "github.com/shutter-network/observer/internal/data" -) - -type DecryptionData struct { - Key []byte - Slot int64 -} - -type KeyShare struct { - Share []byte - Slot int64 -} - -type Tx struct { - EncryptedTx []byte - DD *DecryptionData - KS *KeyShare - BlockHash []byte -} - -type DecKeysAndMessages struct { - Eon int64 - Keys [][]byte - Identities [][]byte - Slot int64 - InstanceID int64 - TxPointer int64 -} - -type DecKeyAndMessage struct { - Slot int64 - TxPointer int64 - Eon int64 - Key []byte - IdentityPreimage []byte - KeyIndex int64 - DecryptionKeyID int64 -} - -type TxExecution struct { - // BlockNumber int64 - DecKeysAndMessages []*DecKeyAndMessage -} - -type TxMapper interface { - AddTransactionSubmittedEvent(ctx context.Context, tx pgx.Tx, st *sequencerBindings.SequencerTransactionSubmitted) error - AddDecryptionKeysAndMessages( - ctx context.Context, - dkam *DecKeysAndMessages, - ) error - AddKeyShare(ctx context.Context, dks *data.DecryptionKeyShare) error - AddBlock( - ctx context.Context, - b *data.Block, - ) error - QueryBlockNumberFromValidatorRegistryEventsSyncedUntil(ctx context.Context) (int64, error) - AddValidatorRegistryEvent(ctx context.Context, tx pgx.Tx, vr *validatorRegistryBindings.ValidatorregistryUpdated) error - UpdateValidatorStatus(ctx context.Context) error - AddProposerDuties(ctx context.Context, epoch uint64) error - UpsertGraffitiIfShutterized(ctx context.Context, validatorIndex int64, graffiti string, blockNumber int64) (bool, error) - HandleBlock(ctx context.Context, blockNumber int64, slot int64, txs types.Transactions) error -} From a3208d3cfc293b459c73c267c5d7d6f95bc0876d Mon Sep 17 00:00:00 2001 From: ylembachar Date: Wed, 18 Mar 2026 02:41:35 +0100 Subject: [PATCH 7/9] fix: stop receipt classification after block finalization --- internal/metrics/tx_mapper_execution.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/internal/metrics/tx_mapper_execution.go b/internal/metrics/tx_mapper_execution.go index ae11bab..b631a13 100644 --- a/internal/metrics/tx_mapper_execution.go +++ b/internal/metrics/tx_mapper_execution.go @@ -205,6 +205,11 @@ func (tm *TxMapperDB) processTransactionExecution( return } + // Block classification may have completed while we were waiting. + if tm.isDone(txHash) { + return + } + log.Info().Hex("tx-hash", receipt.TxHash.Bytes()). Uint64("receipt-status", receipt.Status). Msg("transaction receipt found") @@ -215,6 +220,11 @@ func (tm *TxMapperDB) processTransactionExecution( return } + // Block classification may have completed while we were fetching the block. + if tm.isDone(txHash) { + return + } + inclusionSlot := utils.GetSlotForBlock(block.Header().Time, tm.genesisTimestamp, tm.slotDuration) txStatus, inclusionPosition := data.TxStatusValShieldedinclusion, InclPosUnknown if inclusionSlot != uint64(slot) { From 32040344931984549179716e23f338f81a81da0b Mon Sep 17 00:00:00 2001 From: ylembachar Date: Wed, 18 Mar 2026 03:29:28 +0100 Subject: [PATCH 8/9] fix: prevent tx status races between receipt and block classification --- internal/metrics/tx_mapper.go | 13 ++++ internal/metrics/tx_mapper_classification.go | 56 +++++++------- internal/metrics/tx_mapper_execution.go | 78 ++++++++++++++------ 3 files changed, 99 insertions(+), 48 deletions(-) diff --git a/internal/metrics/tx_mapper.go b/internal/metrics/tx_mapper.go index cbd3650..ae2ef6f 100644 --- a/internal/metrics/tx_mapper.go +++ b/internal/metrics/tx_mapper.go @@ -42,6 +42,19 @@ type TxMapperDB struct { genesisTimestamp uint64 slotDuration uint64 statusDone sync.Map + txLocks sync.Map +} + +func (tm *TxMapperDB) txLock(hash common.Hash) *sync.Mutex { + v, _ := tm.txLocks.LoadOrStore(hash.Hex(), &sync.Mutex{}) + return v.(*sync.Mutex) +} + +func (tm *TxMapperDB) withTxLock(hash common.Hash, fn func() error) error { + mu := tm.txLock(hash) + mu.Lock() + defer mu.Unlock() + return fn() } type TxEventStore interface { diff --git a/internal/metrics/tx_mapper_classification.go b/internal/metrics/tx_mapper_classification.go index c3e5540..e530308 100644 --- a/internal/metrics/tx_mapper_classification.go +++ b/internal/metrics/tx_mapper_classification.go @@ -145,38 +145,44 @@ func (tm *TxMapperDB) HandleBlock(ctx context.Context, blockNumber int64, slot i for blockPos, tx := range txs { h := tx.Hash() expectedPos, ok := indexByHash[h.Hex()] - if !ok || tm.isDone(h) { + if !ok { continue } - status, pos := classifyWithPredecessors(expectedPos, blockPos, entries) - entries[expectedPos].status = status // update for later predecessor checks - - log.Debug(). - Int64("slot", slot). - Int64("block_number", blockNumber). - Int("block_pos", blockPos). - Int("expected_pos", expectedPos). - Int64("tx_index", entries[expectedPos].txIndex). - Hex("tx_hash", h.Bytes()). - Str("tx_status", string(status)). - Str("inclusion_position", pos). - Msg("classified tx from block body") - - if err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ - Slot: slot, - TxIndex: entries[expectedPos].txIndex, - TxHash: h.Bytes(), - TxStatus: status, - InclusionPosition: pos, - DecryptionKeyID: entries[expectedPos].decryptionKeyID, - TransactionSubmittedEventID: entries[expectedPos].submittedEventID, - BlockNumber: pgtype.Int8{Int64: blockNumber, Valid: true}, + if err := tm.withTxLock(h, func() error { + status, pos := classifyWithPredecessors(expectedPos, blockPos, entries) + entries[expectedPos].status = status // update for later predecessor checks + + log.Debug(). + Int64("slot", slot). + Int64("block_number", blockNumber). + Int("block_pos", blockPos). + Int("expected_pos", expectedPos). + Int64("tx_index", entries[expectedPos].txIndex). + Hex("tx_hash", h.Bytes()). + Str("tx_status", string(status)). + Str("inclusion_position", pos). + Msg("classified tx from block body") + + if err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ + Slot: slot, + TxIndex: entries[expectedPos].txIndex, + TxHash: h.Bytes(), + TxStatus: status, + InclusionPosition: pos, + DecryptionKeyID: entries[expectedPos].decryptionKeyID, + TransactionSubmittedEventID: entries[expectedPos].submittedEventID, + BlockNumber: pgtype.Int8{Int64: blockNumber, Valid: true}, + }); err != nil { + return err + } + + tm.markDone(h) + return nil }); err != nil { log.Err(err).Hex("tx-hash", h.Bytes()).Msg("failed to upsert tx from block body") continue } - tm.markDone(h) } return nil diff --git a/internal/metrics/tx_mapper_execution.go b/internal/metrics/tx_mapper_execution.go index b631a13..ee33966 100644 --- a/internal/metrics/tx_mapper_execution.go +++ b/internal/metrics/tx_mapper_execution.go @@ -142,6 +142,10 @@ func (tm *TxMapperDB) processTransactionExecution( txErrorSignalCh <- fmt.Errorf("transaction send cancelled due to context: %w", ctx.Err()) return case <-time.After(time.Duration(tm.config.InclusionDelay) * time.Second): + if tm.isDone(decryptedTx.Hash()) { + return + } + if err := tm.ethClient.SendTransaction(ctx, decryptedTx); err != nil { log.Err(err).Msg("failed to send transaction") if err.Error() == "AlreadyKnown" { @@ -153,15 +157,21 @@ func (tm *TxMapperDB) processTransactionExecution( if isFeeTooLowError(err) { txStatus = data.TxStatusValInvalidfeetoolow } - err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ - Slot: slot, - TxIndex: txSubEvent.TxIndex, - TxHash: decryptedTx.Hash().Bytes(), - TxStatus: txStatus, - InclusionPosition: InclPosUnknown, - DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEvent.ID, - BlockNumber: pgtype.Int8{}, + err := tm.withTxLock(decryptedTx.Hash(), func() error { + if tm.isDone(decryptedTx.Hash()) { + return nil + } + + return tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ + Slot: slot, + TxIndex: txSubEvent.TxIndex, + TxHash: decryptedTx.Hash().Bytes(), + TxStatus: txStatus, + InclusionPosition: InclPosUnknown, + DecryptionKeyID: decryptionKeyID, + TransactionSubmittedEventID: txSubEvent.ID, + BlockNumber: pgtype.Int8{}, + }) }) if err != nil { log.Err(err).Msg("failed to upsert decrypted tx") @@ -183,12 +193,19 @@ func (tm *TxMapperDB) processTransactionExecution( receipt, err := tm.waitForReceiptWithTimeout(ctx, txHash, ReceiptWaitTimeout, txErrorSignalCh) if err != nil { + if errors.Is(err, context.Canceled) { + return + } log.Err(err).Msg("") if errors.Is(err, errSendTransaction) { return } - if !tm.isDone(txHash) { - err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ + err := tm.withTxLock(txHash, func() error { + if tm.isDone(txHash) { + return nil + } + + return tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ Slot: slot, TxIndex: txIndex, TxHash: txHash[:], @@ -198,9 +215,9 @@ func (tm *TxMapperDB) processTransactionExecution( TransactionSubmittedEventID: txSubEventID, BlockNumber: pgtype.Int8{}, }) - if err != nil { - log.Err(err).Msg("failed to upsert decrypted tx") - } + }) + if err != nil { + log.Err(err).Msg("failed to upsert decrypted tx") } return } @@ -244,15 +261,26 @@ func (tm *TxMapperDB) processTransactionExecution( Str("tx_status", string(txStatus)). Msg("transaction receipt classified") - err = tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ - Slot: slot, - TxIndex: txIndex, - TxHash: receipt.TxHash.Bytes(), - TxStatus: txStatus, - InclusionPosition: inclusionPosition, - DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEventID, - BlockNumber: pgtype.Int8{Int64: receipt.BlockNumber.Int64(), Valid: true}, + err = tm.withTxLock(txHash, func() error { + if tm.isDone(txHash) { + return nil + } + + if err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ + Slot: slot, + TxIndex: txIndex, + TxHash: receipt.TxHash.Bytes(), + TxStatus: txStatus, + InclusionPosition: inclusionPosition, + DecryptionKeyID: decryptionKeyID, + TransactionSubmittedEventID: txSubEventID, + BlockNumber: pgtype.Int8{Int64: receipt.BlockNumber.Int64(), Valid: true}, + }); err != nil { + return err + } + + tm.markDone(txHash) + return nil }) if err != nil { log.Err(err).Msg("failed to update decrypted tx") @@ -278,6 +306,10 @@ func (tm *TxMapperDB) waitForReceiptWithTimeout(ctx context.Context, txHash comm func (tm *TxMapperDB) waitForReceipt(ctx context.Context, txHash common.Hash, txErrorSignalCh chan error) (*types.Receipt, error) { for { + if tm.isDone(txHash) { + return nil, context.Canceled + } + // check if the context has been canceled or timed out select { case <-ctx.Done(): From 9b786b12d0dd5432f282a5970d902b6ca06f8f2a Mon Sep 17 00:00:00 2001 From: ylembachar Date: Wed, 18 Mar 2026 13:40:36 +0100 Subject: [PATCH 9/9] fix: add slot-level locking for transaction status classification --- internal/metrics/tx_mapper.go | 16 +-- internal/metrics/tx_mapper_classification.go | 107 +++++++++---------- internal/metrics/tx_mapper_execution.go | 100 +++++++++-------- 3 files changed, 114 insertions(+), 109 deletions(-) diff --git a/internal/metrics/tx_mapper.go b/internal/metrics/tx_mapper.go index ae2ef6f..0f6aa2b 100644 --- a/internal/metrics/tx_mapper.go +++ b/internal/metrics/tx_mapper.go @@ -42,16 +42,16 @@ type TxMapperDB struct { genesisTimestamp uint64 slotDuration uint64 statusDone sync.Map - txLocks sync.Map + slotLocks sync.Map } -func (tm *TxMapperDB) txLock(hash common.Hash) *sync.Mutex { - v, _ := tm.txLocks.LoadOrStore(hash.Hex(), &sync.Mutex{}) +func (tm *TxMapperDB) slotLock(slot int64) *sync.Mutex { + v, _ := tm.slotLocks.LoadOrStore(slot, &sync.Mutex{}) return v.(*sync.Mutex) } -func (tm *TxMapperDB) withTxLock(hash common.Hash, fn func() error) error { - mu := tm.txLock(hash) +func (tm *TxMapperDB) withSlotLock(slot int64, fn func() error) error { + mu := tm.slotLock(slot) mu.Lock() defer mu.Unlock() return fn() @@ -86,13 +86,13 @@ type TxMapper interface { ValidatorStore } -// markDone records that a tx hash has been finalized (by block or receipt) -// to prevent double classification. +// markDone records that an inclusion classification was observed for this tx hash. +// Failure and timeout paths should not overwrite rows once a tx is done. func (tm *TxMapperDB) markDone(hash common.Hash) { tm.statusDone.Store(hash.Hex(), struct{}{}) } -// isDone reports whether a tx hash was already finalized. +// isDone reports whether an inclusion classification was already observed. func (tm *TxMapperDB) isDone(hash common.Hash) bool { _, ok := tm.statusDone.Load(hash.Hex()) return ok diff --git a/internal/metrics/tx_mapper_classification.go b/internal/metrics/tx_mapper_classification.go index e530308..0c6ae9d 100644 --- a/internal/metrics/tx_mapper_classification.go +++ b/internal/metrics/tx_mapper_classification.go @@ -97,61 +97,61 @@ func (tm *TxMapperDB) HandleBlock(ctx context.Context, blockNumber int64, slot i Int("num_txs", len(txs)). Msg("handling block for tx classification") - rows, err := tm.db.Query(ctx, ` - SELECT tx_hash, tx_index, tx_status, decryption_key_id, transaction_submitted_event_id - FROM decrypted_tx - WHERE slot = $1 - AND tx_hash <> '\x00' - ORDER BY tx_index`, slot) - if err != nil { - return err - } - defer rows.Close() - - var entries []batchEntry - indexByHash := make(map[string]int) - - for rows.Next() { - var ( - hashBytes []byte - txIdx int64 - status data.TxStatusVal - decID int64 - subID int64 - ) - if err := rows.Scan(&hashBytes, &txIdx, &status, &decID, &subID); err != nil { + return tm.withSlotLock(slot, func() error { + rows, err := tm.db.Query(ctx, ` + SELECT tx_hash, tx_index, tx_status, decryption_key_id, transaction_submitted_event_id + FROM decrypted_tx + WHERE slot = $1 + AND tx_hash <> '\x00' + ORDER BY tx_index`, slot) + if err != nil { return err } - h := common.BytesToHash(hashBytes) - indexByHash[h.Hex()] = len(entries) - entries = append(entries, batchEntry{ - hash: h, - status: status, - txIndex: txIdx, - decryptionKeyID: decID, - submittedEventID: subID, - }) - } - - log.Debug(). - Int64("slot", slot). - Int("num_candidates", len(entries)). - Msg("loaded decrypted tx candidates for block classification") + defer rows.Close() + + var entries []batchEntry + indexByHash := make(map[string]int) + + for rows.Next() { + var ( + hashBytes []byte + txIdx int64 + status data.TxStatusVal + decID int64 + subID int64 + ) + if err := rows.Scan(&hashBytes, &txIdx, &status, &decID, &subID); err != nil { + return err + } + h := common.BytesToHash(hashBytes) + indexByHash[h.Hex()] = len(entries) + entries = append(entries, batchEntry{ + hash: h, + status: status, + txIndex: txIdx, + decryptionKeyID: decID, + submittedEventID: subID, + }) + } - if len(entries) == 0 { - return nil - } + log.Debug(). + Int64("slot", slot). + Int("num_candidates", len(entries)). + Msg("loaded decrypted tx candidates for block classification") - for blockPos, tx := range txs { - h := tx.Hash() - expectedPos, ok := indexByHash[h.Hex()] - if !ok { - continue + if len(entries) == 0 { + return nil } - if err := tm.withTxLock(h, func() error { + for blockPos, tx := range txs { + h := tx.Hash() + expectedPos, ok := indexByHash[h.Hex()] + if !ok { + continue + } + status, pos := classifyWithPredecessors(expectedPos, blockPos, entries) - entries[expectedPos].status = status // update for later predecessor checks + entries[expectedPos].status = status log.Debug(). Int64("slot", slot). @@ -174,18 +174,15 @@ func (tm *TxMapperDB) HandleBlock(ctx context.Context, blockNumber int64, slot i TransactionSubmittedEventID: entries[expectedPos].submittedEventID, BlockNumber: pgtype.Int8{Int64: blockNumber, Valid: true}, }); err != nil { - return err + log.Err(err).Hex("tx-hash", h.Bytes()).Msg("failed to upsert tx from block body") + continue } tm.markDone(h) - return nil - }); err != nil { - log.Err(err).Hex("tx-hash", h.Bytes()).Msg("failed to upsert tx from block body") - continue } - } - return nil + return nil + }) } func (tm *TxMapperDB) maybeHandleStoredBlock(ctx context.Context, slot int64) { diff --git a/internal/metrics/tx_mapper_execution.go b/internal/metrics/tx_mapper_execution.go index ee33966..1b0b526 100644 --- a/internal/metrics/tx_mapper_execution.go +++ b/internal/metrics/tx_mapper_execution.go @@ -64,61 +64,67 @@ func (tm *TxMapperDB) processTransactionExecution( expectedIdx := 0 jobs := make([]txExecutionJob, 0, len(txSubEvents)) - // First pass: decrypt and create initial decrypted_tx rows synchronously. - for _, txSubEvent := range txSubEvents { - decryptionKeyID, err := getDecryptionKeyID(txSubEvent, identityPreimageToDecKeyAndMsg) - if err != nil { - log.Err(err).Msg("error while trying to retrieve decryption key ID") - continue - } + err = tm.withSlotLock(slot, func() error { + for _, txSubEvent := range txSubEvents { + decryptionKeyID, err := getDecryptionKeyID(txSubEvent, identityPreimageToDecKeyAndMsg) + if err != nil { + log.Err(err).Msg("error while trying to retrieve decryption key ID") + continue + } + + decryptedTx, err := getDecryptedTX(txSubEvent, identityPreimageToDecKeyAndMsg) + if err != nil { + log.Err(err).Msg("error while trying to get decrypted tx hash") + err := tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ + Slot: slot, + TxIndex: txSubEvent.TxIndex, + TxHash: common.Hash{}.Bytes(), + TxStatus: data.TxStatusValNotdecrypted, + InclusionPosition: InclPosUnknown, + DecryptionKeyID: decryptionKeyID, + TransactionSubmittedEventID: txSubEvent.ID, + }) + if err != nil { + log.Err(err).Msg("failed to create decrypted tx") + } + continue + } + + log.Info().Uint64("gas", decryptedTx.Gas()). + Uint64("gas-price", decryptedTx.GasPrice().Uint64()). + Uint64("cost", decryptedTx.Cost().Uint64()). + Uint64("max-priority-fee-per-gas", decryptedTx.GasTipCap().Uint64()). + Uint64("max-fee-per-gas", decryptedTx.GasFeeCap().Uint64()). + Uint8("tx-type", decryptedTx.Type()). + Msg("tx-data") - decryptedTx, err := getDecryptedTX(txSubEvent, identityPreimageToDecKeyAndMsg) - if err != nil { - log.Err(err).Msg("error while trying to get decrypted tx hash") - err := tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ + err = tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ Slot: slot, TxIndex: txSubEvent.TxIndex, - TxHash: common.Hash{}.Bytes(), - TxStatus: data.TxStatusValNotdecrypted, + TxHash: decryptedTx.Hash().Bytes(), + TxStatus: data.TxStatusValPending, InclusionPosition: InclPosUnknown, DecryptionKeyID: decryptionKeyID, TransactionSubmittedEventID: txSubEvent.ID, }) if err != nil { log.Err(err).Msg("failed to create decrypted tx") + continue } - continue - } - log.Info().Uint64("gas", decryptedTx.Gas()). - Uint64("gas-price", decryptedTx.GasPrice().Uint64()). - Uint64("cost", decryptedTx.Cost().Uint64()). - Uint64("max-priority-fee-per-gas", decryptedTx.GasTipCap().Uint64()). - Uint64("max-fee-per-gas", decryptedTx.GasFeeCap().Uint64()). - Uint8("tx-type", decryptedTx.Type()). - Msg("tx-data") - - err = tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ - Slot: slot, - TxIndex: txSubEvent.TxIndex, - TxHash: decryptedTx.Hash().Bytes(), - TxStatus: data.TxStatusValPending, - InclusionPosition: InclPosUnknown, - DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEvent.ID, - }) - if err != nil { - log.Err(err).Msg("failed to create decrypted tx") - continue + jobs = append(jobs, txExecutionJob{ + expectedIndex: expectedIdx, + decryptedTx: decryptedTx, + txSubEvent: txSubEvent, + decryptionKeyID: decryptionKeyID, + }) + expectedIdx++ } - jobs = append(jobs, txExecutionJob{ - expectedIndex: expectedIdx, - decryptedTx: decryptedTx, - txSubEvent: txSubEvent, - decryptionKeyID: decryptionKeyID, - }) - expectedIdx++ + return nil + }) + if err != nil { + return err } // If the block already exists for this slot, classify now that rows are present. @@ -157,7 +163,7 @@ func (tm *TxMapperDB) processTransactionExecution( if isFeeTooLowError(err) { txStatus = data.TxStatusValInvalidfeetoolow } - err := tm.withTxLock(decryptedTx.Hash(), func() error { + err := tm.withSlotLock(slot, func() error { if tm.isDone(decryptedTx.Hash()) { return nil } @@ -196,11 +202,13 @@ func (tm *TxMapperDB) processTransactionExecution( if errors.Is(err, context.Canceled) { return } - log.Err(err).Msg("") if errors.Is(err, errSendTransaction) { + log.Debug().Hex("tx-hash", txHash.Bytes()).Err(err).Msg("receipt wait stopped after send failure") return } - err := tm.withTxLock(txHash, func() error { + + log.Err(err).Hex("tx-hash", txHash.Bytes()).Msg("receipt wait failed") + err := tm.withSlotLock(slot, func() error { if tm.isDone(txHash) { return nil } @@ -261,7 +269,7 @@ func (tm *TxMapperDB) processTransactionExecution( Str("tx_status", string(txStatus)). Msg("transaction receipt classified") - err = tm.withTxLock(txHash, func() error { + err = tm.withSlotLock(slot, func() error { if tm.isDone(txHash) { return nil }