Skip to content

Commit

Permalink
Merge branch 'master' into analyze-concurrency-hotfix
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Aug 15, 2022
2 parents f1c4198 + 39e00eb commit b5cbba8
Show file tree
Hide file tree
Showing 36 changed files with 441 additions and 321 deletions.
56 changes: 56 additions & 0 deletions .github/workflows/integration-test-dumpling-common.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
name: DumplingCommon
on:
workflow_call:
inputs:
debug:
type: boolean
description: 'set tmate on failure'
required: true
mysql_version:
type: string
description: 'specify mysql version'
required: true

jobs:
integration-test:
runs-on: ubuntu-latest
timeout-minutes: 15
strategy:
fail-fast: true
services:
mysql:
image: mysql:${{ inputs.mysql_version }}
env:
MYSQL_ALLOW_EMPTY_PASSWORD: yes
ports:
- 3306:3306
options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3
steps:
- name: 'checkout repository'
uses: actions/checkout@v3
- name: 'set up golang'
uses: actions/setup-go@v3
with:
go-version: 1.18
- name: 'try to use build cache'
uses: actions/cache@v3
with:
path: |
~/.cache/go-build
~/go/pkg/mod
~/Library/Caches/go-build
~\AppData\Local\go-build
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: 'download binary dependencies'
run: sh dumpling/install.sh
- name: 'build tidb'
run: make server
- name: 'build lightning'
run: make build_lightning
- name: 'integration test'
run: make dumpling_integration_test VERBOSE="true"
- name: 'set up tmate session if necessary'
if: ${{ failure() && inputs.debug }}
uses: mxschmitt/action-tmate@v3
130 changes: 22 additions & 108 deletions .github/workflows/integration-test-dumpling.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
name: Dumpling
on:
workflow_dispatch:
inputs:
debug:
type: boolean
description: 'Run the build with tmate debugging enabled'
required: false
default: false
push:
branches:
- master
Expand Down Expand Up @@ -32,113 +39,20 @@ concurrency:
cancel-in-progress: true

jobs:
integration-test-mysql-5735:
runs-on: ubuntu-latest
timeout-minutes: 15
strategy:
fail-fast: true
services:
mysql:
image: mysql:5.7.35
env:
MYSQL_ALLOW_EMPTY_PASSWORD: yes
ports:
- 3306:3306
options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3
steps:
- uses: actions/checkout@v2
- name: Shutdown Ubuntu MySQL (SUDO)
run: sudo service mysql stop # Shutdown the Default MySQL, "sudo" is necessary, please not remove it
- name: Set up Go 1.18
uses: actions/setup-go@v2
with:
go-version: 1.18
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Get dependencies
run: go mod download
- name: Download dependencies
run: sh dumpling/install.sh
- name: Integration test
run: make dumpling_integration_test VERBOSE="true"
- name: Set up tmate session
if: ${{ failure() }}
uses: mxschmitt/action-tmate@v3
mysql-5735:
uses: ./.github/workflows/integration-test-dumpling-common.yml
with:
debug: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.debug }}
mysql_version: 5.7.35

integration-test-mysql-8026:
runs-on: ubuntu-latest
timeout-minutes: 15
strategy:
fail-fast: true
services:
mysql:
image: mysql:8.0.26
env:
MYSQL_ALLOW_EMPTY_PASSWORD: yes
ports:
- 3306:3306
options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3
steps:
- uses: actions/checkout@v2
- name: Shutdown Ubuntu MySQL (SUDO)
run: sudo service mysql stop # Shutdown the Default MySQL, "sudo" is necessary, please not remove it
- name: Set up Go 1.18
uses: actions/setup-go@v2
with:
go-version: 1.18
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Get dependencies
run: go mod download
- name: Download dependencies
run: sh dumpling/install.sh
- name: Integration test
run: make dumpling_integration_test VERBOSE="true"
- name: Set up tmate session
if: ${{ failure() }}
uses: mxschmitt/action-tmate@v3
mysql-8022:
uses: ./.github/workflows/integration-test-dumpling-common.yml
with:
debug: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.debug }}
mysql_version: 8.0.22

