Skip to content

Commit

Permalink
support allow-out-of-order-writes
Browse files Browse the repository at this point in the history
  • Loading branch information
chengshiwen committed Oct 16, 2022
1 parent 19ed984 commit f393b49
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
1 change: 1 addition & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {

// Initialize points writer.
s.PointsWriter = coordinator.NewPointsWriter()
s.PointsWriter.AllowOutOfOrderWrites = c.Coordinator.AllowOutOfOrderWrites
s.PointsWriter.WriteTimeout = time.Duration(c.Coordinator.WriteTimeout)
s.PointsWriter.TSDBStore = s.TSDBStore
s.PointsWriter.ShardWriter = s.ShardWriter
Expand Down
20 changes: 11 additions & 9 deletions coordinator/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ var (

// PointsWriter handles writes across multiple local and remote data nodes.
type PointsWriter struct {
mu sync.RWMutex
closing chan struct{}
WriteTimeout time.Duration
Logger *zap.Logger
mu sync.RWMutex
closing chan struct{}
AllowOutOfOrderWrites bool
WriteTimeout time.Duration
Logger *zap.Logger

MetaClient interface {
NodeID() uint64
Expand Down Expand Up @@ -103,10 +104,11 @@ func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp
// NewPointsWriter returns a new instance of PointsWriter for a node.
func NewPointsWriter() *PointsWriter {
return &PointsWriter{
closing: make(chan struct{}),
WriteTimeout: DefaultWriteTimeout,
Logger: zap.NewNop(),
stats: &WriteStatistics{},
closing: make(chan struct{}),
AllowOutOfOrderWrites: false,
WriteTimeout: DefaultWriteTimeout,
Logger: zap.NewNop(),
stats: &WriteStatistics{},
}
}

Expand Down Expand Up @@ -538,7 +540,7 @@ func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta.
return
}

if !w.HintedHandoff.Empty(shardID, owner.NodeID) {
if !w.AllowOutOfOrderWrites && !w.HintedHandoff.Empty(shardID, owner.NodeID) {
atomic.AddInt64(&w.stats.PointWriteReqHH, int64(len(points)))
hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)
if hherr != nil {
Expand Down

0 comments on commit f393b49

Please sign in to comment.