From 646b7a5105fb1fed6e312b096d182cb1e43b5e58 Mon Sep 17 00:00:00 2001 From: Shogo Hyodo Date: Mon, 14 Nov 2022 16:58:59 +0900 Subject: [PATCH 1/3] Fix duplicated tx index --- state/txindex/indexer_service.go | 48 ++++++++ state/txindex/indexer_service_test.go | 161 ++++++++++++++++++++++++++ 2 files changed, 209 insertions(+) diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index 56b483ca8..268cd2239 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -3,6 +3,7 @@ package txindex import ( "context" + abci "github.com/line/ostracon/abci/types" "github.com/line/ostracon/libs/service" "github.com/line/ostracon/state/indexer" "github.com/line/ostracon/types" @@ -82,6 +83,11 @@ func (is *IndexerService) OnStart() error { is.Logger.Info("indexed block", "height", height) } + batch.Ops, err = DeduplicateBatch(batch.Ops, is.txIdxr) + if err != nil { + is.Logger.Error("deduplicate batch", "height", height) + } + if err = is.txIdxr.AddBatch(batch); err != nil { is.Logger.Error("failed to index block txs", "height", height, "err", err) } else { @@ -98,3 +104,45 @@ func (is *IndexerService) OnStop() { _ = is.eventBus.UnsubscribeAll(context.Background(), subscriber) } } + +// DeduplicateBatch consider the case of duplicate txs. +// if the current one under investigation is NOT OK, then we need to check +// whether there's a previously indexed tx. +// SKIP the current tx if the previously indexed record is found and successful. +func DeduplicateBatch(ops []*abci.TxResult, txIdxr TxIndexer) ([]*abci.TxResult, error) { + result := make([]*abci.TxResult, 0, len(ops)) + + // keep track of successful txs in this block in order to suppress latter ones being indexed. + var successfulTxsInThisBlock = make(map[string]struct{}) + + for _, txResult := range ops { + hash := types.Tx(txResult.Tx).Hash() + + if txResult.Result.IsOK() { + successfulTxsInThisBlock[string(hash)] = struct{}{} + } else { + // if it already appeared in current block and was successful, skip. + if _, found := successfulTxsInThisBlock[string(hash)]; found { + continue + } + + // check if this tx hash is already indexed + old, err := txIdxr.Get(hash) + + // if db op errored + // Not found is not an error + if err != nil { + return nil, err + } + + // if it's already indexed in an older block and was successful, skip. + if old != nil && old.Result.Code == abci.CodeTypeOK { + continue + } + } + + result = append(result, txResult) + } + + return result, nil +} diff --git a/state/txindex/indexer_service_test.go b/state/txindex/indexer_service_test.go index ba1c3f8fa..59ff8fa17 100644 --- a/state/txindex/indexer_service_test.go +++ b/state/txindex/indexer_service_test.go @@ -79,3 +79,164 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { require.NoError(t, err) require.Equal(t, txResult2, res) } + +func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) { + var mockTx = types.Tx("MOCK_TX_HASH") + + testCases := []struct { + name string + tx1 abci.TxResult + tx2 abci.TxResult + expSkip bool // do we expect the second tx to be skipped by tx indexer + }{ + {"skip, previously successful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + }, + }, + abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + true, + }, + {"not skip, previously unsuccessful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + false, + }, + {"not skip, both successful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + }, + }, + abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + }, + }, + false, + }, + {"not skip, both unsuccessful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + abci.TxResult{ + Height: 2, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + false, + }, + {"skip, same block, previously successful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + }, + }, + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + true, + }, + {"not skip, same block, previously unsuccessful", + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK + 1, + }, + }, + abci.TxResult{ + Height: 1, + Index: 0, + Tx: mockTx, + Result: abci.ResponseDeliverTx{ + Code: abci.CodeTypeOK, + }, + }, + false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + indexer := kv.NewTxIndex(db.NewMemDB()) + + if tc.tx1.Height != tc.tx2.Height { + // index the first tx + err := indexer.AddBatch(&txindex.Batch{ + Ops: []*abci.TxResult{&tc.tx1}, + }) + require.NoError(t, err) + + // check if the second one should be skipped. + ops, err := txindex.DeduplicateBatch([]*abci.TxResult{&tc.tx2}, indexer) + require.NoError(t, err) + + if tc.expSkip { + require.Empty(t, ops) + } else { + require.Equal(t, []*abci.TxResult{&tc.tx2}, ops) + } + } else { + // same block + ops := []*abci.TxResult{&tc.tx1, &tc.tx2} + ops, err := txindex.DeduplicateBatch(ops, indexer) + require.NoError(t, err) + if tc.expSkip { + // the second one is skipped + require.Equal(t, []*abci.TxResult{&tc.tx1}, ops) + } else { + require.Equal(t, []*abci.TxResult{&tc.tx1, &tc.tx2}, ops) + } + } + }) + } +} From e02f6b8519fd302e172f6f82aa88d8d8d70dffef Mon Sep 17 00:00:00 2001 From: Shogo Hyodo Date: Mon, 14 Nov 2022 18:22:19 +0900 Subject: [PATCH 2/3] Update CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e488e1a3..7c073eadf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ ### BUG FIXES - [state] [\#458](https://github.com/line/ostracon/pull/458) Fix the thread-unsafe of PeerState logging +- [indexer] [#498](https://github.com/tendermint/tendermint/pull/498) Fix overriding tx index of duplicated txs ## v1.0.6 From 4941250f713029a39137111e29183e77436d69d3 Mon Sep 17 00:00:00 2001 From: Shogo Hyodo Date: Mon, 14 Nov 2022 18:29:47 +0900 Subject: [PATCH 3/3] Revert "Update CHANGELOG" This reverts commit e02f6b8519fd302e172f6f82aa88d8d8d70dffef. --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c073eadf..5e488e1a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,6 @@ ### BUG FIXES - [state] [\#458](https://github.com/line/ostracon/pull/458) Fix the thread-unsafe of PeerState logging -- [indexer] [#498](https://github.com/tendermint/tendermint/pull/498) Fix overriding tx index of duplicated txs ## v1.0.6