integration-test-mysql-8022:
runs-on: ubuntu-latest
timeout-minutes: 15
strategy:
fail-fast: true
services:
mysql:
image: mysql:8.0.22
env:
MYSQL_ALLOW_EMPTY_PASSWORD: yes
ports:
- 3306:3306
options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3
steps:
- uses: actions/checkout@v2
- name: Shutdown Ubuntu MySQL (SUDO)
run: sudo service mysql stop # Shutdown the Default MySQL, "sudo" is necessary, please not remove it
- name: Set up Go 1.18
uses: actions/setup-go@v2
with:
go-version: 1.18
- uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: Get dependencies
run: go mod download
- name: Download dependencies
run: sh dumpling/install.sh
- name: Integration test
run: make dumpling_integration_test VERBOSE="true"
- name: Set up tmate session
if: ${{ failure() }}
uses: mxschmitt/action-tmate@v3
mysql-8026:
uses: ./.github/workflows/integration-test-dumpling-common.yml
with:
debug: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.debug }}
mysql_version: 8.0.26
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ dumpling_unit_test_in_verify_ci: failpoint-enable tools/bin/gotestsum
$(RACE_FLAG) -coverprofile="$(TEST_COVERAGE_DIR)/dumpling_cov.unit_test.out" || ( make failpoint-disable && exit 1 )
@make failpoint-disable

dumpling_integration_test: dumpling_bins failpoint-enable build_dumpling
dumpling_integration_test: dumpling_bins failpoint-enable
@make build_dumpling
@make failpoint-disable
./dumpling/tests/run.sh $(CASE)

