Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
backend: fix load partition table with local backend (#402)
Browse files Browse the repository at this point in the history
* fix load partition table with local backend

* fix log and typo

* fix comment

* fix integration test

* fix parenthesis
  • Loading branch information
glorv authored Sep 17, 2020
1 parent 6e4c898 commit e8c9b88
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 55 deletions.
70 changes: 27 additions & 43 deletions lightning/backend/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,60 +614,44 @@ func (local *local) readAndSplitIntoRange(engineFile *LocalFile, engineUUID uuid
return ranges
}
if tablecodec.IsIndexKey(firstKey) {
// index engine
tableID, startIndexID, _, err := tablecodec.DecodeKeyHead(firstKey)
if err != nil {
return nil, err
type tblIndexRange struct {
tblID int64
indexID int64
startKey []byte
endKey []byte
}
tableID, endIndexID, _, err := tablecodec.DecodeKeyHead(lastKey)
if err != nil {
return nil, err
}
indexCount := (endIndexID - startIndexID) + 1

// each index has to split into n / indexCount ranges
indexRangeCount := (n + indexCount - 1) / indexCount
for i := startIndexID; i <= endIndexID; i++ {
k := tablecodec.EncodeTableIndexPrefix(tableID, i)
iter.SeekGE(k)
// get first key of index i
startKeyOfIndex := append([]byte{}, iter.Key()...)

k = tablecodec.EncodeTableIndexPrefix(tableID, i+1)
// get last key of index i
iter.SeekLT(k)

lastKeyOfIndex := append([]byte{}, iter.Key()...)
// for partitioned table, there will be multiple physical tables and each physical table contains multiple indices
indexRanges := make([]*tblIndexRange, 0)
iter.First()
for iter.Valid() {
startKey := append([]byte{}, iter.Key()...)

_, startIndexID, _, err := tablecodec.DecodeKeyHead(startKeyOfIndex)
if err != nil {
return nil, err
}
_, endIndexID, _, err := tablecodec.DecodeKeyHead(lastKeyOfIndex)
tableID, indexID, _, err := tablecodec.DecodeKeyHead(startKey)
if err != nil {
return nil, err
}

if startIndexID != endIndexID {
// this shouldn't happen
log.L().DPanic("index ID not match",
zap.Int64("startID", startIndexID), zap.Int64("endID", endIndexID))
return nil, errors.New("index ID not match")
}
k := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
iter.SeekLT(k)

endKey := append([]byte{}, iter.Key()...)
indexRanges = append(indexRanges, &tblIndexRange{tableID, indexID, startKey, endKey})
log.L().Debug("index key range", zap.Int64("tableID", tableID), zap.Int64("index", indexID),
zap.Binary("startKey", startKey), zap.Binary("endKey", endKey))

// if index is Unique or Primary, the key is encoded as
// tablePrefix{tableID}_indexPrefixSep{indexID}_indexedColumnsValue
// if index is non-Unique, key is encoded as
// tablePrefix{tableID}_indexPrefixSep{indexID}_indexedColumnsValue_rowID
iter.Next()
}

// we can split by indexColumnsValue to get indexRangeCount ranges from above Keys
indexRangeCount := (int(n) + len(indexRanges)) / len(indexRanges)

log.L().Info("split index to range",
zap.Int64("indexID", i), zap.Int64("rangeCount", indexRangeCount),
zap.Binary("start", startKeyOfIndex), zap.Binary("end", lastKeyOfIndex))
log.L().Info("split table index kv to range",
zap.Int("total index count", len(indexRanges)), zap.Int64("ranges", n),
zap.Int("index range count", indexRangeCount))

values := engineFile.splitValuesToRange(startKeyOfIndex, nextKey(lastKeyOfIndex), indexRangeCount, int(indexCount))
ranges = appendRanges(ranges, startKeyOfIndex, values)
for _, indexRange := range indexRanges {
values := engineFile.splitValuesToRange(indexRange.startKey, nextKey(indexRange.endKey), int64(indexRangeCount), len(indexRanges))
ranges = appendRanges(ranges, indexRange.startKey, values)
}
} else {
// data engine, we split keys by sample keys instead of by handle
Expand Down
2 changes: 1 addition & 1 deletion tests/common_handle/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
check_cluster_version 4 0 0 'local backend' || exit 0

# enable cluster index
run_sql 'set @@global.tidb_enable_clustered_index = 1' || (echo "tidb does not support cluster index yet, skip this test!" && exit 0)
run_sql 'set @@global.tidb_enable_clustered_index = 1' || exit 0
# wait for global variable cache invalid
sleep 2

Expand Down
3 changes: 3 additions & 0 deletions tests/parquet/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ check_row_count() {
}

for BACKEND in local importer tidb; do
if [ "$BACKEND" = 'local' ]; then
check_cluster_version 4 0 0 'local backend' || continue
fi
run_sql 'DROP DATABASE IF EXISTS test'
run_sql 'CREATE DATABASE test'
run_sql -D test "source tests/$TEST_NAME/db.sql;"
Expand Down
2 changes: 2 additions & 0 deletions tests/partitioned-table/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[tikv-importer]
region-split-size = 50
2 changes: 1 addition & 1 deletion tests/partitioned-table/data/partitioned.a-schema.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
create table a (a int) partition by hash(a) partitions 5;
create table a (a int, b varchar(16), KEY key_b (`b`)) partition by hash(a) partitions 5;
10 changes: 9 additions & 1 deletion tests/partitioned-table/data/partitioned.a.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
insert into a values (268435456), (1), (262144), (32), (4), (65536), (8388608);
insert into a values
(268435456, '268435456'),
(1, 'adgagdadgagag'),
(262144, 'gadgagaha'),
(32, '32'),
(4, 'hahaha'),
(65536, 'luck dog'),
(8388608, 'heyhey'),
(0, '999');

20 changes: 13 additions & 7 deletions tests/partitioned-table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@

set -eu

run_sql 'DROP DATABASE IF EXISTS partitioned;'
for BACKEND in tidb importer local; do
if [ "$BACKEND" = 'local' ]; then
check_cluster_version 4 0 0 'local backend' || continue
fi

run_lightning
run_sql 'DROP DATABASE IF EXISTS partitioned;'

run_sql 'SELECT count(1), sum(a) FROM partitioned.a;'
check_contains 'count(1): 7'
check_contains 'sum(a): 277151781'
run_lightning --backend $BACKEND

run_sql "SHOW TABLE STATUS FROM partitioned WHERE name = 'a';"
check_contains 'Create_options: partitioned'
run_sql 'SELECT count(1), sum(a) FROM partitioned.a;'
check_contains 'count(1): 8'
check_contains 'sum(a): 277151781'

run_sql "SHOW TABLE STATUS FROM partitioned WHERE name = 'a';"
check_contains 'Create_options: partitioned'
done
4 changes: 2 additions & 2 deletions tests/tiflash/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# fi cluster version < v3.1.0 || $TIFLASH is not set, skip this test
(check_cluster_version 3 1 0 'TiFlash' && [ -n "$TIFLASH" ]) || exit 0
# before v4.0.5 tiflash doesn't support tls, so we should skip this test then
(check_cluster_version 4 0 5 'TiFlash' && [ -n "$TIFLASH" ]) || exit 0

set -euE
# Populate the mydumper source
Expand Down

0 comments on commit e8c9b88

Please sign in to comment.