Skip to content

Commit

Permalink
Cherry-picks for 2.10.21-RC.2 (#5911)
Browse files Browse the repository at this point in the history
Includes the following:

- #5901
- #5904
- #5900
- #5906
- #5908
- #5907

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander authored Sep 20, 2024
2 parents 290ae91 + 4683479 commit fd3a2e6
Show file tree
Hide file tree
Showing 13 changed files with 689 additions and 269 deletions.
137 changes: 18 additions & 119 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1748,134 +1748,33 @@ func (fs *fileStore) recoverFullState() (rerr error) {
return errPriorState
}
if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// If we are tracking max msgs per subject and we are not up to date we should rebuild.
if fs.cfg.MaxMsgsPer > 0 {
fs.warn("Stream state block state outdated, will rebuild")
return errPriorState
}

// Remove the last message block since recover will add in the new one.
fs.removeMsgBlockFromList(mb)
// Reverse update of tracking state for this mb, will add new state in below.
mstate.Msgs -= mb.msgs
mstate.Bytes -= mb.bytes
if nmb, err := fs.recoverMsgBlock(mb.index); err != nil && !os.IsNotExist(err) {
fs.warn("Stream state could not recover last msg block")
os.Remove(fn)
return errCorruptState
} else if nmb != nil {
fs.adjustAccounting(mb, nmb)
updateTrackingState(&mstate, nmb)
}
}

// On success double check our state.
checkState := func() error {
// We check first and last seq and number of msgs and bytes. If there is a difference,
// return and error so we rebuild from the message block state on disk.
if !trackingStatesEqual(&fs.state, &mstate) {
fs.warn("Stream state encountered internal inconsistency on recover")
os.Remove(fn)
return errCorruptState
}
return nil
}

// We may need to check other blocks. Even if we matched last checksum we will see if there is another block.
for bi := blkIndex + 1; ; bi++ {
nmb, err := fs.recoverMsgBlock(bi)
if err != nil {
if os.IsNotExist(err) {
return checkState()
}
os.Remove(fn)
fs.warn("Stream state could not recover msg block %d", bi)
return err
}
if nmb != nil {
// Update top level accounting
if fseq := atomic.LoadUint64(&nmb.first.seq); fs.state.FirstSeq == 0 || fseq < fs.state.FirstSeq {
fs.state.FirstSeq = fseq
if nmb.first.ts == 0 {
fs.state.FirstTime = time.Time{}
} else {
fs.state.FirstTime = time.Unix(0, nmb.first.ts).UTC()
}
}
if lseq := atomic.LoadUint64(&nmb.last.seq); lseq > fs.state.LastSeq {
fs.state.LastSeq = lseq
if mb.last.ts == 0 {
fs.state.LastTime = time.Time{}
} else {
fs.state.LastTime = time.Unix(0, nmb.last.ts).UTC()
}
}
fs.state.Msgs += nmb.msgs
fs.state.Bytes += nmb.bytes
updateTrackingState(&mstate, nmb)
}
// Detected a stale index.db, we didn't write it upon shutdown so can't rely on it being correct.
fs.warn("Stream state outdated, last block has additional entries, will rebuild")
return errPriorState
}
}

