Skip to content

Commit

Permalink
Merge branch 'master' into enable-new-charset
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Dec 7, 2021
2 parents b4bfaae + 3eafa6d commit f0927fe
Show file tree
Hide file tree
Showing 99 changed files with 2,970 additions and 351 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/dm_binlog_999999.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ name: DM Binlog 999999
on:
schedule:
- cron: '0 17-23 * * *' # run at minute 0 every hour from 01:00 ~ 07:00 UTC+8
workflow_dispatch:
inputs:
pr:
description: 'Which PR do you want to trigger'
required: true
default: ''

jobs:
test-binlog-999999:
Expand All @@ -18,6 +24,12 @@ jobs:
- name: Check out code
uses: actions/checkout@v2

- name: Check out code by workflow dispatch
if: ${{ github.event.inputs.pr != '' }}
uses: actions/checkout@v2
with:
ref: refs/pull/${{ github.event.inputs.pr }}/head

- name: Cache go modules
uses: actions/cache@v2
with:
Expand Down
12 changes: 12 additions & 0 deletions .github/workflows/dm_chaos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ name: DM Chaos
on:
schedule:
- cron: '0 17-23 * * *' # run at minute 0 every hour from 01:00 ~ 07:00 UTC+8
workflow_dispatch:
inputs:
pr:
description: 'Which PR do you want to trigger'
required: true
default: ''

# See: https://docs.github.com/en/actions/reference/workflow-syntax-for-github-actions#concurrency.
concurrency:
Expand Down Expand Up @@ -42,6 +48,12 @@ jobs:
- name: Check out code
uses: actions/checkout@v2

- name: Check out code by workflow dispatch
if: ${{ github.event.inputs.pr != '' }}
uses: actions/checkout@v2
with:
ref: refs/pull/${{ github.event.inputs.pr }}/head

- name: Cache go modules
uses: actions/cache@v2
with:
Expand Down
32 changes: 14 additions & 18 deletions .github/workflows/dm_upstream_switch.yaml
Original file line number Diff line number Diff line change
@@ -1,24 +1,14 @@
name: Upstream Database Switch

on:
push:
branches:
- master
paths:
- "dm/**"
- "go.mod"
- "go.sum"
- "Makefile"
- ".github/**"
pull_request:
branches:
- master
paths:
- "dm/**"
- "go.mod"
- "go.sum"
- "Makefile"
- ".github/**"
schedule:
- cron: '0 17-23 * * *' # run at minute 0 every hour from 01:00 ~ 07:00 UTC+8
workflow_dispatch:
inputs:
pr:
description: 'Which PR do you want to trigger'
required: true
default: ''

jobs:
upstream-database-switch:
Expand All @@ -34,6 +24,12 @@ jobs:
- name: Check out code
uses: actions/checkout@v2

- name: Check out code by workflow dispatch
if: ${{ github.event.inputs.pr != '' }}
uses: actions/checkout@v2
with:
ref: refs/pull/${{ github.event.inputs.pr }}/head

- name: Cache go modules
uses: actions/cache@v2
with:
Expand Down
31 changes: 26 additions & 5 deletions .github/workflows/upgrade_dm_via_tiup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ on:
- cron: '0 17-23 * * *' # run at minute 0 every hour from 01:00 ~ 07:00 UTC+8

workflow_dispatch:
inputs: # TODO: use these inputs
inputs:
pr:
description: 'Which PR do you want to trigger'
required: true
default: ''
# TODO: use these inputs
fromVer:
dmVer: "v1.0.7"
toVer:
Expand All @@ -28,11 +33,19 @@ jobs:
uses: actions/setup-go@v2
with:
go-version: 1.16

- name: Check out code
uses: actions/checkout@v2
with:
path: go/src/github.com/pingcap/ticdc

- name: Check out code by workflow dispatch
if: ${{ github.event.inputs.pr != '' }}
uses: actions/checkout@v2
with:
path: go/src/github.com/pingcap/ticdc
ref: refs/pull/${{ github.event.inputs.pr }}/head

- name: Setup containers
working-directory: ${{ env.working-directory }}
run: |
Expand Down Expand Up @@ -77,18 +90,26 @@ jobs:
uses: actions/setup-go@v2
with:
go-version: 1.16

- name: Check out code
uses: actions/checkout@v2
with:
path: go/src/github.com/pingcap/ticdc

- name: Check out code by workflow dispatch
if: ${{ github.event.inputs.pr != '' }}
uses: actions/checkout@v2
with:
path: go/src/github.com/pingcap/ticdc
ref: refs/pull/${{ github.event.inputs.pr }}/head

- name: Build
if: ${{ github.ref != 'refs/heads/master' }}
if: ${{ github.ref != 'refs/heads/master' || github.event.inputs.pr != '' }}
working-directory: ${{ env.working-directory }}
run: make dm

