diff --git a/net/net-connections.c b/net/net-connections.c index a0d0553..16a79f1 100644 --- a/net/net-connections.c +++ b/net/net-connections.c @@ -493,14 +493,16 @@ int cpu_server_close_connection (connection_job_t C, int who) /* {{{ */ { assert (c->io_conn); job_signal (JOB_REF_PASS (c->io_conn), JS_ABORT); - if (c->target) { + if (c->basic_type == ct_outbound) { MODULE_STAT->outbound_connections --; if (connection_is_active (c->flags)) { MODULE_STAT->active_outbound_connections --; } - job_signal (JOB_REF_PASS (c->target), JS_RUN); + if (c->target) { + job_signal (JOB_REF_PASS (c->target), JS_RUN); + } } else { MODULE_STAT->inbound_connections --; @@ -544,7 +546,9 @@ int do_connection_job (job_t job, int op, struct job_thread *JT) /* {{{ */ { __sync_fetch_and_and (&c->flags, ~C_READY_PENDING); MODULE_STAT->active_outbound_connections ++; MODULE_STAT->active_connections ++; - __sync_fetch_and_add (&CONN_TARGET_INFO(c->target)->active_outbound_connections, 1); + if (c->target) { + __sync_fetch_and_add (&CONN_TARGET_INFO(c->target)->active_outbound_connections, 1); + } if (c->status == conn_connecting) { if (!__sync_bool_compare_and_swap (&c->status, conn_connecting, conn_working)) { assert (c->status == conn_error); @@ -587,7 +591,7 @@ int do_connection_job (job_t job, int op, struct job_thread *JT) /* {{{ */ { updates stats creates socket_connection */ -connection_job_t alloc_new_connection (int cfd, conn_target_job_t CTJ, listening_connection_job_t LCJ, unsigned peer, unsigned char peer_ipv6[16], int peer_port) /* {{{ */ { +connection_job_t alloc_new_connection (int cfd, conn_target_job_t CTJ, listening_connection_job_t LCJ, int basic_type, conn_type_t *conn_type, void *conn_extra, unsigned peer, unsigned char peer_ipv6[16], int peer_port) /* {{{ */ { if (cfd < 0) { return NULL; } @@ -648,12 +652,12 @@ connection_job_t alloc_new_connection (int cfd, conn_target_job_t CTJ, listening assert (0); } - c->type = CT ? CT->type : LC->type; - c->extra = CT ? CT->extra : LC->extra; + c->type = conn_type; + c->extra = conn_extra; assert (c->type); - c->basic_type = CT ? ct_outbound : ct_inbound; - c->status = CT ? conn_connecting : conn_working; + c->basic_type = basic_type; + c->status = (basic_type == ct_outbound) ? conn_connecting : conn_working; c->flags |= c->type->flags & C_EXTERNAL; if (LC) { @@ -692,41 +696,58 @@ connection_job_t alloc_new_connection (int cfd, conn_target_job_t CTJ, listening c->out_queue = alloc_mp_queue_w (); //c->out_packet_queue = alloc_mp_queue_w (); - if (CT) { + if (basic_type == ct_outbound) { vkprintf (1, "New connection %s:%d -> %s:%d\n", show_our_ip (C), c->our_port, show_remote_ip (C), c->remote_port); } else { vkprintf (1, "New connection %s:%d -> %s:%d\n", show_remote_ip (C), c->remote_port, show_our_ip (C), c->our_port); } - int (*func)(connection_job_t) = CT ? CT->type->init_outbound : LC->type->init_accepted; + int (*func)(connection_job_t) = (basic_type == ct_outbound) ? c->type->init_outbound : c->type->init_accepted; vkprintf (3, "func = %p\n", func); if (func (C) >= 0) { - if (CT) { - job_incref (CTJ); + if (basic_type == ct_outbound) { MODULE_STAT->outbound_connections ++; MODULE_STAT->allocated_outbound_connections ++; MODULE_STAT->outbound_connections_created ++; - CT->outbound_connections ++; + if (CTJ) { + job_incref (CTJ); + CT->outbound_connections ++; + } } else { MODULE_STAT->inbound_connections_accepted ++; MODULE_STAT->allocated_inbound_connections ++; MODULE_STAT->inbound_connections ++; MODULE_STAT->active_inbound_connections ++; MODULE_STAT->active_connections ++; + + if (LCJ) { + c->listening = LC->fd; + c->listening_generation = LC->generation; + if (LC->flags & C_NOQACK) { + c->flags |= C_NOQACK; + } - c->listening = LC->fd; - c->listening_generation = LC->generation; - if (LC->flags & C_NOQACK) { - c->flags |= C_NOQACK; + c->window_clamp = LC->window_clamp; + + if (LC->flags & C_SPECIAL) { + c->flags |= C_SPECIAL; + __sync_fetch_and_add (&active_special_connections, 1); + + if (active_special_connections > max_special_connections) { + vkprintf (active_special_connections >= max_special_connections + 16 ? 0 : 1, "ERROR: forced to accept connection when special connections limit was reached (%d of %d)\n", active_special_connections, max_special_connections); + } + if (active_special_connections >= max_special_connections) { + vkprintf (2, "**Invoking epoll_remove(%d)\n", LC->fd); + epoll_remove (LC->fd); + } + } } - - c->window_clamp = LC->window_clamp; if (c->window_clamp) { if (setsockopt (cfd, IPPROTO_TCP, TCP_WINDOW_CLAMP, &c->window_clamp, 4) < 0) { vkprintf (0, "error while setting window size for socket %d to %d: %m\n", cfd, c->window_clamp); @@ -739,18 +760,6 @@ connection_job_t alloc_new_connection (int cfd, conn_target_job_t CTJ, listening } } - if (LC->flags & C_SPECIAL) { - c->flags |= C_SPECIAL; - __sync_fetch_and_add (&active_special_connections, 1); - - if (active_special_connections > max_special_connections) { - vkprintf (active_special_connections >= max_special_connections + 16 ? 0 : 1, "ERROR: forced to accept connection when special connections limit was reached (%d of %d)\n", active_special_connections, max_special_connections); - } - if (active_special_connections >= max_special_connections) { - vkprintf (2, "**Invoking epoll_remove(%d)\n", LC->fd); - epoll_remove (LC->fd); - } - } } alloc_new_socket_connection (C); @@ -1279,10 +1288,10 @@ int net_accept_new_connections (listening_connection_job_t LCJ) /* {{{ */ { connection_job_t C; if (peer.a4.sin_family == AF_INET) { - C = alloc_new_connection (cfd, NULL, LCJ, + C = alloc_new_connection (cfd, NULL, LCJ, ct_inbound, LC->type, LC->extra, ntohl (peer.a4.sin_addr.s_addr), NULL, ntohs (peer.a4.sin_port)); } else { - C = alloc_new_connection (cfd, NULL, LCJ, + C = alloc_new_connection (cfd, NULL, LCJ, ct_inbound, LC->type, LC->extra, 0, peer.a6.sin6_addr.s6_addr, ntohs (peer.a6.sin6_port)); } if (C) { @@ -1726,7 +1735,7 @@ int create_new_connections (conn_target_job_t CTJ) /* {{{ */ { break; } - connection_job_t C = alloc_new_connection (cfd, CTJ, NULL, + connection_job_t C = alloc_new_connection (cfd, CTJ, NULL, ct_outbound, CT->type, CT->extra, ntohl (CT->target.s_addr), CT->target_ipv6, CT->port); if (C) { diff --git a/net/net-connections.h b/net/net-connections.h index d22fe7f..5a70335 100644 --- a/net/net-connections.h +++ b/net/net-connections.h @@ -195,6 +195,24 @@ struct conn_target_info { int global_refcnt; }; +struct pseudo_conn_target_info { + struct event_timer timer; + int pad1; + int pad2; + + void *pad3; + conn_type_t *type; + void *extra; + struct in_addr target; + unsigned char target_ipv6[16]; + int port; + int active_outbound_connections, outbound_connections; + int ready_outbound_connections; + + connection_job_t in_conn; + connection_job_t out_conn; +}; + struct connection_info { struct event_timer timer; int fd; @@ -429,4 +447,4 @@ extern unsigned nat_info[MAX_NAT_INFO_RULES][2]; int net_add_nat_info (char *str); unsigned nat_translate_ip (unsigned local_ip); -connection_job_t alloc_new_connection (int cfd, conn_target_job_t SS, connection_job_t LL, unsigned peer, unsigned char peer_ipv6[16], int peer_port); +connection_job_t alloc_new_connection (int cfd, conn_target_job_t CTJ, listening_connection_job_t LCJ, int basic_type, conn_type_t *conn_type, void *conn_extra, unsigned peer, unsigned char peer_ipv6[16], int peer_port); diff --git a/net/net-tcp-rpc-ext-server.c b/net/net-tcp-rpc-ext-server.c index 7a3859a..8b2a198 100644 --- a/net/net-tcp-rpc-ext-server.c +++ b/net/net-tcp-rpc-ext-server.c @@ -85,6 +85,52 @@ conn_type_t ct_tcp_rpc_ext_server = { .crypto_needed_output_bytes = cpu_tcp_aes_crypto_ctr128_needed_output_bytes, }; +int tcp_proxy_pass_parse_execute (connection_job_t C); +int tcp_proxy_pass_close (connection_job_t C, int who); +int tcp_proxy_pass_write_packet (connection_job_t c, struct raw_message *raw); + +conn_type_t ct_proxy_pass = { + .magic = CONN_FUNC_MAGIC, + .flags = C_RAWMSG, + .title = "proxypass", + .init_accepted = server_failed, + .parse_execute = tcp_proxy_pass_parse_execute, + .close = tcp_proxy_pass_close, + .write_packet = tcp_proxy_pass_write_packet, + .connected = server_noop, +}; + +int tcp_proxy_pass_parse_execute (connection_job_t C) { + struct connection_info *c = CONN_INFO(C); + if (!c->extra) { + fail_connection (C, -1); + return 0; + } + job_t E = job_incref (c->extra); + struct connection_info *e = CONN_INFO(E); + + struct raw_message *r = malloc (sizeof (*r)); + rwm_move (r, &c->in); + mpq_push_w (e->out_queue, PTR_MOVE(r), 0); + job_signal (JOB_REF_PASS (E), JS_RUN); + return 0; +} + +int tcp_proxy_pass_close (connection_job_t C, int who) { + struct connection_info *c = CONN_INFO(C); + if (c->extra) { + job_t E = PTR_MOVE (c->extra); + fail_connection (E, -23); + job_decref (JOB_REF_PASS (E)); + } + return cpu_server_close_connection (C, who); +} + +int tcp_proxy_pass_write_packet (connection_job_t C, struct raw_message *raw) { + rwm_union (&CONN_INFO(C)->out, raw); + return 0; +} + int tcp_rpcs_default_execute (connection_job_t c, int op, struct raw_message *msg); static unsigned char ext_secret[16][16]; @@ -794,20 +840,46 @@ static int is_allowed_timestamp (int timestamp) { return 0; } -static void proxy_connection (connection_job_t C, const struct domain_info *info) { +static int proxy_connection (connection_job_t C, const struct domain_info *info) { const char zero[16] = {}; if (info->target.s_addr == 0 && !memcmp (info->target_ipv6, zero, 16)) { vkprintf (0, "failed to proxy request to %s\n", info->domain); - return; + fail_connection (C, -17); + return 0; + } + + int cfd = -1; + if (info->target.s_addr) { + cfd = client_socket (info->target.s_addr, 443, 0); + } else { + cfd = client_socket_ipv6 (info->target_ipv6, 443, 0); + } + + if (cfd < 0) { + fail_connection (C, -27); + return 0; } - // TODO proxy the connection to info->target.s_addr / info->target_ipv6 + struct connection_info *c = CONN_INFO(C); + c->type->crypto_free (C); + job_incref (C); + job_t EJ = alloc_new_connection (cfd, NULL, NULL, ct_outbound, &ct_proxy_pass, C, ntohl (*(int *)&info->target.s_addr), (void *)info->target_ipv6, 443); + + if (!EJ) { + job_decref_f (C); + fail_connection (C, -37); + return 0; + } + + c->type = &ct_proxy_pass; + c->extra = PTR_MOVE(EJ); + + return c->type->parse_execute (C); } int tcp_rpcs_compact_parse_execute (connection_job_t C) { #define RETURN_TLS_ERROR(info) \ - proxy_connection (C, info); \ - return (-1 << 28); + return proxy_connection (C, info); struct tcp_rpc_data *D = TCP_RPC_DATA (C); if (D->crypto_flags & RPCF_COMPACT_OFF) {