diff --git a/util/chunk/chunk.go b/util/chunk/chunk.go index d09150f1ab84b..3b91cf3bb693d 100644 --- a/util/chunk/chunk.go +++ b/util/chunk/chunk.go @@ -15,6 +15,7 @@ package chunk import ( "encoding/binary" + "reflect" "unsafe" "github.com/cznic/mathutil" @@ -277,6 +278,74 @@ func (c *Chunk) AppendPartialRow(colIdx int, row Row) { } } +// PreAlloc pre-allocates the memory space in a Chunk to store the Row. +// NOTE: +// 1. The Chunk must be empty or holds no useful data. +// 2. The schema of the Row must be the same with the Chunk. +// 3. This API is paired with the `Insert()` function, which inserts all the +// rows data into the Chunk after the pre-allocation. +// 4. We set the null bitmap here instead of in the Insert() function because +// when the Insert() function is called parallelly, the data race on a byte +// can not be avoided although the manipulated bits are different inside a +// byte. +func (c *Chunk) PreAlloc(row Row) { + for i, srcCol := range row.c.columns { + dstCol := c.columns[i] + dstCol.appendNullBitmap(!srcCol.isNull(row.idx)) + elemLen := len(srcCol.elemBuf) + if !srcCol.isFixed() { + elemLen = int(srcCol.offsets[row.idx+1] - srcCol.offsets[row.idx]) + dstCol.offsets = append(dstCol.offsets, int32(len(dstCol.data)+elemLen)) + } + dstCol.length++ + needCap := len(dstCol.data) + elemLen + if needCap <= cap(dstCol.data) { + (*reflect.SliceHeader)(unsafe.Pointer(&dstCol.data)).Len = len(dstCol.data) + elemLen + continue + } + // Grow the capacity according to golang.growslice. + newCap := cap(dstCol.data) + doubleCap := newCap << 1 + if needCap > doubleCap { + newCap = needCap + } else { + if len(dstCol.data) < 1024 { + newCap = doubleCap + } else { + for 0 < newCap && newCap < needCap { + newCap += newCap / 4 + } + if newCap <= 0 { + newCap = needCap + } + } + } + dstCol.data = make([]byte, len(dstCol.data)+elemLen, newCap) + } +} + +// Insert inserts `row` on the position specified by `rowIdx`. +// Note: Insert will cover the origin data, it should be called after +// PreAlloc. +func (c *Chunk) Insert(rowIdx int, row Row) { + for i, srcCol := range row.c.columns { + if row.IsNull(i) { + continue + } + dstCol := c.columns[i] + var srcStart, srcEnd, destStart, destEnd int + if srcCol.isFixed() { + srcElemLen, destElemLen := len(srcCol.elemBuf), len(dstCol.elemBuf) + srcStart, destStart = row.idx*srcElemLen, rowIdx*destElemLen + srcEnd, destEnd = srcStart+srcElemLen, destStart+destElemLen + } else { + srcStart, srcEnd = int(srcCol.offsets[row.idx]), int(srcCol.offsets[row.idx+1]) + destStart, destEnd = int(dstCol.offsets[rowIdx]), int(dstCol.offsets[rowIdx+1]) + } + copy(dstCol.data[destStart:destEnd], srcCol.data[srcStart:srcEnd]) + } +} + // Append appends rows in [begin, end) in another Chunk to a Chunk. func (c *Chunk) Append(other *Chunk, begin, end int) { for colID, src := range other.columns { diff --git a/util/chunk/chunk_test.go b/util/chunk/chunk_test.go index 799a6e703ca64..4edce4b9f35ea 100644 --- a/util/chunk/chunk_test.go +++ b/util/chunk/chunk_test.go @@ -18,6 +18,8 @@ import ( "fmt" "math" "strconv" + "strings" + "sync" "testing" "time" "unsafe" @@ -517,6 +519,107 @@ func (s *testChunkSuite) TestSwapColumn(c *check.C) { checkRef() } +func (s *testChunkSuite) TestPreAlloc4RowAndInsert(c *check.C) { + fieldTypes := make([]*types.FieldType, 0, 4) + fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat}) + fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeLonglong}) + fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeNewDecimal}) + fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeVarchar}) + + srcChk := NewChunkWithCapacity(fieldTypes, 10) + for i := int64(0); i < 10; i++ { + srcChk.AppendFloat32(0, float32(i)) + srcChk.AppendInt64(1, i) + srcChk.AppendMyDecimal(2, types.NewDecFromInt(i)) + srcChk.AppendString(3, strings.Repeat(strconv.FormatInt(i, 10), int(i))) + } + + destChk := NewChunkWithCapacity(fieldTypes, 3) + + // Test Chunk.PreAlloc. + for i := 0; i < srcChk.NumRows(); i++ { + c.Assert(destChk.NumRows(), check.Equals, i) + destChk.PreAlloc(srcChk.GetRow(i)) + } + for i, srcCol := range srcChk.columns { + destCol := destChk.columns[i] + c.Assert(len(srcCol.elemBuf), check.Equals, len(destCol.elemBuf)) + c.Assert(len(srcCol.data), check.Equals, len(destCol.data)) + c.Assert(len(srcCol.offsets), check.Equals, len(destCol.offsets)) + c.Assert(len(srcCol.nullBitmap), check.Equals, len(destCol.nullBitmap)) + c.Assert(srcCol.length, check.Equals, destCol.length) + c.Assert(srcCol.nullCount, check.Equals, destCol.nullCount) + + for _, val := range destCol.data { + c.Assert(val == 0, check.IsTrue) + } + for j, val := range srcCol.offsets { + c.Assert(val, check.Equals, destCol.offsets[j]) + } + for j, val := range srcCol.nullBitmap { + c.Assert(val, check.Equals, destCol.nullBitmap[j]) + } + for _, val := range destCol.elemBuf { + c.Assert(val == 0, check.IsTrue) + } + } + + // Test Chunk.Insert. + for i := srcChk.NumRows() - 1; i >= 0; i-- { + destChk.Insert(i, srcChk.GetRow(i)) + } + for i, srcCol := range srcChk.columns { + destCol := destChk.columns[i] + + for j, val := range srcCol.data { + c.Assert(val, check.Equals, destCol.data[j]) + } + for j, val := range srcCol.offsets { + c.Assert(val, check.Equals, destCol.offsets[j]) + } + for j, val := range srcCol.nullBitmap { + c.Assert(val, check.Equals, destCol.nullBitmap[j]) + } + for _, val := range destCol.elemBuf { + c.Assert(val == 0, check.IsTrue) + } + } + + // Test parallel Chunk.Insert. + destChk.Reset() + startWg, endWg := &sync.WaitGroup{}, &sync.WaitGroup{} + startWg.Add(1) + for i := 0; i < srcChk.NumRows(); i++ { + destChk.PreAlloc(srcChk.GetRow(i)) + endWg.Add(1) + go func(rowIdx int) { + defer func() { + endWg.Done() + }() + startWg.Wait() + destChk.Insert(rowIdx, srcChk.GetRow(rowIdx)) + }(i) + } + startWg.Done() + endWg.Wait() + for i, srcCol := range srcChk.columns { + destCol := destChk.columns[i] + + for j, val := range srcCol.data { + c.Assert(val, check.Equals, destCol.data[j]) + } + for j, val := range srcCol.offsets { + c.Assert(val, check.Equals, destCol.offsets[j]) + } + for j, val := range srcCol.nullBitmap { + c.Assert(val, check.Equals, destCol.nullBitmap[j]) + } + for _, val := range destCol.elemBuf { + c.Assert(val == 0, check.IsTrue) + } + } +} + func BenchmarkAppendInt(b *testing.B) { b.ReportAllocs() chk := newChunk(8) diff --git a/util/chunk/list.go b/util/chunk/list.go index da789211d5a0d..4acee7d199216 100644 --- a/util/chunk/list.go +++ b/util/chunk/list.go @@ -140,6 +140,37 @@ func (l *List) Reset() { l.consumedIdx = -1 } +// PreAlloc4Row pre-allocates the storage memory for a Row. +// NOTE: +// 1. The List must be empty or holds no useful data. +// 2. The schema of the Row must be the same with the List. +// 3. This API is paired with the `Insert()` function, which inserts all the +// rows data into the List after the pre-allocation. +func (l *List) PreAlloc4Row(row Row) (ptr RowPtr) { + chkIdx := len(l.chunks) - 1 + if chkIdx == -1 || l.chunks[chkIdx].NumRows() >= l.chunks[chkIdx].Capacity() { + newChk := l.allocChunk() + l.chunks = append(l.chunks, newChk) + if chkIdx != l.consumedIdx { + l.memTracker.Consume(l.chunks[chkIdx].MemoryUsage()) + l.consumedIdx = chkIdx + } + chkIdx++ + } + chk := l.chunks[chkIdx] + rowIdx := chk.NumRows() + chk.PreAlloc(row) + l.length++ + return RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)} +} + +// Insert inserts `row` on the position specified by `ptr`. +// Note: Insert will cover the origin data, it should be called after +// PreAlloc. +func (l *List) Insert(ptr RowPtr, row Row) { + l.chunks[ptr.ChkIdx].Insert(int(ptr.RowIdx), row) +} + // ListWalkFunc is used to walk the list. // If error is returned, it will stop walking. type ListWalkFunc = func(row Row) error diff --git a/util/chunk/list_test.go b/util/chunk/list_test.go index 646812331ceb8..81ac3f3b29d52 100644 --- a/util/chunk/list_test.go +++ b/util/chunk/list_test.go @@ -15,6 +15,8 @@ package chunk import ( "math" + "strconv" + "strings" "testing" "time" @@ -114,6 +116,47 @@ func (s *testChunkSuite) TestListMemoryUsage(c *check.C) { c.Assert(list.GetMemTracker().BytesConsumed(), check.Equals, memUsage+srcChk.MemoryUsage()) } +func (s *testChunkSuite) TestListPrePreAlloc4RowAndInsert(c *check.C) { + fieldTypes := make([]*types.FieldType, 0, 4) + fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat}) + fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeLonglong}) + fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeNewDecimal}) + fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeVarchar}) + + srcChk := NewChunkWithCapacity(fieldTypes, 10) + for i := int64(0); i < 10; i++ { + srcChk.AppendFloat32(0, float32(i)) + srcChk.AppendInt64(1, i) + srcChk.AppendMyDecimal(2, types.NewDecFromInt(i)) + srcChk.AppendString(3, strings.Repeat(strconv.FormatInt(i, 10), int(i))) + } + + srcList := NewList(fieldTypes, 3, 3) + destList := NewList(fieldTypes, 5, 5) + destRowPtr := make([]RowPtr, srcChk.NumRows()) + for i := 0; i < srcChk.NumRows(); i++ { + srcList.AppendRow(srcChk.GetRow(i)) + destRowPtr[i] = destList.PreAlloc4Row(srcChk.GetRow(i)) + } + + c.Assert(srcList.NumChunks(), check.Equals, 4) + c.Assert(destList.NumChunks(), check.Equals, 2) + + iter4Src := NewIterator4List(srcList) + for row, i := iter4Src.Begin(), 0; row != iter4Src.End(); row, i = iter4Src.Next(), i+1 { + destList.Insert(destRowPtr[i], row) + } + + iter4Dest := NewIterator4List(destList) + srcRow, destRow := iter4Src.Begin(), iter4Dest.Begin() + for ; srcRow != iter4Src.End(); srcRow, destRow = iter4Src.Next(), iter4Dest.Next() { + c.Assert(srcRow.GetFloat32(0), check.Equals, destRow.GetFloat32(0)) + c.Assert(srcRow.GetInt64(1), check.Equals, destRow.GetInt64(1)) + c.Assert(srcRow.GetMyDecimal(2).Compare(destRow.GetMyDecimal(2)) == 0, check.IsTrue) + c.Assert(srcRow.GetString(3), check.Equals, destRow.GetString(3)) + } +} + func BenchmarkListMemoryUsage(b *testing.B) { fieldTypes := make([]*types.FieldType, 0, 4) fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})