diff --git a/.gitignore b/.gitignore index fa632b8..051890c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ build .envrc - -*.txt \ No newline at end of file +.env +*.txt +.idea \ No newline at end of file diff --git a/internal/data/db.sqlc.gen.go b/internal/data/db.sqlc.gen.go index 92f18a0..062888c 100644 --- a/internal/data/db.sqlc.gen.go +++ b/internal/data/db.sqlc.gen.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.26.0 +// sqlc v1.28.0 package data diff --git a/internal/data/metrics.sql.go b/internal/data/metrics.sql.go index a827bc4..935779d 100644 --- a/internal/data/metrics.sql.go +++ b/internal/data/metrics.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.26.0 +// sqlc v1.28.0 // source: metrics.sql package data @@ -40,16 +40,15 @@ func (q *Queries) CreateBlock(ctx context.Context, arg CreateBlockParams) error } const createDecryptedTX = `-- name: CreateDecryptedTX :exec -INSERT into decrypted_tx( - slot, - tx_index, - tx_hash, - tx_status, - decryption_key_id, - transaction_submitted_event_id -) -VALUES ($1, $2, $3, $4, $5, $6) -ON CONFLICT DO NOTHING +INSERT INTO decrypted_tx ( + slot, + tx_index, + tx_hash, + tx_status, + inclusion_position, + decryption_key_id, + transaction_submitted_event_id +) VALUES ($1, $2, $3, $4, $5, $6, $7) ` type CreateDecryptedTXParams struct { @@ -57,6 +56,7 @@ type CreateDecryptedTXParams struct { TxIndex int64 TxHash []byte TxStatus TxStatusVal + InclusionPosition string DecryptionKeyID int64 TransactionSubmittedEventID int64 } @@ -67,6 +67,7 @@ func (q *Queries) CreateDecryptedTX(ctx context.Context, arg CreateDecryptedTXPa arg.TxIndex, arg.TxHash, arg.TxStatus, + arg.InclusionPosition, arg.DecryptionKeyID, arg.TransactionSubmittedEventID, ) @@ -651,20 +652,22 @@ func (q *Queries) UpsertGraffitiIfShutterized(ctx context.Context, arg UpsertGra const upsertTX = `-- name: UpsertTX :exec INSERT INTO decrypted_tx ( - slot, - tx_index, - tx_hash, - tx_status, - decryption_key_id, - transaction_submitted_event_id, - block_number + slot, + tx_index, + tx_hash, + tx_status, + inclusion_position, + decryption_key_id, + transaction_submitted_event_id, + block_number ) -VALUES ($1, $2, $3, $4, $5, $6, $7) -ON CONFLICT (slot, tx_index) -DO UPDATE -SET tx_status = $4, - block_number = $7, - updated_at = NOW() +VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (slot, tx_index) + DO UPDATE + SET tx_status = $4, + inclusion_position = $5, + block_number = $8, + updated_at = NOW() ` type UpsertTXParams struct { @@ -672,6 +675,7 @@ type UpsertTXParams struct { TxIndex int64 TxHash []byte TxStatus TxStatusVal + InclusionPosition string DecryptionKeyID int64 TransactionSubmittedEventID int64 BlockNumber pgtype.Int8 @@ -683,6 +687,7 @@ func (q *Queries) UpsertTX(ctx context.Context, arg UpsertTXParams) error { arg.TxIndex, arg.TxHash, arg.TxStatus, + arg.InclusionPosition, arg.DecryptionKeyID, arg.TransactionSubmittedEventID, arg.BlockNumber, diff --git a/internal/data/models.sqlc.gen.go b/internal/data/models.sqlc.gen.go index ba47d06..1c89c37 100644 --- a/internal/data/models.sqlc.gen.go +++ b/internal/data/models.sqlc.gen.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.26.0 +// sqlc v1.28.0 package data @@ -14,14 +14,15 @@ import ( type TxStatusVal string const ( - TxStatusValIncluded TxStatusVal = "included" - TxStatusValNotincluded TxStatusVal = "not included" - TxStatusValNotdecrypted TxStatusVal = "not decrypted" - TxStatusValInvalid TxStatusVal = "invalid" - TxStatusValPending TxStatusVal = "pending" - TxStatusValShieldedinclusion TxStatusVal = "shielded inclusion" - TxStatusValUnshieldedinclusion TxStatusVal = "unshielded inclusion" - TxStatusValInvalidfeetoolow TxStatusVal = "invalid fee too low" + TxStatusValIncluded TxStatusVal = "included" + TxStatusValNotincluded TxStatusVal = "not included" + TxStatusValNotdecrypted TxStatusVal = "not decrypted" + TxStatusValInvalid TxStatusVal = "invalid" + TxStatusValPending TxStatusVal = "pending" + TxStatusValShieldedinclusion TxStatusVal = "shielded inclusion" + TxStatusValUnshieldedinclusion TxStatusVal = "unshielded inclusion" + TxStatusValInvalidfeetoolow TxStatusVal = "invalid fee too low" + TxStatusValTentativeshieldedinclusion TxStatusVal = "tentative shielded inclusion" ) func (e *TxStatusVal) Scan(src interface{}) error { @@ -122,6 +123,7 @@ type DecryptedTx struct { CreatedAt pgtype.Timestamptz UpdatedAt pgtype.Timestamptz BlockNumber pgtype.Int8 + InclusionPosition string } type DecryptionKey struct { diff --git a/internal/data/sql/queries/metrics.sql b/internal/data/sql/queries/metrics.sql index c905f24..cea3465 100644 --- a/internal/data/sql/queries/metrics.sql +++ b/internal/data/sql/queries/metrics.sql @@ -82,16 +82,15 @@ SELECT * FROM block WHERE slot = $1 FOR UPDATE; -- name: CreateDecryptedTX :exec -INSERT into decrypted_tx( - slot, - tx_index, - tx_hash, - tx_status, - decryption_key_id, - transaction_submitted_event_id -) -VALUES ($1, $2, $3, $4, $5, $6) -ON CONFLICT DO NOTHING; +INSERT INTO decrypted_tx ( + slot, + tx_index, + tx_hash, + tx_status, + inclusion_position, + decryption_key_id, + transaction_submitted_event_id +) VALUES ($1, $2, $3, $4, $5, $6, $7); -- name: CreateValidatorRegistryMessage :exec INSERT into validator_registration_message( @@ -161,20 +160,22 @@ ON CONFLICT DO NOTHING; -- name: UpsertTX :exec INSERT INTO decrypted_tx ( - slot, - tx_index, - tx_hash, - tx_status, - decryption_key_id, - transaction_submitted_event_id, - block_number + slot, + tx_index, + tx_hash, + tx_status, + inclusion_position, + decryption_key_id, + transaction_submitted_event_id, + block_number ) -VALUES ($1, $2, $3, $4, $5, $6, $7) -ON CONFLICT (slot, tx_index) -DO UPDATE -SET tx_status = $4, - block_number = $7, - updated_at = NOW(); +VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (slot, tx_index) + DO UPDATE + SET tx_status = $4, + inclusion_position = $5, + block_number = $8, + updated_at = NOW(); -- name: CreateTransactionSubmittedEventsSyncedUntil :exec INSERT INTO transaction_submitted_events_synced_until (block_hash, block_number) VALUES ($1, $2) @@ -206,5 +207,4 @@ WITH upserted AS ( updated_at = NOW() RETURNING 1 ) -SELECT EXISTS (SELECT 1 FROM upserted) AS did_upsert; - +SELECT EXISTS (SELECT 1 FROM upserted) AS did_upsert; \ No newline at end of file diff --git a/internal/metrics/tx_mapper.go b/internal/metrics/tx_mapper.go new file mode 100644 index 0000000..0f6aa2b --- /dev/null +++ b/internal/metrics/tx_mapper.go @@ -0,0 +1,174 @@ +// Package metrics provides the Prometheus exporter and the transaction mapper logic. +// +// TxMapperDB is the observer's persistence and classification pipeline. +// +// Flow: +// 1. AddTransactionSubmittedEvent stores sequencer tx events. +// 2. AddDecryptionKeysAndMessages decrypts slot txs and creates initial decrypted_tx rows. +// 3. processTransactionExecution sends txs and waits for receipts. +// 4. HandleBlock and receipt handling classify inclusion. +// 5. Validator methods persist validator-related state. +package metrics + +import ( + "context" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/pkg/errors" + sequencerBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/sequencer" + validatorRegistryBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/validatorregistry" + metricsCommon "github.com/shutter-network/observer/common" + "github.com/shutter-network/observer/internal/data" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/beaconapiclient" +) + +const ReceiptWaitTimeout = 1 * time.Hour + +var errSendTransaction = errors.New("send transaction failed") + +type TxMapperDB struct { + db *pgxpool.Pool + dbQuery *data.Queries + config *metricsCommon.Config + ethClient *ethclient.Client + beaconAPIClient *beaconapiclient.Client + chainID int64 + genesisTimestamp uint64 + slotDuration uint64 + statusDone sync.Map + slotLocks sync.Map +} + +func (tm *TxMapperDB) slotLock(slot int64) *sync.Mutex { + v, _ := tm.slotLocks.LoadOrStore(slot, &sync.Mutex{}) + return v.(*sync.Mutex) +} + +func (tm *TxMapperDB) withSlotLock(slot int64, fn func() error) error { + mu := tm.slotLock(slot) + mu.Lock() + defer mu.Unlock() + return fn() +} + +type TxEventStore interface { + AddTransactionSubmittedEvent(ctx context.Context, tx pgx.Tx, st *sequencerBindings.SequencerTransactionSubmitted) error + AddBlock(ctx context.Context, b *data.Block) error + AddKeyShare(ctx context.Context, dks *data.DecryptionKeyShare) error +} + +type DecryptionStore interface { + AddDecryptionKeysAndMessages(ctx context.Context, dkam *DecKeysAndMessages) error +} + +type BlockClassifier interface { + HandleBlock(ctx context.Context, blockNumber int64, slot int64, txs types.Transactions) error +} + +type ValidatorStore interface { + QueryBlockNumberFromValidatorRegistryEventsSyncedUntil(ctx context.Context) (int64, error) + AddValidatorRegistryEvent(ctx context.Context, tx pgx.Tx, vr *validatorRegistryBindings.ValidatorregistryUpdated) error + UpdateValidatorStatus(ctx context.Context) error + AddProposerDuties(ctx context.Context, epoch uint64) error + UpsertGraffitiIfShutterized(ctx context.Context, validatorIndex int64, graffiti string, blockNumber int64) (bool, error) +} + +type TxMapper interface { + TxEventStore + DecryptionStore + BlockClassifier + ValidatorStore +} + +// markDone records that an inclusion classification was observed for this tx hash. +// Failure and timeout paths should not overwrite rows once a tx is done. +func (tm *TxMapperDB) markDone(hash common.Hash) { + tm.statusDone.Store(hash.Hex(), struct{}{}) +} + +// isDone reports whether an inclusion classification was already observed. +func (tm *TxMapperDB) isDone(hash common.Hash) bool { + _, ok := tm.statusDone.Load(hash.Hex()) + return ok +} + +func NewTxMapperDB( + ctx context.Context, + db *pgxpool.Pool, + config *metricsCommon.Config, + ethClient *ethclient.Client, + beaconAPIClient *beaconapiclient.Client, + chainID int64, + genesisTimestamp uint64, + slotDuration uint64, +) TxMapper { + return &TxMapperDB{ + db: db, + dbQuery: data.New(db), + config: config, + ethClient: ethClient, + beaconAPIClient: beaconAPIClient, + chainID: chainID, + genesisTimestamp: genesisTimestamp, + slotDuration: slotDuration, + } +} + +func (tm *TxMapperDB) AddTransactionSubmittedEvent(ctx context.Context, tx pgx.Tx, st *sequencerBindings.SequencerTransactionSubmitted) error { + q := tm.dbQuery + if tx != nil { + // Use transaction if available + q = tm.dbQuery.WithTx(tx) + } + err := q.CreateTransactionSubmittedEvent(ctx, data.CreateTransactionSubmittedEventParams{ + EventBlockHash: st.Raw.BlockHash.Bytes(), + EventBlockNumber: int64(st.Raw.BlockNumber), + EventTxIndex: int64(st.Raw.TxIndex), + EventLogIndex: int64(st.Raw.Index), + Eon: int64(st.Eon), + TxIndex: int64(st.TxIndex), + IdentityPrefix: st.IdentityPrefix[:], + Sender: st.Sender.Bytes(), + EncryptedTransaction: st.EncryptedTransaction, + EventTxHash: st.Raw.TxHash.Bytes(), + }) + if err != nil { + return err + } + metricsEncTxReceived.Inc() + return nil +} + +func (tm *TxMapperDB) AddKeyShare(ctx context.Context, dks *data.DecryptionKeyShare) error { + err := tm.dbQuery.CreateDecryptionKeyShare(ctx, data.CreateDecryptionKeyShareParams{ + Eon: dks.Eon, + DecryptionKeyShare: dks.DecryptionKeyShare, + Slot: dks.Slot, + IdentityPreimage: dks.IdentityPreimage, + KeyperIndex: dks.KeyperIndex, + }) + if err != nil { + return err + } + metricsKeyShareReceived.Inc() + return nil +} + +func (tm *TxMapperDB) AddBlock( + ctx context.Context, + b *data.Block, +) error { + err := tm.dbQuery.CreateBlock(ctx, data.CreateBlockParams{ + BlockHash: b.BlockHash, + BlockNumber: b.BlockNumber, + BlockTimestamp: b.BlockTimestamp, + Slot: b.Slot, + }) + return err +} \ No newline at end of file diff --git a/internal/metrics/tx_mapper_classification.go b/internal/metrics/tx_mapper_classification.go new file mode 100644 index 0000000..0c6ae9d --- /dev/null +++ b/internal/metrics/tx_mapper_classification.go @@ -0,0 +1,212 @@ +package metrics + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + + "github.com/shutter-network/observer/internal/data" +) + +const ( + InclPosUnknown = "unknown" + InclPosExact = "exact" + InclPosLater = "later" + InclPosEarlier = "earlier" + InclPosWrong = "wrong_slot" +) + +func classifyInclusion(expectedIndex int, receiptIndex uint) (data.TxStatusVal, string) { + switch { + case receiptIndex == uint(expectedIndex): + return data.TxStatusValShieldedinclusion, InclPosExact + case receiptIndex > uint(expectedIndex): + return data.TxStatusValUnshieldedinclusion, InclPosLater + default: + return data.TxStatusValTentativeshieldedinclusion, InclPosEarlier + } +} + +type batchEntry struct { + hash common.Hash + status data.TxStatusVal + txIndex int64 + decryptionKeyID int64 + submittedEventID int64 +} + +func classifyWithPredecessors(expectedPos, blockPos int, entries []batchEntry) (data.TxStatusVal, + string) { + // later index always unshielded + if blockPos > expectedPos { + return data.TxStatusValUnshieldedinclusion, InclPosLater + } + + pos := InclPosExact + if blockPos < expectedPos { + pos = InclPosEarlier + } + + badPredecessor := false + allShieldedBefore := true + seenTentative := false + for i := 0; i < expectedPos; i++ { + switch entries[i].status { + case data.TxStatusValShieldedinclusion: + // ok + case data.TxStatusValTentativeshieldedinclusion: + allShieldedBefore = false + seenTentative = true + case data.TxStatusValUnshieldedinclusion: + badPredecessor = true + default: + allShieldedBefore = false + } + if badPredecessor { + break + } + } + + if badPredecessor { + return data.TxStatusValUnshieldedinclusion, pos + } + if pos == InclPosExact && allShieldedBefore { + return data.TxStatusValShieldedinclusion, pos + } + // exact with tentative predecessor, or earlier with good predecessors -> ambiguous (tentative) + _ = seenTentative // kept for readability; not used in decision + return data.TxStatusValTentativeshieldedinclusion, pos +} + +// HandleBlock performs predecessor-aware classification at block arrival. +// Receipt-based classification is a fallback and skips if a tx is already finalized. +func (tm *TxMapperDB) HandleBlock(ctx context.Context, blockNumber int64, slot int64, txs types.Transactions) error { + if len(txs) == 0 { + return nil + } + + log.Debug(). + Int64("slot", slot). + Int64("block_number", blockNumber). + Int("num_txs", len(txs)). + Msg("handling block for tx classification") + + return tm.withSlotLock(slot, func() error { + rows, err := tm.db.Query(ctx, ` + SELECT tx_hash, tx_index, tx_status, decryption_key_id, transaction_submitted_event_id + FROM decrypted_tx + WHERE slot = $1 + AND tx_hash <> '\x00' + ORDER BY tx_index`, slot) + if err != nil { + return err + } + defer rows.Close() + + var entries []batchEntry + indexByHash := make(map[string]int) + + for rows.Next() { + var ( + hashBytes []byte + txIdx int64 + status data.TxStatusVal + decID int64 + subID int64 + ) + if err := rows.Scan(&hashBytes, &txIdx, &status, &decID, &subID); err != nil { + return err + } + h := common.BytesToHash(hashBytes) + indexByHash[h.Hex()] = len(entries) + entries = append(entries, batchEntry{ + hash: h, + status: status, + txIndex: txIdx, + decryptionKeyID: decID, + submittedEventID: subID, + }) + } + + log.Debug(). + Int64("slot", slot). + Int("num_candidates", len(entries)). + Msg("loaded decrypted tx candidates for block classification") + + if len(entries) == 0 { + return nil + } + + for blockPos, tx := range txs { + h := tx.Hash() + expectedPos, ok := indexByHash[h.Hex()] + if !ok { + continue + } + + status, pos := classifyWithPredecessors(expectedPos, blockPos, entries) + entries[expectedPos].status = status + + log.Debug(). + Int64("slot", slot). + Int64("block_number", blockNumber). + Int("block_pos", blockPos). + Int("expected_pos", expectedPos). + Int64("tx_index", entries[expectedPos].txIndex). + Hex("tx_hash", h.Bytes()). + Str("tx_status", string(status)). + Str("inclusion_position", pos). + Msg("classified tx from block body") + + if err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ + Slot: slot, + TxIndex: entries[expectedPos].txIndex, + TxHash: h.Bytes(), + TxStatus: status, + InclusionPosition: pos, + DecryptionKeyID: entries[expectedPos].decryptionKeyID, + TransactionSubmittedEventID: entries[expectedPos].submittedEventID, + BlockNumber: pgtype.Int8{Int64: blockNumber, Valid: true}, + }); err != nil { + log.Err(err).Hex("tx-hash", h.Bytes()).Msg("failed to upsert tx from block body") + continue + } + + tm.markDone(h) + } + + return nil + }) +} + +func (tm *TxMapperDB) maybeHandleStoredBlock(ctx context.Context, slot int64) { + storedBlock, err := tm.dbQuery.QueryBlockFromSlot(ctx, slot) + if err != nil { + if !errors.Is(err, pgx.ErrNoRows) { // handling the error check properly + log.Err(err).Int64("slot", slot).Msg("failed to query stored block") + } + return + } + + block, err := tm.ethClient.BlockByNumber(ctx, big.NewInt(storedBlock.BlockNumber)) + if err != nil { + log.Err(err). + Int64("slot", slot). + Int64("block_number", storedBlock.BlockNumber). + Msg("failed to fetch stored block") + return + } + + if err := tm.HandleBlock(ctx, storedBlock.BlockNumber, slot, block.Transactions()); err != nil { + log.Err(err). + Int64("slot", slot). + Int64("block_number", storedBlock.BlockNumber). + Msg("failed to handle stored block after decryption") + } +} \ No newline at end of file diff --git a/internal/metrics/tx_mapper_db.go b/internal/metrics/tx_mapper_db.go deleted file mode 100644 index adc84a5..0000000 --- a/internal/metrics/tx_mapper_db.go +++ /dev/null @@ -1,818 +0,0 @@ -package metrics - -import ( - "context" - "encoding/hex" - "fmt" - "math" - "strings" - "sync" - "time" - - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethclient" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgtype" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/pkg/errors" - "github.com/rs/zerolog/log" - sequencerBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/sequencer" - validatorRegistryBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/validatorregistry" - metricsCommon "github.com/shutter-network/observer/common" - dbTypes "github.com/shutter-network/observer/common/database" - "github.com/shutter-network/observer/common/utils" - "github.com/shutter-network/observer/internal/data" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/beaconapiclient" - "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/validatorregistry" - "github.com/shutter-network/shutter/shlib/shcrypto" - blst "github.com/supranational/blst/bindings/go" -) - -const ReceiptWaitTimeout = 1 * time.Hour - -var errSendTransaction = errors.New("send transaction failed") - -type TxMapperDB struct { - db *pgxpool.Pool - dbQuery *data.Queries - config *metricsCommon.Config - ethClient *ethclient.Client - beaconAPIClient *beaconapiclient.Client - chainID int64 - genesisTimestamp uint64 - slotDuration uint64 -} - -type validatorData struct { - validatorStatus string - validatorValidity data.ValidatorRegistrationValidity -} - -func NewTxMapperDB( - ctx context.Context, - db *pgxpool.Pool, - config *metricsCommon.Config, - ethClient *ethclient.Client, - beaconAPIClient *beaconapiclient.Client, - chainID int64, - genesisTimestamp uint64, - slotDuration uint64, -) TxMapper { - return &TxMapperDB{ - db: db, - dbQuery: data.New(db), - config: config, - ethClient: ethClient, - beaconAPIClient: beaconAPIClient, - chainID: chainID, - genesisTimestamp: genesisTimestamp, - slotDuration: slotDuration, - } -} - -func (tm *TxMapperDB) AddTransactionSubmittedEvent(ctx context.Context, tx pgx.Tx, st *sequencerBindings.SequencerTransactionSubmitted) error { - q := tm.dbQuery - if tx != nil { - // Use transaction if available - q = tm.dbQuery.WithTx(tx) - } - err := q.CreateTransactionSubmittedEvent(ctx, data.CreateTransactionSubmittedEventParams{ - EventBlockHash: st.Raw.BlockHash.Bytes(), - EventBlockNumber: int64(st.Raw.BlockNumber), - EventTxIndex: int64(st.Raw.TxIndex), - EventLogIndex: int64(st.Raw.Index), - Eon: int64(st.Eon), - TxIndex: int64(st.TxIndex), - IdentityPrefix: st.IdentityPrefix[:], - Sender: st.Sender.Bytes(), - EncryptedTransaction: st.EncryptedTransaction, - EventTxHash: st.Raw.TxHash.Bytes(), - }) - if err != nil { - return err - } - metricsEncTxReceived.Inc() - return nil -} - -func (tm *TxMapperDB) AddDecryptionKeysAndMessages( - ctx context.Context, - decKeysAndMessages *DecKeysAndMessages, -) error { - if len(decKeysAndMessages.Keys) == 0 { - return nil - } - tx, err := tm.db.Begin(ctx) - if err != nil { - return err - } - defer tx.Rollback(ctx) - qtx := tm.dbQuery.WithTx(tx) - - eons, slots, instanceIDs, txPointers, keyIndexes := getDecryptionMessageInfos(decKeysAndMessages) - decryptionKeyIDs, err := qtx.CreateDecryptionKeys(ctx, data.CreateDecryptionKeysParams{ - Column1: eons, - Column2: decKeysAndMessages.Identities, - Column3: decKeysAndMessages.Keys, - }) - if err != nil { - return err - } - if len(decryptionKeyIDs) == 0 || len(decryptionKeyIDs) != len(slots) { - log.Debug().Msg("no new decryption key was added") - return nil - } - err = qtx.CreateDecryptionKeyMessages(ctx, data.CreateDecryptionKeyMessagesParams{ - Column1: slots, - Column2: instanceIDs, - Column3: eons, - Column4: txPointers, - }) - if err != nil { - return err - } - - err = qtx.CreateDecryptionKeysMessageDecryptionKey(ctx, data.CreateDecryptionKeysMessageDecryptionKeyParams{ - Column1: slots, - Column2: keyIndexes, - Column3: decryptionKeyIDs, - }) - if err != nil { - return err - } - - totalDecKeysAndMessages := len(decKeysAndMessages.Keys) - err = tx.Commit(ctx) - if err != nil { - log.Err(err).Msg("unable to commit db transaction") - return err - } - - for i := 0; i < totalDecKeysAndMessages; i++ { - metricsDecKeyReceived.Inc() - } - - dkam := make([]*DecKeyAndMessage, totalDecKeysAndMessages) - for index, key := range decKeysAndMessages.Keys { - identityPreimage := decKeysAndMessages.Identities[index] - dkam[index] = &DecKeyAndMessage{ - Slot: decKeysAndMessages.Slot, - TxPointer: decKeysAndMessages.TxPointer, - Eon: decKeysAndMessages.Eon, - Key: key, - IdentityPreimage: identityPreimage, - KeyIndex: int64(index), - DecryptionKeyID: decryptionKeyIDs[index], - } - } - - if len(dkam) > 0 { - dkam = dkam[1:] - } - err = tm.processTransactionExecution(ctx, &TxExecution{ - DecKeysAndMessages: dkam, - }) - if err != nil { - log.Err(err).Int64("slot", decKeysAndMessages.Slot).Msg("failed to process transaction execution") - return err - } - return nil -} - -func (tm *TxMapperDB) AddKeyShare(ctx context.Context, dks *data.DecryptionKeyShare) error { - err := tm.dbQuery.CreateDecryptionKeyShare(ctx, data.CreateDecryptionKeyShareParams{ - Eon: dks.Eon, - DecryptionKeyShare: dks.DecryptionKeyShare, - Slot: dks.Slot, - IdentityPreimage: dks.IdentityPreimage, - KeyperIndex: dks.KeyperIndex, - }) - if err != nil { - return err - } - metricsKeyShareReceived.Inc() - return nil -} - -func (tm *TxMapperDB) AddBlock( - ctx context.Context, - b *data.Block, -) error { - err := tm.dbQuery.CreateBlock(ctx, data.CreateBlockParams{ - BlockHash: b.BlockHash, - BlockNumber: b.BlockNumber, - BlockTimestamp: b.BlockTimestamp, - Slot: b.Slot, - }) - return err -} - -func (tm *TxMapperDB) QueryBlockNumberFromValidatorRegistryEventsSyncedUntil(ctx context.Context) (int64, error) { - data, err := tm.dbQuery.QueryValidatorRegistryEventsSyncedUntil(ctx) - if err != nil { - return 0, err - } - return data.BlockNumber, nil -} - -func (tm *TxMapperDB) AddValidatorRegistryEvent(ctx context.Context, tx pgx.Tx, vr *validatorRegistryBindings.ValidatorregistryUpdated) error { - regMessage := &validatorregistry.AggregateRegistrationMessage{} - err := regMessage.Unmarshal(vr.Message) - if err != nil { - log.Err(err).Hex("tx-hash", vr.Raw.TxHash.Bytes()).Msg("error unmarshalling registration message") - } else { - validatorIDtoValidity, err := tm.validateValidatorRegistryEvent(ctx, vr, regMessage, uint64(tm.chainID), tm.config.ValidatorRegistryContractAddress) - if err != nil { - log.Err(err).Msg("error validating validator registry events") - return err - } - - q := tm.dbQuery - if tx != nil { - // Use transaction if available - q = tm.dbQuery.WithTx(tx) - } - - for validatorID, validatorData := range validatorIDtoValidity { - err := q.CreateValidatorRegistryMessage(ctx, data.CreateValidatorRegistryMessageParams{ - Version: dbTypes.Uint64ToPgTypeInt8(uint64(regMessage.Version)), - ChainID: dbTypes.Uint64ToPgTypeInt8(regMessage.ChainID), - ValidatorRegistryAddress: regMessage.ValidatorRegistryAddress.Bytes(), - ValidatorIndex: dbTypes.Int64ToPgTypeInt8(validatorID), - Nonce: dbTypes.Uint64ToPgTypeInt8(uint64(regMessage.Nonce)), - IsRegisteration: dbTypes.BoolToPgTypeBool(regMessage.IsRegistration), - Signature: vr.Signature, - EventBlockNumber: int64(vr.Raw.BlockNumber), - EventTxIndex: int64(vr.Raw.TxIndex), - EventLogIndex: int64(vr.Raw.Index), - Validity: validatorData.validatorValidity, - }) - if err != nil { - return err - } - - if validatorData.validatorValidity == data.ValidatorRegistrationValidityValid && - validatorData.validatorStatus != "" { - err := q.CreateValidatorStatus(ctx, data.CreateValidatorStatusParams{ - ValidatorIndex: dbTypes.Int64ToPgTypeInt8(validatorID), - Status: validatorData.validatorStatus, - }) - if err != nil { - return err - } - } - } - } - return nil -} - -func (tm *TxMapperDB) UpdateValidatorStatus(ctx context.Context) error { - batchSize := 100 - jumpBy := 0 - numWorkers := 5 - sem := make(chan struct{}, numWorkers) - var wg sync.WaitGroup - - for { - // Query a batch of validator statuses - validatorStatus, err := tm.dbQuery.QueryValidatorStatuses(ctx, data.QueryValidatorStatusesParams{ - Limit: int32(batchSize), - Offset: int32(jumpBy), - }) - if err != nil { - if errors.Is(err, pgx.ErrNoRows) { - break - } - return err - } - - if len(validatorStatus) == 0 { - break - } - - // Launch goroutines to process each status concurrently - for _, vs := range validatorStatus { - sem <- struct{}{} - wg.Add(1) - go func(vs data.QueryValidatorStatusesRow) { - defer wg.Done() - defer func() { <-sem }() - - validatorIndex := uint64(vs.ValidatorIndex.Int64) - //TODO: should we keep this log or remove it? - log.Debug().Uint64("validatorIndex", validatorIndex).Msg("validator status being updated") - validator, err := tm.beaconAPIClient.GetValidatorByIndex(ctx, "head", validatorIndex) - if err != nil { - log.Err(err).Uint64("validatorIndex", validatorIndex).Msg("failed to get validator from beacon chain") - return - } - if validator == nil { - return - } - - err = tm.dbQuery.CreateValidatorStatus(ctx, data.CreateValidatorStatusParams{ - ValidatorIndex: dbTypes.Uint64ToPgTypeInt8(validatorIndex), - Status: validator.Data.Status, - }) - if err != nil { - log.Err(err).Uint64("validatorIndex", validatorIndex).Msg("failed to create validator status") - return - } - }(vs) - } - - wg.Wait() - - // Wait for 3 seconds before processing the next batch - select { - case <-ctx.Done(): - return ctx.Err() // Handle context cancellation - case <-time.After(3 * time.Second): - } - - jumpBy += batchSize - } - - return nil -} - -func (tm *TxMapperDB) AddProposerDuties(ctx context.Context, epoch uint64) error { - proposerDuties, err := tm.beaconAPIClient.GetProposerDutiesByEpoch(ctx, epoch) - if err != nil { - return err - } - if proposerDuties == nil { - return errors.Errorf("no proposer duties found for epoch %d", epoch) - } - - log.Info().Uint64("epoch", epoch).Msg("processing proposer duties") - - publicKeys := make([]string, len(proposerDuties.Data)) - validatorIndices := make([]int64, len(proposerDuties.Data)) - slots := make([]int64, len(proposerDuties.Data)) - - for i := 0; i < len(proposerDuties.Data); i++ { - publicKeys[i] = proposerDuties.Data[i].Pubkey - validatorIndices[i] = int64(proposerDuties.Data[i].ValidatorIndex) - slots[i] = int64(proposerDuties.Data[i].Slot) - } - - err = tm.dbQuery.CreateProposerDuties(ctx, data.CreateProposerDutiesParams{ - Column1: publicKeys, - Column2: validatorIndices, - Column3: slots, - }) - return err -} - -func (tm *TxMapperDB) processTransactionExecution( - ctx context.Context, - te *TxExecution, -) error { - totalDecKeysAndMessages := len(te.DecKeysAndMessages) - if totalDecKeysAndMessages == 0 { - return nil - } - txSubEvents, err := tm.dbQuery.QueryTransactionSubmittedEvent(ctx, data.QueryTransactionSubmittedEventParams{ - Eon: te.DecKeysAndMessages[0].Eon, - TxIndex: te.DecKeysAndMessages[0].TxPointer, - Column3: totalDecKeysAndMessages, - }) - if err != nil { - return err - } - - if len(txSubEvents) != totalDecKeysAndMessages { - log.Debug().Int("total tx sub events", len(txSubEvents)). - Int("total decryption keys", totalDecKeysAndMessages). - Msg("total tx submitted events dont match total decryption keys") - return nil - } - - identityPreimageToDecKeyAndMsg := make(map[string]*DecKeyAndMessage) - for _, dkam := range te.DecKeysAndMessages { - identityPreimageToDecKeyAndMsg[hex.EncodeToString(dkam.IdentityPreimage)] = dkam - } - - slot := te.DecKeysAndMessages[0].Slot - - var wg sync.WaitGroup - for index, txSubEvent := range txSubEvents { - decryptionKeyID, err := getDecryptionKeyID(txSubEvent, identityPreimageToDecKeyAndMsg) - if err != nil { - log.Err(err).Msg("error while trying to retrieve decryption key ID") - continue - } - decryptedTx, err := getDecryptedTX(txSubEvent, identityPreimageToDecKeyAndMsg) - if err != nil { - log.Err(err).Msg("error while trying to get decrypted tx hash") - err := tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ - Slot: slot, - TxIndex: txSubEvent.TxIndex, - TxHash: common.Hash{}.Bytes(), - TxStatus: data.TxStatusValNotdecrypted, - DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEvent.ID, - }) - if err != nil { - log.Err(err).Msg("failed to create decrypted tx") - } - continue - } - - log.Info().Uint64("gas", decryptedTx.Gas()). - Uint64("gas-price", decryptedTx.GasPrice().Uint64()). - Uint64("cost", decryptedTx.Cost().Uint64()). - Uint64("max-priority-fee-per-gas", decryptedTx.GasTipCap().Uint64()). - Uint64("max-fee-per-gas", decryptedTx.GasFeeCap().Uint64()). - Uint8("tx-type", decryptedTx.Type()). - Msg("tx-data") - - // channel to propagate errors between goroutines - txErrorSignalCh := make(chan error, 1) - - wg.Add(2) - - // First goroutine: Send transaction - go func(ctx context.Context, inclusionDelay int64, decryptedTx *types.Transaction, txSubEvent data.TransactionSubmittedEvent, slot int64, decryptionKeyID int64, txErrorSignalCh chan error) { - defer wg.Done() - - select { - case <-ctx.Done(): - txErrorSignalCh <- fmt.Errorf("transaction send cancelled due to context: %w", ctx.Err()) - return - case <-time.After(time.Duration(tm.config.InclusionDelay) * time.Second): - if err := tm.ethClient.SendTransaction(ctx, decryptedTx); err != nil { - log.Err(err).Msg("failed to send transaction") - if err.Error() == "AlreadyKnown" { - log.Debug().Hex("tx-hash", decryptedTx.Hash().Bytes()).Msg("already known") - err := tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ - Slot: slot, - TxIndex: txSubEvent.TxIndex, - TxHash: decryptedTx.Hash().Bytes(), - TxStatus: data.TxStatusValPending, - DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEvent.ID, - }) - if err != nil { - txErrorSignalCh <- fmt.Errorf("failed to create decrypted tx: %w", err) - return - } - } else { - txStatus := data.TxStatusValInvalid - if isFeeTooLowError(err) { - txStatus = data.TxStatusValInvalidfeetoolow - } - err := tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ - Slot: slot, - TxIndex: txSubEvent.TxIndex, - TxHash: decryptedTx.Hash().Bytes(), - TxStatus: txStatus, - DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEvent.ID, - }) - if err != nil { - log.Err(err).Msg("failed to create decrypted tx") - } - txErrorSignalCh <- fmt.Errorf("%w: %v", errSendTransaction, err) - return - } - } else { - log.Info().Hex("tx-hash", decryptedTx.Hash().Bytes()).Msg("transaction sent") - err := tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ - Slot: slot, - TxIndex: txSubEvent.TxIndex, - TxHash: decryptedTx.Hash().Bytes(), - TxStatus: data.TxStatusValPending, - DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEvent.ID, - }) - if err != nil { - txErrorSignalCh <- fmt.Errorf("failed to create decrypted tx: %w", err) - return - } - } - } - }(ctx, tm.config.InclusionDelay, decryptedTx, txSubEvent, slot, decryptionKeyID, txErrorSignalCh) - - // Second goroutine: Wait for receipt - go func(ctx context.Context, index int, txHash common.Hash, txIndex int64, slot int64, decryptionKeyID int64, txSubEventID int64, txErrorSignalCh chan error) { - defer wg.Done() - - // Wait for the receipt with a timeout - receipt, err := tm.waitForReceiptWithTimeout(ctx, txHash, ReceiptWaitTimeout, txErrorSignalCh) - if err != nil { - log.Err(err).Msg("") - if errors.Is(err, errSendTransaction) { - return - } - // update/create status to not included - err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ - Slot: slot, - TxIndex: txIndex, - TxHash: txHash[:], - TxStatus: data.TxStatusValNotincluded, - DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEventID, - }) - if err != nil { - log.Err(err).Msg("failed to upsert decrypted tx") - } - return - } - - // receipt found - log.Info().Hex("tx-hash", receipt.TxHash.Bytes()). - Uint64("receipt-status", receipt.Status). - Msg("transaction receipt found") - - block, err := tm.ethClient.BlockByNumber(ctx, receipt.BlockNumber) - if err != nil { - log.Err(err).Uint64("block-number", receipt.BlockNumber.Uint64()).Msg("failed to retrieve block") - return - } - - inclusionSlot := utils.GetSlotForBlock(block.Header().Time, tm.genesisTimestamp, tm.slotDuration) - txStatus := data.TxStatusValShieldedinclusion - - log.Info().Uint("tx-index", receipt.TransactionIndex). - Uint64("inclusion-slot", inclusionSlot). - Msg("receipt data") - - log.Info().Int("index", index). - Int64("inclusion-slot", slot). - Msg("local data") - - if receipt.TransactionIndex != uint(index) { - log.Info().Uint("tx-index", receipt.TransactionIndex).Msg("transaction index mismatch") - txStatus = data.TxStatusValUnshieldedinclusion - } - if inclusionSlot != uint64(slot) { - log.Info().Int64("slot", slot).Msg("transaction slot mismatch") - txStatus = data.TxStatusValUnshieldedinclusion - } - - err = tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ - Slot: slot, - TxIndex: txIndex, - TxHash: receipt.TxHash.Bytes(), - TxStatus: txStatus, - DecryptionKeyID: decryptionKeyID, - TransactionSubmittedEventID: txSubEventID, - BlockNumber: pgtype.Int8{Int64: receipt.BlockNumber.Int64(), Valid: true}, - }) - if err != nil { - log.Err(err).Msg("failed to update decrypted tx") - } - }(ctx, index, decryptedTx.Hash(), txSubEvent.TxIndex, slot, decryptionKeyID, txSubEvent.ID, txErrorSignalCh) - } - - // Wait for all routines to end - wg.Wait() - return nil -} - -func (tm *TxMapperDB) validateValidatorRegistryEvent( - ctx context.Context, - vr *validatorRegistryBindings.ValidatorregistryUpdated, - regMessage *validatorregistry.AggregateRegistrationMessage, - chainID uint64, - validatorRegistryContractAddress string, -) (map[int64]*validatorData, error) { - staticRegistrationMessageValidity := validateValidatorRegistryMessageContents(regMessage, chainID, validatorRegistryContractAddress) - - var publicKeys []*blst.P1Affine - var validators []*beaconapiclient.GetValidatorByIndexResponse - validatorIDtoValidity := make(map[int64]*validatorData) - - for _, validatorIndex := range regMessage.ValidatorIndices() { - validatorIDtoValidity[validatorIndex] = &validatorData{validatorValidity: staticRegistrationMessageValidity} - nonceBefore, err := tm.dbQuery.QueryValidatorRegistrationMessageNonceBefore(ctx, data.QueryValidatorRegistrationMessageNonceBeforeParams{ - ValidatorIndex: dbTypes.Int64ToPgTypeInt8(validatorIndex), - EventBlockNumber: int64(vr.Raw.BlockNumber), - EventTxIndex: int64(vr.Raw.TxIndex), - EventLogIndex: int64(vr.Raw.Index), - }) - - if err != nil { - if errors.Is(err, pgx.ErrNoRows) { - // No previous nonce means the message is valid regarding nonce - nonceBefore = pgtype.Int8{Int64: -1, Valid: true} - } else { - return nil, errors.Wrapf(err, "failed to query latest nonce for validator %d", validatorIndex) - } - } - - if regMessage.Nonce > math.MaxInt32 || int64(regMessage.Nonce) <= nonceBefore.Int64 { - // skip the validator - log.Warn(). - Uint32("nonce", regMessage.Nonce). - Int64("before-nonce", nonceBefore.Int64). - Msg("ignoring validator with invalid nonce") - validatorIDtoValidity[validatorIndex].validatorValidity = data.ValidatorRegistrationValidityInvalidmessage - continue - } - validator, err := tm.beaconAPIClient.GetValidatorByIndex(ctx, "head", uint64(validatorIndex)) - if err != nil { - return nil, errors.Wrapf(err, "failed to get validator %d", validatorIndex) - } - if validator == nil { - // validator not found - log.Warn().Msg("registration message for unknown validator") - validatorIDtoValidity[validatorIndex].validatorValidity = data.ValidatorRegistrationValidityInvalidmessage - continue - } - validatorIDtoValidity[validatorIndex].validatorStatus = validator.Data.Status - publicKey, err := validator.Data.Validator.GetPubkey() - if err != nil { - return nil, errors.Wrapf(err, "failed to get public key of validator %d", validatorIndex) - } - publicKeys = append(publicKeys, publicKey) - validators = append(validators, validator) - } - if len(publicKeys) > 0 { - // now we need to check for signature verification depending on the message version - sig := new(blst.P2Affine).Uncompress(vr.Signature) - if sig == nil { - return nil, fmt.Errorf("ignoring registration message with undecodable signature") - } - - if regMessage.Version == validatorregistry.LegacyValidatorRegistrationMessageVersion { - regMessage := new(validatorregistry.LegacyRegistrationMessage) - err := regMessage.Unmarshal(vr.Message) - if err != nil { - return nil, errors.Wrapf(err, "failed to unmarshal legacy registration message") - } - if valid := validatorregistry.VerifySignature(sig, publicKeys[0], regMessage); !valid { - validatorIDtoValidity[int64(validators[0].Data.Index)].validatorValidity = data.ValidatorRegistrationValidityInvalidsignature - log.Warn().Msg("invalid legacy registration message with invalid signature") - } - } else { - if valid := validatorregistry.VerifyAggregateSignature(sig, publicKeys, regMessage); !valid { - for _, validator := range validators { - validatorIDtoValidity[int64(validator.Data.Index)].validatorValidity = data.ValidatorRegistrationValidityInvalidsignature - } - log.Warn().Msg("invalid aggregate registration message with invalid signature") - } - } - } - return validatorIDtoValidity, nil -} - -func validateValidatorRegistryMessageContents( - msg *validatorregistry.AggregateRegistrationMessage, - chainID uint64, - validatorRegistryContractAddress string, -) data.ValidatorRegistrationValidity { - validity := data.ValidatorRegistrationValidityValid - if msg.Version != validatorregistry.AggregateValidatorRegistrationMessageVersion && - msg.Version != validatorregistry.LegacyValidatorRegistrationMessageVersion { - return data.ValidatorRegistrationValidityInvalidmessage - } - if msg.ChainID != chainID { - return data.ValidatorRegistrationValidityInvalidmessage - } - if msg.ValidatorRegistryAddress.String() != validatorRegistryContractAddress { - return data.ValidatorRegistrationValidityInvalidmessage - } - if msg.ValidatorIndex > math.MaxInt64 { - return data.ValidatorRegistrationValidityInvalidmessage - } - return validity -} - -func getDecryptionMessageInfos(dkam *DecKeysAndMessages) ([]int64, []int64, []int64, []int64, []int64) { - eons := make([]int64, len(dkam.Keys)) - slots := make([]int64, len(dkam.Keys)) - instanceIDs := make([]int64, len(dkam.Keys)) - txPointers := make([]int64, len(dkam.Keys)) - keyIndexes := make([]int64, len(dkam.Keys)) - - for index := range dkam.Keys { - eons[index] = dkam.Eon - slots[index] = dkam.Slot - instanceIDs[index] = dkam.InstanceID - txPointers[index] = dkam.TxPointer - keyIndexes[index] = int64(index) - } - - return eons, slots, instanceIDs, txPointers, keyIndexes -} - -func computeIdentity(prefix []byte, sender common.Address) []byte { - imageBytes := append(prefix, sender.Bytes()...) - return imageBytes -} - -func getDecryptedTX( - txSubEvent data.TransactionSubmittedEvent, - identityPreimageToDecKeyAndMsg map[string]*DecKeyAndMessage, -) (*types.Transaction, error) { - identityPreimage := computeIdentity(txSubEvent.IdentityPrefix, common.BytesToAddress(txSubEvent.Sender)) - dkam, ok := identityPreimageToDecKeyAndMsg[hex.EncodeToString(identityPreimage)] - if !ok { - return nil, fmt.Errorf("identity preimage not found %s", hex.EncodeToString(identityPreimage)) - } - tx, err := decryptTransaction(dkam.Key, txSubEvent.EncryptedTransaction) - if err != nil { - return nil, err - } - return tx, nil -} - -func getDecryptionKeyID( - txSubEvent data.TransactionSubmittedEvent, - identityPreimageToDecKeyAndMsg map[string]*DecKeyAndMessage, -) (int64, error) { - identityPreimage := computeIdentity(txSubEvent.IdentityPrefix, common.BytesToAddress(txSubEvent.Sender)) - dkam, ok := identityPreimageToDecKeyAndMsg[hex.EncodeToString(identityPreimage)] - if !ok { - return 0, fmt.Errorf("identity preimage not found %s", hex.EncodeToString(identityPreimage)) - } - return dkam.DecryptionKeyID, nil -} - -func decryptTransaction(key []byte, encrypted []byte) (*types.Transaction, error) { - decryptionKey := new(shcrypto.EpochSecretKey) - err := decryptionKey.Unmarshal(key) - if err != nil { - return nil, errors.Wrapf(err, "invalid decryption key") - } - encryptedMsg := new(shcrypto.EncryptedMessage) - err = encryptedMsg.Unmarshal(encrypted) - if err != nil { - return nil, errors.Wrapf(err, "invalid encrypted msg") - } - decryptedMsg, err := encryptedMsg.Decrypt(decryptionKey) - if err != nil { - return nil, errors.Wrapf(err, "failed to decrypt message") - } - - tx := new(types.Transaction) - err = tx.UnmarshalBinary(decryptedMsg) - if err != nil { - return nil, errors.Wrapf(err, "Failed to unmarshal decrypted message to transaction type") - } - return tx, nil -} - -func isFeeTooLowError(err error) bool { - if err == nil { - return false - } - return strings.Contains(strings.ToLower(err.Error()), "feetoolow") || - strings.Contains(strings.ToLower(err.Error()), "underpriced") || - strings.Contains(strings.ToLower(err.Error()), "maxfeepergaslessthanblockbasefee") -} - -// waitForReceiptWithTimeout waits for a transaction receipt with a provided timeout. -func (tm *TxMapperDB) waitForReceiptWithTimeout(ctx context.Context, txHash common.Hash, receiptWaitTimeout time.Duration, txErrorSignalCh chan error) (*types.Receipt, error) { - ctx, cancel := context.WithTimeout(ctx, receiptWaitTimeout) - defer cancel() - - // wait for the transaction receipt - receipt, err := tm.waitForReceipt(ctx, txHash, txErrorSignalCh) - if err != nil { - return nil, fmt.Errorf("failed to get receipt for transaction %s: %w", txHash.Hex(), err) - } - return receipt, nil -} - -// waitForReceipt polls the Ethereum network for the transaction receipt until it's available or the context is canceled. -func (tm *TxMapperDB) waitForReceipt(ctx context.Context, txHash common.Hash, txErrorSignalCh chan error) (*types.Receipt, error) { - for { - // check if the context has been canceled or timed out - select { - case <-ctx.Done(): - return nil, ctx.Err() - case err := <-txErrorSignalCh: // Listen for errors from the sending goroutine - if err != nil { - return nil, err - } - default: - } - - // query for the transaction receipt - receipt, err := tm.ethClient.TransactionReceipt(ctx, txHash) - if err == ethereum.NotFound { - // If the receipt is not found, continue polling - time.Sleep(3 * time.Second) - continue - } else if err != nil { - return nil, err - } - - return receipt, nil - } -} - -func (tm *TxMapperDB) UpsertGraffitiIfShutterized(ctx context.Context, validatorIndex int64, graffiti string, blockNumber int64) (bool, error) { - upserted, err := tm.dbQuery.UpsertGraffitiIfShutterized(ctx, data.UpsertGraffitiIfShutterizedParams{ - ValidatorIndex: dbTypes.Int64ToPgTypeInt8(validatorIndex), - Graffiti: graffiti, - BlockNumber: blockNumber, - }) - return upserted, err -} diff --git a/internal/metrics/tx_mapper_decryption.go b/internal/metrics/tx_mapper_decryption.go new file mode 100644 index 0000000..47cf65f --- /dev/null +++ b/internal/metrics/tx_mapper_decryption.go @@ -0,0 +1,210 @@ +package metrics + +import ( + "context" + "encoding/hex" + "fmt" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + + "github.com/shutter-network/observer/internal/data" + "github.com/shutter-network/shutter/shlib/shcrypto" +) + +type DecryptionData struct { + Key []byte + Slot int64 +} + +type KeyShare struct { + Share []byte + Slot int64 +} + +type Tx struct { + EncryptedTx []byte + DD *DecryptionData + KS *KeyShare + BlockHash []byte +} + +type DecKeysAndMessages struct { + Eon int64 + Keys [][]byte + Identities [][]byte + Slot int64 + InstanceID int64 + TxPointer int64 +} + +type DecKeyAndMessage struct { + Slot int64 + TxPointer int64 + Eon int64 + Key []byte + IdentityPreimage []byte + KeyIndex int64 + DecryptionKeyID int64 +} + +func (tm *TxMapperDB) AddDecryptionKeysAndMessages( + ctx context.Context, + decKeysAndMessages *DecKeysAndMessages, +) error { + if len(decKeysAndMessages.Keys) == 0 { + return nil + } + tx, err := tm.db.Begin(ctx) + if err != nil { + return err + } + defer tx.Rollback(ctx) + qtx := tm.dbQuery.WithTx(tx) + + eons, slots, instanceIDs, txPointers, keyIndexes := getDecryptionMessageInfos(decKeysAndMessages) + decryptionKeyIDs, err := qtx.CreateDecryptionKeys(ctx, data.CreateDecryptionKeysParams{ + Column1: eons, + Column2: decKeysAndMessages.Identities, + Column3: decKeysAndMessages.Keys, + }) + if err != nil { + return err + } + if len(decryptionKeyIDs) == 0 || len(decryptionKeyIDs) != len(slots) { + log.Debug().Msg("no new decryption key was added") + return nil + } + err = qtx.CreateDecryptionKeyMessages(ctx, data.CreateDecryptionKeyMessagesParams{ + Column1: slots, + Column2: instanceIDs, + Column3: eons, + Column4: txPointers, + }) + if err != nil { + return err + } + + err = qtx.CreateDecryptionKeysMessageDecryptionKey(ctx, data.CreateDecryptionKeysMessageDecryptionKeyParams{ + Column1: slots, + Column2: keyIndexes, + Column3: decryptionKeyIDs, + }) + if err != nil { + return err + } + + totalDecKeysAndMessages := len(decKeysAndMessages.Keys) + err = tx.Commit(ctx) + if err != nil { + log.Err(err).Msg("unable to commit db transaction") + return err + } + + for i := 0; i < totalDecKeysAndMessages; i++ { + metricsDecKeyReceived.Inc() + } + + dkam := make([]*DecKeyAndMessage, totalDecKeysAndMessages) + for index, key := range decKeysAndMessages.Keys { + identityPreimage := decKeysAndMessages.Identities[index] + dkam[index] = &DecKeyAndMessage{ + Slot: decKeysAndMessages.Slot, + TxPointer: decKeysAndMessages.TxPointer, + Eon: decKeysAndMessages.Eon, + Key: key, + IdentityPreimage: identityPreimage, + KeyIndex: int64(index), + DecryptionKeyID: decryptionKeyIDs[index], + } + } + + if len(dkam) > 0 { + dkam = dkam[1:] + } + err = tm.processTransactionExecution(ctx, &TxExecution{ + DecKeysAndMessages: dkam, + }) + if err != nil { + log.Err(err).Int64("slot", decKeysAndMessages.Slot).Msg("failed to process transaction execution") + return err + } + return nil +} + +func getDecryptionMessageInfos(dkam *DecKeysAndMessages) ([]int64, []int64, []int64, []int64, []int64) { + eons := make([]int64, len(dkam.Keys)) + slots := make([]int64, len(dkam.Keys)) + instanceIDs := make([]int64, len(dkam.Keys)) + txPointers := make([]int64, len(dkam.Keys)) + keyIndexes := make([]int64, len(dkam.Keys)) + + for index := range dkam.Keys { + eons[index] = dkam.Eon + slots[index] = dkam.Slot + instanceIDs[index] = dkam.InstanceID + txPointers[index] = dkam.TxPointer + keyIndexes[index] = int64(index) + } + + return eons, slots, instanceIDs, txPointers, keyIndexes +} + +func computeIdentity(prefix []byte, sender common.Address) []byte { + imageBytes := append(prefix, sender.Bytes()...) + return imageBytes +} + +func getDecryptedTX( + txSubEvent data.TransactionSubmittedEvent, + identityPreimageToDecKeyAndMsg map[string]*DecKeyAndMessage, +) (*types.Transaction, error) { + identityPreimage := computeIdentity(txSubEvent.IdentityPrefix, common.BytesToAddress(txSubEvent.Sender)) + dkam, ok := identityPreimageToDecKeyAndMsg[hex.EncodeToString(identityPreimage)] + if !ok { + return nil, fmt.Errorf("identity preimage not found %s", hex.EncodeToString(identityPreimage)) + } + tx, err := decryptTransaction(dkam.Key, txSubEvent.EncryptedTransaction) + if err != nil { + return nil, err + } + return tx, nil +} + +func getDecryptionKeyID( + txSubEvent data.TransactionSubmittedEvent, + identityPreimageToDecKeyAndMsg map[string]*DecKeyAndMessage, +) (int64, error) { + identityPreimage := computeIdentity(txSubEvent.IdentityPrefix, common.BytesToAddress(txSubEvent.Sender)) + dkam, ok := identityPreimageToDecKeyAndMsg[hex.EncodeToString(identityPreimage)] + if !ok { + return 0, fmt.Errorf("identity preimage not found %s", hex.EncodeToString(identityPreimage)) + } + return dkam.DecryptionKeyID, nil +} + +func decryptTransaction(key []byte, encrypted []byte) (*types.Transaction, error) { + decryptionKey := new(shcrypto.EpochSecretKey) + err := decryptionKey.Unmarshal(key) + if err != nil { + return nil, errors.Wrapf(err, "invalid decryption key") + } + encryptedMsg := new(shcrypto.EncryptedMessage) + err = encryptedMsg.Unmarshal(encrypted) + if err != nil { + return nil, errors.Wrapf(err, "invalid encrypted msg") + } + decryptedMsg, err := encryptedMsg.Decrypt(decryptionKey) + if err != nil { + return nil, errors.Wrapf(err, "failed to decrypt message") + } + + tx := new(types.Transaction) + err = tx.UnmarshalBinary(decryptedMsg) + if err != nil { + return nil, errors.Wrapf(err, "Failed to unmarshal decrypted message to transaction type") + } + return tx, nil +} \ No newline at end of file diff --git a/internal/metrics/tx_mapper_execution.go b/internal/metrics/tx_mapper_execution.go new file mode 100644 index 0000000..1b0b526 --- /dev/null +++ b/internal/metrics/tx_mapper_execution.go @@ -0,0 +1,353 @@ +package metrics + +import ( + "context" + "encoding/hex" + "fmt" + "strings" + "sync" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/jackc/pgx/v5/pgtype" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + + "github.com/shutter-network/observer/common/utils" + "github.com/shutter-network/observer/internal/data" +) + +type TxExecution struct { + // BlockNumber int64 + DecKeysAndMessages []*DecKeyAndMessage +} + +type txExecutionJob struct { + expectedIndex int + decryptedTx *types.Transaction + txSubEvent data.TransactionSubmittedEvent + decryptionKeyID int64 +} + +func (tm *TxMapperDB) processTransactionExecution( + ctx context.Context, + te *TxExecution, +) error { + totalDecKeysAndMessages := len(te.DecKeysAndMessages) + if totalDecKeysAndMessages == 0 { + return nil + } + txSubEvents, err := tm.dbQuery.QueryTransactionSubmittedEvent(ctx, data.QueryTransactionSubmittedEventParams{ + Eon: te.DecKeysAndMessages[0].Eon, + TxIndex: te.DecKeysAndMessages[0].TxPointer, + Column3: totalDecKeysAndMessages, + }) + if err != nil { + return err + } + + if len(txSubEvents) != totalDecKeysAndMessages { + log.Debug().Int("total tx sub events", len(txSubEvents)). + Int("total decryption keys", totalDecKeysAndMessages). + Msg("total tx submitted events dont match total decryption keys") + return nil + } + + identityPreimageToDecKeyAndMsg := make(map[string]*DecKeyAndMessage) + for _, dkam := range te.DecKeysAndMessages { + identityPreimageToDecKeyAndMsg[hex.EncodeToString(dkam.IdentityPreimage)] = dkam + } + + slot := te.DecKeysAndMessages[0].Slot + expectedIdx := 0 + jobs := make([]txExecutionJob, 0, len(txSubEvents)) + + err = tm.withSlotLock(slot, func() error { + for _, txSubEvent := range txSubEvents { + decryptionKeyID, err := getDecryptionKeyID(txSubEvent, identityPreimageToDecKeyAndMsg) + if err != nil { + log.Err(err).Msg("error while trying to retrieve decryption key ID") + continue + } + + decryptedTx, err := getDecryptedTX(txSubEvent, identityPreimageToDecKeyAndMsg) + if err != nil { + log.Err(err).Msg("error while trying to get decrypted tx hash") + err := tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ + Slot: slot, + TxIndex: txSubEvent.TxIndex, + TxHash: common.Hash{}.Bytes(), + TxStatus: data.TxStatusValNotdecrypted, + InclusionPosition: InclPosUnknown, + DecryptionKeyID: decryptionKeyID, + TransactionSubmittedEventID: txSubEvent.ID, + }) + if err != nil { + log.Err(err).Msg("failed to create decrypted tx") + } + continue + } + + log.Info().Uint64("gas", decryptedTx.Gas()). + Uint64("gas-price", decryptedTx.GasPrice().Uint64()). + Uint64("cost", decryptedTx.Cost().Uint64()). + Uint64("max-priority-fee-per-gas", decryptedTx.GasTipCap().Uint64()). + Uint64("max-fee-per-gas", decryptedTx.GasFeeCap().Uint64()). + Uint8("tx-type", decryptedTx.Type()). + Msg("tx-data") + + err = tm.dbQuery.CreateDecryptedTX(ctx, data.CreateDecryptedTXParams{ + Slot: slot, + TxIndex: txSubEvent.TxIndex, + TxHash: decryptedTx.Hash().Bytes(), + TxStatus: data.TxStatusValPending, + InclusionPosition: InclPosUnknown, + DecryptionKeyID: decryptionKeyID, + TransactionSubmittedEventID: txSubEvent.ID, + }) + if err != nil { + log.Err(err).Msg("failed to create decrypted tx") + continue + } + + jobs = append(jobs, txExecutionJob{ + expectedIndex: expectedIdx, + decryptedTx: decryptedTx, + txSubEvent: txSubEvent, + decryptionKeyID: decryptionKeyID, + }) + expectedIdx++ + } + + return nil + }) + if err != nil { + return err + } + + // If the block already exists for this slot, classify now that rows are present. + tm.maybeHandleStoredBlock(ctx, slot) + + var wg sync.WaitGroup + for _, job := range jobs { + currExpected := job.expectedIndex + decryptedTx := job.decryptedTx + txSubEvent := job.txSubEvent + decryptionKeyID := job.decryptionKeyID + + txErrorSignalCh := make(chan error, 1) + wg.Add(2) + + go func(ctx context.Context, decryptedTx *types.Transaction, txSubEvent data.TransactionSubmittedEvent, slot int64, decryptionKeyID int64, txErrorSignalCh chan error) { + defer wg.Done() + + select { + case <-ctx.Done(): + txErrorSignalCh <- fmt.Errorf("transaction send cancelled due to context: %w", ctx.Err()) + return + case <-time.After(time.Duration(tm.config.InclusionDelay) * time.Second): + if tm.isDone(decryptedTx.Hash()) { + return + } + + if err := tm.ethClient.SendTransaction(ctx, decryptedTx); err != nil { + log.Err(err).Msg("failed to send transaction") + if err.Error() == "AlreadyKnown" { + log.Debug().Hex("tx-hash", decryptedTx.Hash().Bytes()).Msg("already known") + return + } + + txStatus := data.TxStatusValInvalid + if isFeeTooLowError(err) { + txStatus = data.TxStatusValInvalidfeetoolow + } + err := tm.withSlotLock(slot, func() error { + if tm.isDone(decryptedTx.Hash()) { + return nil + } + + return tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ + Slot: slot, + TxIndex: txSubEvent.TxIndex, + TxHash: decryptedTx.Hash().Bytes(), + TxStatus: txStatus, + InclusionPosition: InclPosUnknown, + DecryptionKeyID: decryptionKeyID, + TransactionSubmittedEventID: txSubEvent.ID, + BlockNumber: pgtype.Int8{}, + }) + }) + if err != nil { + log.Err(err).Msg("failed to upsert decrypted tx") + } + txErrorSignalCh <- fmt.Errorf("%w: %v", errSendTransaction, err) + return + } + + log.Info().Hex("tx-hash", decryptedTx.Hash().Bytes()).Msg("transaction sent") + } + }(ctx, decryptedTx, txSubEvent, slot, decryptionKeyID, txErrorSignalCh) + + go func(ctx context.Context, expectedIndex int, txHash common.Hash, txIndex int64, slot int64, decryptionKeyID int64, txSubEventID int64, txErrorSignalCh chan error) { + defer wg.Done() + + if tm.isDone(txHash) { + return + } + + receipt, err := tm.waitForReceiptWithTimeout(ctx, txHash, ReceiptWaitTimeout, txErrorSignalCh) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + if errors.Is(err, errSendTransaction) { + log.Debug().Hex("tx-hash", txHash.Bytes()).Err(err).Msg("receipt wait stopped after send failure") + return + } + + log.Err(err).Hex("tx-hash", txHash.Bytes()).Msg("receipt wait failed") + err := tm.withSlotLock(slot, func() error { + if tm.isDone(txHash) { + return nil + } + + return tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ + Slot: slot, + TxIndex: txIndex, + TxHash: txHash[:], + TxStatus: data.TxStatusValNotincluded, + InclusionPosition: InclPosUnknown, + DecryptionKeyID: decryptionKeyID, + TransactionSubmittedEventID: txSubEventID, + BlockNumber: pgtype.Int8{}, + }) + }) + if err != nil { + log.Err(err).Msg("failed to upsert decrypted tx") + } + return + } + + // Block classification may have completed while we were waiting. + if tm.isDone(txHash) { + return + } + + log.Info().Hex("tx-hash", receipt.TxHash.Bytes()). + Uint64("receipt-status", receipt.Status). + Msg("transaction receipt found") + + block, err := tm.ethClient.BlockByNumber(ctx, receipt.BlockNumber) + if err != nil { + log.Err(err).Uint64("block-number", receipt.BlockNumber.Uint64()).Msg("failed to retrieve block") + return + } + + // Block classification may have completed while we were fetching the block. + if tm.isDone(txHash) { + return + } + + inclusionSlot := utils.GetSlotForBlock(block.Header().Time, tm.genesisTimestamp, tm.slotDuration) + txStatus, inclusionPosition := data.TxStatusValShieldedinclusion, InclPosUnknown + if inclusionSlot != uint64(slot) { + txStatus = data.TxStatusValUnshieldedinclusion + inclusionPosition = InclPosWrong + } else { + txStatus, inclusionPosition = classifyInclusion(expectedIndex, receipt.TransactionIndex) + } + + log.Info(). + Int64("expected-slot", slot). + Uint64("receipt-slot", inclusionSlot). + Uint("expected-index", uint(expectedIndex)). + Uint("receipt-index", receipt.TransactionIndex). + Hex("tx-hash", receipt.TxHash.Bytes()). + Str("inclusion_position", inclusionPosition). + Str("tx_status", string(txStatus)). + Msg("transaction receipt classified") + + err = tm.withSlotLock(slot, func() error { + if tm.isDone(txHash) { + return nil + } + + if err := tm.dbQuery.UpsertTX(ctx, data.UpsertTXParams{ + Slot: slot, + TxIndex: txIndex, + TxHash: receipt.TxHash.Bytes(), + TxStatus: txStatus, + InclusionPosition: inclusionPosition, + DecryptionKeyID: decryptionKeyID, + TransactionSubmittedEventID: txSubEventID, + BlockNumber: pgtype.Int8{Int64: receipt.BlockNumber.Int64(), Valid: true}, + }); err != nil { + return err + } + + tm.markDone(txHash) + return nil + }) + if err != nil { + log.Err(err).Msg("failed to update decrypted tx") + } + }(ctx, currExpected, decryptedTx.Hash(), txSubEvent.TxIndex, slot, decryptionKeyID, txSubEvent.ID, txErrorSignalCh) + } + + wg.Wait() + return nil +} + +func (tm *TxMapperDB) waitForReceiptWithTimeout(ctx context.Context, txHash common.Hash, receiptWaitTimeout time.Duration, txErrorSignalCh chan error) (*types.Receipt, error) { + ctx, cancel := context.WithTimeout(ctx, receiptWaitTimeout) + defer cancel() + + // wait for the transaction receipt + receipt, err := tm.waitForReceipt(ctx, txHash, txErrorSignalCh) + if err != nil { + return nil, fmt.Errorf("failed to get receipt for transaction %s: %w", txHash.Hex(), err) + } + return receipt, nil +} + +func (tm *TxMapperDB) waitForReceipt(ctx context.Context, txHash common.Hash, txErrorSignalCh chan error) (*types.Receipt, error) { + for { + if tm.isDone(txHash) { + return nil, context.Canceled + } + + // check if the context has been canceled or timed out + select { + case <-ctx.Done(): + return nil, ctx.Err() + case err := <-txErrorSignalCh: // Listen for errors from the sending goroutine + if err != nil { + return nil, err + } + default: + } + + // query for the transaction receipt + receipt, err := tm.ethClient.TransactionReceipt(ctx, txHash) + if errors.Is(err, ethereum.NotFound) || err == ethereum.NotFound { + // If the receipt is not found, continue polling + time.Sleep(3 * time.Second) + continue + } else if err != nil { + return nil, err + } + + return receipt, nil + } +} + +func isFeeTooLowError(err error) bool { + if err == nil { + return false + } + return strings.Contains(strings.ToLower(err.Error()), "feetoolow") || + strings.Contains(strings.ToLower(err.Error()), "underpriced") || + strings.Contains(strings.ToLower(err.Error()), "maxfeepergaslessthanblockbasefee") +} \ No newline at end of file diff --git a/internal/metrics/tx_mapper_test.go b/internal/metrics/tx_mapper_test.go new file mode 100644 index 0000000..dfcbce0 --- /dev/null +++ b/internal/metrics/tx_mapper_test.go @@ -0,0 +1,114 @@ +package metrics + +import ( + "testing" + + "github.com/shutter-network/observer/internal/data" +) + +func TestClassifyInclusion(t *testing.T) { + t.Parallel() + cases := []struct { + name string + expectedIndex int + receiptIndex uint + wantStatus data.TxStatusVal + wantPos string + }{ + {"exact", 2, 2, data.TxStatusValShieldedinclusion, InclPosExact}, + {"later", 1, 3, data.TxStatusValUnshieldedinclusion, InclPosLater}, + {"earlier", 4, 1, data.TxStatusValTentativeshieldedinclusion, InclPosEarlier}, + } + for _, c := range cases { + gotStatus, gotPos := classifyInclusion(c.expectedIndex, c.receiptIndex) + if gotStatus != c.wantStatus || gotPos != c.wantPos { + t.Fatalf("%s: got (%v,%s), want (%v,%s)", c.name, gotStatus, gotPos, c.wantStatus, + c.wantPos) + } + } +} + +func TestClassifyWithPredecessors(t *testing.T) { + t.Parallel() + + makeEntries := func(statuses ...data.TxStatusVal) []batchEntry { + out := make([]batchEntry, len(statuses)) + for i, st := range statuses { + out[i] = batchEntry{status: st} + } + return out + } + + cases := []struct { + name string + expectedPos int + blockPos int + entries []batchEntry + wantStatus data.TxStatusVal + wantPos string + }{ + { + name: "pos0-exact-shielded", + expectedPos: 0, blockPos: 0, + entries: makeEntries(data.TxStatusValPending), + wantStatus: data.TxStatusValShieldedinclusion, wantPos: InclPosExact, + }, + { + name: "pos0-later-unshielded", + expectedPos: 0, blockPos: 1, + entries: makeEntries(data.TxStatusValPending), + wantStatus: data.TxStatusValUnshieldedinclusion, wantPos: InclPosLater, + }, + { + name: "exact-shielded", + expectedPos: 1, blockPos: 1, + entries: makeEntries(data.TxStatusValShieldedinclusion, data.TxStatusValPending), + wantStatus: data.TxStatusValShieldedinclusion, wantPos: InclPosExact, + }, + { + name: "later-unshielded", + expectedPos: 0, blockPos: 1, + entries: makeEntries(data.TxStatusValPending), + wantStatus: data.TxStatusValUnshieldedinclusion, wantPos: InclPosLater, + }, + { + name: "exact-tentative-predecessor", + expectedPos: 1, blockPos: 1, + entries: makeEntries(data.TxStatusValTentativeshieldedinclusion, data.TxStatusValPending), + wantStatus: data.TxStatusValTentativeshieldedinclusion, wantPos: InclPosExact, + }, + { + name: "earlier-tentative", + expectedPos: 2, blockPos: 1, + entries: makeEntries(data.TxStatusValShieldedinclusion, data.TxStatusValPending, data.TxStatusValPending), + wantStatus: data.TxStatusValTentativeshieldedinclusion, wantPos: InclPosEarlier, + }, + { + name: "bad-predecessor-unshielded", + expectedPos: 1, blockPos: 1, + entries: makeEntries(data.TxStatusValUnshieldedinclusion, data.TxStatusValPending), + wantStatus: data.TxStatusValUnshieldedinclusion, wantPos: InclPosExact, + }, + { + name: "earlier due to invalid predecessor -> tentative earlier", + expectedPos: 2, blockPos: 1, + entries: makeEntries(data.TxStatusValInvalid, data.TxStatusValPending, data.TxStatusValPending), + wantStatus: data.TxStatusValTentativeshieldedinclusion, wantPos: InclPosEarlier, + }, + { + name: "exact with not included predecessor -> tentative", + expectedPos: 1, blockPos: 1, + entries: makeEntries(data.TxStatusValNotincluded, data.TxStatusValPending), + wantStatus: data.TxStatusValTentativeshieldedinclusion, wantPos: InclPosExact, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + gotStatus, gotPos := classifyWithPredecessors(c.expectedPos, c.blockPos, c.entries) + if gotStatus != c.wantStatus || gotPos != c.wantPos { + t.Errorf("got (%v, %s), want (%v, %s)", gotStatus, gotPos, c.wantStatus, c.wantPos) + } + }) + } +} \ No newline at end of file diff --git a/internal/metrics/tx_mapper_validators.go b/internal/metrics/tx_mapper_validators.go new file mode 100644 index 0000000..3aeea51 --- /dev/null +++ b/internal/metrics/tx_mapper_validators.go @@ -0,0 +1,298 @@ +package metrics + +import ( + "context" + "fmt" + "math" + "sync" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/pkg/errors" + "github.com/rs/zerolog/log" + + validatorRegistryBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/validatorregistry" + dbTypes "github.com/shutter-network/observer/common/database" + "github.com/shutter-network/observer/internal/data" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/beaconapiclient" + "github.com/shutter-network/rolling-shutter/rolling-shutter/medley/validatorregistry" + blst "github.com/supranational/blst/bindings/go" +) + +type validatorData struct { + validatorStatus string + validatorValidity data.ValidatorRegistrationValidity +} + +func (tm *TxMapperDB) QueryBlockNumberFromValidatorRegistryEventsSyncedUntil(ctx context.Context) (int64, error) { + data, err := tm.dbQuery.QueryValidatorRegistryEventsSyncedUntil(ctx) + if err != nil { + return 0, err + } + return data.BlockNumber, nil +} + +func (tm *TxMapperDB) UpsertGraffitiIfShutterized(ctx context.Context, validatorIndex int64, graffiti string, blockNumber int64) (bool, error) { + upserted, err := tm.dbQuery.UpsertGraffitiIfShutterized(ctx, data.UpsertGraffitiIfShutterizedParams{ + ValidatorIndex: dbTypes.Int64ToPgTypeInt8(validatorIndex), + Graffiti: graffiti, + BlockNumber: blockNumber, + }) + return upserted, err +} + +func (tm *TxMapperDB) AddValidatorRegistryEvent(ctx context.Context, tx pgx.Tx, vr *validatorRegistryBindings.ValidatorregistryUpdated) error { + regMessage := &validatorregistry.AggregateRegistrationMessage{} + err := regMessage.Unmarshal(vr.Message) + if err != nil { + log.Err(err).Hex("tx-hash", vr.Raw.TxHash.Bytes()).Msg("error unmarshalling registration message") + } else { + validatorIDtoValidity, err := tm.validateValidatorRegistryEvent(ctx, vr, regMessage, uint64(tm.chainID), tm.config.ValidatorRegistryContractAddress) + if err != nil { + log.Err(err).Msg("error validating validator registry events") + return err + } + + q := tm.dbQuery + if tx != nil { + // Use transaction if available + q = tm.dbQuery.WithTx(tx) + } + + for validatorID, validatorData := range validatorIDtoValidity { + err := q.CreateValidatorRegistryMessage(ctx, data.CreateValidatorRegistryMessageParams{ + Version: dbTypes.Uint64ToPgTypeInt8(uint64(regMessage.Version)), + ChainID: dbTypes.Uint64ToPgTypeInt8(regMessage.ChainID), + ValidatorRegistryAddress: regMessage.ValidatorRegistryAddress.Bytes(), + ValidatorIndex: dbTypes.Int64ToPgTypeInt8(validatorID), + Nonce: dbTypes.Uint64ToPgTypeInt8(uint64(regMessage.Nonce)), + IsRegisteration: dbTypes.BoolToPgTypeBool(regMessage.IsRegistration), + Signature: vr.Signature, + EventBlockNumber: int64(vr.Raw.BlockNumber), + EventTxIndex: int64(vr.Raw.TxIndex), + EventLogIndex: int64(vr.Raw.Index), + Validity: validatorData.validatorValidity, + }) + if err != nil { + return err + } + + if validatorData.validatorValidity == data.ValidatorRegistrationValidityValid && + validatorData.validatorStatus != "" { + err := q.CreateValidatorStatus(ctx, data.CreateValidatorStatusParams{ + ValidatorIndex: dbTypes.Int64ToPgTypeInt8(validatorID), + Status: validatorData.validatorStatus, + }) + if err != nil { + return err + } + } + } + } + return nil +} + +func (tm *TxMapperDB) UpdateValidatorStatus(ctx context.Context) error { + batchSize := 100 + jumpBy := 0 + numWorkers := 5 + sem := make(chan struct{}, numWorkers) + var wg sync.WaitGroup + + for { + // Query a batch of validator statuses + validatorStatus, err := tm.dbQuery.QueryValidatorStatuses(ctx, data.QueryValidatorStatusesParams{ + Limit: int32(batchSize), + Offset: int32(jumpBy), + }) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + break + } + return err + } + + if len(validatorStatus) == 0 { + break + } + + // Launch goroutines to process each status concurrently + for _, vs := range validatorStatus { + sem <- struct{}{} + wg.Add(1) + go func(vs data.QueryValidatorStatusesRow) { + defer wg.Done() + defer func() { <-sem }() + + validatorIndex := uint64(vs.ValidatorIndex.Int64) + //TODO: should we keep this log or remove it? + log.Debug().Uint64("validatorIndex", validatorIndex).Msg("validator status being updated") + validator, err := tm.beaconAPIClient.GetValidatorByIndex(ctx, "head", validatorIndex) + if err != nil { + log.Err(err).Uint64("validatorIndex", validatorIndex).Msg("failed to get validator from beacon chain") + return + } + if validator == nil { + return + } + + err = tm.dbQuery.CreateValidatorStatus(ctx, data.CreateValidatorStatusParams{ + ValidatorIndex: dbTypes.Uint64ToPgTypeInt8(validatorIndex), + Status: validator.Data.Status, + }) + if err != nil { + log.Err(err).Uint64("validatorIndex", validatorIndex).Msg("failed to create validator status") + return + } + }(vs) + } + + wg.Wait() + + // Wait for 3 seconds before processing the next batch + select { + case <-ctx.Done(): + return ctx.Err() // Handle context cancellation + case <-time.After(3 * time.Second): + } + + jumpBy += batchSize + } + + return nil +} + +func (tm *TxMapperDB) AddProposerDuties(ctx context.Context, epoch uint64) error { + proposerDuties, err := tm.beaconAPIClient.GetProposerDutiesByEpoch(ctx, epoch) + if err != nil { + return err + } + if proposerDuties == nil { + return errors.Errorf("no proposer duties found for epoch %d", epoch) + } + + log.Info().Uint64("epoch", epoch).Msg("processing proposer duties") + + publicKeys := make([]string, len(proposerDuties.Data)) + validatorIndices := make([]int64, len(proposerDuties.Data)) + slots := make([]int64, len(proposerDuties.Data)) + + for i := 0; i < len(proposerDuties.Data); i++ { + publicKeys[i] = proposerDuties.Data[i].Pubkey + validatorIndices[i] = int64(proposerDuties.Data[i].ValidatorIndex) + slots[i] = int64(proposerDuties.Data[i].Slot) + } + + err = tm.dbQuery.CreateProposerDuties(ctx, data.CreateProposerDutiesParams{ + Column1: publicKeys, + Column2: validatorIndices, + Column3: slots, + }) + return err +} + +func (tm *TxMapperDB) validateValidatorRegistryEvent( + ctx context.Context, + vr *validatorRegistryBindings.ValidatorregistryUpdated, + regMessage *validatorregistry.AggregateRegistrationMessage, + chainID uint64, + validatorRegistryContractAddress string, +) (map[int64]*validatorData, error) { + staticRegistrationMessageValidity := validateValidatorRegistryMessageContents(regMessage, chainID, validatorRegistryContractAddress) + + var publicKeys []*blst.P1Affine + var validators []*beaconapiclient.GetValidatorByIndexResponse + validatorIDtoValidity := make(map[int64]*validatorData) + + for _, validatorIndex := range regMessage.ValidatorIndices() { + validatorIDtoValidity[validatorIndex] = &validatorData{validatorValidity: staticRegistrationMessageValidity} + nonceBefore, err := tm.dbQuery.QueryValidatorRegistrationMessageNonceBefore(ctx, data.QueryValidatorRegistrationMessageNonceBeforeParams{ + ValidatorIndex: dbTypes.Int64ToPgTypeInt8(validatorIndex), + EventBlockNumber: int64(vr.Raw.BlockNumber), + EventTxIndex: int64(vr.Raw.TxIndex), + EventLogIndex: int64(vr.Raw.Index), + }) + + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + // No previous nonce means the message is valid regarding nonce + nonceBefore = pgtype.Int8{Int64: -1, Valid: true} + } else { + return nil, errors.Wrapf(err, "failed to query latest nonce for validator %d", validatorIndex) + } + } + + if regMessage.Nonce > math.MaxInt32 || int64(regMessage.Nonce) <= nonceBefore.Int64 { + // skip the validator + log.Warn(). + Uint32("nonce", regMessage.Nonce). + Int64("before-nonce", nonceBefore.Int64). + Msg("ignoring validator with invalid nonce") + validatorIDtoValidity[validatorIndex].validatorValidity = data.ValidatorRegistrationValidityInvalidmessage + continue + } + validator, err := tm.beaconAPIClient.GetValidatorByIndex(ctx, "head", uint64(validatorIndex)) + if err != nil { + return nil, errors.Wrapf(err, "failed to get validator %d", validatorIndex) + } + if validator == nil { + // validator not found + log.Warn().Msg("registration message for unknown validator") + validatorIDtoValidity[validatorIndex].validatorValidity = data.ValidatorRegistrationValidityInvalidmessage + continue + } + validatorIDtoValidity[validatorIndex].validatorStatus = validator.Data.Status + publicKey, err := validator.Data.Validator.GetPubkey() + if err != nil { + return nil, errors.Wrapf(err, "failed to get public key of validator %d", validatorIndex) + } + publicKeys = append(publicKeys, publicKey) + validators = append(validators, validator) + } + if len(publicKeys) > 0 { + // now we need to check for signature verification depending on the message version + sig := new(blst.P2Affine).Uncompress(vr.Signature) + if sig == nil { + return nil, fmt.Errorf("ignoring registration message with undecodable signature") + } + + if regMessage.Version == validatorregistry.LegacyValidatorRegistrationMessageVersion { + regMessage := new(validatorregistry.LegacyRegistrationMessage) + err := regMessage.Unmarshal(vr.Message) + if err != nil { + return nil, errors.Wrapf(err, "failed to unmarshal legacy registration message") + } + if valid := validatorregistry.VerifySignature(sig, publicKeys[0], regMessage); !valid { + validatorIDtoValidity[int64(validators[0].Data.Index)].validatorValidity = data.ValidatorRegistrationValidityInvalidsignature + log.Warn().Msg("invalid legacy registration message with invalid signature") + } + } else { + if valid := validatorregistry.VerifyAggregateSignature(sig, publicKeys, regMessage); !valid { + for _, validator := range validators { + validatorIDtoValidity[int64(validator.Data.Index)].validatorValidity = data.ValidatorRegistrationValidityInvalidsignature + } + log.Warn().Msg("invalid aggregate registration message with invalid signature") + } + } + } + return validatorIDtoValidity, nil +} + +func validateValidatorRegistryMessageContents(msg *validatorregistry.AggregateRegistrationMessage, chainID uint64, validatorRegistryContractAddress string) data.ValidatorRegistrationValidity { + validity := data.ValidatorRegistrationValidityValid + if msg.Version != validatorregistry.AggregateValidatorRegistrationMessageVersion && + msg.Version != validatorregistry.LegacyValidatorRegistrationMessageVersion { + return data.ValidatorRegistrationValidityInvalidmessage + } + if msg.ChainID != chainID { + return data.ValidatorRegistrationValidityInvalidmessage + } + if msg.ValidatorRegistryAddress.String() != validatorRegistryContractAddress { + return data.ValidatorRegistrationValidityInvalidmessage + } + if msg.ValidatorIndex > math.MaxInt64 { + return data.ValidatorRegistrationValidityInvalidmessage + } + return validity +} \ No newline at end of file diff --git a/internal/metrics/types.go b/internal/metrics/types.go deleted file mode 100644 index ad94f58..0000000 --- a/internal/metrics/types.go +++ /dev/null @@ -1,69 +0,0 @@ -package metrics - -import ( - "context" - - "github.com/jackc/pgx/v5" - sequencerBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/sequencer" - validatorRegistryBindings "github.com/shutter-network/gnosh-contracts/gnoshcontracts/validatorregistry" - "github.com/shutter-network/observer/internal/data" -) - -type DecryptionData struct { - Key []byte - Slot int64 -} - -type KeyShare struct { - Share []byte - Slot int64 -} - -type Tx struct { - EncryptedTx []byte - DD *DecryptionData - KS *KeyShare - BlockHash []byte -} - -type DecKeysAndMessages struct { - Eon int64 - Keys [][]byte - Identities [][]byte - Slot int64 - InstanceID int64 - TxPointer int64 -} - -type DecKeyAndMessage struct { - Slot int64 - TxPointer int64 - Eon int64 - Key []byte - IdentityPreimage []byte - KeyIndex int64 - DecryptionKeyID int64 -} - -type TxExecution struct { - // BlockNumber int64 - DecKeysAndMessages []*DecKeyAndMessage -} - -type TxMapper interface { - AddTransactionSubmittedEvent(ctx context.Context, tx pgx.Tx, st *sequencerBindings.SequencerTransactionSubmitted) error - AddDecryptionKeysAndMessages( - ctx context.Context, - dkam *DecKeysAndMessages, - ) error - AddKeyShare(ctx context.Context, dks *data.DecryptionKeyShare) error - AddBlock( - ctx context.Context, - b *data.Block, - ) error - QueryBlockNumberFromValidatorRegistryEventsSyncedUntil(ctx context.Context) (int64, error) - AddValidatorRegistryEvent(ctx context.Context, tx pgx.Tx, vr *validatorRegistryBindings.ValidatorregistryUpdated) error - UpdateValidatorStatus(ctx context.Context) error - AddProposerDuties(ctx context.Context, epoch uint64) error - UpsertGraffitiIfShutterized(ctx context.Context, validatorIndex int64, graffiti string, blockNumber int64) (bool, error) -} diff --git a/internal/watcher/blocks.go b/internal/watcher/blocks.go index 2357239..b03f31e 100644 --- a/internal/watcher/blocks.go +++ b/internal/watcher/blocks.go @@ -106,6 +106,18 @@ func (bw *BlocksWatcher) processBlock(ctx context.Context, header *types.Header) if err != nil { return err } + + // classify decrypted txs included in this block as early as possible + block, err := bw.ethClient.BlockByHash(ctx, header.Hash()) + if err == nil { + slot := utils.GetSlotForBlock(header.Time, GenesisTimestamp, SlotDuration) + if err := bw.txMapper.HandleBlock(ctx, block.Number().Int64(), int64(slot), block.Transactions()); err != nil { + log.Err(err).Msg("tx mapper handle block failed") + } + } else { + log.Err(err).Uint64("block", header.Number.Uint64()).Msg("failed to fetch block body for classification") + } + bw.clearOldBlocks(header) if err := bw.transactionSubmittedSyncer.Sync(ctx, header); err != nil { diff --git a/migrations/20260313160000_add_inclusion_position.sql b/migrations/20260313160000_add_inclusion_position.sql new file mode 100644 index 0000000..730b71e --- /dev/null +++ b/migrations/20260313160000_add_inclusion_position.sql @@ -0,0 +1,9 @@ +-- +goose Up +ALTER TABLE decrypted_tx + ADD COLUMN inclusion_position TEXT NOT NULL DEFAULT 'unknown'; + +ALTER TYPE tx_status_val ADD VALUE IF NOT EXISTS 'tentative shielded inclusion'; + +-- +goose Down +ALTER TABLE decrypted_tx DROP COLUMN inclusion_position; +-- cannot easily drop enum value; left as-is on down. \ No newline at end of file