Skip to content

Commit

Permalink
Update version to 2.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
olegator77 committed Jun 8, 2019
1 parent 3de128b commit c6c5c68
Show file tree
Hide file tree
Showing 307 changed files with 13,707 additions and 4,609 deletions.
159 changes: 76 additions & 83 deletions bindings.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package reindexer

import (
"context"
"fmt"
"reflect"
"strconv"
Expand All @@ -19,7 +20,7 @@ const (
modeDelete = bindings.ModeDelete
)

func (db *Reindexer) modifyItem(namespace string, ns *reindexerNamespace, item interface{}, json []byte, mode int, precepts ...string) (count int, err error) {
func (db *reindexerImpl) modifyItem(ctx context.Context, namespace string, ns *reindexerNamespace, item interface{}, json []byte, mode int, precepts ...string) (count int, err error) {

if ns == nil {
ns, err = db.getNS(namespace)
Expand All @@ -39,12 +40,12 @@ func (db *Reindexer) modifyItem(namespace string, ns *reindexerNamespace, item i
return
}

out, err := db.binding.ModifyItem(ns.nsHash, ns.name, format, ser.Bytes(), mode, precepts, stateToken)
out, err := db.binding.ModifyItem(ctx, ns.nsHash, ns.name, format, ser.Bytes(), mode, precepts, stateToken)

if err != nil {
rerr, ok := err.(bindings.Error)
if ok && rerr.Code() == bindings.ErrStateInvalidated {
db.Query(ns.name).Limit(0).Exec().Close()
db.query(ns.name).Limit(0).ExecCtx(ctx).Close()
err = rerr
continue
}
Expand All @@ -67,6 +68,14 @@ func (db *Reindexer) modifyItem(namespace string, ns *reindexerNamespace, item i
ns.cacheLock.Lock()
delete(ns.cacheItems, resultp.id)
ns.cacheLock.Unlock()

if len(precepts) > 0 && (resultp.cptr != 0 || resultp.data != nil) && reflect.TypeOf(item).Kind() == reflect.Ptr {
nsArrEntry := nsArrayEntry{ns, ns.cjsonState.Copy()}
if _, err := unpackItem(&nsArrEntry, &resultp, false, true, item); err != nil {
return 0, err
}
}

return rawQueryParams.count, err
}
return 0, err
Expand Down Expand Up @@ -102,7 +111,7 @@ func packItem(ns *reindexerNamespace, item interface{}, json []byte, ser *cjson.
return
}

func (db *Reindexer) getNS(namespace string) (*reindexerNamespace, error) {
func (db *reindexerImpl) getNS(namespace string) (*reindexerNamespace, error) {
db.lock.RLock()
defer db.lock.RUnlock()
ns, ok := db.ns[strings.ToLower(namespace)]
Expand All @@ -112,13 +121,13 @@ func (db *Reindexer) getNS(namespace string) (*reindexerNamespace, error) {
return ns, nil
}

func (db *Reindexer) PutMeta(namespace, key string, data []byte) error {
return db.binding.PutMeta(namespace, key, string(data))
func (db *reindexerImpl) putMeta(ctx context.Context, namespace, key string, data []byte) error {
return db.binding.PutMeta(ctx, namespace, key, string(data))
}

func (db *Reindexer) GetMeta(namespace, key string) ([]byte, error) {
func (db *reindexerImpl) getMeta(ctx context.Context, namespace, key string) ([]byte, error) {

out, err := db.binding.GetMeta(namespace, key)
out, err := db.binding.GetMeta(ctx, namespace, key)
if err != nil {
return nil, err
}
Expand All @@ -128,41 +137,18 @@ func (db *Reindexer) GetMeta(namespace, key string) ([]byte, error) {
return ret, nil
}

func unpackItem(ns *nsArrayEntry, params *rawResultItemParams, allowUnsafe bool, nonCacheableData bool) (item interface{}, err error) {
useCache := (ns.deepCopyIface || allowUnsafe) && !nonCacheableData && ns.cacheItems != nil
func unpackItem(ns *nsArrayEntry, params *rawResultItemParams, allowUnsafe bool, nonCacheableData bool, item interface{}) (interface{}, error) {
useCache := item == nil && (ns.deepCopyIface || allowUnsafe) && !nonCacheableData && ns.cacheItems != nil
needCopy := ns.deepCopyIface && !allowUnsafe
var err error

if useCache {
ns.cacheLock.RLock()
if citem, ok := ns.cacheItems[params.id]; ok && citem.version == params.version {
item = citem.item
}
ns.cacheLock.RUnlock()
}

if item == nil {
if useCache {
ns.cacheLock.Lock()
if citem, ok := ns.cacheItems[params.id]; ok && citem.version == params.version {
item = citem.item
} else {
item = reflect.New(ns.rtype).Interface()
dec := ns.localCjsonState.NewDecoder()
if params.cptr != 0 {
err = dec.DecodeCPtr(params.cptr, item)
} else if params.data != nil {
err = dec.Decode(params.data, item)
} else {
panic(fmt.Errorf("Internal error while decoding item id %d from ns %s: cptr and data are both null", params.id, ns.name))
}
if err != nil {
ns.cacheLock.Unlock()
return
}
ns.cacheItems[params.id] = cacheItem{item: item, version: params.version}
}
ns.cacheLock.Unlock()
ns.cacheLock.RUnlock()
} else {
ns.cacheLock.RUnlock()
item = reflect.New(ns.rtype).Interface()
dec := ns.localCjsonState.NewDecoder()
if params.cptr != 0 {
Expand All @@ -173,11 +159,29 @@ func unpackItem(ns *nsArrayEntry, params *rawResultItemParams, allowUnsafe bool,
panic(fmt.Errorf("Internal error while decoding item id %d from ns %s: cptr and data are both null", params.id, ns.name))
}
if err != nil {
return
return item, err
}
// Reset needCopy, because item already separate
needCopy = false
ns.cacheLock.Lock()
ns.cacheItems[params.id] = cacheItem{item: item, version: params.version}
ns.cacheLock.Unlock()
}
} else {
if item == nil {
item = reflect.New(ns.rtype).Interface()
}
dec := ns.localCjsonState.NewDecoder()
if params.cptr != 0 {
err = dec.DecodeCPtr(params.cptr, item)
} else if params.data != nil {
err = dec.Decode(params.data, item)
} else {
panic(fmt.Errorf("Internal error while decoding item id %d from ns %s: cptr and data are both null", params.id, ns.name))
}
if err != nil {
return item, err
}
// Reset needCopy, because item already separate
needCopy = false
}

if needCopy {
Expand All @@ -188,10 +192,10 @@ func unpackItem(ns *nsArrayEntry, params *rawResultItemParams, allowUnsafe bool,
}
}

return
return item, err
}

func (db *Reindexer) rawResultToJson(rawResult []byte, jsonName string, totalName string, initJson []byte, initOffsets []int) (json []byte, offsets []int, explain []byte, err error) {
func (db *reindexerImpl) rawResultToJson(rawResult []byte, jsonName string, totalName string, initJson []byte, initOffsets []int) (json []byte, offsets []int, explain []byte, err error) {

ser := newSerializer(rawResult)
rawQueryParams := ser.readRawQueryParams()
Expand Down Expand Up @@ -240,7 +244,7 @@ func (db *Reindexer) rawResultToJson(rawResult []byte, jsonName string, totalNam
return jsonBuf.Bytes(), offsets, explain, nil
}

func (db *Reindexer) prepareQuery(q *Query, asJson bool) (result bindings.RawBuffer, err error) {
func (db *reindexerImpl) prepareQuery(ctx context.Context, q *Query, asJson bool) (result bindings.RawBuffer, err error) {

if ns, err := db.getNS(q.Namespace); err == nil {
q.nsArray = append(q.nsArray, nsArrayEntry{ns, ns.cjsonState.Copy()})
Expand Down Expand Up @@ -301,7 +305,7 @@ func (db *Reindexer) prepareQuery(q *Query, asJson bool) (result bindings.RawBuf
// json iterator not support fetch queries
fetchCount = -1
}
result, err = db.binding.SelectQuery(ser.Bytes(), asJson, q.ptVersions, fetchCount)
result, err = db.binding.SelectQuery(ctx, ser.Bytes(), asJson, q.ptVersions, fetchCount)

if err == nil && result.GetBuf() == nil {
panic(fmt.Errorf("result.Buffer is nil"))
Expand All @@ -310,17 +314,17 @@ func (db *Reindexer) prepareQuery(q *Query, asJson bool) (result bindings.RawBuf
}

// Execute query
func (db *Reindexer) execQuery(q *Query) *Iterator {
result, err := db.prepareQuery(q, false)
func (db *reindexerImpl) execQuery(ctx context.Context, q *Query) *Iterator {
result, err := db.prepareQuery(ctx, q, false)
if err != nil {
return errIterator(err)
}
iter := newIterator(q, result, q.nsArray, q.joinToFields, q.joinHandlers, q.context)
iter := newIterator(ctx, q, result, q.nsArray, q.joinToFields, q.joinHandlers, q.context)
return iter
}

func (db *Reindexer) execJSONQuery(q *Query, jsonRoot string) *JSONIterator {
result, err := db.prepareQuery(q, true)
func (db *reindexerImpl) execJSONQuery(ctx context.Context, q *Query, jsonRoot string) *JSONIterator {
result, err := db.prepareQuery(ctx, q, true)
if err != nil {
return errJSONIterator(err)
}
Expand All @@ -330,10 +334,10 @@ func (db *Reindexer) execJSONQuery(q *Query, jsonRoot string) *JSONIterator {
if err != nil {
return errJSONIterator(err)
}
return newJSONIterator(q, q.json, q.jsonOffsets, explain)
return newJSONIterator(ctx, q, q.json, q.jsonOffsets, explain)
}

func (db *Reindexer) prepareSQL(namespace, query string, asJson bool) (result bindings.RawBuffer, nsArray []nsArrayEntry, err error) {
func (db *reindexerImpl) prepareSQL(ctx context.Context, namespace, query string, asJson bool) (result bindings.RawBuffer, nsArray []nsArrayEntry, err error) {
nsArray = make([]nsArrayEntry, 0, 3)
var ns *reindexerNamespace

Expand All @@ -348,41 +352,19 @@ func (db *Reindexer) prepareSQL(namespace, query string, asJson bool) (result bi
ptVersions = append(ptVersions, ns.localCjsonState.Version^ns.localCjsonState.StateToken)
}

result, err = db.binding.Select(query, asJson, ptVersions, defaultFetchCount)
result, err = db.binding.Select(ctx, query, asJson, ptVersions, defaultFetchCount)
return
}

func (db *Reindexer) execSQL(namespace, query string) *Iterator {
result, nsArray, err := db.prepareSQL(namespace, query, false)
if err != nil {
return errIterator(err)
}
iter := newIterator(nil, result, nsArray, nil, nil, nil)
return iter
}

func (db *Reindexer) execSQLAsJSON(namespace string, query string) *JSONIterator {
result, _, err := db.prepareSQL(namespace, query, true)
if err != nil {
return errJSONIterator(err)
}
defer result.Free()
json, jsonOffsets, explain, err := db.rawResultToJson(result.GetBuf(), namespace, "total", nil, nil)
if err != nil {
return errJSONIterator(err)
}
return newJSONIterator(nil, json, jsonOffsets, explain)
}

// Execute query
func (db *Reindexer) deleteQuery(q *Query) (int, error) {
func (db *reindexerImpl) deleteQuery(ctx context.Context, q *Query) (int, error) {

ns, err := db.getNS(q.Namespace)
if err != nil {
return 0, err
}

result, err := db.binding.DeleteQuery(ns.nsHash, q.ser.Bytes())
result, err := db.binding.DeleteQuery(ctx, ns.nsHash, q.ser.Bytes())
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -413,18 +395,17 @@ func (db *Reindexer) deleteQuery(q *Query) (int, error) {
}

// Execute query
func (db *Reindexer) updateQuery(q *Query) (int, error) {
func (db *reindexerImpl) updateQuery(ctx context.Context, q *Query) *Iterator {

ns, err := db.getNS(q.Namespace)
if err != nil {
return 0, err
return errIterator(err)
}

result, err := db.binding.UpdateQuery(ns.nsHash, q.ser.Bytes())
result, err := db.binding.UpdateQuery(ctx, ns.nsHash, q.ser.Bytes())
if err != nil {
return 0, err
return errIterator(err)
}
defer result.Free()

ser := newSerializer(result.GetBuf())
// skip total count
Expand All @@ -447,10 +428,14 @@ func (db *Reindexer) updateQuery(q *Query) (int, error) {
panic("Internal error: data after end of update query result")
}

return rawQueryParams.count, err
return newIterator(ctx, q, result, q.nsArray, nil, nil, nil)
}

func (db *Reindexer) resetCaches() {
func (db *Reindexer) ResetCaches() {
db.impl.resetCachesCtx(db.ctx)
}

func (db *reindexerImpl) resetCachesCtx(ctx context.Context) {
db.lock.RLock()
nsArray := make([]*reindexerNamespace, 0, len(db.ns))
for _, ns := range db.ns {
Expand All @@ -464,10 +449,14 @@ func (db *Reindexer) resetCaches() {
}
ns.cacheLock.Unlock()
ns.cjsonState.Reset()
db.Query(ns.name).Limit(0).Exec().Close()
db.query(ns.name).Limit(0).ExecCtx(ctx).Close()
}
}

func (db *reindexerImpl) resetCaches() {
db.resetCachesCtx(context.Background())
}

func WithCgoLimit(cgoLimit int) interface{} {
return bindings.OptionCgoLimit{cgoLimit}
}
Expand All @@ -483,3 +472,7 @@ func WithRetryAttempts(read int, write int) interface{} {
func WithServerConfig(startupTimeout time.Duration, serverConfig *config.ServerConfig) interface{} {
return bindings.OptionBuiltinWithServer{ServerConfig: serverConfig, StartupTimeout: startupTimeout}
}

func WithTimeouts(loginTimeout time.Duration, requestTimeout time.Duration) interface{} {
return bindings.OptionTimeouts{loginTimeout, requestTimeout}
}
Loading

0 comments on commit c6c5c68

Please sign in to comment.