Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support asynchronous IO.pipe on Windows #13362

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spec/compiler/crystal/tools/doc/project_info_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ describe Crystal::Doc::ProjectInfo do
File.write("shard.yml", "name: foo\nversion: 1.0")
end

pending_win32 "git missing" do
it "git missing" do
Crystal::Git.executable = "git-missing-executable"

assert_with_defaults(ProjectInfo.new(nil, nil), ProjectInfo.new("foo", "1.0", refname: nil))
Expand Down
2 changes: 1 addition & 1 deletion spec/std/http/client/client_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ module HTTP
end
end

pending_win32 "will retry a broken socket" do
it "will retry a broken socket" do
server = HTTP::Server.new do |context|
context.response.output.print "foo"
context.response.output.close
Expand Down
38 changes: 18 additions & 20 deletions spec/std/http/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -61,29 +61,27 @@ end
def run_handler(handler, &)
done = Channel(Exception?).new

begin
IO::Stapled.pipe do |server_io, client_io|
processor = HTTP::Server::RequestProcessor.new(handler)
f = spawn do
processor.process(server_io, server_io)
rescue exc
done.send exc
else
done.send nil
end
IO::Stapled.pipe do |server_io, client_io|
processor = HTTP::Server::RequestProcessor.new(handler)
f = spawn do
processor.process(server_io, server_io)
rescue exc
done.send exc
else
done.send nil
end

client = HTTP::Client.new(client_io)
client = HTTP::Client.new(client_io)

begin
wait_until_blocked f
begin
wait_until_blocked f

yield client
ensure
processor.close
server_io.close
if exc = done.receive
raise exc
end
yield client
ensure
processor.close
server_io.close
if exc = done.receive
raise exc
end
end
end
Expand Down
8 changes: 4 additions & 4 deletions spec/std/io/io_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ end

describe IO do
describe "partial read" do
pending_win32 "doesn't block on first read. blocks on 2nd read" do
it "doesn't block on first read. blocks on 2nd read" do
IO.pipe do |read, write|
write.puts "hello"
slice = Bytes.new 1024
Expand Down Expand Up @@ -920,8 +920,8 @@ describe IO do
end
{% end %}

pending_win32 describe: "#close" do
it "aborts 'read' in a different thread" do
describe "#close" do
it "aborts 'read' in a different fiber" do
ch = Channel(SpecChannelStatus).new(1)

IO.pipe do |read, write|
Expand All @@ -942,7 +942,7 @@ describe IO do
end
end

it "aborts 'write' in a different thread" do
it "aborts 'write' in a different fiber" do
ch = Channel(SpecChannelStatus).new(1)

IO.pipe do |read, write|
Expand Down
2 changes: 1 addition & 1 deletion spec/std/log/io_backend_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ private def io_logger(*, stdout : IO, config = nil, source : String = "")
end

describe Log::IOBackend do
pending_win32 "creates with defaults" do
it "creates with defaults" do
backend = Log::IOBackend.new
backend.io.should eq(STDOUT)
backend.formatter.should eq(Log::ShortFormat)
Expand Down
2 changes: 1 addition & 1 deletion spec/std/oauth2/client_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ describe OAuth2::Client do
end
end

pending_win32 describe: "get_access_token_using_*" do
describe "get_access_token_using_*" do
describe "using HTTP Basic authentication to pass credentials" do
it "#get_access_token_using_authorization_code" do
handler = HTTP::Handler::HandlerProc.new do |context|
Expand Down
8 changes: 5 additions & 3 deletions spec/std/process_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ describe Process do
value.should eq("hello#{newline}")
end

pending_win32 "sends input in IO" do
it "sends input in IO" do
value = Process.run(*stdin_to_stdout_command, input: IO::Memory.new("hello")) do |proc|
proc.input?.should be_nil
proc.output.gets_to_end
end
value.should eq("hello")
value.chomp.should eq("hello")
end

it "sends output to IO" do
Expand Down Expand Up @@ -305,6 +305,8 @@ describe Process do
{% end %}
end

# TODO: this spec gives "WaitForSingleObject: The handle is invalid."
# is this because standard streams on windows aren't async?
pending_win32 "can link processes together" do
buffer = IO::Memory.new
Process.run(*stdin_to_stdout_command) do |cat|
Expand All @@ -313,7 +315,7 @@ describe Process do
cat.close
end
end
buffer.to_s.lines.size.should eq(1000)
buffer.to_s.chomp.lines.size.should eq(1000)
end
end

Expand Down
4 changes: 4 additions & 0 deletions src/crystal/system/unix/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ module Crystal::System::FileDescriptor
fcntl(LibC::F_SETFL, new_flags) unless new_flags == current_flags
end

