Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
cirth9 committed Apr 3, 2024
0 parents commit 27869ad
Show file tree
Hide file tree
Showing 31 changed files with 1,356 additions and 0 deletions.
8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions .idea/reids-by-go.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# reids-by-go
# reids-by-go
29 changes: 29 additions & 0 deletions aof/aof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package aof

import (
"context"
"os"
"sync"
)

type CmdLine = [][]byte

type payload struct {
cmdline CmdLine
dbIndex int
wg *sync.WaitGroup
}

type Persist struct {
ctx context.Context
cancel context.CancelFunc
aofChan chan *payload
aofFile *os.File
aofFileName string
aofFsync string
aofFinished chan struct{}
pausingAof sync.Mutex
currentDB int

buffer []CmdLine
}
43 changes: 43 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package database

import (
"reids-by-go/datastruct/dict"
)

const (
dataDictSize = 1 << 16
ttlDictSize = 1 << 10
)

type DB struct {
index int
// key -> DataEntity
data *dict.ConcurrentDict

// key -> expireTime (time.Time)
ttlMap *dict.ConcurrentDict

// key -> version(uint32)
versionMap *dict.ConcurrentDict

//// callbacks
//insertCallback database.KeyEventCallback
//deleteCallback database.KeyEventCallback

//todo about aof

// addaof is used to add command to aof
addAof func(CmdLine)
}

type CmdLine = [][]byte

func NewDatabase() DB {
return DB{
index: 0,
data: dict.MakeConcurrent(dataDictSize),
ttlMap: dict.MakeConcurrent(ttlDictSize),
versionMap: dict.MakeConcurrent(dataDictSize),
addAof: func(line CmdLine) {},
}
}
64 changes: 64 additions & 0 deletions database/exec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package database

import (
"bytes"
"reids-by-go/interface/redis"
"reids-by-go/redis/protocol"
"strings"
)

func (db *DB) Exec(cmd CmdLine) redis.Reply {
var cmdStrings []string
for _, bytesCmd := range cmd {
cmdStrings = append(cmdStrings, string(bytes.TrimSuffix(bytesCmd, []byte{'\r', '\n'})))
//log.Print("exec >>>> ", string(bytes))
}
switch strings.ToLower(cmdStrings[0]) {
case "set":
return db.Set(cmdStrings[1], cmdStrings[2])
case "get":
return db.Get(cmdStrings[1])
case "delete":
return db.Delete(cmdStrings[1])
case "ping":
return db.Ping()
}
return nil
}

func (db *DB) Ping() redis.Reply {
return protocol.MakeBulkReply([]byte("pong"))
}

func (db *DB) Set(key string, value any) redis.Reply {
//log.Println("exec >>> set", key, value)
result := db.data.Put(key, value)
//todo aof
if result == 1 {
return protocol.MakeStatusReply("SET OK")
} else {
return protocol.MakeStatusReply("SET FAILED")
}
}

func (db *DB) Get(key string) redis.Reply {
val, exists := db.data.Get(key)
//todo aof

if exists {
return protocol.MakeBulkReply([]byte(val.(string)))
} else {
return protocol.MakeBulkReply([]byte("have existed! key:" + key))
}
}

func (db *DB) Delete(key string) redis.Reply {
result := db.data.Delete(key)
//todo aof

if result == 1 {
return protocol.MakeStatusReply("DELETE OK")
} else {
return protocol.MakeStatusReply("THE KEY IS NOT EXISTED")
}
}
11 changes: 11 additions & 0 deletions database/singel_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package database

import "time"

func getExpireTask(key string) string {
return "expire:" + key
}

func (db *DB) Expire(key string, expireTime time.Time) {

}
155 changes: 155 additions & 0 deletions datastruct/dict/concurrent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package dict

import (
"hash/fnv"
"log"
"math"
"sync"
"sync/atomic"
)

type ConcurrentDict struct {
table []*Shard
count int32
}

type Shard struct {
m map[string]any
mutex sync.RWMutex
}

// todo 控制hashCode映射到table切片的范围内
func (dict *ConcurrentDict) spread(hashCode uint64) uint64 {
if dict == nil {
panic("dict is nil")
}
tableSize := uint64(len(dict.table))
return (tableSize - 1) & hashCode
}

// todo 获取指定index的Shard
func (dict *ConcurrentDict) getShard(index uint64) *Shard {
if dict == nil {
panic("dict is nil")
}
return dict.table[index]
}

// todo 初始化规模,这里一直按位取or,右移是为了获取大于该param的最小的2次幂
func computeCapacity(param int) (size int) {
if param <= 16 {
return 16
}
n := param - 1
n |= n >> 1
n |= n >> 2
n |= n >> 4
n |= n >> 8
n |= n >> 16
if n < 0 {
return math.MaxInt32
} else {
return n + 1
}
}

func MakeConcurrent(shardCount int) *ConcurrentDict {
shardCount = computeCapacity(shardCount)
table := make([]*Shard, shardCount)
for i := 0; i < shardCount; i++ {
table[i] = &Shard{
m: make(map[string]any),
}
}
d := &ConcurrentDict{
table: table,
count: 0,
}
return d
}

func (dict *ConcurrentDict) Get(key string) (val any, exists bool) {
if dict == nil {
panic("dict is nil!")
}
hashCode := hashFnv64(key)
index := dict.spread(hashCode)
shard := dict.getShard(index)
shard.mutex.RLock()
defer shard.mutex.RUnlock()
val, exists = shard.m[key]
return
}

func (dict *ConcurrentDict) Put(key string, val any) (result int) {
if dict == nil {
panic("dict is nil!")
}
hashCode := hashFnv64(key)
index := dict.spread(hashCode)
shard := dict.table[index]
shard.mutex.Lock()
defer shard.mutex.Unlock()

if _, ok := shard.m[key]; ok {
//have existed
shard.m[key] = val
return 0
} else {
//do not exist
shard.m[key] = val
//log.Println("put >>> ", key, shard.m[key])
dict.addCount()
return 1
}
return 0
}

func (dict *ConcurrentDict) Delete(key string) int {
if dict == nil {
panic("dict is nil!")
}
hashCode := hashFnv64(key)
index := dict.spread(hashCode)
shard := dict.table[index]
shard.mutex.Lock()
defer shard.mutex.Unlock()
if _, ok := shard.m[key]; ok {
delete(shard.m, key)
dict.lessCount()
return 1
}
return 0
}

func (dict *ConcurrentDict) Len() int {
if dict == nil {
panic("dict is nil!")
}
//todo atomic.LoadInt32原子操作防止data race
return int(atomic.LoadInt32(&dict.count))
}

func (dict *ConcurrentDict) addCount() {
dict.count++
}

func (dict *ConcurrentDict) lessCount() {
dict.count--
}

func hashFnv64(key string) uint64 {
defer func() {
if r := recover(); r != nil {
log.Println(r)
}
}()

hash64 := fnv.New64a()
_, err := hash64.Write([]byte(key))
if err != nil {
panic(err)
}
hashValue := hash64.Sum64()
return hashValue
}
Loading

0 comments on commit 27869ad

Please sign in to comment.