forked from Restream/reindexer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reindexer.go
354 lines (301 loc) · 11.8 KB
/
reindexer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
package reindexer
import (
"context"
"github.com/restream/reindexer/bindings"
_ "github.com/restream/reindexer/bindings/cproto"
"github.com/restream/reindexer/dsl"
// _ "github.com/restream/reindexer/bindings/builtinserver"
)
// Condition types
const (
// Equal '='
EQ = bindings.EQ
// Greater '>'
GT = bindings.GT
// Lower '<'
LT = bindings.LT
// Greater or equal '>=' (GT|EQ)
GE = bindings.GE
// Lower or equal '<'
LE = bindings.LE
// One of set 'IN []'
SET = bindings.SET
// All of set
ALLSET = bindings.ALLSET
// In range
RANGE = bindings.RANGE
// Any value
ANY = bindings.ANY
// Empty value (usualy zero len array)
EMPTY = bindings.EMPTY
// String like pattern
LIKE = bindings.LIKE
)
const (
// ERROR Log level
ERROR = bindings.ERROR
// WARNING Log level
WARNING = bindings.WARNING
// INFO Log level
INFO = bindings.INFO
// TRACE Log level
TRACE = bindings.TRACE
)
// Aggregation funcs
const (
AggAvg = bindings.AggAvg
AggSum = bindings.AggSum
AggFacet = bindings.AggFacet
AggMin = bindings.AggMin
AggMax = bindings.AggMax
)
// Reindexer error codes
const (
ErrCodeOK = bindings.ErrOK
ErrCodeParseSQL = bindings.ErrParseSQL
ErrCodeQueryExec = bindings.ErrQueryExec
ErrCodeParams = bindings.ErrParams
ErrCodeLogic = bindings.ErrLogic
ErrCodeParseJson = bindings.ErrParseJson
ErrCodeParseDSL = bindings.ErrParseDSL
ErrCodeConflict = bindings.ErrConflict
ErrCodeParseBin = bindings.ErrParseBin
ErrCodeForbidden = bindings.ErrForbidden
ErrCodeWasRelock = bindings.ErrWasRelock
ErrCodeNotValid = bindings.ErrNotValid
ErrCodeNetwork = bindings.ErrNetwork
ErrCodeNotFound = bindings.ErrNotFound
ErrCodeStateInvalidated = bindings.ErrStateInvalidated
ErrCodeTimeout = bindings.ErrTimeout
)
var logger Logger = &nullLogger{}
// Reindexer The reindxer state struct
type Reindexer struct {
impl *reindexerImpl
ctx context.Context
}
// IndexDef - Inddex definition struct
type IndexDef bindings.IndexDef
// Error - reindexer Error interface
type Error interface {
Error() string
Code() int
}
// Joinable is an interface for append joined items
type Joinable interface {
Join(field string, subitems []interface{}, context interface{})
}
// JoinHandler it's function for handle join results.
// Returns bool, that indicates whether automatic join strategy still needs to be applied.
// If `useAutomaticJoinStrategy` is false - it means that JoinHandler takes full responsibility of performing join.
// If `useAutomaticJoinStrategy` is true - it means JoinHandler will perform only part of the work, required during join, the rest will be done using automatic join strategy.
// Automatic join strategy is defined as:
// - use Join method to perform join (in case item implements Joinable interface)
// - use reflection to perform join otherwise
type JoinHandler func(field string, item interface{}, subitems []interface{}) (useAutomaticJoinStrategy bool)
type DeepCopy interface {
DeepCopy() interface{}
}
// Logger interface for reindexer
type Logger interface {
Printf(level int, fmt string, msg ...interface{})
}
type nullLogger struct {
}
func (nullLogger) Printf(level int, fmt string, msg ...interface{}) {
}
var (
errNsNotFound = bindings.NewError("rq: Namespace is not found", ErrCodeNotFound)
errNsExists = bindings.NewError("rq: Namespace is already exists", ErrCodeParams)
errInvalidReflection = bindings.NewError("rq: Invalid reflection type of index", ErrCodeParams)
errStorageNotEnabled = bindings.NewError("rq: Storage is not enabled, can't save", ErrCodeLogic)
errIteratorNotReady = bindings.NewError("rq: Iterator not ready. Next() must be called before", ErrCodeLogic)
errJoinUnexpectedField = bindings.NewError("rq: Unexpected join field", ErrCodeParams)
ErrEmptyNamespace = bindings.NewError("rq: empty namespace name", ErrCodeParams)
ErrEmptyFieldName = bindings.NewError("rq: empty field name in filter", ErrCodeParams)
ErrCondType = bindings.NewError("rq: cond type not found", ErrCodeParams)
ErrOpInvalid = bindings.NewError("rq: op is invalid", ErrCodeParams)
ErrNoPK = bindings.NewError("rq: No pk field in struct", ErrCodeParams)
ErrWrongType = bindings.NewError("rq: Wrong type of item", ErrCodeParams)
ErrMustBePointer = bindings.NewError("rq: Argument must be a pointer to element, not element", ErrCodeParams)
ErrNotFound = bindings.NewError("rq: Not found", ErrCodeNotFound)
ErrDeepCopyType = bindings.NewError("rq: DeepCopy() returns wrong type", ErrCodeParams)
)
type AggregationResult struct {
Fields []string `json:"fields"`
Type string `json:"type"`
Value float64 `json:"value"`
Facets []struct {
Values []string `json:"values"`
Count int `json:"count"`
} `json:"facets"`
}
// NewReindex Create new instanse of Reindexer DB
// Returns pointer to created instance
func NewReindex(dsn string, options ...interface{}) *Reindexer {
rx := &Reindexer{
impl: newReindexImpl(dsn, options...),
ctx: context.TODO(),
}
return rx
}
// Status will return current db status
func (db *Reindexer) Status() bindings.Status {
return db.impl.getStatus()
}
// SetLogger sets logger interface for output reindexer logs
func (db *Reindexer) SetLogger(log Logger) {
db.impl.setLogger(log)
}
// Ping checks connection with reindexer
func (db *Reindexer) Ping() error {
return db.impl.ping(db.ctx)
}
func (db *Reindexer) Close() {
db.impl.close()
}
// NamespaceOptions is options for namespace
type NamespaceOptions struct {
// Only in memory namespace
enableStorage bool
// Drop ns on index mismatch error
dropOnIndexesConflict bool
// Drop on file errors
dropOnFileFormatError bool
}
// DefaultNamespaceOptions return defailt namespace options
func DefaultNamespaceOptions() *NamespaceOptions {
return &NamespaceOptions{enableStorage: true}
}
func (opts *NamespaceOptions) NoStorage() *NamespaceOptions {
opts.enableStorage = false
return opts
}
func (opts *NamespaceOptions) DropOnIndexesConflict() *NamespaceOptions {
opts.dropOnIndexesConflict = true
return opts
}
func (opts *NamespaceOptions) DropOnFileFormatError() *NamespaceOptions {
opts.dropOnFileFormatError = true
return opts
}
// OpenNamespace Open or create new namespace and indexes based on passed struct.
// IndexDef fields of struct are marked by `reindex:` tag
func (db *Reindexer) OpenNamespace(namespace string, opts *NamespaceOptions, s interface{}) (err error) {
return db.impl.openNamespace(db.ctx, namespace, opts, s)
}
// RegisterNamespace Register go type against namespace. There are no data and indexes changes will be performed
func (db *Reindexer) RegisterNamespace(namespace string, opts *NamespaceOptions, s interface{}) (err error) {
return db.impl.registerNamespace(namespace, opts, s)
}
// DropNamespace - drop whole namespace from DB
func (db *Reindexer) DropNamespace(namespace string) error {
return db.impl.dropNamespace(db.ctx, namespace)
}
// CloseNamespace - close namespace, but keep storage
func (db *Reindexer) CloseNamespace(namespace string) error {
return db.impl.closeNamespace(db.ctx, namespace)
}
// Upsert (Insert or Update) item to index
// Item must be the same type as item passed to OpenNamespace, or []byte with json
// If the precepts are provided and the item is a pointer, the value pointed by item will be updated
func (db *Reindexer) Upsert(namespace string, item interface{}, precepts ...string) error {
return db.impl.upsert(db.ctx, namespace, item, precepts...)
}
// Insert item to namespace by PK
// Item must be the same type as item passed to OpenNamespace, or []byte with json data
// Return 0, if no item was inserted, 1 if item was inserted
// If the precepts are provided and the item is a pointer, the value pointed by item will be updated
func (db *Reindexer) Insert(namespace string, item interface{}, precepts ...string) (int, error) {
return db.impl.insert(db.ctx, namespace, item, precepts...)
}
// Update item to namespace by PK
// Item must be the same type as item passed to OpenNamespace, or []byte with json data
// Return 0, if no item was updated, 1 if item was updated
// If the precepts are provided and the item is a pointer, the value pointed by item will be updated
func (db *Reindexer) Update(namespace string, item interface{}, precepts ...string) (int, error) {
return db.impl.update(db.ctx, namespace, item, precepts...)
}
// Delete - remove single item from namespace by PK
// Item must be the same type as item passed to OpenNamespace, or []byte with json data
// If the precepts are provided and the item is a pointer, the value pointed by item will be updated
func (db *Reindexer) Delete(namespace string, item interface{}, precepts ...string) error {
return db.impl.delete(db.ctx, namespace, item, precepts...)
}
// ConfigureIndex - congigure index.
// config argument must be struct with index configuration
// Deprecated: Use UpdateIndex instead.
func (db *Reindexer) ConfigureIndex(namespace, index string, config interface{}) error {
return db.impl.configureIndex(db.ctx, namespace, index, config)
}
// AddIndex - add index.
func (db *Reindexer) AddIndex(namespace string, indexDef ...IndexDef) error {
return db.impl.addIndex(db.ctx, namespace, indexDef...)
}
// UpdateIndex - update index.
func (db *Reindexer) UpdateIndex(namespace string, indexDef IndexDef) error {
return db.impl.updateIndex(db.ctx, namespace, indexDef)
}
// DropIndex - drop index.
func (db *Reindexer) DropIndex(namespace, index string) error {
return db.impl.dropIndex(db.ctx, namespace, index)
}
// SetDefaultQueryDebug sets default debug level for queries to namespaces
func (db *Reindexer) SetDefaultQueryDebug(namespace string, level int) error {
return db.impl.setDefaultQueryDebug(db.ctx, namespace, level)
}
// Query Create new Query for building request
func (db *Reindexer) Query(namespace string) *Query {
return db.impl.query(namespace)
}
// ExecSQL make query to database. Query is a SQL statement.
// Return Iterator.
func (db *Reindexer) ExecSQL(query string) *Iterator {
return db.impl.execSQL(db.ctx, query)
}
// ExecSQLToJSON make query to database. Query is a SQL statement.
// Return JSONIterator.
func (db *Reindexer) ExecSQLToJSON(query string) *JSONIterator {
return db.impl.execSQLToJSON(db.ctx, query)
}
// BeginTx - start update transaction
func (db *Reindexer) BeginTx(namespace string) (*Tx, error) {
return db.impl.beginTx(db.ctx, namespace)
}
// MustBeginTx - start update transaction, panic on error
func (db *Reindexer) MustBeginTx(namespace string) *Tx {
return db.impl.mustBeginTx(db.ctx, namespace)
}
// QueryFrom - create query from DSL and execute it
func (db *Reindexer) QueryFrom(d dsl.DSL) (*Query, error) {
return db.impl.queryFrom(d)
}
// GetStats Get local thread reindexer usage stats
// Deprecated: Use SELECT * FROM '#perfstats' to get performance statistics.
func (db *Reindexer) GetStats() bindings.Stats {
return db.impl.getStats()
}
// ResetStats Reset local thread reindexer usage stats
// Deprecated: no longer used.
func (db *Reindexer) ResetStats() {
db.impl.resetStats()
}
// EnableStorage enables persistent storage of data
// Deprecated: storage path should be passed as DSN part to reindexer.NewReindex (""), e.g. reindexer.NewReindexer ("builtin:///tmp/reindex").
func (db *Reindexer) EnableStorage(storagePath string) error {
return db.impl.enableStorage(db.ctx, storagePath)
}
func (db *Reindexer) PutMeta(namespace, key string, data []byte) error {
return db.impl.putMeta(db.ctx, namespace, key, data)
}
func (db *Reindexer) GetMeta(namespace, key string) ([]byte, error) {
return db.impl.getMeta(db.ctx, namespace, key)
}
// WithContext Add context to next method call
func (db *Reindexer) WithContext(ctx context.Context) *Reindexer {
dbC := &Reindexer{
impl: db.impl,
ctx: ctx,
}
return dbC
}