Skip to content

Commit

Permalink
ddl: Add log details and tests (pingcap#5021)
Browse files Browse the repository at this point in the history
* ddl: add log details and tests
  • Loading branch information
zimulala authored Nov 6, 2017
1 parent 7a71b14 commit f7533a6
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 14 deletions.
5 changes: 5 additions & 0 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

const (
defaultBatchCnt = 1024
defaultSmallBatchCnt = 128
)

// addTableColumn adds a column to the table.
// TODO: Use it when updating the column type or remove it.
// How to backfill column data in reorganization state?
Expand Down
18 changes: 17 additions & 1 deletion ddl/ddl_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ const (

var _ = Suite(&testDBSuite{})

const defaultBatchSize = 4196
const defaultBatchSize = 2048

type testDBSuite struct {
store kv.Storage
Expand Down Expand Up @@ -459,6 +459,21 @@ func (s *testDBSuite) TestAddIndex(c *C) {
sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i)
s.mustExec(c, sql)
}
// Add some discrete rows.
maxBatch := 20
batchCnt := 100
otherKeys := make([]int, 0, batchCnt*maxBatch)
// Make sure there are no duplicate keys.
base := defaultBatchSize * 20
for i := 1; i < batchCnt; i++ {
n := base + i*defaultBatchSize + i
for j := 0; j < rand.Intn(maxBatch); j++ {
n += j
sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", n, n, n)
s.mustExec(c, sql)
otherKeys = append(otherKeys, n)
}
}

sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done)

Expand Down Expand Up @@ -503,6 +518,7 @@ LOOP:
}
keys = append(keys, i)
}
keys = append(keys, otherKeys...)

// test index key
expectedRows := make([][]interface{}, 0, len(keys))
Expand Down
13 changes: 5 additions & 8 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,6 @@ func (w *worker) getIndexRecord(t table.Table, colMap map[int64]*types.FieldType
}

const (
defaultBatchCnt = 1024
defaultSmallBatchCnt = 128
defaultTaskHandleCnt = 128
defaultWorkers = 16
)
Expand Down Expand Up @@ -569,10 +567,9 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo
wg := sync.WaitGroup{}
for i := 0; i < workerCnt; i++ {
wg.Add(1)
workers[i].setTaskNewRange(baseHandle, baseHandle+taskBatch)
workers[i].setTaskNewRange(baseHandle+int64(i)*taskBatch, baseHandle+int64(i+1)*taskBatch)
// TODO: Consider one worker to one goroutine.
go workers[i].doBackfillIndexTask(t, colMap, &wg)
baseHandle += taskBatch
}
wg.Wait()

Expand All @@ -587,14 +584,14 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo
err1 := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
return errors.Trace(reorgInfo.UpdateHandle(txn, nextHandle))
})
log.Warnf("[ddl] total added index for %d rows, this task add index for %d failed, take time %v, update handle err %v",
addedCount, taskAddedCount, sub, err1)
log.Warnf("[ddl] total added index for %d rows, this task [%d,%d) add index for %d failed %v, take time %v, update handle err %v",
addedCount, baseHandle, nextHandle, taskAddedCount, err, sub, err1)
return errors.Trace(err)
}
d.setReorgRowCount(addedCount)
batchHandleDataHistogram.WithLabelValues(batchAddIdx).Observe(sub)
log.Infof("[ddl] total added index for %d rows, this task added index for %d rows, take time %v",
addedCount, taskAddedCount, sub)
log.Infof("[ddl] total added index for %d rows, this task [%d,%d) added index for %d rows, take time %v",
addedCount, baseHandle, nextHandle, taskAddedCount, sub)

if isEnd {
return nil
Expand Down
10 changes: 6 additions & 4 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,11 @@ func (d *ddl) runReorgJob(job *model.Job, f func() error) error {
// wait reorganization job done or timeout
select {
case err := <-d.reorgDoneCh:
log.Info("[ddl] run reorg job done")
rowCount := d.getReorgRowCount()
log.Info("[ddl] run reorg job done, handled %d rows", rowCount)
d.reorgDoneCh = nil
// Update a job's RowCount.
job.SetRowCount(d.getReorgRowCount())
job.SetRowCount(rowCount)
d.setReorgRowCount(0)
return errors.Trace(err)
case <-d.quitCh:
Expand All @@ -81,9 +82,10 @@ func (d *ddl) runReorgJob(job *model.Job, f func() error) error {
// We return errWaitReorgTimeout here too, so that outer loop will break.
return errWaitReorgTimeout
case <-time.After(waitTimeout):
log.Infof("[ddl] run reorg job wait timeout %v", waitTimeout)
rowCount := d.getReorgRowCount()
log.Infof("[ddl] run reorg job wait timeout %v, handled %d rows", waitTimeout, rowCount)
// Update a job's RowCount.
job.SetRowCount(d.getReorgRowCount())
job.SetRowCount(rowCount)
// If timeout, we will return, check the owner and retry to wait job done again.
return errWaitReorgTimeout
}
Expand Down
2 changes: 1 addition & 1 deletion domain/schema_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, cha

// Update the schema deltaItem information.
if currVer != oldVer {
log.Debug("update schema validator:", oldVer, currVer, changedTableIDs)
log.Debugf("update schema validator, old ver %d, curr ver %d, changed IDs %v", oldVer, currVer, changedTableIDs)
s.enqueue(currVer, changedTableIDs)
}
}
Expand Down

0 comments on commit f7533a6

Please sign in to comment.