Skip to content

Commit

Permalink
stream: add an API to make streams do blocking writes
Browse files Browse the repository at this point in the history
This patch adds the `uv_stream_set_blocking` API which makes all
uv_write calls to that stream blocking. It currently only works for
pipes, on windows.
  • Loading branch information
HenryRawas authored and piscisaureus committed Jun 20, 2013
1 parent b1d390e commit 92040eb
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 9 deletions.
25 changes: 25 additions & 0 deletions include/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,31 @@ UV_EXTERN int uv_is_readable(const uv_stream_t* handle);
UV_EXTERN int uv_is_writable(const uv_stream_t* handle);


/*
* Enable or disable blocking mode for a stream.
*
* When blocking mode is enabled all writes complete synchronously. The
* interface remains unchanged otherwise, e.g. completion or failure of the
* operation will still be reported through a callback which is made
* asychronously.
*
* Relying too much on this API is not recommended. It is likely to change
* significantly in the future.
*
* On windows this currently works only for uv_pipe_t instances. On unix it
* works for tcp, pipe and tty instances. Be aware that changing the blocking
* mode on unix sets or clears the O_NONBLOCK bit. If you are sharing a handle
* with another process, the other process is affected by the change too,
* which can lead to unexpected results.
*
* Also libuv currently makes no ordering guarantee when the blocking mode
* is changed after write requests have already been submitted. Therefore it is
* recommended to set the blocking mode immediately after opening or creating
* the stream.
*/
UV_EXTERN int uv_stream_set_blocking(uv_stream_t* handle, int blocking);


/*
* Used to determine whether a stream is closing or closed.
*
Expand Down
5 changes: 5 additions & 0 deletions src/unix/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -1430,3 +1430,8 @@ void uv__stream_close(uv_stream_t* handle) {

assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
}


int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
assert(0 && "implement me");
}
1 change: 1 addition & 0 deletions src/win/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#define UV_HANDLE_SYNC_BYPASS_IOCP 0x00040000
#define UV_HANDLE_ZERO_READ 0x00080000
#define UV_HANDLE_EMULATE_IOCP 0x00100000
#define UV_HANDLE_BLOCKING_WRITES 0x00200000

/* Only used by uv_tcp_t handles. */
#define UV_HANDLE_IPV6 0x01000000
Expand Down
89 changes: 80 additions & 9 deletions src/win/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,13 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
/* Write the header or the whole frame. */
memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped));

/* Using overlapped IO, but wait for completion before returning.
This write is blocking because ipc_frame is on stack. */
ipc_header_req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
if (!ipc_header_req->overlapped.hEvent) {
uv_fatal_error(GetLastError(), "CreateEvent");
}

result = WriteFile(handle->handle,
&ipc_frame,
ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
Expand All @@ -1136,18 +1143,22 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
&ipc_header_req->overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
uv__set_sys_error(loop, GetLastError());
CloseHandle(ipc_header_req->overlapped.hEvent);
return -1;
}

if (result) {
/* Request completed immediately. */
ipc_header_req->queued_bytes = 0;
} else {
/* Request queued by the kernel. */
ipc_header_req->queued_bytes = ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
sizeof(ipc_frame) : sizeof(ipc_frame.header);
handle->write_queue_size += ipc_header_req->queued_bytes;
if (!result) {
/* Request not completed immediately. Wait for it.*/
if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
WAIT_OBJECT_0) {
uv__set_sys_error(loop, GetLastError());
CloseHandle(ipc_header_req->overlapped.hEvent);
return -1;
}
}
ipc_header_req->queued_bytes = 0;
CloseHandle(ipc_header_req->overlapped.hEvent);
ipc_header_req->overlapped.hEvent = NULL;

REGISTER_HANDLE_REQ(loop, handle, ipc_header_req);
handle->reqs_pending++;
Expand All @@ -1159,7 +1170,29 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
}
}

if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
if ((handle->flags &
(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
DWORD bytes;
result = WriteFile(handle->handle,
bufs[0].base,
bufs[0].len,
&bytes,
NULL);

if (!result) {
return uv__set_sys_error(loop, GetLastError());
} else {
/* Request completed immediately. */
req->queued_bytes = 0;
}

REGISTER_HANDLE_REQ(loop, handle, req);
handle->reqs_pending++;
handle->write_reqs_pending++;
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
} else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
req->write_buffer = bufs[0];
uv_insert_non_overlapped_write_req(handle, req);
if (handle->write_reqs_pending == 0) {
Expand All @@ -1169,6 +1202,44 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
/* Request queued by the kernel. */
req->queued_bytes = uv_count_bufs(bufs, bufcnt);
handle->write_queue_size += req->queued_bytes;
} else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
/* Using overlapped IO, but wait for completion before returning */
req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
if (!req->overlapped.hEvent) {
uv_fatal_error(GetLastError(), "CreateEvent");
}

result = WriteFile(handle->handle,
bufs[0].base,
bufs[0].len,
NULL,
&req->overlapped);

if (!result && GetLastError() != ERROR_IO_PENDING) {
uv__set_sys_error(loop, GetLastError());
CloseHandle(req->overlapped.hEvent);
return -1;
}

if (result) {
/* Request completed immediately. */
req->queued_bytes = 0;
} else {
/* Request queued by the kernel. */
if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
WAIT_OBJECT_0) {
uv__set_sys_error(loop, GetLastError());
CloseHandle(ipc_header_req->overlapped.hEvent);
return -1;
}
}
CloseHandle(req->overlapped.hEvent);

REGISTER_HANDLE_REQ(loop, handle, req);
handle->reqs_pending++;
handle->write_reqs_pending++;
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
} else {
result = WriteFile(handle->handle,
bufs[0].base,
Expand Down
10 changes: 10 additions & 0 deletions src/win/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,13 @@ int uv_is_readable(const uv_stream_t* handle) {
int uv_is_writable(const uv_stream_t* handle) {
return !!(handle->flags & UV_HANDLE_WRITABLE);
}


int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
if (blocking != 0)
handle->flags |= UV_HANDLE_BLOCKING_WRITES;
else
handle->flags &= ~UV_HANDLE_BLOCKING_WRITES;

return 0;
}

0 comments on commit 92040eb

Please sign in to comment.