Skip to content

Commit

Permalink
Revert "pipe: allow queueing pending handles"
Browse files Browse the repository at this point in the history
The commit is quite broken and must be refactored before going into.

This reverts commit 08aeaf6.
  • Loading branch information
indutny committed Dec 23, 2013
1 parent 08aeaf6 commit 8b6c19e
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 91 deletions.
1 change: 0 additions & 1 deletion include/uv-unix.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ typedef struct {
uv_connection_cb connection_cb; \
int delayed_error; \
int accepted_fd; \
int* queued_fds; \
UV_STREAM_PRIVATE_PLATFORM_FIELDS \

#define UV_TCP_PRIVATE_FIELDS /* empty */
Expand Down
3 changes: 0 additions & 3 deletions include/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,6 @@ typedef void (*uv_read_cb)(uv_stream_t* stream,
* Just like the uv_read_cb except that if the pending parameter is true
* then you can use uv_accept() to pull the new handle into the process.
* If no handle is pending then pending will be UV_UNKNOWN_HANDLE.
*
* NOTE: The buffer may be a null buffer if multiple fds were accepted and
* read2_cb is called for pending ones.
*/
typedef void (*uv_read2_cb)(uv_pipe_t* pipe,
ssize_t nread,
Expand Down
102 changes: 15 additions & 87 deletions src/unix/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ void uv__stream_init(uv_loop_t* loop,
stream->connect_req = NULL;
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
stream->queued_fds = NULL;
stream->delayed_error = 0;
QUEUE_INIT(&stream->write_queue);
QUEUE_INIT(&stream->write_completed_queue);
Expand Down Expand Up @@ -560,7 +559,6 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {
if (server->accepted_fd == -1)
return -EAGAIN;

err = 0;
switch (client->type) {
case UV_NAMED_PIPE:
case UV_TCP:
Expand All @@ -570,47 +568,27 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {
if (err) {
/* TODO handle error */
uv__close(server->accepted_fd);
goto done;
server->accepted_fd = -1;
return err;
}
break;

case UV_UDP:
err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
if (err) {
uv__close(server->accepted_fd);
goto done;
server->accepted_fd = -1;
return err;
}
break;

default:
assert(0);
}

done:
/* Process queued fds */
if (server->queued_fds != NULL) {
/* Read first */
server->accepted_fd = server->queued_fds[2];

/* All read, free */
if (--server->queued_fds[0] == 0) {
free(server->queued_fds);
server->queued_fds = NULL;
} else {
/* Shift rest */
memmove(server->queued_fds + 2,
server->queued_fds + 3,
server->queued_fds[0]);
}

/* Invoke read_cb one more time */
uv__io_feed(server->loop, &server->io_watcher);
} else {
server->accepted_fd = -1;
if (err == 0)
uv__io_start(server->loop, &server->io_watcher, UV__POLLIN);
}
return err;
uv__io_start(server->loop, &server->io_watcher, UV__POLLIN);
server->accepted_fd = -1;
return 0;
}


Expand Down Expand Up @@ -973,58 +951,13 @@ static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
}


static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
int queue_offset;
int queue_len;

if (stream->queued_fds == NULL) {
queue_offset = 0;
queue_len = 8;
stream->queued_fds = malloc((queue_len + 2) * sizeof(*stream->queued_fds));
if (stream->queued_fds == NULL)
return UV_ENOMEM;
stream->queued_fds[1] = queue_len;
} else {
queue_offset = stream->queued_fds[0];
queue_len = stream->queued_fds[1];

/* Grow */
if (queue_offset == queue_len) {
queue_len += 8;
stream->queued_fds = realloc(stream->queued_fds,
(queue_len + 2) *
sizeof(*stream->queued_fds));
if (stream->queued_fds == NULL)
return UV_ENOMEM;
stream->queued_fds[1] = queue_len;
}
}

/* Put fd in a queue */
stream->queued_fds[0] = queue_offset;
stream->queued_fds[2 + queue_offset++] = fd;

return 0;
}


static void uv__read(uv_stream_t* stream) {
uv_buf_t buf;
ssize_t nread;
struct msghdr msg;
struct cmsghdr* cmsg;
char cmsg_space[64];
int count;
int err;

/* Has queued fds */
if (stream->accepted_fd != -1) {
static uv_buf_t buf = { NULL, 0 };
stream->read2_cb((uv_pipe_t*) stream,
0,
&buf,
uv__handle_type(stream->accepted_fd));
}

stream->flags &= ~UV_STREAM_READ_PARTIAL;

Expand Down Expand Up @@ -1113,20 +1046,17 @@ static void uv__read(uv_stream_t* stream) {
cmsg = CMSG_NXTHDR(&msg, cmsg)) {

if (cmsg->cmsg_type == SCM_RIGHTS) {
/* silence aliasing warning */
void* pv = CMSG_DATA(cmsg);
int* pi = pv;

/* Already has accepted fd, queue now */
if (stream->accepted_fd != -1) {
err = uv__stream_queue_fd(stream, *pi);
if (err != 0) {
uv__stream_read_cb(stream, err, NULL, UV_UNKNOWN_HANDLE);
return;
}
} else {
fprintf(stderr, "(libuv) ignoring extra FD received\n");
}

/* silence aliasing warning */
{
void* pv = CMSG_DATA(cmsg);
int* pi = pv;
stream->accepted_fd = *pi;
}

} else {
fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
cmsg->cmsg_type);
Expand Down Expand Up @@ -1563,8 +1493,6 @@ void uv__stream_close(uv_stream_t* handle) {
handle->accepted_fd = -1;
}

free(handle->queued_fds);

assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
}

Expand Down

0 comments on commit 8b6c19e

Please sign in to comment.