From d8c50a9220336ea55cdc8fdf19c954e3da7b8637 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Tue, 6 Aug 2024 14:49:22 +1000 Subject: [PATCH] feat: events: add `lotus-shed indexes inspect-events` health-check --- CHANGELOG.md | 2 +- cmd/lotus-shed/indexes.go | 195 +++++++++++++++++++++++++++++++++++++- 2 files changed, 194 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c519d607778..e0201118a6c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,9 +22,9 @@ ## New features - feat: Add trace filter API supporting RPC method `trace_filter` ([filecoin-project/lotus#12123](https://github.com/filecoin-project/lotus/pull/12123)). Configuring `EthTraceFilterMaxResults` sets a limit on how many results are returned in any individual `trace_filter` RPC API call. - - feat: `FilecoinAddressToEthAddress` RPC can now return ETH addresses for all Filecoin address types ("f0"/"f1"/"f2"/"f3") based on client's re-org tolerance. This is a breaking change if you are using the API via the go-jsonrpc library or by using Lotus as a library, but is a non-breaking change when using the API via any other RPC method as it adds an optional second argument. ([filecoin-project/lotus#12324](https://github.com/filecoin-project/lotus/pull/12324)). +- feat: Added `lotus-shed indexes inspect-events` health-check command ([filecoin-project/lotus#12346](https://github.com/filecoin-project/lotus/pull/12346)). # v1.28.1 / 2024-07-24 diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index f81f89f1eab..07c934d5162 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "math" "path" "path/filepath" "strings" @@ -27,9 +28,12 @@ import ( const ( // same as in chain/events/index.go eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` + eventCount = `SELECT COUNT(*) FROM event WHERE tipset_key_cid=?` + entryCount = `SELECT COUNT(*) FROM event_entry JOIN event ON event_entry.event_id=event.id WHERE event.tipset_key_cid=?` insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` + tipsetSeen = `SELECT height,reverted FROM events_seen WHERE tipset_key_cid=?` ) func withCategory(cat string, cmd *cli.Command) *cli.Command { @@ -46,12 +50,13 @@ var indexesCmd = &cli.Command{ withCategory("msgindex", pruneMsgIndexCmd), withCategory("txhash", backfillTxHashCmd), withCategory("events", backfillEventsCmd), + withCategory("events", inspectEventsCmd), }, } var backfillEventsCmd = &cli.Command{ Name: "backfill-events", - Usage: "Backfill the events.db for a number of epochs starting from a specified height", + Usage: "Backfill the events.db for a number of epochs starting from a specified height and working backward", Flags: []cli.Flag{ &cli.UintFlag{ Name: "from", @@ -61,7 +66,7 @@ var backfillEventsCmd = &cli.Command{ &cli.IntFlag{ Name: "epochs", Value: 2000, - Usage: "the number of epochs to backfill", + Usage: "the number of epochs to backfill (working backwards)", }, &cli.BoolFlag{ Name: "temporary-index", @@ -387,6 +392,192 @@ var backfillEventsCmd = &cli.Command{ }, } +var inspectEventsCmd = &cli.Command{ + Name: "inspect-events", + Usage: "Perform a health-check on the events.db for a number of epochs starting from a specified height and working backward. " + + "Logs tipsets with problems and optionally logs tipsets without problems. Without specifying a fixed number of epochs, " + + "the command will continue until it reaches a tipset that is not in the blockstore.", + Flags: []cli.Flag{ + &cli.UintFlag{ + Name: "from", + Value: 0, + Usage: "the tipset height to start inspecting from (0 is head of chain)", + }, + &cli.IntFlag{ + Name: "epochs", + Value: 0, + Usage: "the number of epochs to inspect (working backwards) [0 = until we reach a block we don't have]", + }, + &cli.BoolFlag{ + Name: "log-good", + Usage: "log tipsets that have no detected problems", + Value: false, + }, + }, + Action: func(cctx *cli.Context) error { + srv, err := lcli.GetFullNodeServices(cctx) + if err != nil { + return err + } + defer srv.Close() //nolint:errcheck + + api := srv.FullNodeAPI() + ctx := lcli.ReqContext(cctx) + + // currTs will be the tipset where we start backfilling from + currTs, err := api.ChainHead(ctx) + if err != nil { + return err + } + if cctx.IsSet("from") { + // we need to fetch the tipset after the epoch being specified since we will need to advance currTs + currTs, err = api.ChainGetTipSetAfterHeight(ctx, abi.ChainEpoch(cctx.Int("from")+1), currTs.Key()) + if err != nil { + return err + } + } + + logGood := cctx.Bool("log-good") + + // advance currTs by one epoch and maintain prevTs as the previous tipset (this allows us to easily use the ChainGetParentMessages/Receipt API) + prevTs := currTs + currTs, err = api.ChainGetTipSet(ctx, currTs.Parents()) + if err != nil { + return fmt.Errorf("failed to load tipset %s: %w", prevTs.Parents(), err) + } + + epochs := cctx.Int("epochs") + if epochs <= 0 { + epochs = math.MaxInt32 + } + + basePath, err := homedir.Expand(cctx.String("repo")) + if err != nil { + return err + } + + dbPath := path.Join(basePath, "sqlite", "events.db") + db, err := sql.Open("sqlite3", dbPath+"?mode=ro") + if err != nil { + return err + } + + defer func() { + err := db.Close() + if err != nil { + fmt.Printf("ERROR: closing db: %s", err) + } + }() + + stmtEventCount, err := db.Prepare(eventCount) + if err != nil { + return err + } + stmtEntryCount, err := db.Prepare(entryCount) + if err != nil { + return err + } + stmtTipsetSeen, err := db.Prepare(tipsetSeen) + if err != nil { + return err + } + + processHeight := func(ctx context.Context, ts *types.TipSet, receipts []*types.MessageReceipt) error { + tsKeyCid, err := ts.Key().Cid() + if err != nil { + return fmt.Errorf("failed to get tipset key cid: %w", err) + } + + var expectEvents int + var expectEntries int + + for _, receipt := range receipts { + if receipt.ExitCode != exitcode.Ok || receipt.EventsRoot == nil { + continue + } + events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot) + if err != nil { + return fmt.Errorf("failed to load events for tipset %s: %w", currTs, err) + } + expectEvents += len(events) + for _, event := range events { + expectEntries += len(event.Entries) + } + } + + var problems []string + + var seenHeight int + var seenReverted int + if err := stmtTipsetSeen.QueryRow(tsKeyCid.Bytes()).Scan(&seenHeight, &seenReverted); err != nil { + if err == sql.ErrNoRows { + if expectEvents > 0 { + problems = append(problems, "not in events_seen table") + } else { + problems = append(problems, "zero-event epoch not in events_seen table") + } + } else { + return fmt.Errorf("failed to check if tipset is seen: %w", err) + } + } else { + if seenHeight != int(ts.Height()) { + problems = append(problems, fmt.Sprintf("events_seen height mismatch (%d)", seenHeight)) + } + if seenReverted != 0 { + problems = append(problems, "events_seen marked as reverted") + } + } + + var actualEvents int + if err := stmtEventCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEvents); err != nil { + return fmt.Errorf("failed to count events for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err) + } + var actualEntries int + if err := stmtEntryCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEntries); err != nil { + return fmt.Errorf("failed to count entries for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err) + } + + if actualEvents != expectEvents { + problems = append(problems, fmt.Sprintf("expected %d events, got %d", expectEvents, actualEvents)) + } + if actualEntries != expectEntries { + problems = append(problems, fmt.Sprintf("expected %d entries, got %d", expectEntries, actualEntries)) + } + + if len(problems) > 0 { + _, _ = fmt.Fprintf(cctx.App.Writer, "✗ Epoch %d (%s): %s\n", ts.Height(), tsKeyCid, problems) + } else if logGood { + _, _ = fmt.Fprintf(cctx.App.Writer, "✓ Epoch %d (%s)\n", ts.Height(), tsKeyCid) + } + + return nil + } + + for i := 0; ctx.Err() == nil && i < epochs; i++ { + // get receipts for the parent of the previous tipset (which will be currTs) + receipts, err := api.ChainGetParentReceipts(ctx, prevTs.Blocks()[0].Cid()) + if err != nil { + _, _ = fmt.Fprintf(cctx.App.ErrWriter, "Missing parent receipts for epoch %d (checked %d epochs)", prevTs.Height(), i) + break + } + + err = processHeight(ctx, currTs, receipts) + if err != nil { + return err + } + + // advance prevTs and currTs up the chain + prevTs = currTs + currTs, err = api.ChainGetTipSet(ctx, currTs.Parents()) + if err != nil { + return fmt.Errorf("failed to load tipset %s: %w", currTs, err) + } + } + + return nil + }, +} + var backfillMsgIndexCmd = &cli.Command{ Name: "backfill-msgindex", Usage: "Backfill the msgindex.db for a number of epochs starting from a specified height",