Skip to content

Commit

Permalink
Updated to version 1.0.3, standalone mode added (alpha)
Browse files Browse the repository at this point in the history
  • Loading branch information
olegator77 committed Feb 13, 2018
1 parent dca02ac commit dadc029
Show file tree
Hide file tree
Showing 161 changed files with 10,178 additions and 5,018 deletions.
3 changes: 3 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ PointerAlignment: Left
DisableFormat: false
ExperimentalAutoDetectBinPacking: false
ForEachMacros: [ foreach, Q_FOREACH, BOOST_FOREACH ]
FixNamespaceComments: true
IncludeCategories:
- Regex: '^<.*\.h>'
Priority: 1
Expand Down Expand Up @@ -80,6 +81,7 @@ PointerAlignment: Left
ReflowComments: true
SortIncludes: true
SpaceAfterCStyleCast: false
SpaceAfterTemplateKeyword: true
SpaceBeforeAssignmentOperators: true
SpaceBeforeParens: ControlStatements
SpaceInEmptyParentheses: false
Expand All @@ -89,6 +91,7 @@ SpacesInContainerLiterals: true
SpacesInCStyleCastParentheses: false
SpacesInParentheses: false
SpacesInSquareBrackets: false
SortUsingDeclarations: false
Standard: Auto
TabWidth: 4
UseTab: Always
Expand Down
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
cmake_minimum_required(VERSION 2.8)
add_subdirectory(cpp_src)
enable_testing()
add_subdirectory(cpp_src)
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,4 @@
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
119 changes: 52 additions & 67 deletions bindings.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//go:generate make -j4 -C cpp_src
package reindexer

