Skip to content

Commit

Permalink
Updated to version 1.9.6
Browse files Browse the repository at this point in the history
  • Loading branch information
olegator77 committed Sep 5, 2018
1 parent 861f0c4 commit 076baef
Show file tree
Hide file tree
Showing 165 changed files with 3,896 additions and 1,683 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,4 @@
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
52 changes: 39 additions & 13 deletions bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"fmt"
"reflect"
"strconv"
"strings"
"time"
"unsafe"

"github.com/restream/reindexer/bindings"
"github.com/restream/reindexer/bindings/builtinserver/config"
"github.com/restream/reindexer/cjson"
)

Expand Down Expand Up @@ -104,7 +107,7 @@ func packItem(ns *reindexerNamespace, item interface{}, json []byte, ser *cjson.
func (db *Reindexer) getNS(namespace string) (*reindexerNamespace, error) {
db.lock.RLock()
defer db.lock.RUnlock()
ns, ok := db.ns[namespace]
ns, ok := db.ns[strings.ToLower(namespace)]
if !ok {
return nil, errNsNotFound
}
Expand Down Expand Up @@ -240,9 +243,6 @@ func (db *Reindexer) rawResultToJson(rawResult []byte, jsonName string, totalNam
}

func (db *Reindexer) prepareQuery(q *Query, asJson bool) (result bindings.RawBuffer, err error) {
if len(q.joinQueries) != 0 && len(q.mergedQueries) != 0 {
return nil, ErrMergeAndJoinInOneQuery
}

if ns, err := db.getNS(q.Namespace); err == nil {
q.nsArray = append(q.nsArray, nsArrayEntry{ns, ns.cjsonState.Copy()})
Expand All @@ -251,28 +251,50 @@ func (db *Reindexer) prepareQuery(q *Query, asJson bool) (result bindings.RawBuf
}

ser := q.ser
ser.PutVarCUInt(queryEnd)
for _, sq := range q.joinQueries {
ser.PutVarCUInt(sq.joinType)
ser.Append(sq.ser)
ser.PutVarCUInt(queryEnd)
for _, sq := range q.mergedQueries {
if ns, err := db.getNS(sq.Namespace); err == nil {
q.nsArray = append(q.nsArray, nsArrayEntry{ns, ns.cjsonState.Copy()})
} else {
return nil, err
}
}
for _, sq := range q.mergedQueries {
ser.PutVarCUInt(merge)
ser.Append(sq.ser)
ser.PutVarCUInt(queryEnd)

for _, sq := range q.joinQueries {
if ns, err := db.getNS(sq.Namespace); err == nil {
q.nsArray = append(q.nsArray, nsArrayEntry{ns, ns.cjsonState.Copy()})
} else {
return nil, err
}
}

for _, mq := range q.mergedQueries {
for _, sq := range mq.joinQueries {
if ns, err := db.getNS(sq.Namespace); err == nil {
q.nsArray = append(q.nsArray, nsArrayEntry{ns, ns.cjsonState.Copy()})
} else {
return nil, err
}
}
}

ser.PutVarCUInt(queryEnd)
for _, sq := range q.joinQueries {
ser.PutVarCUInt(sq.joinType)
ser.Append(sq.ser)
ser.PutVarCUInt(queryEnd)
}

for _, mq := range q.mergedQueries {
ser.PutVarCUInt(merge)
ser.Append(mq.ser)
ser.PutVarCUInt(queryEnd)
for _, sq := range mq.joinQueries {
ser.PutVarCUInt(sq.joinType)
ser.Append(sq.ser)
ser.PutVarCUInt(queryEnd)
}
}

ptVersions := make([]int32, 0, 16)
for _, ns := range q.nsArray {
ptVersions = append(ptVersions, ns.localCjsonState.Version^ns.localCjsonState.CacheToken)
Expand Down Expand Up @@ -422,3 +444,7 @@ func WithConnPoolSize(connPoolSize int) interface{} {
func WithRetryAttempts(read int, write int) interface{} {
return bindings.OptionRetryAttempts{read, write}
}

func WithServerConfig(startupTimeout time.Duration, serverConfig *config.ServerConfig) interface{} {
return bindings.OptionBuiltinWithServer{ServerConfig: serverConfig, StartupTimeout: startupTimeout}
}
9 changes: 7 additions & 2 deletions bindings/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func str2c(str string) C.reindexer_string {
hdr := (*reflect.StringHeader)(unsafe.Pointer(&str))
return C.reindexer_string{p: unsafe.Pointer(hdr.Data), n: C.int(hdr.Len)}
}

func err2go(ret C.reindexer_error) error {
if ret.what != nil {
defer C.free(unsafe.Pointer(ret.what))
Expand Down Expand Up @@ -114,20 +115,24 @@ func bool2cint(v bool) C.int {
return 0
}

func (binding *Builtin) SetReindexerInstance(p unsafe.Pointer) {
C.change_reindexer_instance(p)
}

func (binding *Builtin) Init(u *url.URL, options ...interface{}) error {

cgoLimit := defCgoLimit
for _, option := range options {
switch v := option.(type) {
case bindings.OptionCgoLimit:
cgoLimit = v.CgoLimit
case bindings.OptionBuiltinWithServer:
default:
fmt.Printf("Unknown builtin option: %v\n", option)
}
}

binding.cgoLimiter = make(chan struct{}, cgoLimit)

if len(u.Path) != 0 && u.Path != "/" {
err := binding.EnableStorage(u.Path)
if err != nil {
Expand Down Expand Up @@ -232,8 +237,8 @@ func CGoLogger(level int, msg string) {
func (binding *Builtin) EnableLogger(log bindings.Logger) {
logger = log
C.reindexer_enable_go_logger()

}

func (binding *Builtin) DisableLogger() {
C.reindexer_disable_go_logger()
logger = nil
Expand Down
169 changes: 169 additions & 0 deletions bindings/builtinserver/builtinserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package builtinserver

// #include "server/cbinding/server_c.h"
// #include <stdlib.h>
import "C"
import (
"fmt"
"net/url"
"reflect"
"time"
"unsafe"

"github.com/restream/reindexer/bindings"
"github.com/restream/reindexer/bindings/builtin"
"github.com/restream/reindexer/bindings/builtinserver/config"
)

var defaultStartupTimeout time.Duration = time.Minute * 3

func init() {
C.init_reindexer_server()
bindings.RegisterBinding("builtinserver", new(BuiltinServer))
}

func err2go(ret C.reindexer_error) error {
if ret.what != nil {
defer C.free(unsafe.Pointer(ret.what))
return bindings.NewError("rq:"+C.GoString(ret.what), int(ret.code))
}
return nil
}

func str2c(str string) C.reindexer_string {
hdr := (*reflect.StringHeader)(unsafe.Pointer(&str))
return C.reindexer_string{p: unsafe.Pointer(hdr.Data), n: C.int(hdr.Len)}
}

func checkStorageReady() bool {
return int(C.check_server_ready()) == 1
}

type BuiltinServer struct {
builtin bindings.RawBinding
ready chan bool
}

func (server *BuiltinServer) Init(u *url.URL, options ...interface{}) error {
server.builtin = &builtin.Builtin{}

startupTimeout := defaultStartupTimeout
serverCfg := config.DefaultServerConfig()

for _, option := range options {
switch v := option.(type) {
case bindings.OptionCgoLimit:
case bindings.OptionBuiltinWithServer:
if v.StartupTimeout != 0 {
startupTimeout = v.StartupTimeout
}
if v.ServerConfig != nil {
serverCfg = v.ServerConfig
}
default:
fmt.Printf("Unknown builtinserver option: %v\n", option)
}
}

yamlStr, err := serverCfg.GetYamlString()
if err != nil {
return err
}

go func() {
err := err2go(C.start_reindexer_server(str2c(yamlStr)))
if err != nil {
panic(err)
}
}()

tTimeout := time.Now().Add(startupTimeout)
for !checkStorageReady() {
if time.Now().After(tTimeout) {
panic(bindings.NewError("Server startup timeout expired.", bindings.ErrLogic))
}
time.Sleep(time.Second)
}

pass, _ := u.User.Password()
server.builtin.(*builtin.Builtin).SetReindexerInstance(
unsafe.Pointer(C.get_reindexer_instance(str2c(u.Host), str2c(u.User.Username()), str2c(pass))),
)

url := *u
url.Path = ""

if err := server.builtin.Init(&url, options...); err != nil {
return err
}

return nil
}

func (server *BuiltinServer) OpenNamespace(namespace string, enableStorage, dropOnFileFormatError bool, cacheMode uint8) error {
return server.builtin.OpenNamespace(namespace, enableStorage, dropOnFileFormatError, cacheMode)
}

func (server *BuiltinServer) CloseNamespace(namespace string) error {
return server.builtin.CloseNamespace(namespace)
}

func (server *BuiltinServer) DropNamespace(namespace string) error {
return server.builtin.DropNamespace(namespace)
}

func (server *BuiltinServer) EnableStorage(namespace string) error {
return server.builtin.EnableStorage(namespace)
}

func (server *BuiltinServer) AddIndex(namespace, index, jsonPath, indexType, fieldType string, opts bindings.IndexOptions, collateMode int, sortOrderStr string) error {
return server.builtin.AddIndex(namespace, index, jsonPath, indexType, fieldType, opts, collateMode, sortOrderStr)
}

func (server *BuiltinServer) DropIndex(namespace, index string) error {
return server.builtin.DropIndex(namespace, index)
}

func (server *BuiltinServer) ConfigureIndex(namespace, index, config string) error {
return server.ConfigureIndex(namespace, index, config)
}

func (server *BuiltinServer) PutMeta(namespace, key, data string) error {
return server.builtin.PutMeta(namespace, key, data)
}

func (server *BuiltinServer) GetMeta(namespace, key string) (bindings.RawBuffer, error) {
return server.builtin.GetMeta(namespace, key)
}

func (server *BuiltinServer) ModifyItem(nsHash int, data []byte, mode int) (bindings.RawBuffer, error) {
return server.builtin.ModifyItem(nsHash, data, mode)
}

func (server *BuiltinServer) Select(query string, withItems bool, ptVersions []int32, fetchCount int) (bindings.RawBuffer, error) {
return server.builtin.Select(query, withItems, ptVersions, fetchCount)
}

func (server *BuiltinServer) SelectQuery(rawQuery []byte, withItems bool, ptVersions []int32, fetchCount int) (bindings.RawBuffer, error) {
return server.builtin.SelectQuery(rawQuery, withItems, ptVersions, fetchCount)
}

func (server *BuiltinServer) DeleteQuery(nsHash int, rawQuery []byte) (bindings.RawBuffer, error) {
return server.builtin.DeleteQuery(nsHash, rawQuery)
}

func (server *BuiltinServer) Commit(namespace string) error {
return server.builtin.Commit(namespace)
}

func (server *BuiltinServer) EnableLogger(logger bindings.Logger) {
server.builtin.EnableLogger(logger)
}

func (server *BuiltinServer) DisableLogger() {
server.builtin.DisableLogger()
}

func (server *BuiltinServer) Ping() error {
return server.builtin.Ping()
}
9 changes: 9 additions & 0 deletions bindings/builtinserver/builtinserver_posix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// +build !windows
//go:generate sh -c "cd ../.. && mkdir -p build && cd build && cmake -DLINK_RESOURCES=On -DWITH_GPERF=Off .. && make reindexer_server_library -j4"

package builtinserver

// #cgo CXXFLAGS: -std=c++11 -g -O2 -Wall -Wpedantic -Wextra -I../../cpp_src
// #cgo CFLAGS: -std=c99 -g -O2 -Wall -Wpedantic -Wno-unused-variable -I../../cpp_src
// #cgo LDFLAGS: -L${SRCDIR}/../../build/cpp_src/ -L${SRCDIR}/../../build/cpp_src/server/ -lreindexer_server_library -lresources -lreindexer -lleveldb -lsnappy -lstdc++ -lm -g
import "C"
9 changes: 9 additions & 0 deletions bindings/builtinserver/builtinserver_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// +build windows
//go:generate cmd /c cd ..\.. && mkdir build & cd build && cmake -G "MinGW Makefiles" -DLINK_RESOURCES=On -DWITH_GPERF=Off -DCMAKE_BUILD_TYPE=Release .. && cmake --build . --target reindexer reindexer_server_library -- -j4

package builtin

// #cgo CXXFLAGS: -std=c++11 -g -O2 -Wall -Wpedantic -Wextra -I../../cpp_src
// #cgo CFLAGS: -std=c99 -g -O2 -Wall -Wpedantic -Wno-unused-variable -I../../cpp_src
// #cgo LDFLAGS: -L${SRCDIR}/../../build/cpp_src/ -L${SRCDIR}/../../build/cpp_src/server/ -lreindexer -lreindexer_server_library -lresources -lleveldb -lsnappy -lstdc++ -g -lshlwapi -ldbghelp
import "C"
Loading

0 comments on commit 076baef

Please sign in to comment.