// adjustAccounting will be called when a stream state was only partially accounted for
// within a message block, e.g. additional records were added after the stream state.
// Lock should be held.
func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) {
nmb.mu.Lock()
defer nmb.mu.Unlock()
// We need to see if any blocks exist after our last one even though we matched the last record exactly.
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
var dirs []os.DirEntry

// First make sure the new block is loaded.
if nmb.cacheNotLoaded() {
nmb.loadMsgsWithLock()
<-dios
if f, err := os.Open(mdir); err == nil {
dirs, _ = f.ReadDir(-1)
f.Close()
}
nmb.ensurePerSubjectInfoLoaded()

var smv StoreMsg
dios <- struct{}{}

// Need to walk previous messages and undo psim stats.
// We already undid msgs and bytes accounting.
for seq, lseq := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq); seq <= lseq; seq++ {
// Lookup the message. If an error will be deleted, so can skip.
sm, err := nmb.cacheLookup(seq, &smv)
if err != nil {
continue
}
if len(sm.subj) > 0 && fs.psim != nil {
if info, ok := fs.psim.Find(stringToBytes(sm.subj)); ok {
info.total--
var index uint32
for _, fi := range dirs {
if n, err := fmt.Sscanf(fi.Name(), blkScan, &index); err == nil && n == 1 {
if index > blkIndex {
fs.warn("Stream state outdated, found extra blocks, will rebuild")
return errPriorState
}
}
}

// Walk only new messages and update accounting at fs level. Any messages that should have
// triggered limits exceeded will be handled after the recovery and prior to the stream
// being available to the system.
for seq, lseq := atomic.LoadUint64(&mb.last.seq)+1, atomic.LoadUint64(&nmb.last.seq); seq <= lseq; seq++ {
// Lookup the message. If an error will be deleted, so can skip.
sm, err := nmb.cacheLookup(seq, &smv)
if err != nil {
continue
}
// Since we found it we just need to adjust fs totals and psim.
fs.state.Msgs++
fs.state.Bytes += fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
}

// Now check to see if we had a higher first for the recovered state mb vs nmb.
if atomic.LoadUint64(&nmb.first.seq) < atomic.LoadUint64(&mb.first.seq) {
// Now set first for nmb.
atomic.StoreUint64(&nmb.first.seq, atomic.LoadUint64(&mb.first.seq))
}

// Update top level accounting.
if fseq := atomic.LoadUint64(&nmb.first.seq); fs.state.FirstSeq == 0 || fseq < fs.state.FirstSeq {
fs.state.FirstSeq = fseq
fs.state.FirstTime = time.Unix(0, nmb.first.ts).UTC()
}
if lseq := atomic.LoadUint64(&nmb.last.seq); lseq > fs.state.LastSeq {
fs.state.LastSeq = lseq
fs.state.LastTime = time.Unix(0, nmb.last.ts).UTC()
}
return nil
}

// Grabs last checksum for the named block file.
Expand Down
66 changes: 66 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7540,6 +7540,72 @@ func TestFileStoreDmapBlockRecoverAfterCompact(t *testing.T) {
require_Equal(t, dmap.Size(), 4)
}

func TestFileStoreRestoreIndexWithMatchButLeftOverBlocks(t *testing.T) {
sd := t.TempDir()
fs, err := newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 256},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage, MaxMsgsPer: 1})
require_NoError(t, err)
defer fs.Stop()

msg := []byte("hello")

// 6 msgs per block.
// Fill the first 2 blocks.
for i := 1; i <= 12; i++ {
fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg)
}
require_Equal(t, fs.numMsgBlocks(), 2)

// We will now stop which will create the index.db file which will
// match the last record exactly.
sfile := filepath.Join(sd, msgDir, streamStreamStateFile)
fs.Stop()

// Grab it since we will put it back.
buf, err := os.ReadFile(sfile)
require_NoError(t, err)
require_True(t, len(buf) > 0)

// Now do an additional block, but with the MaxMsgsPer this will remove the first block,
// but leave the second so on recovery will match the checksum for the last msg in second block.

fs, err = newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 256},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage, MaxMsgsPer: 1})
require_NoError(t, err)
defer fs.Stop()

for i := 1; i <= 6; i++ {
fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg)
}

// Grab correct state, we will use it to make sure we do the right thing.
var state StreamState
fs.FastState(&state)

require_Equal(t, state.Msgs, 12)
require_Equal(t, state.FirstSeq, 7)
require_Equal(t, state.LastSeq, 18)
// This will be block 2 and 3.
require_Equal(t, fs.numMsgBlocks(), 2)

fs.Stop()
// Put old stream state back.
require_NoError(t, os.WriteFile(sfile, buf, defaultFilePerms))

fs, err = newFileStore(
FileStoreConfig{StoreDir: sd, BlockSize: 256},
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage, MaxMsgsPer: 1})
require_NoError(t, err)
defer fs.Stop()

fs.FastState(&state)
require_Equal(t, state.Msgs, 12)
require_Equal(t, state.FirstSeq, 7)
require_Equal(t, state.LastSeq, 18)
}