import (
Expand All @@ -13,8 +12,6 @@ import (
"github.com/restream/reindexer/cjson"
)

type CInt bindings.CInt

const (
modeInsert = bindings.ModeInsert
modeUpdate = bindings.ModeUpdate
Expand All @@ -40,10 +37,10 @@ func (db *Reindexer) modifyItem(namespace string, ns *reindexerNamespace, item i

out, err := db.binding.ModifyItem(ser.Bytes(), mode)

defer out.Free()
if err != nil {
return 0, err
}
defer out.Free()

rdSer := newSerializer(out.GetBuf())
rawQueryParams := rdSer.readRawQueryParams()
Expand All @@ -63,7 +60,7 @@ func (db *Reindexer) modifyItem(namespace string, ns *reindexerNamespace, item i

func packItem(ns *reindexerNamespace, item interface{}, json []byte, ser *cjson.Serializer, precepts ...string) error {

ser.PutString(ns.name)
ser.PutVString(ns.name)

if item != nil {
json, _ = item.([]byte)
Expand All @@ -78,27 +75,27 @@ func packItem(ns *reindexerNamespace, item interface{}, json []byte, ser *cjson.
panic(ErrWrongType)
}

ser.PutCInt(bindings.FormatCJson)
ser.PutVarCUInt(bindings.FormatCJson)

// reserve int for save len
pos := len(ser.Bytes())
ser.PutCInt(0)
ser.PutUInt32(0)
pos1 := len(ser.Bytes())

enc := ns.cjsonState.NewEncoder()
if err := enc.Encode(item, ser); err != nil {
return err
}

*(*CInt)(unsafe.Pointer(&ser.Bytes()[pos])) = CInt(len(ser.Bytes()) - pos1)
*(*uint32)(unsafe.Pointer(&ser.Bytes()[pos])) = uint32(len(ser.Bytes()) - pos1)
} else {
ser.PutCInt(bindings.FormatJson)
ser.PutVarCUInt(bindings.FormatJson)
ser.PutBytes(json)
}

ser.PutCInt(len(precepts))
ser.PutVarCUInt(len(precepts))
for _, precept := range precepts {
ser.PutString(precept)
ser.PutVString(precept)
}

return nil
Expand All @@ -115,33 +112,20 @@ func (db *Reindexer) getNS(namespace string) (*reindexerNamespace, error) {
}

func (db *Reindexer) putMeta(namespace, key string, data []byte) error {

var ser cjson.Serializer
ser.PutString(namespace)
ser.PutString(key)
ser.PutBytes(data)

return db.binding.PutMeta(ser.Bytes())
return db.binding.PutMeta(namespace, key, string(data))
}

func (db *Reindexer) getMeta(namespace, key string) ([]byte, error) {

var ser cjson.Serializer
ser.PutString(namespace)
ser.PutString(key)

out, err := db.binding.GetMeta(ser.Bytes())
defer out.Free()

out, err := db.binding.GetMeta(namespace, key)
if err != nil {
return nil, err
}
defer out.Free()

rdSer := newSerializer(out.GetBuf())
b := rdSer.GetBytes()
bb := make([]byte, len(b), cap(b))
copy(bb, b)
return bb, nil
s := rdSer.GetVString()
return []byte(s), nil
}

func unpackItem(ns *reindexerNamespace, params rawResultItemParams, allowUnsafe bool) (item interface{}, err error) {
Expand All @@ -164,7 +148,14 @@ func unpackItem(ns *reindexerNamespace, params rawResultItemParams, allowUnsafe
} else {
item = reflect.New(ns.rtype).Interface()
dec := ns.cjsonState.NewDecoder()
if err = dec.Decode(params.cptr, item); err != nil {
if params.cptr != 0 {
err = dec.DecodeCPtr(params.cptr, item)
} else if params.data != nil {
err = dec.Decode(params.data, item)
} else {
panic(fmt.Errorf("Internal error while decoding item id %d from ns %s: cptr and data are both null", params.id, ns.name))
}
if err != nil {
ns.cacheLock.Unlock()
return
}
Expand All @@ -174,7 +165,14 @@ func unpackItem(ns *reindexerNamespace, params rawResultItemParams, allowUnsafe
} else {
item = reflect.New(ns.rtype).Interface()
dec := ns.cjsonState.NewDecoder()
if err = dec.Decode(params.cptr, item); err != nil {
if params.cptr != 0 {
err = dec.DecodeCPtr(params.cptr, item)
} else if params.data != nil {
err = dec.Decode(params.data, item)
} else {
panic(fmt.Errorf("Internal error while decoding item id %d from ns %s: cptr and data are both null", params.id, ns.name))
}
if err != nil {
return
}
// Reset needCopy, because item already separate
Expand All @@ -193,28 +191,6 @@ func unpackItem(ns *reindexerNamespace, params rawResultItemParams, allowUnsafe
return
}

func (db *Reindexer) updatePayloadTypes(ns []*reindexerNamespace, ser *resultSerializer, rawQueryParams rawResultQueryParams) {
if rawQueryParams.nsCount != len(ns) {
panic(fmt.Errorf("Internal error: wrong namespaces count. Expected %d, got %d", len(ns), rawQueryParams.nsCount))
}
for i := 0; i < rawQueryParams.nsCount; i++ {
if ns[i].cjsonState.PayloadTypeVersion() != rawQueryParams.nsVersions[i] {
b, err := db.binding.GetPayloadType(ser.Bytes(), i)
if err != nil {
panic(err)
}
ns[i].cjsonState.ReadPayloadType(b.GetBuf())
if ns[i].cjsonState.PayloadTypeVersion() != rawQueryParams.nsVersions[i] {
panic(fmt.Errorf(
"Internal error: wrong tagsMatcher version. Expected %d, got %d",
rawQueryParams.nsVersions[i],
ns[i].cjsonState.PayloadTypeVersion()),
)
}
}
}
}

func (db *Reindexer) rawResultToJson(rawResult []byte, jsonName string, totalName string, initJson []byte, initOffsets []int) (json []byte, offsets []int, err error) {

ser := newSerializer(rawResult)
Expand Down Expand Up @@ -247,14 +223,14 @@ func (db *Reindexer) rawResultToJson(rawResult []byte, jsonName string, totalNam
jsonBuf.WriteString("\":[")

for i := 0; i < rawQueryParams.count; i++ {
_ = ser.readRawtItemParams()
item := ser.readRawtItemParams()
if i != 0 {
jsonBuf.WriteString(",")
}
offsets = append(offsets, len(jsonBuf.Bytes()))
jsonBuf.Write(ser.GetBytes())
jsonBuf.Write(item.data)

if ser.GetCInt() != 0 {
if ser.GetVarUInt() != 0 {
panic("Sorry, not implemented: Can't return join query results as json")
}
}
Expand All @@ -275,29 +251,34 @@ func (db *Reindexer) prepareQuery(q *Query, asJson bool) (result bindings.RawBuf
}

ser := q.ser
ser.PutCInt(queryEnd)
ser.PutVarCUInt(queryEnd)
for _, sq := range q.joinQueries {
ser.PutCInt(sq.joinType)
ser.PutVarCUInt(sq.joinType)
ser.Append(sq.ser)
ser.PutCInt(queryEnd)
ser.PutVarCUInt(queryEnd)
if ns, err := db.getNS(sq.Namespace); err == nil {
q.nsArray = append(q.nsArray, ns)
} else {
return nil, err
}
}
for _, sq := range q.mergedQueries {
ser.PutCInt(merge)
ser.PutVarCUInt(merge)
ser.Append(sq.ser)
ser.PutCInt(queryEnd)
ser.PutVarCUInt(queryEnd)
if ns, err := db.getNS(sq.Namespace); err == nil {
q.nsArray = append(q.nsArray, ns)
} else {
return nil, err
}
}

result, err = db.binding.SelectQuery(asJson, ser.Bytes())
ptVersions := make([]int32, 0, 16)
for _, ns := range q.nsArray {
ptVersions = append(ptVersions, ns.cjsonState.PayloadTypeVersion())
}

result, err = db.binding.SelectQuery(ser.Bytes(), asJson, ptVersions)

if err == nil && result.GetBuf() == nil {
panic(fmt.Errorf("result.Buffer is nil"))
Expand All @@ -312,7 +293,6 @@ func (db *Reindexer) execQuery(q *Query) *Iterator {
return errIterator(err)
}
iter := newIterator(q, result, q.nsArray, q.joinToFields, q.joinHandlers, q.context)
db.updatePayloadTypes(q.nsArray, &iter.ser, iter.rawQueryParams)
return iter
}

Expand Down Expand Up @@ -345,7 +325,13 @@ func (db *Reindexer) prepareSQL(namespace, query string, asJson bool) (result bi
return
}
}
result, err = db.binding.Select(query, asJson)

ptVersions := make([]int32, 0, 16)
for _, ns := range nsArray {
ptVersions = append(ptVersions, ns.cjsonState.PayloadTypeVersion())
}

result, err = db.binding.Select(query, asJson, ptVersions)
return
}

Expand All @@ -355,7 +341,6 @@ func (db *Reindexer) execSQL(namespace, query string) *Iterator {
return errIterator(err)
}
iter := newIterator(nil, result, nsArray, nil, nil, nil)
db.updatePayloadTypes(nsArray, &iter.ser, iter.rawQueryParams)
return iter
}

Expand All @@ -381,10 +366,10 @@ func (db *Reindexer) deleteQuery(q *Query) (int, error) {
}

result, err := db.binding.DeleteQuery(q.ser.Bytes())
defer result.Free()
if err != nil {
return 0, err
}
defer result.Free()

ser := newSerializer(result.GetBuf())
// skip total count
Expand All @@ -393,7 +378,7 @@ func (db *Reindexer) deleteQuery(q *Query) (int, error) {
ns.cacheLock.Lock()
for i := 0; i < rawQueryParams.count; i++ {
params := ser.readRawtItemParams()
jres := ser.GetCInt()
jres := ser.GetVarUInt()
if jres != 0 {
panic("Internal error: joined items in delete query result")
}
Expand Down
Loading

0 comments on commit dadc029

Please sign in to comment.