Expand Down
2 changes: 1 addition & 1 deletion bindinfo/optimize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestOptimizeOnlyOnce(t *testing.T) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, index idxa(a))")
tk.MustExec("create global binding for select * from t using select * from t use index(idxa)")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/planner/checkOptimizeCountOne", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/planner/checkOptimizeCountOne", "return(\"select * from t\")"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/planner/checkOptimizeCountOne"))
}()
Expand Down
7 changes: 3 additions & 4 deletions br/pkg/lightning/mydump/parquet_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,10 @@ func NewParquetParser(

columns := make([]string, 0, len(reader.Footer.Schema)-1)
columnMetas := make([]*parquet.SchemaElement, 0, len(reader.Footer.Schema)-1)
for _, c := range reader.SchemaHandler.SchemaElements {
for i, c := range reader.SchemaHandler.SchemaElements {
if c.GetNumChildren() == 0 {
// NOTE: the SchemaElement.Name is capitalized, SchemaHandler.Infos.ExName is the raw column name
// though in this context, there is no difference between these two fields
columns = append(columns, strings.ToLower(c.Name))
// we need to use the raw name, SchemaElement.Name might be prefixed with PARGO_PERFIX_
columns = append(columns, strings.ToLower(reader.SchemaHandler.GetExName(i)))
// transfer old ConvertedType to LogicalType
columnMeta := c
if c.ConvertedType != nil && c.LogicalType == nil {
Expand Down
10 changes: 4 additions & 6 deletions br/pkg/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,16 +387,14 @@ func ParseServerInfo(src string) ServerInfo {
var err error
serverInfo.ServerVersion, err = semver.NewVersion(versionStr)
if err != nil {
log.L().Warn("fail to parse version",
log.L().Warn("fail to parse version, fallback to 0.0.0",
zap.String("version", versionStr))
serverInfo.ServerVersion = semver.New("0.0.0")
}
var version string
if serverInfo.ServerVersion != nil {
version = serverInfo.ServerVersion.String()
}

log.L().Info("detect server version",
zap.String("type", serverInfo.ServerType.String()),
zap.String("version", version))
zap.String("version", serverInfo.ServerVersion.String()))

return serverInfo
}
2 changes: 1 addition & 1 deletion br/pkg/version/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func TestDetectServerInfo(t *testing.T) {
{3, "5.7.25-TiDB-v4.0.0-alpha-1263-g635f2e1af", ServerTypeTiDB, mkVer(4, 0, 0, "alpha-1263-g635f2e1af")},
{4, "5.7.25-TiDB-v3.0.7-58-g6adce2367", ServerTypeTiDB, mkVer(3, 0, 7, "58-g6adce2367")},
{5, "5.7.25-TiDB-3.0.6", ServerTypeTiDB, mkVer(3, 0, 6, "")},
{6, "invalid version", ServerTypeUnknown, (*semver.Version)(nil)},
{6, "invalid version", ServerTypeUnknown, mkVer(0, 0, 0, "")},
{7, "Release Version: v5.2.1\nEdition: Community\nGit Commit Hash: cd8fb24c5f7ebd9d479ed228bb41848bd5e97445", ServerTypeTiDB, mkVer(5, 2, 1, "")},
{8, "Release Version: v5.4.0-alpha-21-g86caab907\nEdition: Community\nGit Commit Hash: 86caab907c481bbc4243b5a3346ec13907cc8721\nGit Branch: master", ServerTypeTiDB, mkVer(5, 4, 0, "alpha-21-g86caab907")},
}
Expand Down
Binary file not shown.
8 changes: 8 additions & 0 deletions br/tests/lightning_parquet/db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,11 @@ CREATE TABLE `warehouse` (
PRIMARY KEY (`w_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
/*!40101 SET character_set_client = @saved_cs_client */;

DROP TABLE IF EXISTS `special_col_name`;
CREATE TABLE `special_col_name` (
`c1` varchar(128) DEFAULT NULL,
`_c2` timestamp NULL DEFAULT NULL,
`123_c3` timestamp NULL DEFAULT NULL,
`中_c4` timestamp NULL DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
1 change: 1 addition & 0 deletions br/tests/lightning_parquet/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ for BACKEND in local tidb; do
check_row_count orders 100
check_row_count stock 50
check_row_count warehouse 1
check_row_count special_col_name 1

run_sql 'select sum(c_id) from test.customer;'
check_contains "sum(c_id): 210"
Expand Down
9 changes: 9 additions & 0 deletions build/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@
".*_generated\\.go$": "ignore generated code"
},
"only_files": {
"ddl/backfilling.go": "ddl/backfilling.go",
"ddl/column.go": "ddl/column.go",
"ddl/index.go": "ddl/index.go",
"server/conn.go": "server/conn.go",
"server/conn_stmt.go": "server/conn_stmt.go",
"server/conn_test.go": "server/conn_test.go",
Expand Down Expand Up @@ -283,6 +286,9 @@
".*_generated\\.go$": "ignore generated code"
},
"only_files": {
"ddl/backfilling.go": "ddl/backfilling.go",
"ddl/column.go": "ddl/column.go",
"ddl/index.go": "ddl/index.go",
"expression/builtin_cast.go": "expression/builtin_cast code",
"server/conn.go": "server/conn.go",
"server/conn_stmt.go": "server/conn_stmt.go",
Expand Down Expand Up @@ -638,6 +644,9 @@
".*_generated\\.go$": "ignore generated code"
},
"only_files": {
"ddl/backfilling.go": "ddl/backfilling.go",
"ddl/column.go": "ddl/column.go",
"ddl/index.go": "ddl/index.go",
"expression/builtin_cast.go": "enable expression/builtin_cast.go",
"planner/core/plan.go": "planner/core/plan.go",
"server/conn.go": "server/conn.go",
Expand Down
15 changes: 7 additions & 8 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func newBackfillWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
sessCtx: sessCtx,
taskCh: make(chan *reorgBackfillTask, 1),
resultCh: make(chan *backfillResult, 1),
priority: kv.PriorityLow,
priority: reorgInfo.Job.Priority,
}
}

Expand Down Expand Up @@ -365,7 +365,7 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey
return ranges, nil
}

func (w *worker) waitTaskResults(workers []*backfillWorker, taskCnt int,
func (*worker) waitTaskResults(workers []*backfillWorker, taskCnt int,
totalAddedCount *int64, startKey kv.Key) (kv.Key, int64, error) {
var (
addedCount int64
Expand Down Expand Up @@ -585,7 +585,7 @@ func setSessCtxLocation(sctx sessionctx.Context, info *reorgInfo) error {
//
// The above operations are completed in a transaction.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType backfillWorkerType, indexInfo *model.IndexInfo, oldColInfo, colInfo *model.ColumnInfo, reorgInfo *reorgInfo) error {
func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
totalAddedCount := job.GetRowCount()

Expand All @@ -604,6 +604,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
}

failpoint.Inject("MockCaseWhenParseFailure", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
failpoint.Return(errors.New("job.ErrCount:" + strconv.Itoa(int(job.ErrorCount)) + ", mock unknown type: ast.whenClause."))
}
Expand Down Expand Up @@ -656,20 +657,17 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba

switch bfWorkerType {
case typeAddIndexWorker:
idxWorker := newAddIndexWorker(sessCtx, i, t, indexInfo, decodeColMap, reorgInfo, jc)
idxWorker.priority = job.Priority
idxWorker := newAddIndexWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc)
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job)
case typeUpdateColumnWorker:
// Setting InCreateOrAlterStmt tells the difference between SELECT casting and ALTER COLUMN casting.
sessCtx.GetSessionVars().StmtCtx.InCreateOrAlterStmt = true
updateWorker := newUpdateColumnWorker(sessCtx, i, t, oldColInfo, colInfo, decodeColMap, reorgInfo, jc)
updateWorker.priority = job.Priority
updateWorker := newUpdateColumnWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc)
backfillWorkers = append(backfillWorkers, updateWorker.backfillWorker)
go updateWorker.backfillWorker.run(reorgInfo.d, updateWorker, job)
case typeCleanUpIndexWorker:
idxWorker := newCleanUpIndexWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc)
idxWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job)
default:
Expand All @@ -684,6 +682,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
}

failpoint.Inject("checkBackfillWorkerNum", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
num := int(atomic.LoadInt32(&TestCheckWorkerNumber))
if num != 0 {
Expand Down
Loading

0 comments on commit b5cbba8

Please sign in to comment.