Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: add PreAlloc4Row and Insert for Chunk and List #7916

Merged
merged 13 commits into from
Oct 18, 2018
55 changes: 55 additions & 0 deletions util/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package chunk

import (
"encoding/binary"
"reflect"
"unsafe"

"github.com/cznic/mathutil"
Expand Down Expand Up @@ -277,6 +278,60 @@ func (c *Chunk) AppendPartialRow(colIdx int, row Row) {
}
}

// PreAlloc4Row pre-allocates the memory space for a Row.
// The null elem info will be pre-written.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. s/elem info/bitmap/
  2. should the schema of row and c be the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but it's should be promised by the caller.

func (c *Chunk) PreAlloc4Row(row Row) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we preallocate the memory for a batch of rows? The memory grow stratagem may allocate a lot of unused memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Preallocate memory for a batch of rows cannot avoid the waste problem brought by memory usage increment of chunk.data.
    Unless the batch contains all of the rows which should be pre-allocated.

  2. I adjust the strategy according to https://github.com/golang/go/blob/master/src/runtime/slice.go#L116-L135

Copy link
Member

@zz-jason zz-jason Oct 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about:

  1. s/PreAlloc4Row/PreAlloc/
  2. change the comment to:
// PreAlloc pre-allocates the memory space in a Chunk to store the Row.
// NOTE:
// 1. The Chunk must be empty or holds no useful data.
// 2. The schema of the Row must be the same with the Chunk.
// 3. This API is paired with the `Insert()` function, which inserts all the
//    rows into the Chunk after the pre-allocation.

for i, rowCol := range row.c.columns {
chkCol := c.columns[i]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about:

  • s/chkCol/dstCol/
  • s/rowCol/srcCol/

chkCol.appendNullBitmap(!rowCol.isNull(row.idx))
if rowCol.isFixed() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about:

elemLen := len(rowCol.elemBuf)
if !rowCol.isFixed() {
	elemLen = int(rowCol.offsets[row.idx+1] - rowCol.offsets[row.idx])
	chkCol.offsets = append(chkCol.offsets, len(chkCol.data)+elemLen)
}
if len(chkCol.data)+elemLen >= cap(chkCol.data) {
...

elemLen := len(rowCol.elemBuf)
if len(chkCol.data)+elemLen >= cap(chkCol.data) {
chkCol.data = make([]byte, len(chkCol.data)+elemLen, 2*cap(chkCol.data))
} else {
(*reflect.SliceHeader)(unsafe.Pointer(&chkCol.data)).Len = len(chkCol.data) + elemLen
}
} else {
elemLen := int(rowCol.offsets[row.idx+1] - rowCol.offsets[row.idx])
if len(chkCol.data)+elemLen >= cap(chkCol.data) {
chkCol.data = make([]byte, len(chkCol.data)+elemLen, 2*cap(chkCol.data))
} else {
(*reflect.SliceHeader)(unsafe.Pointer(&chkCol.data)).Len = len(chkCol.data) + elemLen
}
chkCol.offsets = append(chkCol.offsets, int32(len(chkCol.data)))
}
chkCol.length++
}
c.numVirtualRows++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we increase numVirtualRows?

}

// Insert inserts `row` on the position specified by `rowIdx`.
// Note: Insert will cover the origin data, it should be called after
// PreAlloc4Row.
func (c *Chunk) Insert(rowIdx int, row Row) {
// Check data length between row and the origin data for every column.
// Cover the origin data if the upper check is valid.
for i, rowCol := range row.c.columns {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/rowCol/srcCol/
s/chkCol/dstCol/

chkCol := c.columns[i]
if chkCol.isFixed() != rowCol.isFixed() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check can be removed

panic("unexcepted error happens during Chunk.Insert")
}
var srcStart, srcEnd, destStart, destEnd int
if rowCol.isFixed() {
srcElemLen, destElemLen := len(rowCol.elemBuf), len(chkCol.elemBuf)
srcStart, destStart = row.idx*srcElemLen, rowIdx*destElemLen
srcEnd, destEnd = srcStart+srcElemLen, destStart+destElemLen
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
} else {
srcStart, srcEnd = int(rowCol.offsets[row.idx]), int(rowCol.offsets[row.idx+1])
destStart, destEnd = int(chkCol.offsets[rowIdx]), int(chkCol.offsets[rowIdx+1])
}
if destEnd-destStart != srcEnd-srcStart {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to add this length check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, this should be promised by the caller, as well as line 316.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check can be removed.

panic("unexcepted error happens during Chunk.Insert")
}
copy(chkCol.data[destStart:destEnd], rowCol.data[srcStart:srcEnd])
}
}

