diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 5122c5b92868..d2292423f936 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -1442,7 +1442,23 @@ lifecycler: # CLI flag: -ingester.flush-check-period [flush_check_period: | default = 30s] -# The timeout before a flush is cancelled. +flush_op_backoff: + # Minimum backoff period when a flush fails. Each concurrent flush has its own + # backoff, see `ingester.concurrent-flushes`. + # CLI flag: -ingester.flush-op-backoff-min-period + [min_period: | default = 10s] + + # Maximum backoff period when a flush fails. Each concurrent flush has its own + # backoff, see `ingester.concurrent-flushes`. + # CLI flag: -ingester.flush-op-backoff-max-period + [max_period: | default = 1m] + + # Maximum retries for failed flushes. + # CLI flag: -ingester.flush-op-backoff-retries + [max_retries: | default = 10] + +# The timeout for an individual flush. Will be retried up to +# `flush-op-backoff-retries` times. # CLI flag: -ingester.flush-op-timeout [flush_op_timeout: | default = 10m] diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index be4899325ed4..bffe1d2d372b 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -7,15 +7,16 @@ import ( "sync" "time" + "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/tenant" "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "golang.org/x/net/context" - "github.com/grafana/dskit/tenant" - "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/util" @@ -126,8 +127,9 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo } func (i *Ingester) flushLoop(j int) { + l := log.With(util_log.Logger, "loop", j) defer func() { - level.Debug(util_log.Logger).Log("msg", "Ingester.flushLoop() exited") + level.Debug(l).Log("msg", "Ingester.flushLoop() exited") i.flushQueuesDone.Done() }() @@ -138,9 +140,10 @@ func (i *Ingester) flushLoop(j int) { } op := o.(*flushOp) - err := i.flushUserSeries(op.userID, op.fp, op.immediate) + m := util_log.WithUserID(op.userID, l) + err := i.flushOp(m, op) if err != nil { - level.Error(util_log.WithUserID(op.userID, util_log.Logger)).Log("msg", "failed to flush", "err", err) + level.Error(m).Log("msg", "failed to flush", "err", err) } // If we're exiting & we failed to flush, put the failed operation @@ -152,7 +155,23 @@ func (i *Ingester) flushLoop(j int) { } } -func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error { +func (i *Ingester) flushOp(l log.Logger, op *flushOp) error { + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + + b := backoff.New(ctx, i.cfg.FlushOpBackoff) + for b.Ongoing() { + err := i.flushUserSeries(ctx, op.userID, op.fp, op.immediate) + if err == nil { + break + } + level.Error(l).Log("msg", "failed to flush", "retries", b.NumRetries(), "err", err) + b.Wait() + } + return b.Err() +} + +func (i *Ingester) flushUserSeries(ctx context.Context, userID string, fp model.Fingerprint, immediate bool) error { instance, ok := i.getInstanceByID(userID) if !ok { return nil @@ -166,9 +185,9 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat lbs := labels.String() level.Info(util_log.Logger).Log("msg", "flushing stream", "user", userID, "fp", fp, "immediate", immediate, "num_chunks", len(chunks), "labels", lbs) - ctx := user.InjectOrgID(context.Background(), userID) - ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout) - defer cancel() + ctx = user.InjectOrgID(ctx, userID) + ctx, cancelFunc := context.WithTimeout(ctx, i.cfg.FlushOpTimeout) + defer cancelFunc() err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx) if err != nil { return fmt.Errorf("failed to flush chunks: %w, num_chunks: %d, labels: %s", err, len(chunks), lbs) diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 904b4d582496..0ee3ecc93837 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -1,6 +1,7 @@ package ingester import ( + "errors" "fmt" "os" "sort" @@ -100,6 +101,67 @@ func Benchmark_FlushLoop(b *testing.B) { } } +func Test_FlushOp(t *testing.T) { + t.Run("no error", func(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.FlushOpBackoff.MinBackoff = time.Second + cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second + cfg.FlushOpBackoff.MaxRetries = 1 + cfg.FlushCheckPeriod = 100 * time.Millisecond + + _, ing := newTestStore(t, cfg, nil) + + ctx := user.InjectOrgID(context.Background(), "foo") + ins, err := ing.GetOrCreateInstance("foo") + require.NoError(t, err) + + lbs := makeRandomLabels() + req := &logproto.PushRequest{Streams: []logproto.Stream{{ + Labels: lbs.String(), + Entries: entries(5, time.Now()), + }}} + require.NoError(t, ins.Push(ctx, req)) + + time.Sleep(cfg.FlushCheckPeriod) + require.NoError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{ + immediate: true, + userID: "foo", + fp: ins.getHashForLabels(lbs), + })) + }) + + t.Run("max retries exceeded", func(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.FlushOpBackoff.MinBackoff = time.Second + cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second + cfg.FlushOpBackoff.MaxRetries = 1 + cfg.FlushCheckPeriod = 100 * time.Millisecond + + store, ing := newTestStore(t, cfg, nil) + store.onPut = func(_ context.Context, _ []chunk.Chunk) error { + return errors.New("failed to write chunks") + } + + ctx := user.InjectOrgID(context.Background(), "foo") + ins, err := ing.GetOrCreateInstance("foo") + require.NoError(t, err) + + lbs := makeRandomLabels() + req := &logproto.PushRequest{Streams: []logproto.Stream{{ + Labels: lbs.String(), + Entries: entries(5, time.Now()), + }}} + require.NoError(t, ins.Push(ctx, req)) + + time.Sleep(cfg.FlushCheckPeriod) + require.EqualError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{ + immediate: true, + userID: "foo", + fp: ins.getHashForLabels(lbs), + }), "terminated after 1 retries") + }) +} + func Test_Flush(t *testing.T) { var ( store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil) @@ -295,6 +357,10 @@ func defaultIngesterTestConfig(t testing.TB) Config { cfg := Config{} flagext.DefaultValues(&cfg) + cfg.FlushOpBackoff.MinBackoff = 100 * time.Millisecond + cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second + cfg.FlushOpBackoff.MaxRetries = 1 + cfg.FlushOpTimeout = 15 * time.Second cfg.FlushCheckPeriod = 99999 * time.Hour cfg.MaxChunkIdle = 99999 * time.Hour cfg.ConcurrentFlushes = 1 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index c6b339baa03f..ea6a019d511c 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -13,6 +13,7 @@ import ( "time" "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/modules" "github.com/grafana/dskit/multierror" @@ -80,6 +81,7 @@ type Config struct { ConcurrentFlushes int `yaml:"concurrent_flushes"` FlushCheckPeriod time.Duration `yaml:"flush_check_period"` + FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"` FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` RetainPeriod time.Duration `yaml:"chunk_retain_period"` MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` @@ -123,7 +125,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 0, "Number of times to try and transfer chunks before falling back to flushing. If set to 0 or negative value, transfers are disabled.") f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.") f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.") - f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is cancelled.") + f.DurationVar(&cfg.FlushOpBackoff.MinBackoff, "ingester.flush-op-backoff-min-period", 10*time.Second, "Minimum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.") + f.DurationVar(&cfg.FlushOpBackoff.MaxBackoff, "ingester.flush-op-backoff-max-period", time.Minute, "Maximum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.") + f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester.flush-op-backoff-retries", 10, "Maximum retries for failed flushes.") + f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout for an individual flush. Will be retried up to `flush-op-backoff-retries` times.") f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.") f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.") f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.") @@ -151,8 +156,14 @@ func (cfg *Config) Validate() error { return err } - if cfg.MaxTransferRetries > 0 && cfg.WAL.Enabled { - return errors.New("the use of the write ahead log (WAL) is incompatible with chunk transfers. It's suggested to use the WAL. Please try setting ingester.max-transfer-retries to 0 to disable transfers") + if cfg.FlushOpBackoff.MinBackoff > cfg.FlushOpBackoff.MaxBackoff { + return errors.New("invalid flush op min backoff: cannot be larger than max backoff") + } + if cfg.FlushOpBackoff.MaxRetries <= 0 { + return fmt.Errorf("invalid flush op max retries: %d", cfg.FlushOpBackoff.MaxRetries) + } + if cfg.FlushOpTimeout <= 0 { + return fmt.Errorf("invalid flush op timeout: %s", cfg.FlushOpTimeout) } if cfg.IndexShards <= 0 { diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index b453e5a9ea0a..e95491d9f0b3 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/middleware" @@ -660,57 +661,119 @@ func TestIngester_asyncStoreMaxLookBack(t *testing.T) { func TestValidate(t *testing.T) { for i, tc := range []struct { - in Config - err bool - expected Config + in Config + expected Config + expectedErr string }{ { in: Config{ - MaxChunkAge: time.Minute, ChunkEncoding: chunkenc.EncGZIP.String(), - IndexShards: index.DefaultIndexShards, + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, + IndexShards: index.DefaultIndexShards, + MaxChunkAge: time.Minute, }, expected: Config{ + ChunkEncoding: chunkenc.EncGZIP.String(), + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, + IndexShards: index.DefaultIndexShards, MaxChunkAge: time.Minute, - ChunkEncoding: chunkenc.EncGZIP.String(), parsedEncoding: chunkenc.EncGZIP, - IndexShards: index.DefaultIndexShards, }, }, { in: Config{ ChunkEncoding: chunkenc.EncSnappy.String(), - IndexShards: index.DefaultIndexShards, + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, + IndexShards: index.DefaultIndexShards, }, expected: Config{ - ChunkEncoding: chunkenc.EncSnappy.String(), - parsedEncoding: chunkenc.EncSnappy, + ChunkEncoding: chunkenc.EncSnappy.String(), + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, IndexShards: index.DefaultIndexShards, + parsedEncoding: chunkenc.EncSnappy, }, }, { in: Config{ - IndexShards: index.DefaultIndexShards, ChunkEncoding: "bad-enc", + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, + IndexShards: index.DefaultIndexShards, + }, + expectedErr: "invalid encoding: bad-enc, supported: none, gzip, lz4-64k, snappy, lz4-256k, lz4-1M, lz4, flate, zstd", + }, + { + in: Config{ + ChunkEncoding: chunkenc.EncGZIP.String(), + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + }, + FlushOpTimeout: 15 * time.Second, + IndexShards: index.DefaultIndexShards, + MaxChunkAge: time.Minute, + }, + expectedErr: "invalid flush op max retries: 0", + }, + { + in: Config{ + ChunkEncoding: chunkenc.EncGZIP.String(), + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + IndexShards: index.DefaultIndexShards, + MaxChunkAge: time.Minute, }, - err: true, + expectedErr: "invalid flush op timeout: 0s", }, { in: Config{ - MaxChunkAge: time.Minute, ChunkEncoding: chunkenc.EncGZIP.String(), + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, + FlushOpTimeout: 15 * time.Second, + MaxChunkAge: time.Minute, }, - err: true, + expectedErr: "invalid ingester index shard factor: 0", }, } { t.Run(fmt.Sprint(i), func(t *testing.T) { err := tc.in.Validate() - if tc.err { - require.NotNil(t, err) - return + if tc.expectedErr != "" { + require.EqualError(t, err, tc.expectedErr) + } else { + require.NoError(t, err) + require.Equal(t, tc.expected, tc.in) } - require.Nil(t, err) - require.Equal(t, tc.expected, tc.in) }) } } diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index ff54db7c33c2..729b32e55388 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/flagext" "github.com/pkg/errors" "github.com/prometheus/common/model" @@ -30,9 +31,15 @@ import ( func defaultConfig() *Config { cfg := Config{ - BlockSize: 512, - ChunkEncoding: "gzip", - IndexShards: 32, + BlockSize: 512, + ChunkEncoding: "gzip", + IndexShards: 32, + FlushOpTimeout: 15 * time.Second, + FlushOpBackoff: backoff.Config{ + MinBackoff: 100 * time.Millisecond, + MaxBackoff: 10 * time.Second, + MaxRetries: 1, + }, } if err := cfg.Validate(); err != nil { panic(errors.Wrap(err, "error building default test config"))