- name: Package files
if: ${{ github.ref != 'refs/heads/master' }}
if: ${{ github.ref != 'refs/heads/master' || github.event.inputs.pr != '' }}
run: |
mkdir ${{ github.workspace }}/package
cd ${{ github.workspace }}/package
Expand Down Expand Up @@ -121,7 +142,7 @@ jobs:
GOPATH=${GITHUB_WORKSPACE}/go docker-compose up -d
- name: Copy package files
if: ${{ github.ref != 'refs/heads/master' }}
if: ${{ github.ref != 'refs/heads/master' || github.event.inputs.pr != '' }}
run: |
cd ${{ github.workspace }}/package
docker cp dm-master-nightly-linux-amd64.tar.gz control:/tmp
Expand All @@ -133,7 +154,7 @@ jobs:
working-directory: ${{ env.working-directory }}
run: |
cd ${{ env.working-directory }}/dm/tests/tiup/docker
docker-compose exec -e ref=${{ github.ref }} -T control bash -c "cd /go/src/github.com/pingcap/ticdc/dm && ./tests/tiup/upgrade-from-v2.sh ${{ matrix.previous_v2 }} nightly"
docker-compose exec -e ref=${{ github.ref }} -e id=${{ github.event.inputs.pr }} -T control bash -c "cd /go/src/github.com/pingcap/ticdc/dm && ./tests/tiup/upgrade-from-v2.sh ${{ matrix.previous_v2 }} nightly"
# if above step is passed, logs will be removed by tiup dm destroy
- name: Copy logs to hack permission
Expand Down
3 changes: 2 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ make

### Running tests

This project contains unit tests and integration tests with coverage collection. See [tests/README.md](./tests/README.md) for how to execute and add tests.
This project contains unit tests and integration tests with coverage collection.
See [tests/integration_tests/README.md](./tests/integration_tests/README.md) for how to execute and add tests.

For more information on how to trigger these tests, please see the [command help](./docs/ci/command.md).

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ $ make test

Note that TiCDC supports building with Go version `Go >= 1.16`.

When TiCDC is built successfully, you can find binary in the `bin` directory. Instructions for unit test and integration test can be found in [Running tests](tests/README.md).
When TiCDC is built successfully, you can find binary in the `bin` directory. Instructions for unit test and integration
test can be found in [Running tests](./tests/integration_tests/README.md).

## Deployment

