From 6679da118b36ca19cfd096771f73d87928e71228 Mon Sep 17 00:00:00 2001 From: "hao.hu" Date: Mon, 26 Nov 2018 20:23:28 +0800 Subject: [PATCH 1/2] support system variable wait_timeout. (#8245) --- server/conn.go | 29 +++++++++++++++++++++++++---- server/conn_test.go | 32 +++++++++++++++++++++++++++++++- server/packetio.go | 17 ++++++++++++++++- session/session.go | 1 + sessionctx/variable/sysvar.go | 6 +++++- sessionctx/variable/varsutil.go | 2 ++ 6 files changed, 80 insertions(+), 7 deletions(-) diff --git a/server/conn.go b/server/conn.go index c98d488fb90f0..4e0691fa92e0a 100644 --- a/server/conn.go +++ b/server/conn.go @@ -215,6 +215,18 @@ func (cc *clientConn) writePacket(data []byte) error { return cc.pkt.writePacket(data) } +// getSessionVarsWaitTimeout get session variable wait_timeout +func (cc *clientConn) getSessionVarsWaitTimeout() uint64 { + valStr, _ := cc.ctx.GetSessionVars().GetSystemVar(variable.WaitTimeout) + waitTimeout, err := strconv.ParseUint(valStr, 10, 64) + if err != nil { + log.Errorf("con:%d get sysval wait_timeout error, use default value.", cc.connectionID) + // if get waitTimeout error, use default value + waitTimeout, _ = strconv.ParseUint(variable.WaitTimeoutDefaultValue, 10, 64) + } + return waitTimeout +} + type handshakeResponse41 struct { Capability uint32 Collation uint8 @@ -449,13 +461,22 @@ func (cc *clientConn) Run() { } cc.alloc.Reset() + // close connection when idle time is more than wait_timout + waitTimeout := cc.getSessionVarsWaitTimeout() + cc.pkt.setReadTimeout(time.Duration(waitTimeout) * time.Second) + start := time.Now() data, err := cc.readPacket() if err != nil { if terror.ErrorNotEqual(err, io.EOF) { - errStack := errors.ErrorStack(err) - if !strings.Contains(errStack, "use of closed network connection") { - log.Errorf("con:%d read packet error, close this connection %s", - cc.connectionID, errStack) + if netErr, isNetErr := errors.Cause(err).(net.Error); isNetErr && netErr.Timeout() { + idleTime := time.Now().Sub(start) + log.Infof("con:%d read packet timeout, close this connection, idle: %v, wait_timeout: %v", cc.connectionID, idleTime, waitTimeout) + } else { + errStack := errors.ErrorStack(err) + if !strings.Contains(errStack, "use of closed network connection") { + log.Errorf("con:%d read packet error, close this connection %s", + cc.connectionID, errStack) + } } } return diff --git a/server/conn_test.go b/server/conn_test.go index 8e56bad74c9e3..7e104607fd311 100644 --- a/server/conn_test.go +++ b/server/conn_test.go @@ -20,9 +20,16 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/store/mockstore" ) -type ConnTestSuite struct{} +type ConnTestSuite struct { + dom *domain.Domain + store kv.Storage +} var _ = Suite(ConnTestSuite{}) @@ -149,6 +156,29 @@ func (ts ConnTestSuite) TestInitialHandshake(c *C) { c.Assert(outBuffer.Bytes()[4:], DeepEquals, expected.Bytes()) } +func (ts ConnTestSuite) testGetSessionVarsWaitTimeout(c *C) { + c.Parallel() + var err error + ts.store, err = mockstore.NewMockTikvStore() + c.Assert(err, IsNil) + ts.dom, err = session.BootstrapSession(ts.store) + c.Assert(err, IsNil) + se, err := session.CreateSession4Test(ts.store) + c.Assert(err, IsNil) + tc := &TiDBContext{ + session: se, + stmts: make(map[int]*TiDBStatement), + } + cc := &clientConn{ + connectionID: 1, + server: &Server{ + capability: defaultCapability, + }, + ctx: tc, + } + c.Assert(cc.getSessionVarsWaitTimeout(), Equals, 28800) +} + func mapIdentical(m1, m2 map[string]string) bool { return mapBelong(m1, m2) && mapBelong(m2, m1) } diff --git a/server/packetio.go b/server/packetio.go index 74b07b68bc291..f4b2a539da491 100644 --- a/server/packetio.go +++ b/server/packetio.go @@ -37,6 +37,7 @@ package server import ( "bufio" "io" + "time" "github.com/pingcap/errors" "github.com/pingcap/parser/mysql" @@ -50,6 +51,7 @@ type packetIO struct { bufReadConn *bufferedReadConn bufWriter *bufio.Writer sequence uint8 + readTimeout time.Duration } func newPacketIO(bufReadConn *bufferedReadConn) *packetIO { @@ -63,9 +65,17 @@ func (p *packetIO) setBufferedReadConn(bufReadConn *bufferedReadConn) { p.bufWriter = bufio.NewWriterSize(bufReadConn, defaultWriterSize) } +func (p *packetIO) setReadTimeout(timeout time.Duration) { + p.readTimeout = timeout +} + func (p *packetIO) readOnePacket() ([]byte, error) { var header [4]byte - + if p.readTimeout > 0 { + if err := p.bufReadConn.SetReadDeadline(time.Now().Add(p.readTimeout)); err != nil { + return nil, err + } + } if _, err := io.ReadFull(p.bufReadConn, header[:]); err != nil { return nil, errors.Trace(err) } @@ -80,6 +90,11 @@ func (p *packetIO) readOnePacket() ([]byte, error) { length := int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16) data := make([]byte, length) + if p.readTimeout > 0 { + if err := p.bufReadConn.SetReadDeadline(time.Now().Add(p.readTimeout)); err != nil { + return nil, err + } + } if _, err := io.ReadFull(p.bufReadConn, data); err != nil { return nil, errors.Trace(err) } diff --git a/session/session.go b/session/session.go index 4efb0544576a4..48ae24724b267 100644 --- a/session/session.go +++ b/session/session.go @@ -1373,6 +1373,7 @@ const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variab variable.MaxAllowedPacket + quoteCommaQuote + variable.TimeZone + quoteCommaQuote + variable.BlockEncryptionMode + quoteCommaQuote + + variable.WaitTimeout + quoteCommaQuote + /* TiDB specific global variables: */ variable.TiDBSkipUTF8Check + quoteCommaQuote + variable.TiDBIndexJoinBatchSize + quoteCommaQuote + diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 942ef270f9c16..e86c77ad97042 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -535,7 +535,7 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal, "innodb_buffer_pool_size", "134217728"}, {ScopeGlobal, "innodb_adaptive_flushing", "ON"}, {ScopeNone, "datadir", "/usr/local/mysql/data/"}, - {ScopeGlobal | ScopeSession, "wait_timeout", "28800"}, + {ScopeGlobal | ScopeSession, WaitTimeout, WaitTimeoutDefaultValue}, {ScopeGlobal, "innodb_monitor_enable", ""}, {ScopeNone, "date_format", "%Y-%m-%d"}, {ScopeGlobal, "innodb_buffer_pool_filename", "ib_buffer_pool"}, @@ -774,6 +774,10 @@ const ( SyncBinlog = "sync_binlog" // BlockEncryptionMode is the name for 'block_encryption_mode' system variable. BlockEncryptionMode = "block_encryption_mode" + // WaitTimeout is the name for 'wait_timeout' system variable. + WaitTimeout = "wait_timeout" + // WaitTimeoutDefaultValue is the default value of 'wait_timeout' system variable. + WaitTimeoutDefaultValue = "28800" ) // GlobalVarAccessor is the interface for accessing global scope system and status variables. diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index de81da330c483..5eec4bb784393 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -280,6 +280,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, return checkUInt64SystemVar(name, value, 400, 524288, vars) case TmpTableSize: return checkUInt64SystemVar(name, value, 1024, math.MaxUint64, vars) + case WaitTimeout: + return checkUInt64SystemVar(name, value, 1, 31536000, vars) case TimeZone: if strings.EqualFold(value, "SYSTEM") { return "SYSTEM", nil From 753c9eda2ad09fa1e6ea3f89247aedb7d0909dcd Mon Sep 17 00:00:00 2001 From: "hao.hu" Date: Tue, 27 Nov 2018 21:04:00 +0800 Subject: [PATCH 2/2] move DefWaitTimeout to tidb_vars.go --- server/conn.go | 2 +- sessionctx/variable/sysvar.go | 4 +--- sessionctx/variable/tidb_vars.go | 1 + 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/server/conn.go b/server/conn.go index 4e0691fa92e0a..2bf031080c4cb 100644 --- a/server/conn.go +++ b/server/conn.go @@ -222,7 +222,7 @@ func (cc *clientConn) getSessionVarsWaitTimeout() uint64 { if err != nil { log.Errorf("con:%d get sysval wait_timeout error, use default value.", cc.connectionID) // if get waitTimeout error, use default value - waitTimeout, _ = strconv.ParseUint(variable.WaitTimeoutDefaultValue, 10, 64) + waitTimeout = variable.DefWaitTimeout } return waitTimeout } diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index e86c77ad97042..d5373da3951af 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -535,7 +535,7 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal, "innodb_buffer_pool_size", "134217728"}, {ScopeGlobal, "innodb_adaptive_flushing", "ON"}, {ScopeNone, "datadir", "/usr/local/mysql/data/"}, - {ScopeGlobal | ScopeSession, WaitTimeout, WaitTimeoutDefaultValue}, + {ScopeGlobal | ScopeSession, WaitTimeout, strconv.FormatInt(DefWaitTimeout, 10)}, {ScopeGlobal, "innodb_monitor_enable", ""}, {ScopeNone, "date_format", "%Y-%m-%d"}, {ScopeGlobal, "innodb_buffer_pool_filename", "ib_buffer_pool"}, @@ -776,8 +776,6 @@ const ( BlockEncryptionMode = "block_encryption_mode" // WaitTimeout is the name for 'wait_timeout' system variable. WaitTimeout = "wait_timeout" - // WaitTimeoutDefaultValue is the default value of 'wait_timeout' system variable. - WaitTimeoutDefaultValue = "28800" ) // GlobalVarAccessor is the interface for accessing global scope system and status variables. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 6bbfda2d9fc4e..519631a2ab7d2 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -240,6 +240,7 @@ const ( DefCurretTS = 0 DefMaxChunkSize = 32 DefDMLBatchSize = 20000 + DefWaitTimeout = 28800 DefTiDBMemQuotaHashJoin = 32 << 30 // 32GB. DefTiDBMemQuotaMergeJoin = 32 << 30 // 32GB. DefTiDBMemQuotaSort = 32 << 30 // 32GB.