private def system_blocking_init(value)
self.system_blocking = false unless value
end

private def system_close_on_exec?
flags = fcntl(LibC::F_GETFD)
flags.bits_set? LibC::FD_CLOEXEC
Expand Down
3 changes: 3 additions & 0 deletions src/crystal/system/wasi/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ module Crystal::System::FileDescriptor
raise NotImplementedError.new "Crystal::System::FileDescriptor.pipe"
end

private def system_blocking_init(value)
end

private def system_reopen(other : IO::FileDescriptor)
raise NotImplementedError.new "Crystal::System::FileDescriptor#system_reopen"
end
Expand Down
87 changes: 63 additions & 24 deletions src/crystal/system/win32/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,50 @@ require "c/io"
require "c/consoleapi"
require "c/consoleapi2"
require "c/winnls"
require "io/overlapped"

module Crystal::System::FileDescriptor
include IO::Overlapped

@volatile_fd : Atomic(LibC::Int)
@system_blocking = true

private def unbuffered_read(slice : Bytes)
bytes_read = LibC._read(fd, slice, slice.size)
if bytes_read == -1
if Errno.value == Errno::EBADF
raise IO::Error.new "File not open for reading"
else
raise IO::Error.from_errno("Error reading file")
if system_blocking?
bytes_read = LibC._read(fd, slice, slice.size)
if bytes_read == -1
if Errno.value == Errno::EBADF
raise IO::Error.new "File not open for reading"
else
raise IO::Error.from_errno("Error reading file")
end
end
bytes_read
else
handle = windows_handle
overlapped_operation(handle, "ReadFile", read_timeout) do |overlapped|
ret = LibC.ReadFile(handle, slice, slice.size, out byte_count, overlapped)
{ret, byte_count}
end
end
bytes_read
end

private def unbuffered_write(slice : Bytes)
until slice.empty?
bytes_written = LibC._write(fd, slice, slice.size)
if bytes_written == -1
if Errno.value == Errno::EBADF
raise IO::Error.new "File not open for writing"
else
raise IO::Error.from_errno("Error writing file")
if system_blocking?
bytes_written = LibC._write(fd, slice, slice.size)
if bytes_written == -1
if Errno.value == Errno::EBADF
raise IO::Error.new "File not open for writing"
else
raise IO::Error.from_errno("Error writing file")
end
end
else
handle = windows_handle
bytes_written = overlapped_operation(handle, "WriteFile", write_timeout, writing: true) do |overlapped|
ret = LibC.WriteFile(handle, slice, slice.size, out byte_count, overlapped)
{ret, byte_count}
end
end

Expand All @@ -34,11 +54,17 @@ module Crystal::System::FileDescriptor
end

private def system_blocking?
true
@system_blocking
end

private def system_blocking=(blocking)
raise NotImplementedError.new("Crystal::System::FileDescriptor#system_blocking=") unless blocking
unless blocking == @system_blocking
raise IO::Error.new("Cannot reconfigure `IO::FileDescriptor#blocking` after creation")
end
end

private def system_blocking_init(value)
@system_blocking = value
end

private def system_close_on_exec?
Expand Down Expand Up @@ -125,11 +151,12 @@ module Crystal::System::FileDescriptor
end

private def system_close
LibC.CancelIoEx(windows_handle, nil) unless system_blocking?

file_descriptor_close
end

def file_descriptor_close
err = nil
if LibC._close(fd) != 0
case Errno.value
when Errno::EINTR
Expand All @@ -140,14 +167,26 @@ module Crystal::System::FileDescriptor
end
end

def self.pipe(read_blocking, write_blocking)
pipe_fds = uninitialized StaticArray(LibC::Int, 2)
if LibC._pipe(pipe_fds, 8192, LibC::O_BINARY | LibC::O_NOINHERIT) != 0
raise IO::Error.from_errno("Could not create pipe")
end
private PIPE_BUFFER_SIZE = 8192

r = IO::FileDescriptor.new(pipe_fds[0], read_blocking)
w = IO::FileDescriptor.new(pipe_fds[1], write_blocking)
def self.pipe(read_blocking, write_blocking)
pipe_name = ::Path.windows(::File.tempname("crystal", nil, dir: %q(\\.\pipe))).normalize.to_s
pipe_mode = 0 # PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT

