Skip to content

Commit

Permalink
Merge branch 'master' into fix_hist
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Jan 31, 2023
2 parents ccbda88 + eb53aa8 commit da1913e
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 6 deletions.
4 changes: 4 additions & 0 deletions br/pkg/utils/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ func makeJSONSchema(schema *backuppb.Schema) (*jsonSchema, error) {

func fromJSONSchema(jSchema *jsonSchema) (*backuppb.Schema, error) {
schema := jSchema.Schema
if schema == nil {
schema = &backuppb.Schema{}
}

var err error
schema.Db, err = json.Marshal(jSchema.DB)
if err != nil {
Expand Down
38 changes: 38 additions & 0 deletions br/pkg/utils/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,44 @@ var testMetaJSONs = [][]byte{
"is_raw_kv": true,
"br_version": "BR\nRelease Version: v5.0.0-master\nGit Commit Hash: c0d60dae4998cf9ac40f02e5444731c15f0b2522\nGit Branch: HEAD\nGo Version: go1.13.4\nUTC Build Time: 2021-03-25 08:10:08\nRace Enabled: false"
}`),
[]byte(`{
"files": [
{
"sha256": "3ae857ef9b379d498ae913434f1d47c3e90a55f3a4cd9074950bfbd163d5e5fc",
"start_key": "7480000000000000115f720000000000000000",
"end_key": "7480000000000000115f72ffffffffffffffff00",
"name": "1_20_9_36adb8cedcd7af34708edff520499e712e2cfdcb202f5707dc9305a031d55a98_1675066275424_write.sst",
"end_version": 439108573623222300,
"crc64xor": 16261462091570213000,
"total_kvs": 15,
"total_bytes": 1679,
"cf": "write",
"size": 2514,
"cipher_iv": "56MTbxA4CaNILpirKnBxUw=="
}
],
"schemas": [
{
"db": {
"charset": "utf8mb4",
"collate": "utf8mb4_bin",
"db_name": {
"L": "test",
"O": "test"
},
"id": 1,
"policy_ref_info": null,
"state": 5
}
}
],
"ddls": [],
"cluster_id": 7194351714070942000,
"cluster_version": "\"6.1.0\"\n",
"br_version": "BR\nRelease Version: v6.1.0\nGit Commit Hash: 1a89decdb192cbdce6a7b0020d71128bc964d30f\nGit Branch: heads/refs/tags/v6.1.0\nGo Version: go1.18.2\nUTC Build Time: 2022-06-05 05:09:12\nRace Enabled: false",
"end_version": 439108573623222300,
"new_collations_enabled": "True"
}`),
}

func TestEncodeAndDecode(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,14 @@ func (d *ddl) refreshTiFlashTicker(ctx sessionctx.Context, pollTiFlashContext *T
return err
}
}

failpoint.Inject("OneTiFlashStoreDown", func() {
for storeID, store := range pollTiFlashContext.TiFlashStores {
store.Store.StateName = "Down"
pollTiFlashContext.TiFlashStores[storeID] = store
break
}
})
pollTiFlashContext.PollCounter++

// Start to process every table.
Expand Down
20 changes: 20 additions & 0 deletions ddl/tiflashtest/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,3 +1334,23 @@ func TestTiFlashAvailableAfterAddPartition(t *testing.T) {
require.NotNil(t, pi)
require.Equal(t, len(pi.Definitions), 2)
}

func TestTiFlashAvailableAfterDownOneStore(t *testing.T) {
s, teardown := createTiFlashContext(t)
defer teardown()
tk := testkit.NewTestKit(t, s.store)

tk.MustExec("use test")
tk.MustExec("drop table if exists ddltiflash")
tk.MustExec("create table ddltiflash(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10))")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/OneTiFlashStoreDown", `return`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/OneTiFlashStoreDown", `return`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/OneTiFlashStoreDown"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/OneTiFlashStoreDown"))
}()

tk.MustExec("alter table ddltiflash set tiflash replica 1")
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3)
CheckTableAvailable(s.dom, t, 1, []string{})
}
12 changes: 11 additions & 1 deletion domain/infosync/tiflash_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -89,10 +90,19 @@ func getTiFlashPeerWithoutLagCount(tiFlashStores map[int64]helper.StoreStat, tab
for _, store := range tiFlashStores {
regionReplica := make(map[int64]int)
err := helper.CollectTiFlashStatus(store.Store.StatusAddress, tableID, &regionReplica)
failpoint.Inject("OneTiFlashStoreDown", func() {
if store.Store.StateName == "Down" {
err = errors.New("mock TiFlasah down")
}
})
if err != nil {
logutil.BgLogger().Error("Fail to get peer status from TiFlash.",
zap.Int64("tableID", tableID))
return 0, err
// Just skip down or offline or tomestone stores, because PD will migrate regions from these stores.
if store.Store.StateName == "Up" || store.Store.StateName == "Disconnected" {
return 0, err
}
continue
}
flashPeerCount += len(regionReplica)
}
Expand Down
25 changes: 20 additions & 5 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,12 +668,12 @@ func (cc *clientConn) readOptionalSSLRequestAndHandshakeResponse(ctx context.Con

switch resp.AuthPlugin {
case mysql.AuthCachingSha2Password:
resp.Auth, err = cc.authSha(ctx)
resp.Auth, err = cc.authSha(ctx, resp)
if err != nil {
return err
}
case mysql.AuthTiDBSM3Password:
resp.Auth, err = cc.authSM3(ctx)
resp.Auth, err = cc.authSM3(ctx, resp)
if err != nil {
return err
}
Expand Down Expand Up @@ -727,14 +727,21 @@ func (cc *clientConn) handleAuthPlugin(ctx context.Context, resp *handshakeRespo
}

// authSha implements the caching_sha2_password specific part of the protocol.
func (cc *clientConn) authSha(ctx context.Context) ([]byte, error) {
func (cc *clientConn) authSha(ctx context.Context, resp handshakeResponse41) ([]byte, error) {
const (
shaCommand = 1
requestRsaPubKey = 2 // Not supported yet, only TLS is supported as secure channel.
fastAuthOk = 3
fastAuthFail = 4
)

// If no password is specified, we don't send the FastAuthFail to do the full authentication
// as that doesn't make sense without a password and confuses the client.
// https://github.com/pingcap/tidb/issues/40831
if len(resp.Auth) == 0 {
return []byte{}, nil
}

// Currently we always send a "FastAuthFail" as the cached part of the protocol isn't implemented yet.
// This triggers the client to send the full response.
err := cc.writePacket([]byte{0, 0, 0, 0, shaCommand, fastAuthFail})
Expand All @@ -757,8 +764,16 @@ func (cc *clientConn) authSha(ctx context.Context) ([]byte, error) {
}

// authSM3 implements the tidb_sm3_password specific part of the protocol.
func (cc *clientConn) authSM3(ctx context.Context) ([]byte, error) {
err := cc.writePacket([]byte{0, 0, 0, 0, 1, 4})
// tidb_sm3_password is very similar to caching_sha2_password.
func (cc *clientConn) authSM3(ctx context.Context, resp handshakeResponse41) ([]byte, error) {
// If no password is specified, we don't send the FastAuthFail to do the full authentication
// as that doesn't make sense without a password and confuses the client.
// https://github.com/pingcap/tidb/issues/40831
if len(resp.Auth) == 0 {
return []byte{}, nil
}

err := cc.writePacket([]byte{0, 0, 0, 0, 1, 4}) // fastAuthFail
if err != nil {
logutil.Logger(ctx).Error("authSM3 packet write failed", zap.Error(err))
return nil, err
Expand Down
45 changes: 45 additions & 0 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1806,3 +1806,48 @@ func TestExtensionChangeUser(t *testing.T) {
require.Equal(t, expectedConnInfo.Error, logInfo.Error)
require.Equal(t, *(expectedConnInfo.ConnectionInfo), *(logInfo.ConnectionInfo))
}

func TestAuthSha(t *testing.T) {
store := testkit.CreateMockStore(t)

var outBuffer bytes.Buffer
tidbdrv := NewTiDBDriver(store)
cfg := newTestConfig()
cfg.Port, cfg.Status.StatusPort = 0, 0
cfg.Status.ReportStatus = false
server, err := NewServer(cfg, tidbdrv)
require.NoError(t, err)
defer server.Close()

cc := &clientConn{
connectionID: 1,
salt: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14},
server: server,
pkt: &packetIO{
bufWriter: bufio.NewWriter(&outBuffer),
},
collation: mysql.DefaultCollationID,
peerHost: "localhost",
alloc: arena.NewAllocator(512),
chunkAlloc: chunk.NewAllocator(),
capability: mysql.ClientProtocol41,
}

tk := testkit.NewTestKit(t, store)
ctx := &TiDBContext{Session: tk.Session()}
cc.setCtx(ctx)

resp := handshakeResponse41{
Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth,
AuthPlugin: mysql.AuthCachingSha2Password,
Auth: []byte{}, // No password
}

authData, err := cc.authSha(context.Background(), resp)
require.NoError(t, err)

// If no password is specified authSha() should return an empty byte slice
// which differs from when a password is specified as that should trigger
// fastAuthFail and the rest of the auth process.
require.Equal(t, authData, []byte{})
}

0 comments on commit da1913e

Please sign in to comment.