Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: convert lockKeys to key flag #18966

Merged
merged 3 commits into from
Aug 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,6 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *kv.LockCtx {
LockKeysDuration: &seVars.StmtCtx.LockKeysDuration,
LockKeysCount: &seVars.StmtCtx.LockKeysCount,
LockExpired: &seVars.TxnCtx.LockExpire,
CheckKeyExists: seVars.StmtCtx.CheckKeyExists,
}
}

Expand Down Expand Up @@ -1653,7 +1652,6 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
}

sc.TblInfo2UnionScan = make(map[*model.TableInfo]bool)
sc.CheckKeyExists = make(map[string]struct{})
errCount, warnCount := vars.StmtCtx.NumErrorWarnings()
vars.SysErrorCount = errCount
vars.SysWarningCount = warnCount
Expand Down
15 changes: 13 additions & 2 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,28 @@ type RetrieverMutator interface {
Mutator
}

// MemBufferIterator is an Iterator with KeyFlags related functions.
type MemBufferIterator interface {
Iterator
HasValue() bool
Flags() KeyFlags
}

// MemBuffer is an in-memory kv collection, can be used to buffer write operations.
type MemBuffer interface {
RetrieverMutator

// GetFlags returns the latest flags associated with key.
GetFlags(Key) (KeyFlags, error)
// IterWithFlags returns a MemBufferIterator.
IterWithFlags(k Key, upperBound Key) MemBufferIterator
// IterReverseWithFlags returns a reversed MemBufferIterator.
IterReverseWithFlags(k Key) MemBufferIterator
// SetWithFlags put key-value into the last active staging buffer with the given KeyFlags.
SetWithFlags(Key, []byte, ...FlagsOp) error
// UpdateFlags update the flags associated with key.
UpdateFlags(Key, ...FlagsOp)

// Reset reset the MemBuffer to initial states.
Reset()
// DiscardValues releases the memory used by all values.
Expand All @@ -186,7 +198,7 @@ type MemBuffer interface {
// If the changes are not published by `Release`, they will be discarded.
Cleanup(StagingHandle)
// InspectStage used to inspect the value updates in the given stage.
InspectStage(handle StagingHandle, f func(Key, KeyFlags, []byte))
InspectStage(StagingHandle, func(Key, KeyFlags, []byte))

// Size returns sum of keys and values length.
Size() int
Expand Down Expand Up @@ -256,7 +268,6 @@ type LockCtx struct {
Values map[string]ReturnedValue
ValuesLock sync.Mutex
LockExpired *uint32
CheckKeyExists map[string]struct{}
}

// ReturnedValue pairs the Value and AlreadyLocked flag for PessimisticLock return values result.
Expand Down
99 changes: 66 additions & 33 deletions kv/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (

const (
flagPresumeKNE KeyFlags = 1 << iota
flagPessimisticLock
flagKeyLocked
flagKeyLockedValExist
flagNeedCheckExists
flagNoNeedCommit

persistentFlags = flagPessimisticLock
persistentFlags = flagKeyLocked | flagKeyLockedValExist
// bit 1 => red, bit 0 => black
nodeColorBit uint8 = 0x80
nodeFlagsMask = ^nodeColorBit
Expand All @@ -35,14 +37,24 @@ const (
// KeyFlags are metadata associated with key
type KeyFlags uint8

// HasPresumeKeyNotExists retruns whether the associated key use lazy check.
// HasPresumeKeyNotExists returns whether the associated key use lazy check.
func (f KeyFlags) HasPresumeKeyNotExists() bool {
return f&flagPresumeKNE != 0
}

// HasPessimisticLock retruns whether the associated key has acquired pessimistic lock.
func (f KeyFlags) HasPessimisticLock() bool {
return f&flagPessimisticLock != 0
// HasLocked returns whether the associated key has acquired pessimistic lock.
func (f KeyFlags) HasLocked() bool {
return f&flagKeyLocked != 0
}

// HasLockedValueExists returns whether the value exists when key locked.
func (f KeyFlags) HasLockedValueExists() bool {
return f&flagKeyLockedValExist != 0
}

// HasNeedCheckExists returns whether the key need to check existence when it has been locked.
func (f KeyFlags) HasNeedCheckExists() bool {
return f&flagNeedCheckExists != 0
}

// HasNoNeedCommit returns whether the key should be used in 2pc commit phase.
Expand All @@ -51,17 +63,24 @@ func (f KeyFlags) HasNoNeedCommit() bool {
}

// FlagsOp describes KeyFlags modify operation.
type FlagsOp uint8
type FlagsOp uint16

const (
// SetPresumeKeyNotExists marks the existence of the associated key is checked lazily.
// Implies KeyFlags.HasNeedCheckExists() == true.
SetPresumeKeyNotExists FlagsOp = 1 << iota
// DelPresumeKeyNotExists reverts SetPresumeKeyNotExists.
DelPresumeKeyNotExists
// SetPessimisticLock marks the associated key has acquired pessimistic lock.
SetPessimisticLock
// DelPessimisticLock reverts SetPessimisticLock.
DelPessimisticLock
// SetKeyLocked marks the associated key has acquired lock.
SetKeyLocked
// DelKeyLocked reverts SetKeyLocked.
DelKeyLocked
// SetKeyLockedValueExists marks the value exists when key has been locked in Transaction.LockKeys.
SetKeyLockedValueExists
// SetKeyLockedValueNotExists marks the value doesn't exists when key has been locked in Transaction.LockKeys.
SetKeyLockedValueNotExists
// DelNeedCheckExists marks the key no need to be checked in Transaction.LockKeys.
DelNeedCheckExists
// SetNoNeedCommit marks the key shouldn't be used in 2pc commit phase.
SetNoNeedCommit
)
Expand All @@ -70,13 +89,19 @@ func applyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags {
for _, op := range ops {
switch op {
case SetPresumeKeyNotExists:
origin |= flagPresumeKNE
origin |= flagPresumeKNE | flagNeedCheckExists
case DelPresumeKeyNotExists:
origin &= ^flagPresumeKNE
case SetPessimisticLock:
origin |= flagPessimisticLock
case DelPessimisticLock:
origin &= ^flagPessimisticLock
origin &= ^(flagPresumeKNE | flagNeedCheckExists)
case SetKeyLocked:
origin |= flagKeyLocked
case DelKeyLocked:
origin &= ^flagKeyLocked
case SetKeyLockedValueExists:
origin |= flagKeyLockedValExist
case DelNeedCheckExists:
origin &= ^flagNeedCheckExists
case SetKeyLockedValueNotExists:
origin &= ^flagKeyLockedValExist
case SetNoNeedCommit:
origin |= flagNoNeedCommit
}
Expand All @@ -92,7 +117,7 @@ var tombstone = []byte{}
// The value map is rollbackable, that means you can use the `Staging`, `Release` and `Cleanup` API to safely modify KVs.
//
// The flags map is not rollbackable. There are two types of flag, persistent and non-persistent.
// When discading a newly added KV in `Cleanup`, the non-persistent flags will be cleared.
// When discarding a newly added KV in `Cleanup`, the non-persistent flags will be cleared.
// If there are persistent flags associated with key, we will keep this key in node without value.
type memdb struct {
root memdbArenaAddr
Expand Down Expand Up @@ -126,7 +151,7 @@ func (db *memdb) Staging() StagingHandle {

func (db *memdb) Release(h StagingHandle) {
if int(h) != len(db.stages) {
// This should never happens in production environmen.
// This should never happens in production environment.
// Use panic to make debug easier.
panic("cannot release staging buffer")
}
Expand All @@ -144,7 +169,7 @@ func (db *memdb) Cleanup(h StagingHandle) {
return
}
if int(h) < len(db.stages) {
// This should never happens in production environmen.
// This should never happens in production environment.
// Use panic to make debug easier.
panic("cannot cleanup staging buffer")
}
Expand Down Expand Up @@ -186,7 +211,7 @@ func (db *memdb) Get(_ context.Context, key Key) ([]byte, error) {
panic("vlog is resetted")
}

x := db.tranverse(key, false)
x := db.traverse(key, false)
if x.isNull() {
return nil, ErrNotExist
}
Expand All @@ -198,7 +223,7 @@ func (db *memdb) Get(_ context.Context, key Key) ([]byte, error) {
}

func (db *memdb) GetFlags(key Key) (KeyFlags, error) {
x := db.tranverse(key, false)
x := db.traverse(key, false)
if x.isNull() {
return 0, ErrNotExist
}
Expand Down Expand Up @@ -254,14 +279,13 @@ func (db *memdb) set(key Key, value []byte, ops ...FlagsOp) error {
if len(db.stages) == 0 {
db.dirty = true
}
x := db.tranverse(key, true)
if x.vptr.isNull() && value != nil {
db.size += len(key)
db.count++
}
x := db.traverse(key, true)

if len(ops) != 0 {
flags := applyFlagsOps(x.getKeyFlags(), ops...)
if flags&persistentFlags != 0 {
db.dirty = true
}
x.setKeyFlags(flags)
}

Expand Down Expand Up @@ -299,9 +323,9 @@ func (db *memdb) setValue(x memdbNodeAddr, value []byte) {
db.size = db.size - len(oldVal) + len(value)
}

// tranverse search for and if not found and insert is true, will add a new node in.
// traverse search for and if not found and insert is true, will add a new node in.
// Returns a pointer to the new node, or the node found.
func (db *memdb) tranverse(key Key, insert bool) memdbNodeAddr {
func (db *memdb) traverse(key Key, insert bool) memdbNodeAddr {
x := db.getRoot()
y := memdbNodeAddr{nil, nullAddr}
found := false
Expand Down Expand Up @@ -437,7 +461,8 @@ func (db *memdb) leftRotate(x memdbNodeAddr) {

// If B is not null, set it's parent to be X
if !y.left.isNull() {
y.getLeft(db).up = x.addr
left := y.getLeft(db)
left.up = x.addr
}

// Set Y's parent to be what X's parent was
Expand Down Expand Up @@ -470,7 +495,8 @@ func (db *memdb) rightRotate(y memdbNodeAddr) {

// If B is not null, set it's parent to be Y
if !x.right.isNull() {
x.getRight(db).up = y.addr
right := x.getRight(db)
right.up = y.addr
}

// Set X's parent to be what Y's parent was
Expand Down Expand Up @@ -498,6 +524,9 @@ func (db *memdb) rightRotate(y memdbNodeAddr) {
func (db *memdb) deleteNode(z memdbNodeAddr) {
var x, y memdbNodeAddr

db.count--
db.size -= int(z.klen)

if z.left.isNull() || z.right.isNull() {
y = z
} else {
Expand Down Expand Up @@ -551,10 +580,12 @@ func (db *memdb) replaceNode(old memdbNodeAddr, new memdbNodeAddr) {
}
new.up = old.up

old.getLeft(db).up = new.addr
left := old.getLeft(db)
left.up = new.addr
new.left = old.left

old.getRight(db).up = new.addr
right := old.getRight(db)
right.up = new.addr
new.right = old.right

if old.isBlack() {
Expand Down Expand Up @@ -693,6 +724,8 @@ func (db *memdb) getRoot() memdbNodeAddr {
}

func (db *memdb) allocNode(key Key) memdbNodeAddr {
db.size += len(key)
db.count++
x, xn := db.allocator.allocNode(key)
return memdbNodeAddr{xn, x}
}
Expand Down
2 changes: 0 additions & 2 deletions kv/memdb_arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,6 @@ func (l *memdbVlog) revertToCheckpoint(db *memdb, cp *memdbCheckpoint) {
db.size -= int(hdr.valueLen)
// oldValue.isNull() == true means this is a newly added value.
if hdr.oldValue.isNull() {
db.count--
db.size -= int(node.klen)
// If there are no flags associated with this key, we need to delete this node.
keptFlags := node.getKeyFlags() & persistentFlags
if keptFlags == 0 {
Expand Down
30 changes: 30 additions & 0 deletions kv/memdb_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,28 @@ func (db *memdb) IterReverse(k Key) (Iterator, error) {
return i, nil
}

func (db *memdb) IterWithFlags(k Key, upperBound Key) MemBufferIterator {
i := &memdbIterator{
db: db,
start: k,
end: upperBound,
includeFlags: true,
}
i.init()
return i
}

func (db *memdb) IterReverseWithFlags(k Key) MemBufferIterator {
i := &memdbIterator{
db: db,
end: k,
reverse: true,
includeFlags: true,
}
i.init()
return i
}

func (i *memdbIterator) init() {
if i.reverse {
if len(i.end) == 0 {
Expand Down Expand Up @@ -72,6 +94,14 @@ func (i *memdbIterator) Valid() bool {
return !i.curr.isNull()
}

func (i *memdbIterator) Flags() KeyFlags {
return i.curr.getKeyFlags()
}

func (i *memdbIterator) HasValue() bool {
return !i.isFlagsOnly()
}

func (i *memdbIterator) Key() Key {
return i.curr.getKey()
}
Expand Down
Loading