Expand Down
57 changes: 40 additions & 17 deletions cdc/sink/buffer_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ import (

type bufferSink struct {
Sink
checkpointTs uint64
buffer map[model.TableID][]*model.RowChangedEvent
bufferMu sync.Mutex
flushTsChan chan uint64
drawbackChan chan drawbackMsg
changeFeedCheckpointTs uint64
tableCheckpointTsMap sync.Map
buffer map[model.TableID][]*model.RowChangedEvent
bufferMu sync.Mutex
flushTsChan chan flushMsg
drawbackChan chan drawbackMsg
}

func newBufferSink(
Expand All @@ -42,14 +43,14 @@ func newBufferSink(
errCh chan error,
checkpointTs model.Ts,
drawbackChan chan drawbackMsg,
) Sink {
) *bufferSink {
sink := &bufferSink{
Sink: backendSink,
// buffer shares the same flow control with table sink
buffer: make(map[model.TableID][]*model.RowChangedEvent),
checkpointTs: checkpointTs,
flushTsChan: make(chan uint64, 128),
drawbackChan: drawbackChan,
buffer: make(map[model.TableID][]*model.RowChangedEvent),
changeFeedCheckpointTs: checkpointTs,
flushTsChan: make(chan flushMsg, 128),
drawbackChan: drawbackChan,
}
go sink.run(ctx, errCh)
return sink
Expand Down Expand Up @@ -81,8 +82,9 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) {
delete(b.buffer, drawback.tableID)
b.bufferMu.Unlock()
close(drawback.callback)
case resolvedTs := <-b.flushTsChan:
case flushEvent := <-b.flushTsChan:
b.bufferMu.Lock()
resolvedTs := flushEvent.resolvedTs
// find all rows before resolvedTs and emit to backend sink
for tableID, rows := range b.buffer {
i := sort.Search(len(rows), func(i int) bool {
Expand All @@ -109,15 +111,15 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) {
b.bufferMu.Unlock()

start := time.Now()
// todo: use real table ID
checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, 0, resolvedTs)
tableID := flushEvent.tableID
checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, flushEvent.tableID, resolvedTs)
if err != nil {
if errors.Cause(err) != context.Canceled {
errCh <- err
}
return
}
atomic.StoreUint64(&b.checkpointTs, checkpointTs)
b.tableCheckpointTsMap.Store(tableID, checkpointTs)

dur := time.Since(start)
metricFlushDuration.Observe(dur.Seconds())
Expand Down Expand Up @@ -150,8 +152,29 @@ func (b *bufferSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Ro
func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
select {
case <-ctx.Done():
return atomic.LoadUint64(&b.checkpointTs), ctx.Err()
case b.flushTsChan <- resolvedTs:
return b.getTableCheckpointTs(tableID), ctx.Err()
case b.flushTsChan <- flushMsg{
tableID: tableID,
resolvedTs: resolvedTs,
}:
}
return atomic.LoadUint64(&b.checkpointTs), nil
return b.getTableCheckpointTs(tableID), nil
}

type flushMsg struct {
tableID model.TableID
resolvedTs uint64
}

func (b *bufferSink) getTableCheckpointTs(tableID model.TableID) uint64 {
checkPoints, ok := b.tableCheckpointTsMap.Load(tableID)
if ok {
return checkPoints.(uint64)
}
return atomic.LoadUint64(&b.changeFeedCheckpointTs)
}

// UpdateChangeFeedCheckpointTs update the changeFeedCheckpointTs every processor tick
func (b *bufferSink) UpdateChangeFeedCheckpointTs(checkpointTs uint64) {
atomic.StoreUint64(&b.changeFeedCheckpointTs, checkpointTs)
}
91 changes: 91 additions & 0 deletions cdc/sink/buffer_sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sink

import (
"context"
"testing"
"time"

"github.com/pingcap/ticdc/cdc/model"
"github.com/stretchr/testify/require"
)

func TestTableIsNotFlushed(t *testing.T) {
b := bufferSink{changeFeedCheckpointTs: 1}
require.Equal(t, uint64(1), b.getTableCheckpointTs(2))
b.UpdateChangeFeedCheckpointTs(3)
require.Equal(t, uint64(3), b.getTableCheckpointTs(2))
}

func TestFlushTable(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer func() {
cancel()
}()
b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg))
require.Equal(t, uint64(5), b.getTableCheckpointTs(2))
require.Nil(t, b.EmitRowChangedEvents(ctx))
tbl1 := &model.TableName{TableID: 1}
tbl2 := &model.TableName{TableID: 2}
tbl3 := &model.TableName{TableID: 3}
tbl4 := &model.TableName{TableID: 4}
require.Nil(t, b.EmitRowChangedEvents(ctx, []*model.RowChangedEvent{
{CommitTs: 6, Table: tbl1},
{CommitTs: 6, Table: tbl2},
{CommitTs: 6, Table: tbl3},
{CommitTs: 6, Table: tbl4},
{CommitTs: 10, Table: tbl1},
{CommitTs: 10, Table: tbl2},
{CommitTs: 10, Table: tbl3},
{CommitTs: 10, Table: tbl4},
}...))
checkpoint, err := b.FlushRowChangedEvents(ctx, 1, 7)
require.True(t, checkpoint <= 7)
require.Nil(t, err)
checkpoint, err = b.FlushRowChangedEvents(ctx, 2, 6)
require.True(t, checkpoint <= 6)
require.Nil(t, err)
checkpoint, err = b.FlushRowChangedEvents(ctx, 3, 8)
require.True(t, checkpoint <= 8)
require.Nil(t, err)
time.Sleep(200 * time.Millisecond)
require.Equal(t, uint64(7), b.getTableCheckpointTs(1))
require.Equal(t, uint64(6), b.getTableCheckpointTs(2))
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
require.Equal(t, uint64(5), b.getTableCheckpointTs(4))
b.UpdateChangeFeedCheckpointTs(6)
require.Equal(t, uint64(7), b.getTableCheckpointTs(1))
require.Equal(t, uint64(6), b.getTableCheckpointTs(2))
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
require.Equal(t, uint64(6), b.getTableCheckpointTs(4))
}

func TestFlushFailed(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg))
checkpoint, err := b.FlushRowChangedEvents(ctx, 3, 8)
require.True(t, checkpoint <= 8)
require.Nil(t, err)
time.Sleep(200 * time.Millisecond)
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
cancel()
checkpoint, _ = b.FlushRowChangedEvents(ctx, 3, 18)
require.Equal(t, uint64(8), checkpoint)
checkpoint, _ = b.FlushRowChangedEvents(ctx, 1, 18)
require.Equal(t, uint64(5), checkpoint)
time.Sleep(200 * time.Millisecond)
require.Equal(t, uint64(8), b.getTableCheckpointTs(3))
require.Equal(t, uint64(5), b.getTableCheckpointTs(1))
}
Loading

0 comments on commit f0927fe

Please sign in to comment.