forked from bloomberg/comdb2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
default_consumer_v1.1.lua
231 lines (213 loc) · 5.88 KB
/
default_consumer_v1.1.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
local function bad_options(err)
return -200, err
end
local function register_failed()
return -201, [[failed 'register']]
end
local function begin_failed()
return -202, [[failed 'begin']]
end
local function commit_failed()
return -202, [[failed 'commit']]
end
local function define_emit_columns(opt)
local col
local cols = {}
col = {}
col.name = [[comdb2_event]]
col.type = [[text]]
table.insert(cols, col)
if opt.with_id then
col = {}
col.name = [[comdb2_id]]
col.type = [[blob]]
table.insert(cols, col)
end
if opt.with_tid then
col = {}
col.name = [[comdb2_tid]]
col.type = [[integer]]
table.insert(cols, col)
end
if opt.with_sequence then
col = {}
col.name = [[comdb2_sequence]]
col.type = [[integer]]
table.insert(cols, col)
end
if opt.with_epoch then
col = {}
col.name = [[comdb2_epoch]]
col.type = [[integer]]
table.insert(cols, col)
end
local sql =
[[SELECT DISTINCT c.columnname name, c.type type ]]..
[[FROM comdb2_columns c ]]..
[[JOIN comdb2_triggers t WHERE ]]..
[[c.tablename = t.tbl_name AND ]]..
[[c.columnname = t.col AND ]]..
[[t.name = @name]]
local stmt = db:prepare(sql)
stmt:bind([[name]], db:spname())
local row = stmt:fetch()
while row do
table.insert(cols, row)
row = stmt:fetch()
end
stmt:close()
db:num_columns(#cols)
for i, row in ipairs(cols) do
db:column_name(row.name, i)
db:column_type(row.type, i)
end
end
local function get_event(consumer, opt)
if opt.poll_timeout == nil then
return consumer:get()
end
local e = consumer:poll(opt.poll_timeout)
while e == nil do
consumer:emit({comdb2_event = [[poll_timeout]]})
e = consumer:poll(opt.poll_timeout)
end
return e
end
local function emit(event, opt, obj, out, type)
out.comdb2_event = type
if opt.with_id then
out.comdb2_id = event.id
end
if opt.with_tid then
out.comdb2_tid = event.tid
end
if opt.with_sequence then
out.comdb2_sequence = event.sequence
end
if opt.with_epoch then
out.comdb2_epoch = event.epoch
end
obj:emit(out)
end
local function emit_value(opt, event, obj)
local type = event.type
if type == [[upd]] then
emit(event, opt, db, event.old, [[old]])
emit(event, opt, obj, event.new, [[new]])
elseif type == [[del]] then
emit(event, opt, obj, event.old, type)
elseif type == [[add]] then
emit(event, opt, obj, event.new, type)
end
end
local function validate_options(opt)
local valid_options = {}
valid_options.batch_consume = true
valid_options.consume_count = true --undocumented
valid_options.emit_timeout = true
valid_options.poll_timeout = true
valid_options.register_timeout = true
valid_options.with_epoch = true
valid_options.with_id = true --undocumented
valid_options.with_sequence = true
valid_options.with_tid = true
valid_options.with_txn_sentinel = true
for k, _ in pairs(opt) do
if valid_options[k] == nil then
return [[invalid option ']] .. k .. [[']]
end
end
end
local function get_options(json)
local opt
if json then
opt = db:json_to_table(json)
local err = validate_options(opt)
if err then return nil, err end
else
opt = {}
end
if opt.with_id == nil then
opt.with_id = true
end
if opt.emit_timeout == nil then
opt.emit_timeout = 10 * 1000
end
return opt, nil
end
local function get_consumer(opt)
local consumer = db:consumer(opt)
while consumer == nil do
if opt.register_timeout == nil then
return register_failed()
end
emit({}, {}, db, {}, [[register_timeout]])
consumer = db:consumer(opt)
end
consumer:emit_timeout(opt.emit_timeout)
return consumer
end
local function count_consumer(opt, consumer)
local counter = 0
while counter < opt.consume_count do
emit_value(opt, get_event(consumer, opt), consumer)
consumer:consume()
counter = counter + 1
end
end
local function batch_consumer(opt, consumer)
local rc = db:begin()
if rc ~= 0 then
return begin_failed()
end
local event_tid, last_tid
local last = get_event(consumer, opt)
if last then last_tid = db:get_event_tid(last) end
consumer:next()
while true do
local event = consumer:poll(0)
if event then event_tid = db:get_event_tid(event) end
if event == nil or event_tid ~= last_tid then
if opt.with_txn_sentinel then
emit_value(opt, last, db)
consumer:emit({comdb2_event = [[txn]]})
else
emit_value(opt, last, consumer)
end
rc = db:commit()
if rc ~= 0 then
return commit_failed()
end
rc = db:begin()
last = get_event(consumer, opt)
if last then last_tid = db:get_event_tid(last) end
if rc ~= 0 then
return begin_failed()
end
else
emit_value(opt, last, db)
last = event
last_tid = event_tid
end
consumer:next()
end
end
local function simple_consumer(opt, consumer)
while true do
emit_value(opt, get_event(consumer, opt), consumer)
consumer:consume()
end
end
local function main(json)
local opt, err = get_options(json)
if err then return bad_options(err) end
define_emit_columns(opt)
local consumer = get_consumer(opt)
if opt.consume_count then
count_consumer(opt, consumer)
elseif opt.batch_consume then
batch_consumer(opt, consumer)
else
simple_consumer(opt, consumer)
end
end