Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cockroach store feature completion #1358

Merged
merged 3 commits into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 98 additions & 5 deletions store/cockroach/cockroach.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ type sqlStore struct {
database string
table string

list *sql.Stmt
readOne *sql.Stmt
write *sql.Stmt
delete *sql.Stmt
list *sql.Stmt
readOne *sql.Stmt
readMany *sql.Stmt
readOffset *sql.Stmt
write *sql.Stmt
delete *sql.Stmt

options store.Options
}
Expand Down Expand Up @@ -92,7 +94,9 @@ func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record,
o(&options)
}

// TODO: make use of options.Prefix using WHERE key LIKE = ?
if options.Prefix || options.Suffix {
return s.read(key, options)
}

var records []*store.Record
var timehelper pq.NullTime
Expand Down Expand Up @@ -120,6 +124,61 @@ func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record,
return records, nil
}

// Read Many records
func (s *sqlStore) read(key string, options store.ReadOptions) ([]*store.Record, error) {
pattern := "%"
if options.Prefix {
pattern = key + pattern
}
if options.Suffix {
pattern = pattern + key
}
var rows *sql.Rows
var err error
if options.Limit != 0 {
rows, err = s.readOffset.Query(pattern, options.Limit, options.Offset)
} else {
rows, err = s.readMany.Query(pattern)
}
if err != nil {
if err == sql.ErrNoRows {
return []*store.Record{}, nil
}
return []*store.Record{}, errors.Wrap(err, "sqlStore.read failed")
}
defer rows.Close()
var records []*store.Record
var timehelper pq.NullTime

for rows.Next() {
record := &store.Record{}
if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil {
return records, err
}
if timehelper.Valid {
if timehelper.Time.Before(time.Now()) {
// record has expired
go s.Delete(record.Key)
} else {
record.Expiry = time.Until(timehelper.Time)
records = append(records, record)
}
} else {
records = append(records, record)
}
}
rowErr := rows.Close()
if rowErr != nil {
// transaction rollback or something
return records, rowErr
}
if err := rows.Err(); err != nil {
return records, err
}

return records, nil
}

// Write records
func (s *sqlStore) Write(r *store.Record, opts ...store.WriteOption) error {
var err error
Expand Down Expand Up @@ -174,16 +233,44 @@ func (s *sqlStore) initDB() error {
return errors.Wrap(err, "Couldn't create table")
}

// Create Index
_, err = s.db.Exec(fmt.Sprintf(`CREATE INDEX IF NOT EXISTS "%s" ON %s.%s USING btree ("key")`, "key_index_"+s.table, s.database, s.table))
if err != nil {
return err
}

list, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s;", s.database, s.table))
if err != nil {
return errors.Wrap(err, "List statement couldn't be prepared")
}
if s.list != nil {
s.list.Close()
}
s.list = list
readOne, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key = $1;", s.database, s.table))
if err != nil {
return errors.Wrap(err, "ReadOne statement couldn't be prepared")
}
if s.readOne != nil {
s.readOne.Close()
}
s.readOne = readOne
readMany, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1;", s.database, s.table))
if err != nil {
return errors.Wrap(err, "ReadMany statement couldn't be prepared")
}
if s.readMany != nil {
s.readMany.Close()
}
s.readMany = readMany
readOffset, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1 ORDER BY key DESC LIMIT $2 OFFSET $3;", s.database, s.table))
if err != nil {
return errors.Wrap(err, "ReadOffset statement couldn't be prepared")
}
if s.readOffset != nil {
s.readOffset.Close()
}
s.readOffset = readOffset
write, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO %s.%s(key, value, expiry)
VALUES ($1, $2::bytea, $3)
ON CONFLICT (key)
Expand All @@ -192,11 +279,17 @@ func (s *sqlStore) initDB() error {
if err != nil {
return errors.Wrap(err, "Write statement couldn't be prepared")
}
if s.write != nil {
s.write.Close()
}
s.write = write
delete, err := s.db.Prepare(fmt.Sprintf("DELETE FROM %s.%s WHERE key = $1;", s.database, s.table))
if err != nil {
return errors.Wrap(err, "Delete statement couldn't be prepared")
}
if s.delete != nil {
s.delete.Close()
}
s.delete = delete

return nil
Expand Down
25 changes: 21 additions & 4 deletions store/cockroach/cockroach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ func TestSQL(t *testing.T) {
connection := fmt.Sprintf(
"host=%s port=%d user=%s sslmode=disable dbname=%s",
"localhost",
5432,
"jake",
26257,
"root",
"test",
)
db, err := sql.Open("postgres", connection)
Expand All @@ -32,6 +32,10 @@ func TestSQL(t *testing.T) {
store.Nodes(connection),
)

if err := sqlStore.Init(); err != nil {
t.Fatal(err)
}

keys, err := sqlStore.List()
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -74,7 +78,7 @@ func TestSQL(t *testing.T) {
err = sqlStore.Write(&store.Record{
Key: "test",
Value: []byte("bar"),
Expiry: time.Minute,
Expiry: time.Second * 10,
})
if err != nil {
t.Error(err)
Expand All @@ -89,7 +93,7 @@ func TestSQL(t *testing.T) {
t.Error("Expected bar, got ", string(records[0].Value))
}

time.Sleep(61 * time.Second)
time.Sleep(11 * time.Second)
_, err = sqlStore.Read("test")
switch err {
case nil:
Expand All @@ -99,4 +103,17 @@ func TestSQL(t *testing.T) {
case store.ErrNotFound:
break
}
sqlStore.Delete("bar")
sqlStore.Write(&store.Record{Key: "aaa", Value: []byte("bbb"), Expiry: 5 * time.Second})
sqlStore.Write(&store.Record{Key: "aaaa", Value: []byte("bbb"), Expiry: 5 * time.Second})
sqlStore.Write(&store.Record{Key: "aaaaa", Value: []byte("bbb"), Expiry: 5 * time.Second})
results, err := sqlStore.Read("a", store.ReadPrefix())
if len(results) != 3 {
t.Fatal("Results should have returned 3 records")
}
time.Sleep(6 * time.Second)
results, err = sqlStore.Read("a", store.ReadPrefix())
if len(results) != 0 {
t.Fatal("Results should have returned 0 records")
}
}
10 changes: 10 additions & 0 deletions store/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,16 @@ func basictest(s store.Store, t *testing.T) {
t.Error("Expiry options were not effective")
}
}
s.Write(&store.Record{Key: "a", Value: []byte("a")})
s.Write(&store.Record{Key: "aa", Value: []byte("aa")})
s.Write(&store.Record{Key: "aaa", Value: []byte("aaa")})
if results, err := s.Read("b", store.ReadPrefix()); err != nil {
t.Error(err)
} else {
if len(results) != 0 {
t.Errorf("Expected 0 results, got %d", len(results))
}
}

s.Init()
for i := 0; i < 10; i++ {
Expand Down