///////////////////////////////////////////////////////////////////////////
// Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
4 changes: 4 additions & 0 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type jetStream struct {
storeReserved int64
memUsed int64
storeUsed int64
queueLimit int64
clustered int32
mu sync.RWMutex
srv *Server
Expand Down Expand Up @@ -377,6 +378,9 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error {
}
s.gcbMu.Unlock()

// TODO: Not currently reloadable.
atomic.StoreInt64(&js.queueLimit, s.getOpts().JetStreamRequestQueueLimit)

s.js.Store(js)

// FIXME(dlc) - Allow memory only operation?
Expand Down
25 changes: 22 additions & 3 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ const (
// JSAdvisoryServerRemoved notification that a server has been removed from the system.
JSAdvisoryServerRemoved = "$JS.EVENT.ADVISORY.SERVER.REMOVED"

// JSAdvisoryAPILimitReached notification that a server has reached the JS API hard limit.
JSAdvisoryAPILimitReached = "$JS.EVENT.ADVISORY.API.LIMIT_REACHED"

// JSAuditAdvisory is a notification about JetStream API access.
// FIXME - Add in details about who..
JSAuditAdvisory = "$JS.EVENT.ADVISORY.API"
Expand Down Expand Up @@ -346,6 +349,10 @@ const JSMaxMetadataLen = 128 * 1024
// Picked 255 as it seems to be a widely used file name limit
const JSMaxNameLen = 255

// JSDefaultRequestQueueLimit is the default number of entries that we will
// put on the global request queue before we react.
const JSDefaultRequestQueueLimit = 10_000

// Responses for API calls.

// ApiResponse is a standard response from the JetStream JSON API
Expand Down Expand Up @@ -825,10 +832,22 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
// Copy the state. Note the JSAPI only uses the hdr index to piece apart the
// header from the msg body. No other references are needed.
// Check pending and warn if getting backed up.
const warnThresh = 128
pending := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
if pending >= warnThresh {
s.rateLimitFormatWarnf("JetStream request queue has high pending count: %d", pending)
limit := atomic.LoadInt64(&js.queueLimit)
if pending >= int(limit) {
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
s.jsAPIRoutedReqs.drain()

s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
TypedEvent: TypedEvent{
Type: JSAPILimitReachedAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Server: s.Name(),
Domain: js.config.Domain,
Dropped: int64(pending),
})
}
}

Expand Down
74 changes: 73 additions & 1 deletion server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package server

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
Expand Down Expand Up @@ -2055,7 +2056,7 @@ func TestJetStreamClusterAndNamesWithSpaces(t *testing.T) {
}
accounts {
sys {
sys {
users = [ { user: sys, pass: sys } ] }
js {
jetstream: enabled
Expand Down Expand Up @@ -3445,3 +3446,74 @@ func TestJetStreamClusterAckDeleted(t *testing.T) {
)
}
}

func TestJetStreamClusterAPILimitDefault(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

for _, s := range c.servers {
s.optsMu.RLock()
lim := s.opts.JetStreamRequestQueueLimit
s.optsMu.RUnlock()

require_Equal(t, lim, JSDefaultRequestQueueLimit)
require_Equal(t, atomic.LoadInt64(&s.getJetStream().queueLimit), JSDefaultRequestQueueLimit)
}
}

func TestJetStreamClusterAPILimitAdvisory(t *testing.T) {
// Hit the limit straight away.
const queueLimit = 1

config := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {
max_mem_store: 256MB
max_file_store: 2GB
store_dir: '%s'
request_queue_limit: ` + fmt.Sprintf("%d", queueLimit) + `
}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } }
`
c := createJetStreamClusterWithTemplate(t, config, "R3S", 3)
defer c.shutdown()

c.waitOnLeader()
s := c.randomNonLeader()

for _, s := range c.servers {
lim := atomic.LoadInt64(&s.getJetStream().queueLimit)
require_Equal(t, lim, queueLimit)
}

nc, _ := jsClientConnect(t, s)
defer nc.Close()

snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()

sub, err := snc.SubscribeSync(JSAdvisoryAPILimitReached)
require_NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

require_NoError(t, nc.PublishMsg(&nats.Msg{
Subject: fmt.Sprintf(JSApiConsumerListT, "TEST"),
Reply: nc.NewInbox(),
}))

// Wait for the advisory to come in.
msg, err := sub.NextMsgWithContext(ctx)
require_NoError(t, err)
var advisory JSAPILimitReachedAdvisory
require_NoError(t, json.Unmarshal(msg.Data, &advisory))
require_Equal(t, advisory.Domain, _EMPTY_) // No JetStream domain was set.
require_Equal(t, advisory.Dropped, queueLimit) // Configured queue limit.
}
11 changes: 11 additions & 0 deletions server/jetstream_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,14 @@ type JSServerRemovedAdvisory struct {
Cluster string `json:"cluster"`
Domain string `json:"domain,omitempty"`
}

// JSAPILimitReachedAdvisoryType is sent when the JS API request queue limit is reached.
const JSAPILimitReachedAdvisoryType = "io.nats.jetstream.advisory.v1.api_limit_reached"

// JSAPILimitReachedAdvisory is a advisory published when JetStream hits the queue length limit.
type JSAPILimitReachedAdvisory struct {
TypedEvent
Server string `json:"server"` // Server that created the event, name or ID
Domain string `json:"domain,omitempty"` // Domain the server belongs to
Dropped int64 `json:"dropped"` // How many messages did we drop from the queue
}
Loading

0 comments on commit fd3a2e6

Please sign in to comment.