w_pipe_flags = LibC::PIPE_ACCESS_OUTBOUND | LibC::FILE_FLAG_FIRST_PIPE_INSTANCE
w_pipe_flags |= LibC::FILE_FLAG_OVERLAPPED unless write_blocking
w_pipe = LibC.CreateNamedPipeA(pipe_name, w_pipe_flags, pipe_mode, 1, PIPE_BUFFER_SIZE, PIPE_BUFFER_SIZE, 0, nil)
raise IO::Error.from_winerror("CreateNamedPipeA") if w_pipe == LibC::INVALID_HANDLE_VALUE
Crystal::Scheduler.event_loop.create_completion_port(w_pipe) unless write_blocking

r_pipe_flags = LibC::FILE_FLAG_NO_BUFFERING
r_pipe_flags |= LibC::FILE_FLAG_OVERLAPPED unless read_blocking
r_pipe = LibC.CreateFileW(System.to_wstr(pipe_name), LibC::GENERIC_READ | LibC::FILE_WRITE_ATTRIBUTES, 0, nil, LibC::OPEN_EXISTING, r_pipe_flags, nil)
raise IO::Error.from_winerror("CreateFileW") if r_pipe == LibC::INVALID_HANDLE_VALUE
Crystal::Scheduler.event_loop.create_completion_port(r_pipe) unless read_blocking

r = IO::FileDescriptor.new(LibC._open_osfhandle(r_pipe, 0), read_blocking)
w = IO::FileDescriptor.new(LibC._open_osfhandle(w_pipe, 0), write_blocking)
w.sync = true

{r, w}
Expand Down Expand Up @@ -187,7 +226,7 @@ module Crystal::System::FileDescriptor
end
end

io = IO::FileDescriptor.new(fd)
io = IO::FileDescriptor.new(fd, blocking: true)
# Set sync or flush_on_newline as described in STDOUT and STDERR docs.
# See https://crystal-lang.org/api/toplevel.html#STDERR
if console_handle
Expand Down
53 changes: 52 additions & 1 deletion src/crystal/system/win32/socket.cr
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,26 @@ module Crystal::System::Socket
end
end

private def overlapped_connect(socket, method, &)
OverlappedOperation.run(socket) do |operation|
yield operation.start

schedule_overlapped(read_timeout || 1.seconds)

operation.wsa_result(socket) do |error|
case error
when .wsa_io_incomplete?, .wsaeconnrefused?
return ::Socket::ConnectError.from_os_error(method, error)
when .error_operation_aborted?
# FIXME: Not sure why this is necessary
return ::Socket::ConnectError.from_os_error(method, error)
end
end

nil
end
end

private def system_connect_connectionless(addr, timeout, &)
ret = LibC.connect(fd, addr, addr.size)
if ret == LibC::SOCKET_ERROR
Expand Down Expand Up @@ -194,6 +214,25 @@ module Crystal::System::Socket
true
end

private def overlapped_accept(socket, method, &)
OverlappedOperation.run(socket) do |operation|
yield operation.start

unless schedule_overlapped(read_timeout)
raise IO::TimeoutError.new("accept timed out")
end

operation.wsa_result(socket) do |error|
case error
when .wsa_io_incomplete?, .wsaenotsock?
return false
end
end

true
end
end

private def wsa_buffer(bytes)
wsabuf = LibC::WSABUF.new
wsabuf.len = bytes.size
Expand Down Expand Up @@ -348,7 +387,7 @@ module Crystal::System::Socket
private def unbuffered_read(slice : Bytes)
wsabuf = wsa_buffer(slice)

bytes_read = overlapped_operation(fd, "WSARecv", read_timeout, connreset_is_error: false) do |overlapped|
bytes_read = overlapped_read(fd, "WSARecv", connreset_is_error: false) do |overlapped|
flags = 0_u32
LibC.WSARecv(fd, pointerof(wsabuf), 1, out bytes_received, pointerof(flags), overlapped, nil)
end
Expand All @@ -365,6 +404,18 @@ module Crystal::System::Socket
bytes.to_i32
end

private def overlapped_write(socket, method, &)
wsa_overlapped_operation(socket, method, write_timeout) do |operation|
yield operation
end
end

private def overlapped_read(socket, method, *, connreset_is_error = true, &)
wsa_overlapped_operation(socket, method, read_timeout, connreset_is_error) do |operation|
yield operation
end
end

def system_close
handle = @volatile_fd.swap(LibC::INVALID_SOCKET)

Expand Down
4 changes: 1 addition & 3 deletions src/io/file_descriptor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ class IO::FileDescriptor < IO
end
end

unless blocking || {{ flag?(:win32) || flag?(:wasi) }}
self.blocking = false
end
system_blocking_init(blocking)
end

# :nodoc:
Expand Down
Loading