// Append appends rows in [begin, end) in another Chunk to a Chunk.
func (c *Chunk) Append(other *Chunk, begin, end int) {
for colID, src := range other.columns {
Expand Down
103 changes: 103 additions & 0 deletions util/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"fmt"
"math"
"strconv"
"strings"
"sync"
"testing"
"time"
"unsafe"
Expand Down Expand Up @@ -517,6 +519,107 @@ func (s *testChunkSuite) TestSwapColumn(c *check.C) {
checkRef()
}

func (s *testChunkSuite) TestPreAlloc4RowAndInsert(c *check.C) {
fieldTypes := make([]*types.FieldType, 0, 4)
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeLonglong})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeNewDecimal})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeVarchar})

srcChk := NewChunkWithCapacity(fieldTypes, 10)
for i := int64(0); i < 10; i++ {
srcChk.AppendFloat32(0, float32(i))
srcChk.AppendInt64(1, i)
srcChk.AppendMyDecimal(2, types.NewDecFromInt(i))
srcChk.AppendString(3, strings.Repeat(strconv.FormatInt(i, 10), int(i)))
}

destChk := NewChunkWithCapacity(fieldTypes, 3)

// Test Chunk.PreAlloc4Row.
for i := 0; i < srcChk.NumRows(); i++ {
c.Assert(destChk.NumRows(), check.Equals, i)
destChk.PreAlloc4Row(srcChk.GetRow(i))
}
for i, srcCol := range srcChk.columns {
destCol := destChk.columns[i]
c.Assert(len(srcCol.elemBuf), check.Equals, len(destCol.elemBuf))
c.Assert(len(srcCol.data), check.Equals, len(destCol.data))
c.Assert(len(srcCol.offsets), check.Equals, len(destCol.offsets))
c.Assert(len(srcCol.nullBitmap), check.Equals, len(destCol.nullBitmap))
c.Assert(srcCol.length, check.Equals, destCol.length)
c.Assert(srcCol.nullCount, check.Equals, destCol.nullCount)

for _, val := range destCol.data {
c.Assert(val == 0, check.IsTrue)
}
for j, val := range srcCol.offsets {
c.Assert(val, check.Equals, destCol.offsets[j])
}
for j, val := range srcCol.nullBitmap {
c.Assert(val, check.Equals, destCol.nullBitmap[j])
}
for _, val := range destCol.elemBuf {
c.Assert(val == 0, check.IsTrue)
}
}

// Test Chunk.Insert.
for i := srcChk.NumRows() - 1; i >= 0; i-- {
destChk.Insert(i, srcChk.GetRow(i))
}
for i, srcCol := range srcChk.columns {
destCol := destChk.columns[i]

for j, val := range srcCol.data {
c.Assert(val, check.Equals, destCol.data[j])
}
for j, val := range srcCol.offsets {
c.Assert(val, check.Equals, destCol.offsets[j])
}
for j, val := range srcCol.nullBitmap {
c.Assert(val, check.Equals, destCol.nullBitmap[j])
}
for _, val := range destCol.elemBuf {
c.Assert(val == 0, check.IsTrue)
}
}

// Test parallel Chunk.Insert.
destChk.Reset()
startWg, endWg := &sync.WaitGroup{}, &sync.WaitGroup{}
startWg.Add(1)
for i := 0; i < srcChk.NumRows(); i++ {
destChk.PreAlloc4Row(srcChk.GetRow(i))
endWg.Add(1)
go func(rowIdx int) {
defer func() {
endWg.Done()
}()
startWg.Wait()
destChk.Insert(rowIdx, srcChk.GetRow(rowIdx))
}(i)
}
startWg.Done()
endWg.Wait()
for i, srcCol := range srcChk.columns {
destCol := destChk.columns[i]

for j, val := range srcCol.data {
c.Assert(val, check.Equals, destCol.data[j])
}
for j, val := range srcCol.offsets {
c.Assert(val, check.Equals, destCol.offsets[j])
}
for j, val := range srcCol.nullBitmap {
c.Assert(val, check.Equals, destCol.nullBitmap[j])
}
for _, val := range destCol.elemBuf {
c.Assert(val == 0, check.IsTrue)
}
}
}

