Skip to content

Commit

Permalink
fix and improve hinted handoff
Browse files Browse the repository at this point in the history
- optimize the load order of segments in hh
- optimize marshalWrite and unmarshalWrite in node processor
- support max-writes-pending in hh
- do not queue partial write errors to hinted handoff
- prevent the hinted handoff from becoming blocked if it encounters field type errors
- fix issue where read bytes, blocked writes and dropped writes were not recorded in hh
- ensure the hinted handoff (hh) queue makes forward progress when segment errors occur
- verify and truncate the queue segment files if any are corrupted upon node startup
- improve hinted handoff metrics
- append bytes to buffer in hh queue to avoid OOM
  • Loading branch information
chengshiwen committed Oct 16, 2022
1 parent beed46a commit dfa5aa3
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 72 deletions.
18 changes: 5 additions & 13 deletions coordinator/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -554,7 +553,7 @@ func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta.

atomic.AddInt64(&w.stats.PointWriteReqRemote, int64(len(points)))
err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points)
if err != nil && isRetryable(err) {
if err != nil && hh.IsRetryable(err) {
// The remote write failed so queue it via hinted handoff
atomic.AddInt64(&w.stats.PointWriteReqHH, int64(len(points)))
hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)
Expand Down Expand Up @@ -594,6 +593,10 @@ func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta.
atomic.AddInt64(&w.stats.WriteErr, 1)
w.Logger.Error("Write failed", zap.Uint64("node_id", result.Owner.NodeID), zap.Uint64("shard_id", shard.ID), zap.Error(result.Err))

if result.Err.Error() == hh.ErrQueueBlocked.Error() {
continue
}

// Keep track of the first error we see to return back to the client
if writeError == nil {
writeError = result.Err
Expand Down Expand Up @@ -622,14 +625,3 @@ func (w *PointsWriter) writeToShardWithContext(ctx context.Context, shard *meta.

return ErrWriteFailed
}

// isRetryable returns true if this error is temporary and could be retried
func isRetryable(err error) bool {
if err == nil {
return true
}
if strings.HasPrefix(err.Error(), tsdb.ErrFieldTypeConflict.Error()) {
return false
}
return true
}
3 changes: 3 additions & 0 deletions coordinator/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func (w *WriteShardRequest) RetentionPolicy() string { return w.pb.GetRetentionP
// Points returns the time series Points
func (w *WriteShardRequest) Points() []models.Point { return w.unmarshalPoints() }

// SetBinaryPoints sets the time series binary points
func (w *WriteShardRequest) SetBinaryPoints(points [][]byte) { w.pb.Points = points }

// AddPoint adds a new time series point
func (w *WriteShardRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) {
pt, err := models.NewPoint(
Expand Down
15 changes: 14 additions & 1 deletion coordinator/shard_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ func NewShardWriter(timeout, dialTimeout, idleTimeout time.Duration, maxIdleStre

// WriteShard writes time series points to a shard
func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) error {
pts := make([][]byte, 0, len(points))
for _, p := range points {
b, err := p.MarshalBinary()
if err != nil {
continue
}
pts = append(pts, b)
}
return w.WriteShardBinary(shardID, ownerID, pts)
}

// WriteShardBinary writes time series binary points to a shard
func (w *ShardWriter) WriteShardBinary(shardID, ownerID uint64, points [][]byte) error {
c, err := w.dial(ownerID)
if err != nil {
return err
Expand Down Expand Up @@ -67,7 +80,7 @@ func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point)
request.SetShardID(shardID)
request.SetDatabase(db)
request.SetRetentionPolicy(rp)
request.AddPoints(points)
request.SetBinaryPoints(points)

// Marshal into protocol buffers.
buf, err := request.MarshalBinary()
Expand Down
2 changes: 1 addition & 1 deletion etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ dir = "/var/lib/influxdb/hh"

# Maximum number of writes into the hinted-handoff queue that can be pending.
# This is writes incoming to the hh queue, not outbound from the queue.
# max-pending-writes = 1024
# max-writes-pending = 1024

###
### [anti-entropy]
Expand Down
3 changes: 3 additions & 0 deletions services/hh/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ func (c *Config) Validate() error {
if c.PurgeInterval <= 0 {
return errors.New("purge-interval must be positive")
}
if c.MaxWritesPending < 0 {
return errors.New("max-writes-pending must be non-negative")
}

return nil
}
Expand Down
87 changes: 69 additions & 18 deletions services/hh/node_processor.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package hh

import (
"encoding/binary"
"fmt"
"io"
"os"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -20,6 +22,7 @@ type NodeProcessor struct {
RetryInterval time.Duration // Interval between periodic write-to-node attempts.
RetryMaxInterval time.Duration // Max interval between periodic write-to-node attempts.
RetryRateLimit int64 // Limits the rate data is sent to node.
MaxWritesPending int // Maximum number of incoming pending writes.
MaxSize int64 // Maximum size an underlying queue can get.
MaxAge time.Duration // Maximum age queue data can get before purging.
nodeID uint64
Expand Down Expand Up @@ -47,6 +50,7 @@ func NewNodeProcessor(cfg Config, nodeID, shardID uint64, dir string, w shardWri
RetryInterval: time.Duration(cfg.RetryInterval),
RetryMaxInterval: time.Duration(cfg.RetryMaxInterval),
RetryRateLimit: cfg.RetryRateLimit,
MaxWritesPending: cfg.MaxWritesPending,
MaxSize: cfg.MaxSize,
MaxAge: time.Duration(cfg.MaxAge),
nodeID: nodeID,
Expand Down Expand Up @@ -84,7 +88,7 @@ func (n *NodeProcessor) Open() error {
}

// Create the queue of hinted-handoff data.
queue, err := newQueue(n.dir, n.MaxSize)
queue, err := newQueue(n.dir, n.MaxSize, n.MaxWritesPending)
if err != nil {
return err
}
Expand Down Expand Up @@ -126,7 +130,7 @@ func (n *NodeProcessor) Statistics(tags map[string]string) []models.Statistic {
statBytesRead: atomic.LoadInt64(&n.stats.BytesRead),
statBytesWritten: atomic.LoadInt64(&n.stats.BytesWritten),
statQueueBytes: n.queue.diskUsage(),
statQueueDepth: len(n.queue.segments),
statQueueDepth: int64(len(n.queue.segments)),
statWriteBlocked: atomic.LoadInt64(&n.stats.WriteBlocked),
statWriteDropped: atomic.LoadInt64(&n.stats.WriteDropped),
statWriteShardReq: atomic.LoadInt64(&n.stats.WriteShardReq),
Expand Down Expand Up @@ -166,16 +170,22 @@ func (n *NodeProcessor) WriteShard(points []models.Point) error {

i, j := 0, len(points)
for i < j {
b := marshalWrite(points[i:j])
b := marshalWrite(n.shardID, points[i:j])
for len(b) > defaultSegmentSize {
if j == i+1 {
return ErrSegmentFull
}
j = (i + j + 1) / 2
b = marshalWrite(points[i:j])
b = marshalWrite(n.shardID, points[i:j])
}
atomic.AddInt64(&n.stats.BytesWritten, int64(len(b)))
if err := n.queue.Append(b); err != nil {
switch err {
case ErrQueueBlocked:
atomic.AddInt64(&n.stats.WriteBlocked, 1)
case ErrQueueFull:
atomic.AddInt64(&n.stats.WriteDropped, 1)
}
return err
}
if j == len(points) {
Expand Down Expand Up @@ -263,25 +273,37 @@ func (n *NodeProcessor) SendWrite() (int, error) {
// Get the current block from the queue
buf, err := n.queue.Current()
if err != nil {
if err != io.EOF {
n.Logger.Error("Failed to current queue", zap.Uint64("node", n.nodeID), zap.Uint64("shardID", n.shardID), zap.Error(err))
// Try to truncate it.
if err := n.queue.Truncate(); err != nil {
n.Logger.Error("Failed to truncate queue", zap.Uint64("node", n.nodeID), zap.Uint64("shardID", n.shardID), zap.Error(err))
}
} else {
// Try to skip it.
if err := n.queue.Advance(); err != nil {
n.Logger.Error("Failed to advance queue", zap.Uint64("node", n.nodeID), zap.Uint64("shardID", n.shardID), zap.Error(err))
}
}
return 0, err
}

// unmarshal the byte slice back to shard ID and points
points, err := unmarshalWrite(buf)
_, points, err := unmarshalWrite(buf)
if err != nil {
atomic.AddInt64(&n.stats.WriteDropped, int64(len(buf)))
n.Logger.Error("Unmarshal write failed", zap.Error(err))
n.Logger.Error("Unmarshal write failed", zap.Uint64("node", n.nodeID), zap.Uint64("shardID", n.shardID), zap.Error(err))
// Try to skip it.
if err := n.queue.Advance(); err != nil {
n.Logger.Error("Failed to advance queue", zap.Uint64("node", n.nodeID), zap.Uint64("shardID", n.shardID), zap.Error(err))
}
return 0, err
}

if err := n.writer.WriteShard(n.shardID, n.nodeID, points); err != nil {
if err := n.writer.WriteShardBinary(n.shardID, n.nodeID, points); err != nil && IsRetryable(err) {
atomic.AddInt64(&n.stats.WriteNodeReqFail, 1)
return 0, err
}
atomic.AddInt64(&n.stats.BytesRead, int64(len(buf)))
atomic.AddInt64(&n.stats.WriteNodeReq, 1)
atomic.AddInt64(&n.stats.WriteNodeReqPoints, int64(len(points)))

Expand Down Expand Up @@ -324,21 +346,50 @@ func (n *NodeProcessor) Empty() bool {
return n.queue.Empty()
}

func marshalWrite(points []models.Point) []byte {
var b []byte
if len(points) > 0 {
b = make([]byte, 0, (len(points[0].String())+1)*len(points))
// IsRetryable returns true if this error is temporary and could be retried
func IsRetryable(err error) bool {
if err == nil {
return false
}
if strings.Contains(err.Error(), "field type conflict") || strings.Contains(err.Error(), "partial write") {
return false
}
return true
}

func marshalWrite(shardID uint64, points []models.Point) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, shardID)
nb := make([]byte, 4)
for _, p := range points {
b = append(b, []byte(p.String())...)
b = append(b, '\n')
pb, err := p.MarshalBinary()
if err != nil {
continue
}
binary.BigEndian.PutUint32(nb, uint32(len(pb)))
b = append(b, nb...)
b = append(b, pb...)
}
return b
}

func unmarshalWrite(b []byte) ([]models.Point, error) {
if len(b) == 0 {
return nil, fmt.Errorf("too short: zero")
func unmarshalWrite(b []byte) (uint64, [][]byte, error) {
if len(b) < 8 {
return 0, nil, fmt.Errorf("too short: len = %d", len(b))
}
shardID, b := binary.BigEndian.Uint64(b[:8]), b[8:]
var points [][]byte
var n int
for len(b) > 0 {
if len(b) < 4 {
return shardID, points, io.ErrShortBuffer
}
n, b = int(binary.BigEndian.Uint32(b[:4])), b[4:]
if len(b) < n {
return shardID, points, io.ErrShortBuffer
}
points = append(points, b[:n])
b = b[n:]
}
return models.ParsePoints(b)
return shardID, points, nil
}
47 changes: 41 additions & 6 deletions services/hh/node_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"
"io/ioutil"
"os"
"strings"
"testing"
"time"

Expand All @@ -12,10 +13,10 @@ import (
)

type fakeShardWriter struct {
ShardWriteFn func(shardID, nodeID uint64, points []models.Point) error
ShardWriteFn func(shardID, nodeID uint64, points [][]byte) error
}

func (f *fakeShardWriter) WriteShard(shardID, nodeID uint64, points []models.Point) error {
func (f *fakeShardWriter) WriteShardBinary(shardID, nodeID uint64, points [][]byte) error {
return f.ShardWriteFn(shardID, nodeID, points)
}

Expand All @@ -38,7 +39,7 @@ func TestNodeProcessorSendBlock(t *testing.T) {
pt := models.MustNewPoint("cpu", models.NewTags(map[string]string{"foo": "bar"}), models.Fields{"value": 1.0}, time.Unix(0, 0))

sh := &fakeShardWriter{
ShardWriteFn: func(shardID, nodeID uint64, points []models.Point) error {
ShardWriteFn: func(shardID, nodeID uint64, points [][]byte) error {
count++
if shardID != expShardID {
t.Errorf("SendWrite() shardID mismatch: got %v, exp %v", shardID, expShardID)
Expand All @@ -51,8 +52,13 @@ func TestNodeProcessorSendBlock(t *testing.T) {
t.Fatalf("SendWrite() points mismatch: got %v, exp %v", len(points), exp)
}

if points[0].String() != pt.String() {
t.Fatalf("SendWrite() points mismatch:\n got %v\n exp %v", points[0].String(), pt.String())
p, err := models.NewPointFromBytes(points[0])
if err != nil {
t.Fatalf("SendWrite() point bytes mismatch: got %v, exp %v", err, pt.String())
}

if p.String() != pt.String() {
t.Fatalf("SendWrite() points mismatch:\n got %v\n exp %v", p.String(), pt.String())
}

return nil
Expand Down Expand Up @@ -110,7 +116,7 @@ func TestNodeProcessorSendBlock(t *testing.T) {
}

// Make the node inactive.
sh.ShardWriteFn = func(shardID, nodeID uint64, points []models.Point) error {
sh.ShardWriteFn = func(shardID, nodeID uint64, points [][]byte) error {
t.Fatalf("write sent to inactive node")
return nil
}
Expand Down Expand Up @@ -153,3 +159,32 @@ func TestNodeProcessorSendBlock(t *testing.T) {
t.Fatalf("Node processor directory still present after purge")
}
}

func TestNodeProcessorMarshalWrite(t *testing.T) {
expShardID := uint64(127)
expPointsStr := `cpu value1=1.0,value2=1.0,value3=3.0,value4=4,value5="five" 1000000000
cpu,env=prod,host=serverA,region=us-west,tag1=value1,tag2=value2,tag3=value3,tag4=value4,tag5=value5,target=servers,zone=1c value=1i 1000000000`
points, _ := models.ParsePointsString(expPointsStr)
b := marshalWrite(expShardID, points)

shardID, pts, err := unmarshalWrite(b)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if shardID != expShardID {
t.Fatalf("unexpected shardID: %d, exp: %d", shardID, expShardID)
}

var lines []string
for _, pt := range pts {
p, err := models.NewPointFromBytes(pt)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
lines = append(lines, p.String())
}
pointsStr := strings.Join(lines, "\n")
if pointsStr != expPointsStr {
t.Fatalf("unexpected points string: %s, exp: %s", pointsStr, expPointsStr)
}
}
Loading

0 comments on commit dfa5aa3

Please sign in to comment.