Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix overriding tx index of duplicated txs #498

Merged
merged 3 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions state/txindex/indexer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package txindex
import (
"context"

abci "github.com/line/ostracon/abci/types"
"github.com/line/ostracon/libs/service"
"github.com/line/ostracon/state/indexer"
"github.com/line/ostracon/types"
Expand Down Expand Up @@ -82,6 +83,11 @@ func (is *IndexerService) OnStart() error {
is.Logger.Info("indexed block", "height", height)
}

batch.Ops, err = DeduplicateBatch(batch.Ops, is.txIdxr)
if err != nil {
is.Logger.Error("deduplicate batch", "height", height)
}

if err = is.txIdxr.AddBatch(batch); err != nil {
is.Logger.Error("failed to index block txs", "height", height, "err", err)
} else {
Expand All @@ -98,3 +104,45 @@ func (is *IndexerService) OnStop() {
_ = is.eventBus.UnsubscribeAll(context.Background(), subscriber)
}
}

// DeduplicateBatch consider the case of duplicate txs.
// if the current one under investigation is NOT OK, then we need to check
// whether there's a previously indexed tx.
// SKIP the current tx if the previously indexed record is found and successful.
func DeduplicateBatch(ops []*abci.TxResult, txIdxr TxIndexer) ([]*abci.TxResult, error) {
result := make([]*abci.TxResult, 0, len(ops))

// keep track of successful txs in this block in order to suppress latter ones being indexed.
var successfulTxsInThisBlock = make(map[string]struct{})

for _, txResult := range ops {
hash := types.Tx(txResult.Tx).Hash()

if txResult.Result.IsOK() {
successfulTxsInThisBlock[string(hash)] = struct{}{}
} else {
// if it already appeared in current block and was successful, skip.
if _, found := successfulTxsInThisBlock[string(hash)]; found {
continue
}

// check if this tx hash is already indexed
old, err := txIdxr.Get(hash)

// if db op errored
// Not found is not an error
if err != nil {
return nil, err
}

// if it's already indexed in an older block and was successful, skip.
if old != nil && old.Result.Code == abci.CodeTypeOK {
continue
}
}

result = append(result, txResult)
}

return result, nil
}
161 changes: 161 additions & 0 deletions state/txindex/indexer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,164 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
require.NoError(t, err)
require.Equal(t, txResult2, res)
}

func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) {
var mockTx = types.Tx("MOCK_TX_HASH")

testCases := []struct {
name string
tx1 abci.TxResult
tx2 abci.TxResult
expSkip bool // do we expect the second tx to be skipped by tx indexer
}{
{"skip, previously successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
true,
},
{"not skip, previously unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
false,
},
{"not skip, both successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
false,
},
{"not skip, both unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
false,
},
{"skip, same block, previously successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
true,
},
{"not skip, same block, previously unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
indexer := kv.NewTxIndex(db.NewMemDB())

if tc.tx1.Height != tc.tx2.Height {
// index the first tx
err := indexer.AddBatch(&txindex.Batch{
Ops: []*abci.TxResult{&tc.tx1},
})
require.NoError(t, err)

// check if the second one should be skipped.
ops, err := txindex.DeduplicateBatch([]*abci.TxResult{&tc.tx2}, indexer)
require.NoError(t, err)

if tc.expSkip {
require.Empty(t, ops)
} else {
require.Equal(t, []*abci.TxResult{&tc.tx2}, ops)
}
} else {
// same block
ops := []*abci.TxResult{&tc.tx1, &tc.tx2}
ops, err := txindex.DeduplicateBatch(ops, indexer)
require.NoError(t, err)
if tc.expSkip {
// the second one is skipped
require.Equal(t, []*abci.TxResult{&tc.tx1}, ops)
} else {
require.Equal(t, []*abci.TxResult{&tc.tx1, &tc.tx2}, ops)
}
}
})
}
}