func BenchmarkAppendInt(b *testing.B) {
b.ReportAllocs()
chk := newChunk(8)
Expand Down
26 changes: 26 additions & 0 deletions util/chunk/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,32 @@ func (l *List) Reset() {
l.consumedIdx = -1
}

// PreAlloc4Row pre-allocate the storage memory for a Row.
func (l *List) PreAlloc4Row(row Row) (ptr RowPtr) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment of this function should also be updated.

chkIdx := len(l.chunks) - 1
if chkIdx == -1 || l.chunks[chkIdx].NumRows() >= l.chunks[chkIdx].Capacity() || chkIdx == l.consumedIdx {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check can be removed: chkIdx == l.consumedIdx

newChk := l.allocChunk()
l.chunks = append(l.chunks, newChk)
if chkIdx != l.consumedIdx {
l.memTracker.Consume(l.chunks[chkIdx].MemoryUsage())
l.consumedIdx = chkIdx
}
chkIdx++
}
chk := l.chunks[chkIdx]
rowIdx := chk.NumRows()
chk.PreAlloc4Row(row)
l.length++
return RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}
}

// Insert inserts `row` on the position specified by `ptr`.
// Note: Insert will cover the origin data, it should be called after
// PreAlloc4Row.
func (l *List) Insert(ptr RowPtr, row Row) {
l.chunks[ptr.ChkIdx].Insert(int(ptr.RowIdx), row)
}

// ListWalkFunc is used to walk the list.
// If error is returned, it will stop walking.
type ListWalkFunc = func(row Row) error
Expand Down
43 changes: 43 additions & 0 deletions util/chunk/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"strconv"
"strings"
)

func (s *testChunkSuite) TestList(c *check.C) {
Expand Down Expand Up @@ -114,6 +116,47 @@ func (s *testChunkSuite) TestListMemoryUsage(c *check.C) {
c.Assert(list.GetMemTracker().BytesConsumed(), check.Equals, memUsage+srcChk.MemoryUsage())
}

func (s *testChunkSuite) TestListPrePreAlloc4RowAndInsert(c *check.C) {
fieldTypes := make([]*types.FieldType, 0, 4)
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeLonglong})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeNewDecimal})
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeVarchar})

srcChk := NewChunkWithCapacity(fieldTypes, 10)
for i := int64(0); i < 10; i++ {
srcChk.AppendFloat32(0, float32(i))
srcChk.AppendInt64(1, i)
srcChk.AppendMyDecimal(2, types.NewDecFromInt(i))
srcChk.AppendString(3, strings.Repeat(strconv.FormatInt(i, 10), int(i)))
}

srcList := NewList(fieldTypes, 3, 3)
destList := NewList(fieldTypes, 5, 5)
destRowPtr := make([]RowPtr, srcChk.NumRows())
for i := 0; i < srcChk.NumRows(); i++ {
srcList.AppendRow(srcChk.GetRow(i))
destRowPtr[i] = destList.PreAlloc4Row(srcChk.GetRow(i))
}

c.Assert(srcList.NumChunks(), check.Equals, 4)
c.Assert(destList.NumChunks(), check.Equals, 2)

iter4Src := NewIterator4List(srcList)
for row, i := iter4Src.Begin(), 0; row != iter4Src.End(); row, i = iter4Src.Next(), i+1 {
destList.Insert(destRowPtr[i], row)
}

iter4Dest := NewIterator4List(destList)
srcRow, destRow := iter4Src.Begin(), iter4Dest.Begin()
for ; srcRow != iter4Src.End(); srcRow, destRow = iter4Src.Next(), iter4Dest.Next() {
c.Assert(srcRow.GetFloat32(0), check.Equals, destRow.GetFloat32(0))
c.Assert(srcRow.GetInt64(1), check.Equals, destRow.GetInt64(1))
c.Assert(srcRow.GetMyDecimal(2).Compare(destRow.GetMyDecimal(2)) == 0, check.IsTrue)
c.Assert(srcRow.GetString(3), check.Equals, destRow.GetString(3))
}
}

func BenchmarkListMemoryUsage(b *testing.B) {
fieldTypes := make([]*types.FieldType, 0, 4)
fieldTypes = append(fieldTypes, &types.FieldType{Tp: mysql.TypeFloat})
Expand Down