Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: events: add lotus-shed indexes inspect-events health-check #12346

Merged
merged 1 commit into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
## New features

- feat: Add trace filter API supporting RPC method `trace_filter` ([filecoin-project/lotus#12123](https://github.com/filecoin-project/lotus/pull/12123)). Configuring `EthTraceFilterMaxResults` sets a limit on how many results are returned in any individual `trace_filter` RPC API call.

- feat: `FilecoinAddressToEthAddress` RPC can now return ETH addresses for all Filecoin address types ("f0"/"f1"/"f2"/"f3") based on client's re-org tolerance. This is a breaking change if you are using the API via the go-jsonrpc library or by using Lotus as a library, but is a non-breaking change when using the API via any other RPC method as it adds an optional second argument.
([filecoin-project/lotus#12324](https://github.com/filecoin-project/lotus/pull/12324)).
- feat: Added `lotus-shed indexes inspect-events` health-check command ([filecoin-project/lotus#12346](https://github.com/filecoin-project/lotus/pull/12346)).

# v1.28.1 / 2024-07-24

Expand Down
195 changes: 193 additions & 2 deletions cmd/lotus-shed/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"math"
"path"
"path/filepath"
"strings"
Expand All @@ -27,9 +28,12 @@ import (
const (
// same as in chain/events/index.go
eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
eventCount = `SELECT COUNT(*) FROM event WHERE tipset_key_cid=?`
entryCount = `SELECT COUNT(*) FROM event_entry JOIN event ON event_entry.event_id=event.id WHERE event.tipset_key_cid=?`
insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false`
tipsetSeen = `SELECT height,reverted FROM events_seen WHERE tipset_key_cid=?`
)

func withCategory(cat string, cmd *cli.Command) *cli.Command {
Expand All @@ -46,12 +50,13 @@ var indexesCmd = &cli.Command{
withCategory("msgindex", pruneMsgIndexCmd),
withCategory("txhash", backfillTxHashCmd),
withCategory("events", backfillEventsCmd),
withCategory("events", inspectEventsCmd),
},
}

var backfillEventsCmd = &cli.Command{
Name: "backfill-events",
Usage: "Backfill the events.db for a number of epochs starting from a specified height",
Usage: "Backfill the events.db for a number of epochs starting from a specified height and working backward",
Flags: []cli.Flag{
&cli.UintFlag{
Name: "from",
Expand All @@ -61,7 +66,7 @@ var backfillEventsCmd = &cli.Command{
&cli.IntFlag{
Name: "epochs",
Value: 2000,
Usage: "the number of epochs to backfill",
Usage: "the number of epochs to backfill (working backwards)",
},
&cli.BoolFlag{
Name: "temporary-index",
Expand Down Expand Up @@ -387,6 +392,192 @@ var backfillEventsCmd = &cli.Command{
},
}

var inspectEventsCmd = &cli.Command{
Name: "inspect-events",
Usage: "Perform a health-check on the events.db for a number of epochs starting from a specified height and working backward. " +
"Logs tipsets with problems and optionally logs tipsets without problems. Without specifying a fixed number of epochs, " +
"the command will continue until it reaches a tipset that is not in the blockstore.",
Flags: []cli.Flag{
&cli.UintFlag{
Name: "from",
Value: 0,
Usage: "the tipset height to start inspecting from (0 is head of chain)",
},
&cli.IntFlag{
Name: "epochs",
Value: 0,
Usage: "the number of epochs to inspect (working backwards) [0 = until we reach a block we don't have]",
},
&cli.BoolFlag{
Name: "log-good",
Usage: "log tipsets that have no detected problems",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
srv, err := lcli.GetFullNodeServices(cctx)
if err != nil {
return err
}
defer srv.Close() //nolint:errcheck

api := srv.FullNodeAPI()
ctx := lcli.ReqContext(cctx)

// currTs will be the tipset where we start backfilling from
currTs, err := api.ChainHead(ctx)
if err != nil {
return err
}
if cctx.IsSet("from") {
// we need to fetch the tipset after the epoch being specified since we will need to advance currTs
currTs, err = api.ChainGetTipSetAfterHeight(ctx, abi.ChainEpoch(cctx.Int("from")+1), currTs.Key())
if err != nil {
return err
}
}

logGood := cctx.Bool("log-good")

// advance currTs by one epoch and maintain prevTs as the previous tipset (this allows us to easily use the ChainGetParentMessages/Receipt API)
prevTs := currTs
currTs, err = api.ChainGetTipSet(ctx, currTs.Parents())
if err != nil {
return fmt.Errorf("failed to load tipset %s: %w", prevTs.Parents(), err)
}

epochs := cctx.Int("epochs")
if epochs <= 0 {
epochs = math.MaxInt32
}

basePath, err := homedir.Expand(cctx.String("repo"))
if err != nil {
return err
}

dbPath := path.Join(basePath, "sqlite", "events.db")
db, err := sql.Open("sqlite3", dbPath+"?mode=ro")
if err != nil {
return err
}

defer func() {
err := db.Close()
if err != nil {
fmt.Printf("ERROR: closing db: %s", err)
}
}()

stmtEventCount, err := db.Prepare(eventCount)
if err != nil {
return err
}
stmtEntryCount, err := db.Prepare(entryCount)
if err != nil {
return err
}
stmtTipsetSeen, err := db.Prepare(tipsetSeen)
if err != nil {
return err
}

processHeight := func(ctx context.Context, ts *types.TipSet, receipts []*types.MessageReceipt) error {
tsKeyCid, err := ts.Key().Cid()
if err != nil {
return fmt.Errorf("failed to get tipset key cid: %w", err)
}

var expectEvents int
var expectEntries int

for _, receipt := range receipts {
if receipt.ExitCode != exitcode.Ok || receipt.EventsRoot == nil {
continue
}
events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot)
if err != nil {
return fmt.Errorf("failed to load events for tipset %s: %w", currTs, err)
}
expectEvents += len(events)
for _, event := range events {
expectEntries += len(event.Entries)
}
}

var problems []string

var seenHeight int
var seenReverted int
if err := stmtTipsetSeen.QueryRow(tsKeyCid.Bytes()).Scan(&seenHeight, &seenReverted); err != nil {
if err == sql.ErrNoRows {
if expectEvents > 0 {
problems = append(problems, "not in events_seen table")
} else {
problems = append(problems, "zero-event epoch not in events_seen table")
}
} else {
return fmt.Errorf("failed to check if tipset is seen: %w", err)
}
} else {
if seenHeight != int(ts.Height()) {
problems = append(problems, fmt.Sprintf("events_seen height mismatch (%d)", seenHeight))
}
if seenReverted != 0 {
problems = append(problems, "events_seen marked as reverted")
}
}

var actualEvents int
if err := stmtEventCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEvents); err != nil {
rvagg marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("failed to count events for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err)
}
var actualEntries int
if err := stmtEntryCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEntries); err != nil {
return fmt.Errorf("failed to count entries for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err)
}

if actualEvents != expectEvents {
problems = append(problems, fmt.Sprintf("expected %d events, got %d", expectEvents, actualEvents))
}
if actualEntries != expectEntries {
problems = append(problems, fmt.Sprintf("expected %d entries, got %d", expectEntries, actualEntries))
}

if len(problems) > 0 {
_, _ = fmt.Fprintf(cctx.App.Writer, "✗ Epoch %d (%s): %s\n", ts.Height(), tsKeyCid, problems)
} else if logGood {
_, _ = fmt.Fprintf(cctx.App.Writer, "✓ Epoch %d (%s)\n", ts.Height(), tsKeyCid)
}

return nil
}

for i := 0; ctx.Err() == nil && i < epochs; i++ {
// get receipts for the parent of the previous tipset (which will be currTs)
receipts, err := api.ChainGetParentReceipts(ctx, prevTs.Blocks()[0].Cid())
if err != nil {
_, _ = fmt.Fprintf(cctx.App.ErrWriter, "Missing parent receipts for epoch %d (checked %d epochs)", prevTs.Height(), i)
break
}

err = processHeight(ctx, currTs, receipts)
if err != nil {
return err
}

// advance prevTs and currTs up the chain
prevTs = currTs
currTs, err = api.ChainGetTipSet(ctx, currTs.Parents())
if err != nil {
return fmt.Errorf("failed to load tipset %s: %w", currTs, err)
}
}

return nil
},
}

var backfillMsgIndexCmd = &cli.Command{
Name: "backfill-msgindex",
Usage: "Backfill the msgindex.db for a number of epochs starting from a specified height",
Expand Down