-
Notifications
You must be signed in to change notification settings - Fork 272
/
pipe.lua
645 lines (500 loc) · 19.1 KB
/
pipe.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
-- Copyright (C) by OpenResty Inc.
local base = require "resty.core.base"
base.allows_subsystem("http")
require "resty.core.phase" -- for ngx.get_phase
local assert = assert
local error = error
local ipairs = ipairs
local tonumber = tonumber
local tostring = tostring
local type = type
local str_find = string.find
local table_concat = table.concat
local ffi = require "ffi"
local C = ffi.C
local ffi_new = ffi.new
local ffi_str = ffi.string
local ngx_phase = ngx.get_phase
local get_string_buf = base.get_string_buf
local get_size_ptr = base.get_size_ptr
local get_request = base.get_request
local FFI_AGAIN = base.FFI_AGAIN
local FFI_BAD_CONTEXT = base.FFI_BAD_CONTEXT
local FFI_DECLINED = base.FFI_DECLINED
local FFI_ERROR = base.FFI_ERROR
local FFI_NO_REQ_CTX = base.FFI_NO_REQ_CTX
local FFI_OK = base.FFI_OK
local co_yield = coroutine._yield
ffi.cdef[[
typedef int ngx_pid_t;
typedef uintptr_t ngx_msec_t;
typedef unsigned char u_char;
typedef struct ngx_http_lua_pipe_s ngx_http_lua_pipe_t;
typedef struct {
ngx_pid_t _pid;
ngx_msec_t write_timeout;
ngx_msec_t stdout_read_timeout;
ngx_msec_t stderr_read_timeout;
ngx_msec_t wait_timeout;
ngx_http_lua_pipe_t *pipe;
} ngx_http_lua_ffi_pipe_proc_t;
int ngx_http_lua_ffi_pipe_spawn(ngx_http_request_t *r,
ngx_http_lua_ffi_pipe_proc_t *proc,
const char *file, const char **argv, int merge_stderr, size_t buffer_size,
const char **environ, u_char *errbuf, size_t *errbuf_size);
int ngx_http_lua_ffi_pipe_proc_read(ngx_http_request_t *r,
ngx_http_lua_ffi_pipe_proc_t *proc, int from_stderr, int reader_type,
size_t length, u_char **buf, size_t *buf_size, u_char *errbuf,
size_t *errbuf_size);
int ngx_http_lua_ffi_pipe_get_read_result(ngx_http_request_t *r,
ngx_http_lua_ffi_pipe_proc_t *proc, int from_stderr, u_char **buf,
size_t *buf_size, u_char *errbuf, size_t *errbuf_size);
ssize_t ngx_http_lua_ffi_pipe_proc_write(ngx_http_request_t *r,
ngx_http_lua_ffi_pipe_proc_t *proc, const u_char *data, size_t len,
u_char *errbuf, size_t *errbuf_size);
ssize_t ngx_http_lua_ffi_pipe_get_write_result(ngx_http_request_t *r,
ngx_http_lua_ffi_pipe_proc_t *proc, u_char *errbuf, size_t *errbuf_size);
int ngx_http_lua_ffi_pipe_proc_shutdown_stdin(
ngx_http_lua_ffi_pipe_proc_t *proc, u_char *errbuf, size_t *errbuf_size);
int ngx_http_lua_ffi_pipe_proc_shutdown_stdout(
ngx_http_lua_ffi_pipe_proc_t *proc, u_char *errbuf, size_t *errbuf_size);
int ngx_http_lua_ffi_pipe_proc_shutdown_stderr(
ngx_http_lua_ffi_pipe_proc_t *proc, u_char *errbuf, size_t *errbuf_size);
int ngx_http_lua_ffi_pipe_proc_wait(ngx_http_request_t *r,
ngx_http_lua_ffi_pipe_proc_t *proc, char **reason, int *status,
u_char *errbuf, size_t *errbuf_size);
int ngx_http_lua_ffi_pipe_proc_kill(ngx_http_lua_ffi_pipe_proc_t *proc,
int signal, u_char *errbuf, size_t *errbuf_size);
void ngx_http_lua_ffi_pipe_proc_destroy(ngx_http_lua_ffi_pipe_proc_t *proc);
]]
if not pcall(function() return C.ngx_http_lua_ffi_pipe_spawn end) then
error("pipe API is not supported due to either a platform issue " ..
"or lack of the HAVE_SOCKET_CLOEXEC_PATCH patch", 2)
end
local _M = { version = base.version }
local ERR_BUF_SIZE = 256
local VALUE_BUF_SIZE = 512
local PIPE_READ_ALL = 0
local PIPE_READ_BYTES = 1
local PIPE_READ_LINE = 2
local PIPE_READ_ANY = 3
local proc_set_timeouts
do
local MAX_TIMEOUT = 0xffffffff
function proc_set_timeouts(proc, write_timeout, stdout_read_timeout,
stderr_read_timeout, wait_timeout)
-- the implementation below is straightforward but could not be JIT
-- compiled by the latest LuaJIT. When called in loops, LuaJIT will try
-- to unroll it, and fall back to interpreter after it reaches the
-- unroll limit.
--[[
local function set_timeout(proc, attr, timeout)
if timeout then
if timeout > MAX_TIMEOUT then
error("bad timeout value", 3)
end
proc[attr] = timeout
end
end
set_timeout(...)
]]
if write_timeout then
if write_timeout < 0 or MAX_TIMEOUT < write_timeout then
error("bad write_timeout option", 3)
end
proc.write_timeout = write_timeout
end
if stdout_read_timeout then
if stdout_read_timeout < 0 or MAX_TIMEOUT < stdout_read_timeout then
error("bad stdout_read_timeout option", 3)
end
proc.stdout_read_timeout = stdout_read_timeout
end
if stderr_read_timeout then
if stderr_read_timeout < 0 or MAX_TIMEOUT < stderr_read_timeout then
error("bad stderr_read_timeout option", 3)
end
proc.stderr_read_timeout = stderr_read_timeout
end
if wait_timeout then
if wait_timeout < 0 or MAX_TIMEOUT < wait_timeout then
error("bad wait_timeout option", 3)
end
proc.wait_timeout = wait_timeout
end
end
end
local function check_proc_instance(proc)
if type(proc) ~= "cdata" then
error("not a process instance", 3)
end
end
local proc_read
do
local value_buf = ffi_new("char[?]", VALUE_BUF_SIZE)
local buf = ffi_new("char *[1]")
local buf_size = ffi_new("size_t[1]")
function proc_read(proc, stderr, reader_type, len)
check_proc_instance(proc)
local r = get_request()
if not r then
error("no request found")
end
buf[0] = value_buf
buf_size[0] = VALUE_BUF_SIZE
local errbuf = get_string_buf(ERR_BUF_SIZE)
local errbuf_size = get_size_ptr()
errbuf_size[0] = ERR_BUF_SIZE
local rc = C.ngx_http_lua_ffi_pipe_proc_read(r, proc, stderr,
reader_type, len, buf,
buf_size, errbuf,
errbuf_size)
if rc == FFI_NO_REQ_CTX then
error("no request ctx found")
end
if rc == FFI_BAD_CONTEXT then
error(ffi_str(errbuf, errbuf_size[0]), 2)
end
while true do
if rc == FFI_ERROR then
return nil, ffi_str(errbuf, errbuf_size[0])
end
if rc == FFI_OK then
local p = buf[0]
if p ~= value_buf then
p = ffi_new("char[?]", buf_size[0])
buf[0] = p
C.ngx_http_lua_ffi_pipe_get_read_result(r, proc, stderr,
buf, buf_size,
errbuf, errbuf_size)
assert(p == buf[0])
end
return ffi_str(p, buf_size[0])
end
if rc == FFI_DECLINED then
local err = ffi_str(errbuf, errbuf_size[0])
local p = buf[0]
if p ~= value_buf then
p = ffi_new("char[?]", buf_size[0])
buf[0] = p
C.ngx_http_lua_ffi_pipe_get_read_result(r, proc, stderr,
buf, buf_size,
errbuf, errbuf_size)
assert(p == buf[0])
end
local partial = ffi_str(p, buf_size[0])
return nil, err, partial
end
assert(rc == FFI_AGAIN)
co_yield()
buf[0] = value_buf
buf_size[0] = VALUE_BUF_SIZE
errbuf = get_string_buf(ERR_BUF_SIZE)
errbuf_size = get_size_ptr()
errbuf_size[0] = ERR_BUF_SIZE
rc = C.ngx_http_lua_ffi_pipe_get_read_result(r, proc, stderr, buf,
buf_size, errbuf,
errbuf_size)
end
end
end
local function proc_write(proc, data)
check_proc_instance(proc)
local r = get_request()
if not r then
error("no request found", 2)
end
local data_type = type(data)
if data_type ~= "string" then
if data_type == "table" then
data = table_concat(data, "")
elseif data_type == "number" then
data = tostring(data)
else
error("bad data arg: string, number, or table expected, got "
.. data_type, 2)
end
end
local errbuf = get_string_buf(ERR_BUF_SIZE)
local errbuf_size = get_size_ptr()
errbuf_size[0] = ERR_BUF_SIZE
local rc = C.ngx_http_lua_ffi_pipe_proc_write(r, proc, data, #data, errbuf,
errbuf_size)
if rc == FFI_NO_REQ_CTX then
error("no request ctx found", 2)
end
if rc == FFI_BAD_CONTEXT then
error(ffi_str(errbuf, errbuf_size[0]), 2)
end
while true do
if rc == FFI_ERROR then
return nil, ffi_str(errbuf, errbuf_size[0])
end
if rc >= 0 then
-- rc holds the bytes sent
return tonumber(rc)
end
assert(rc == FFI_AGAIN)
co_yield()
errbuf = get_string_buf(ERR_BUF_SIZE)
errbuf_size = get_size_ptr()
errbuf_size[0] = ERR_BUF_SIZE
rc = C.ngx_http_lua_ffi_pipe_get_write_result(r, proc, errbuf,
errbuf_size)
end
end
local function proc_shutdown(proc, direction)
check_proc_instance(proc)
local rc
local errbuf = get_string_buf(ERR_BUF_SIZE)
local errbuf_size = get_size_ptr()
errbuf_size[0] = ERR_BUF_SIZE
if direction == "stdin" then
rc = C.ngx_http_lua_ffi_pipe_proc_shutdown_stdin(proc, errbuf,
errbuf_size)
elseif direction == "stdout" then
rc = C.ngx_http_lua_ffi_pipe_proc_shutdown_stdout(proc, errbuf,
errbuf_size)
elseif direction == "stderr" then
rc = C.ngx_http_lua_ffi_pipe_proc_shutdown_stderr(proc, errbuf,
errbuf_size)
else
error("bad shutdown arg: " .. direction, 2)
end
if rc == FFI_ERROR then
return nil, ffi_str(errbuf, errbuf_size[0])
end
return true
end
local proc_wait
do
local reason = ffi_new("char *[1]")
local status = ffi_new("int[1]")
function proc_wait(proc)
check_proc_instance(proc)
local r = get_request()
if not r then
error("no request found", 2)
end
local errbuf = get_string_buf(ERR_BUF_SIZE)
local errbuf_size = get_size_ptr()
errbuf_size[0] = ERR_BUF_SIZE
local rc = C.ngx_http_lua_ffi_pipe_proc_wait(r, proc, reason, status,
errbuf, errbuf_size)
if rc == FFI_NO_REQ_CTX then
error("no request ctx found", 2)
end
if rc == FFI_BAD_CONTEXT then
error(ffi_str(errbuf, errbuf_size[0]), 2)
end
if rc == FFI_ERROR then
return nil, ffi_str(errbuf, errbuf_size[0])
end
if rc == FFI_OK then
return true, ffi_str(reason[0]), tonumber(status[0])
end
if rc == FFI_DECLINED then
return false, ffi_str(reason[0]), tonumber(status[0])
end
local ok, exit_reason, exit_status
ok, exit_reason, exit_status = co_yield()
return ok, exit_reason, exit_status
end
end
local function proc_kill(proc, signal)
check_proc_instance(proc)
if type(signal) ~= "number" then
error("bad signal arg: number expected, got " .. tostring(signal), 2)
end
local errbuf = get_string_buf(ERR_BUF_SIZE)
local errbuf_size = get_size_ptr()
errbuf_size[0] = ERR_BUF_SIZE
local rc = C.ngx_http_lua_ffi_pipe_proc_kill(proc, signal, errbuf,
errbuf_size)
if rc == FFI_ERROR then
return nil, ffi_str(errbuf, errbuf_size[0])
end
return true
end
local mt = {
__gc = C.ngx_http_lua_ffi_pipe_proc_destroy,
__index = {
pid = function (proc)
return proc._pid
end,
set_timeouts = function (proc, write_timeout, stdout_read_timeout,
stderr_read_timeout, wait_timeout)
proc_set_timeouts(proc, write_timeout, stdout_read_timeout,
stderr_read_timeout, wait_timeout)
end,
stdout_read_all = function (proc)
local data, err, partial = proc_read(proc, 0, PIPE_READ_ALL, 0)
return data, err, partial
end,
stdout_read_bytes = function (proc, len)
if len <= 0 then
if len < 0 then
error("bad len argument", 2)
end
return ""
end
local data, err, partial = proc_read(proc, 0, PIPE_READ_BYTES, len)
return data, err, partial
end,
stdout_read_line = function (proc)
local data, err, partial = proc_read(proc, 0, PIPE_READ_LINE, 0)
return data, err, partial
end,
stdout_read_any = function (proc, max)
if type(max) ~= "number" then
max = tonumber(max)
end
if not max or max <= 0 then
error("bad max argument", 2)
end
local data, err, partial = proc_read(proc, 0, PIPE_READ_ANY, max)
return data, err, partial
end,
stderr_read_all = function (proc)
local data, err, partial = proc_read(proc, 1, PIPE_READ_ALL, 0)
return data, err, partial
end,
stderr_read_bytes = function (proc, len)
if len <= 0 then
if len < 0 then
error("bad len argument", 2)
end
return ""
end
local data, err, partial = proc_read(proc, 1, PIPE_READ_BYTES, len)
return data, err, partial
end,
stderr_read_line = function (proc)
local data, err, partial = proc_read(proc, 1, PIPE_READ_LINE, 0)
return data, err, partial
end,
stderr_read_any = function (proc, max)
if type(max) ~= "number" then
max = tonumber(max)
end
if not max or max <= 0 then
error("bad max argument", 2)
end
local data, err, partial = proc_read(proc, 1, PIPE_READ_ANY, max)
return data, err, partial
end,
write = proc_write,
shutdown = proc_shutdown,
wait = proc_wait,
kill = proc_kill,
}
}
local Proc = ffi.metatype("ngx_http_lua_ffi_pipe_proc_t", mt)
local pipe_spawn
do
local sh_exe = "/bin/sh"
local opt_c = "-c"
local shell_args = ffi_new("const char* [?]", 4)
shell_args[0] = sh_exe
shell_args[1] = opt_c
shell_args[3] = nil
local write_timeout = 10000
local stdout_read_timeout = 10000
local stderr_read_timeout = 10000
local wait_timeout = 10000
-- reference shell cmd's constant strings here to prevent them from getting
-- collected by the Lua GC.
_M._gc_ref_c_opt = opt_c
function pipe_spawn(args, opts)
if ngx_phase() == "init" then
error("API disabled in the current context", 2)
end
local exe
local proc_args
local proc_envs
local args_type = type(args)
if args_type == "table" then
local nargs = 0
for i, arg in ipairs(args) do
nargs = nargs + 1
if type(arg) ~= "string" then
args[i] = tostring(arg)
end
end
if nargs == 0 then
error("bad args arg: non-empty table expected", 2)
end
exe = args[1]
proc_args = ffi_new("const char* [?]", nargs + 1, args)
proc_args[nargs] = nil
elseif args_type == "string" then
exe = sh_exe
shell_args[2] = args
proc_args = shell_args
else
error("bad args arg: table expected, got " .. args_type, 2)
end
local merge_stderr = 0
local buffer_size = 4096
local proc = Proc()
if opts then
merge_stderr = opts.merge_stderr and 1 or 0
if opts.buffer_size then
buffer_size = tonumber(opts.buffer_size)
if not buffer_size or buffer_size < 1 then
error("bad buffer_size option", 2)
end
end
if opts.environ then
local environ = opts.environ
local environ_type = type(environ)
if environ_type ~= "table" then
error("bad environ option: table expected, got " ..
environ_type, 2)
end
local nenv = 0
for i, env in ipairs(environ) do
nenv = nenv + 1
local env_type = type(env)
if env_type ~= "string" then
error("bad value at index " .. i .. " of environ " ..
"option: string expected, got " .. env_type, 2)
end
if not str_find(env, "=", 2, true) then
error("bad value at index " .. i .. " of environ " ..
"option: 'name=[value]' format expected, got '" ..
env .. "'", 2)
end
end
if nenv > 0 then
proc_envs = ffi_new("const char* [?]", nenv + 1, environ)
proc_envs[nenv] = nil
end
end
proc_set_timeouts(proc,
opts.write_timeout or write_timeout,
opts.stdout_read_timeout or stdout_read_timeout,
opts.stderr_read_timeout or stderr_read_timeout,
opts.wait_timeout or wait_timeout)
else
proc_set_timeouts(proc,
write_timeout,
stdout_read_timeout,
stderr_read_timeout,
wait_timeout)
end
local errbuf = get_string_buf(ERR_BUF_SIZE)
local errbuf_size = get_size_ptr()
local r = get_request()
errbuf_size[0] = ERR_BUF_SIZE
local rc = C.ngx_http_lua_ffi_pipe_spawn(r, proc, exe, proc_args,
merge_stderr, buffer_size,
proc_envs, errbuf, errbuf_size)
if rc == FFI_ERROR then
return nil, ffi_str(errbuf, errbuf_size[0])
end
return proc
end
end -- do
_M.spawn = pipe_spawn
return _M