diff --git a/crates/shadowsocks-service/src/config.rs b/crates/shadowsocks-service/src/config.rs index 87fbc54bab65..47208312e044 100644 --- a/crates/shadowsocks-service/src/config.rs +++ b/crates/shadowsocks-service/src/config.rs @@ -1936,15 +1936,6 @@ impl Config { local_config.check_integrity()?; } - if self.server.is_empty() { - let err = Error::new( - ErrorKind::MissingField, - "missing `servers` for client configuration", - None, - ); - return Err(err); - } - // Balancer related checks if let Some(rtt) = self.balancer.max_server_rtt { if rtt.as_secs() == 0 { diff --git a/crates/shadowsocks-service/src/local/dns/server.rs b/crates/shadowsocks-service/src/local/dns/server.rs index 3abe84d16b6d..4040d652a745 100644 --- a/crates/shadowsocks-service/src/local/dns/server.rs +++ b/crates/shadowsocks-service/src/local/dns/server.rs @@ -325,6 +325,11 @@ fn check_name_in_proxy_list(acl: &AccessControl, name: &Name) -> Option { /// given the query, determine whether remote/local query should be used, or inconclusive fn should_forward_by_query(context: &ServiceContext, balancer: &PingBalancer, query: &Query) -> Option { + // No server was configured, then always resolve with local + if balancer.is_empty() { + return Some(false); + } + // Check if we are trying to make queries for remote servers // // This happens normally because VPN or TUN device receives DNS queries from local servers' plugins diff --git a/crates/shadowsocks-service/src/local/http/dispatcher.rs b/crates/shadowsocks-service/src/local/http/dispatcher.rs index f8dbd7f82c75..df6984e01f15 100644 --- a/crates/shadowsocks-service/src/local/http/dispatcher.rs +++ b/crates/shadowsocks-service/src/local/http/dispatcher.rs @@ -22,8 +22,8 @@ use shadowsocks::relay::socks5::Address; use crate::local::{ context::ServiceContext, loadbalancing::PingBalancer, - net::AutoProxyClientStream, - utils::establish_tcp_tunnel, + net::{AutoProxyClientStream, AutoProxyIo}, + utils::{establish_tcp_tunnel, establish_tcp_tunnel_bypassed}, }; use super::{ @@ -101,10 +101,24 @@ impl HttpDispatcher { // Connect to Shadowsocks' remote // // FIXME: What STATUS should I return for connection error? - let server = self.balancer.best_tcp_server(); - let mut stream = AutoProxyClientStream::connect(self.context, server.as_ref(), &host).await?; + let mut server_opt = None; + let mut stream = if self.balancer.is_empty() { + AutoProxyClientStream::connect_bypassed(self.context, &host).await? + } else { + let server = self.balancer.best_tcp_server(); - debug!("CONNECT relay connected {} <-> {}", self.client_addr, host); + let stream = AutoProxyClientStream::connect(self.context, server.as_ref(), &host).await?; + server_opt = Some(server); + + stream + }; + + debug!( + "CONNECT relay connected {} <-> {} ({})", + self.client_addr, + host, + if stream.is_bypassed() { "bypassed" } else { "proxied" } + ); // Upgrade to a TCP tunnel // @@ -118,14 +132,19 @@ impl HttpDispatcher { Ok(mut upgraded) => { trace!("CONNECT tunnel upgrade success, {} <-> {}", client_addr, host); - let _ = establish_tcp_tunnel( - server.server_config(), - &mut upgraded, - &mut stream, - client_addr, - &host, - ) - .await; + let _ = match server_opt { + Some(server) => { + establish_tcp_tunnel( + server.server_config(), + &mut upgraded, + &mut stream, + client_addr, + &host, + ) + .await + } + None => establish_tcp_tunnel_bypassed(&mut upgraded, &mut stream, client_addr, &host).await, + }; } Err(e) => { error!( @@ -153,7 +172,7 @@ impl HttpDispatcher { // Set keep-alive for connection with remote set_conn_keep_alive(version, self.req.headers_mut(), conn_keep_alive); - let client = if self.context.check_target_bypassed(&host).await { + let client = if self.balancer.is_empty() || self.context.check_target_bypassed(&host).await { trace!("bypassed {} -> {} {:?}", self.client_addr, host, self.req); HttpClientEnum::Bypass(self.bypass_client) } else { diff --git a/crates/shadowsocks-service/src/local/loadbalancing/ping_balancer.rs b/crates/shadowsocks-service/src/local/loadbalancing/ping_balancer.rs index a28361d91336..6623b5b92bd7 100644 --- a/crates/shadowsocks-service/src/local/loadbalancing/ping_balancer.rs +++ b/crates/shadowsocks-service/src/local/loadbalancing/ping_balancer.rs @@ -104,6 +104,11 @@ impl PingBalancerBuilder { } fn find_best_idx(servers: &[Arc], mode: Mode) -> (usize, usize) { + if servers.is_empty() { + trace!("init without any TCP and UDP servers"); + return (0, 0); + } + let mut best_tcp_idx = 0; let mut best_udp_idx = 0; @@ -157,8 +162,6 @@ impl PingBalancerBuilder { } pub async fn build(self) -> io::Result { - assert!(!self.servers.is_empty(), "build PingBalancer without any servers"); - if let Some(intv) = self.check_best_interval { if intv > self.check_interval { return Err(io::Error::new( @@ -215,12 +218,19 @@ struct PingBalancerContext { impl PingBalancerContext { fn best_tcp_server(&self) -> Arc { + assert!(!self.is_empty(), "no available server"); self.servers[self.best_tcp_idx.load(Ordering::Relaxed)].clone() } fn best_udp_server(&self) -> Arc { + assert!(!self.is_empty(), "no available server"); self.servers[self.best_udp_idx.load(Ordering::Relaxed)].clone() } + + #[inline] + fn is_empty(&self) -> bool { + self.servers.is_empty() + } } impl PingBalancerContext { @@ -328,8 +338,9 @@ impl PingBalancerContext { } async fn init_score(&self) { - assert!(!self.servers.is_empty(), "check PingBalancer without any servers"); - + if self.servers.is_empty() { + return; + } self.check_once(true).await; } @@ -342,6 +353,10 @@ impl PingBalancerContext { } fn probing_required(&self) -> bool { + if self.servers.is_empty() { + return false; + } + let mut tcp_count = 0; let mut udp_count = 0; @@ -376,6 +391,9 @@ impl PingBalancerContext { /// Check each servers' score and update the best server's index async fn check_once(&self, first_run: bool) { let servers = &self.servers; + if servers.is_empty() { + return; + } let mut vfut_tcp = Vec::with_capacity(servers.len()); let mut vfut_udp = Vec::with_capacity(servers.len()); @@ -496,6 +514,9 @@ impl PingBalancerContext { /// Check the best server only async fn check_best_server(&self) { let servers = &self.servers; + if servers.is_empty() { + return; + } let mut vfut = Vec::new(); @@ -689,6 +710,13 @@ impl PingBalancer { context.best_udp_server() } + /// Check if there is no available server + #[inline] + pub fn is_empty(&self) -> bool { + let context = self.inner.context.load(); + context.is_empty() + } + /// Get the server list pub fn servers(&self) -> PingServerIter<'_> { let context = self.inner.context.load(); diff --git a/crates/shadowsocks-service/src/local/mod.rs b/crates/shadowsocks-service/src/local/mod.rs index 4ecc435cd6df..053a46559587 100644 --- a/crates/shadowsocks-service/src/local/mod.rs +++ b/crates/shadowsocks-service/src/local/mod.rs @@ -106,7 +106,6 @@ impl Server { /// Starts a shadowsocks local server pub async fn create(config: Config) -> io::Result { assert!(config.config_type == ConfigType::Local && !config.local.is_empty()); - assert!(!config.server.is_empty()); trace!("{:?}", config); diff --git a/crates/shadowsocks-service/src/local/net/udp/association.rs b/crates/shadowsocks-service/src/local/net/udp/association.rs index 661a93fcff4e..d414b0735fe4 100644 --- a/crates/shadowsocks-service/src/local/net/udp/association.rs +++ b/crates/shadowsocks-service/src/local/net/udp/association.rs @@ -421,7 +421,7 @@ where async fn dispatch_received_packet(&mut self, target_addr: &Address, data: &[u8]) { // Check if target should be bypassed. If so, send packets directly. - let bypassed = self.context.check_target_bypassed(target_addr).await; + let bypassed = self.balancer.is_empty() || self.context.check_target_bypassed(target_addr).await; trace!( "udp relay {} -> {} ({}) with {} bytes", diff --git a/crates/shadowsocks-service/src/local/redir/tcprelay/mod.rs b/crates/shadowsocks-service/src/local/redir/tcprelay/mod.rs index 32ba3e19dc78..5fae76d811bb 100644 --- a/crates/shadowsocks-service/src/local/redir/tcprelay/mod.rs +++ b/crates/shadowsocks-service/src/local/redir/tcprelay/mod.rs @@ -21,7 +21,7 @@ use crate::{ loadbalancing::PingBalancer, net::AutoProxyClientStream, redir::redir_ext::{TcpListenerRedirExt, TcpStreamRedirExt}, - utils::{establish_tcp_tunnel, to_ipv4_mapped}, + utils::{establish_tcp_tunnel, establish_tcp_tunnel_bypassed, to_ipv4_mapped}, }, }; @@ -37,6 +37,11 @@ async fn establish_client_tcp_redir<'a>( peer_addr: SocketAddr, addr: &Address, ) -> io::Result<()> { + if balancer.is_empty() { + let mut remote = AutoProxyClientStream::connect_bypassed(context, addr).await?; + return establish_tcp_tunnel_bypassed(&mut stream, &mut remote, peer_addr, addr).await; + } + let server = balancer.best_tcp_server(); let svr_cfg = server.server_config(); diff --git a/crates/shadowsocks-service/src/local/socks/server/socks4/tcprelay.rs b/crates/shadowsocks-service/src/local/socks/server/socks4/tcprelay.rs index f8c4dc895dec..7a8c973fcf0b 100644 --- a/crates/shadowsocks-service/src/local/socks/server/socks4/tcprelay.rs +++ b/crates/shadowsocks-service/src/local/socks/server/socks4/tcprelay.rs @@ -17,7 +17,7 @@ use crate::local::{ context::ServiceContext, loadbalancing::PingBalancer, net::AutoProxyClientStream, - utils::establish_tcp_tunnel, + utils::{establish_tcp_tunnel, establish_tcp_tunnel_bypassed}, }; use crate::local::socks::socks4::{ @@ -95,11 +95,20 @@ impl Socks4TcpHandler { return Ok(()); } - let server = self.balancer.best_tcp_server(); - let svr_cfg = server.server_config(); let target_addr = target_addr.into(); + let mut server_opt = None; + let server_result = if self.balancer.is_empty() { + AutoProxyClientStream::connect_bypassed(self.context, &target_addr).await + } else { + let server = self.balancer.best_tcp_server(); - let mut remote = match AutoProxyClientStream::connect(self.context, &server, &target_addr).await { + let r = AutoProxyClientStream::connect(self.context, &server, &target_addr).await; + server_opt = Some(server); + + r + }; + + let mut remote = match server_result { Ok(remote) => { // Tell the client that we are ready let handshake_rsp = HandshakeResponse::new(ResultCode::RequestGranted); @@ -132,6 +141,12 @@ impl Socks4TcpHandler { // UNWRAP. let mut stream = stream.into_inner(); - establish_tcp_tunnel(svr_cfg, &mut stream, &mut remote, peer_addr, &target_addr).await + match server_opt { + Some(server) => { + let svr_cfg = server.server_config(); + establish_tcp_tunnel(svr_cfg, &mut stream, &mut remote, peer_addr, &target_addr).await + } + None => establish_tcp_tunnel_bypassed(&mut stream, &mut remote, peer_addr, &target_addr).await, + } } } diff --git a/crates/shadowsocks-service/src/local/socks/server/socks5/tcprelay.rs b/crates/shadowsocks-service/src/local/socks/server/socks5/tcprelay.rs index 3552c2280928..993af01febae 100644 --- a/crates/shadowsocks-service/src/local/socks/server/socks5/tcprelay.rs +++ b/crates/shadowsocks-service/src/local/socks/server/socks5/tcprelay.rs @@ -33,7 +33,7 @@ use crate::{ loadbalancing::PingBalancer, net::AutoProxyClientStream, socks::config::Socks5AuthConfig, - utils::establish_tcp_tunnel, + utils::{establish_tcp_tunnel, establish_tcp_tunnel_bypassed}, }, net::utils::ignore_until_end, }; @@ -251,10 +251,19 @@ impl Socks5TcpHandler { return Ok(()); } - let server = self.balancer.best_tcp_server(); - let svr_cfg = server.server_config(); + let mut server_opt = None; + let remote_result = if self.balancer.is_empty() { + AutoProxyClientStream::connect_bypassed(self.context.clone(), &target_addr).await + } else { + let server = self.balancer.best_tcp_server(); + + let r = AutoProxyClientStream::connect(self.context.clone(), &server, &target_addr).await; + server_opt = Some(server); + + r + }; - let mut remote = match AutoProxyClientStream::connect(self.context.clone(), &server, &target_addr).await { + let mut remote = match remote_result { Ok(remote) => { // Tell the client that we are ready let header = @@ -280,7 +289,13 @@ impl Socks5TcpHandler { } }; - establish_tcp_tunnel(svr_cfg, &mut stream, &mut remote, peer_addr, &target_addr).await + match server_opt { + Some(server) => { + let svr_cfg = server.server_config(); + establish_tcp_tunnel(svr_cfg, &mut stream, &mut remote, peer_addr, &target_addr).await + } + None => establish_tcp_tunnel_bypassed(&mut stream, &mut remote, peer_addr, &target_addr).await, + } } async fn handle_udp_associate(self, mut stream: TcpStream, client_addr: Address) -> io::Result<()> { diff --git a/crates/shadowsocks-service/src/local/tun/tcp.rs b/crates/shadowsocks-service/src/local/tun/tcp.rs index 864ffb05a067..0135a930ddf0 100644 --- a/crates/shadowsocks-service/src/local/tun/tcp.rs +++ b/crates/shadowsocks-service/src/local/tun/tcp.rs @@ -33,7 +33,7 @@ use crate::local::{ context::ServiceContext, loadbalancing::PingBalancer, net::AutoProxyClientStream, - utils::{establish_tcp_tunnel, to_ipv4_mapped}, + utils::{establish_tcp_tunnel, establish_tcp_tunnel_bypassed, to_ipv4_mapped}, }; use super::virt_device::VirtTunDevice; @@ -477,11 +477,15 @@ async fn establish_client_tcp_redir<'a>( peer_addr: SocketAddr, addr: &Address, ) -> io::Result<()> { + if balancer.is_empty() { + let mut remote = AutoProxyClientStream::connect_bypassed(context, addr).await?; + return establish_tcp_tunnel_bypassed(&mut stream, &mut remote, peer_addr, addr).await; + } + let server = balancer.best_tcp_server(); let svr_cfg = server.server_config(); let mut remote = AutoProxyClientStream::connect(context, &server, addr).await?; - establish_tcp_tunnel(svr_cfg, &mut stream, &mut remote, peer_addr, addr).await } diff --git a/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs b/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs index 88ead2caa395..a0a445f7488b 100644 --- a/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs +++ b/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs @@ -10,7 +10,7 @@ use crate::local::{ context::ServiceContext, loadbalancing::PingBalancer, net::AutoProxyClientStream, - utils::establish_tcp_tunnel, + utils::{establish_tcp_tunnel, establish_tcp_tunnel_bypassed}, }; pub async fn run_tcp_tunnel( @@ -61,6 +61,13 @@ async fn handle_tcp_client( peer_addr: SocketAddr, forward_addr: Address, ) -> io::Result<()> { + if balancer.is_empty() { + trace!("establishing tcp tunnel {} <-> {} direct", peer_addr, forward_addr); + + let mut remote = AutoProxyClientStream::connect_bypassed(context, &forward_addr).await?; + return establish_tcp_tunnel_bypassed(&mut stream, &mut remote, peer_addr, &forward_addr).await; + } + let server = balancer.best_tcp_server(); let svr_cfg = server.server_config(); trace!( @@ -72,6 +79,5 @@ async fn handle_tcp_client( ); let mut remote = AutoProxyClientStream::connect_proxied(context, &server, &forward_addr).await?; - establish_tcp_tunnel(svr_cfg, &mut stream, &mut remote, peer_addr, &forward_addr).await } diff --git a/crates/shadowsocks-service/src/local/tunnel/udprelay.rs b/crates/shadowsocks-service/src/local/tunnel/udprelay.rs index 7987797fe39a..065a68f05b5b 100644 --- a/crates/shadowsocks-service/src/local/tunnel/udprelay.rs +++ b/crates/shadowsocks-service/src/local/tunnel/udprelay.rs @@ -1,65 +1,47 @@ //! UDP Tunnel server -use std::{ - cell::RefCell, - io::{self, ErrorKind}, - net::SocketAddr, - sync::Arc, - time::Duration, -}; +use std::{io, net::SocketAddr, sync::Arc, time::Duration}; -use bytes::Bytes; -use futures::future; -use log::{debug, error, info, trace, warn}; -use lru_time_cache::LruCache; -use rand::{rngs::SmallRng, Rng, SeedableRng}; +use async_trait::async_trait; +use log::{debug, error, info}; use shadowsocks::{ lookup_then, net::UdpSocket as ShadowUdpSocket, - relay::{ - socks5::Address, - udprelay::{options::UdpSocketControlData, ProxySocket, MAXIMUM_UDP_PAYLOAD_SIZE}, - }, + relay::{socks5::Address, udprelay::MAXIMUM_UDP_PAYLOAD_SIZE}, ServerAddr, }; -use tokio::{net::UdpSocket, sync::mpsc, task::JoinHandle, time}; +use tokio::{net::UdpSocket, time}; -use crate::{ - local::{context::ServiceContext, loadbalancing::PingBalancer}, - net::{ - packet_window::PacketWindowFilter, - MonProxySocket, - UDP_ASSOCIATION_KEEP_ALIVE_CHANNEL_SIZE, - UDP_ASSOCIATION_SEND_CHANNEL_SIZE, - }, +use crate::local::{ + context::ServiceContext, + loadbalancing::PingBalancer, + net::{UdpAssociationManager, UdpInboundWrite}, }; -type AssociationMap = LruCache; +#[derive(Clone)] +struct TunnelUdpInboundWriter { + inbound: Arc, +} + +#[async_trait] +impl UdpInboundWrite for TunnelUdpInboundWriter { + async fn send_to(&self, peer_addr: SocketAddr, _remote_addr: &Address, data: &[u8]) -> io::Result<()> { + self.inbound.send_to(data, peer_addr).await.map(|_| ()) + } +} pub struct UdpTunnel { context: Arc, - assoc_map: AssociationMap, - keepalive_tx: mpsc::Sender, - keepalive_rx: mpsc::Receiver, - time_to_live: Duration, + time_to_live: Option, + capacity: Option, } impl UdpTunnel { pub fn new(context: Arc, time_to_live: Option, capacity: Option) -> UdpTunnel { - let time_to_live = time_to_live.unwrap_or(crate::DEFAULT_UDP_EXPIRY_DURATION); - let assoc_map = match capacity { - Some(capacity) => LruCache::with_expiry_duration_and_capacity(time_to_live, capacity), - None => LruCache::with_expiry_duration(time_to_live), - }; - - let (keepalive_tx, keepalive_rx) = mpsc::channel(UDP_ASSOCIATION_KEEP_ALIVE_CHANNEL_SIZE); - UdpTunnel { context, - assoc_map, - keepalive_tx, - keepalive_rx, time_to_live, + capacity, } } @@ -85,20 +67,29 @@ impl UdpTunnel { info!("shadowsocks UDP tunnel listening on {}", socket.local_addr()?); let listener = Arc::new(socket); + let (mut manager, cleanup_interval, mut keepalive_rx) = UdpAssociationManager::new( + self.context.clone(), + TunnelUdpInboundWriter { + inbound: listener.clone(), + }, + self.time_to_live, + self.capacity, + balancer, + ); let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE]; - let mut cleanup_timer = time::interval(self.time_to_live); + let mut cleanup_timer = time::interval(cleanup_interval); loop { tokio::select! { _ = cleanup_timer.tick() => { // cleanup expired associations. iter() will remove expired elements - let _ = self.assoc_map.iter(); + manager.cleanup_expired().await; } - peer_addr_opt = self.keepalive_rx.recv() => { + peer_addr_opt = keepalive_rx.recv() => { let peer_addr = peer_addr_opt.expect("keep-alive channel closed unexpectly"); - self.assoc_map.get(&peer_addr); + manager.keep_alive(&peer_addr).await; } recv_result = listener.recv_from(&mut buffer) => { @@ -123,8 +114,7 @@ impl UdpTunnel { } let data = &buffer[..n]; - if let Err(err) = self - .send_packet(&listener, peer_addr, &balancer, forward_addr, data) + if let Err(err) = manager.send_to(peer_addr, forward_addr.clone(), data) .await { debug!( @@ -139,347 +129,4 @@ impl UdpTunnel { } } } - - async fn send_packet( - &mut self, - listener: &Arc, - peer_addr: SocketAddr, - balancer: &PingBalancer, - forward_addr: &Address, - data: &[u8], - ) -> io::Result<()> { - if let Some(assoc) = self.assoc_map.get(&peer_addr) { - return assoc.try_send(Bytes::copy_from_slice(data)); - } - - let assoc = UdpAssociation::new( - self.context.clone(), - listener.clone(), - peer_addr, - forward_addr.clone(), - self.keepalive_tx.clone(), - balancer.clone(), - self.time_to_live, - ); - - debug!("created udp association for {}", peer_addr); - - assoc.try_send(Bytes::copy_from_slice(data))?; - self.assoc_map.insert(peer_addr, assoc); - - Ok(()) - } -} - -struct UdpAssociation { - assoc_handle: JoinHandle<()>, - sender: mpsc::Sender, -} - -impl Drop for UdpAssociation { - fn drop(&mut self) { - self.assoc_handle.abort(); - } -} - -impl UdpAssociation { - fn new( - context: Arc, - inbound: Arc, - peer_addr: SocketAddr, - forward_addr: Address, - keepalive_tx: mpsc::Sender, - balancer: PingBalancer, - server_session_expire_duration: Duration, - ) -> UdpAssociation { - let (assoc_handle, sender) = UdpAssociationContext::create( - context, - inbound, - peer_addr, - forward_addr, - keepalive_tx, - balancer, - server_session_expire_duration, - ); - UdpAssociation { assoc_handle, sender } - } - - fn try_send(&self, data: Bytes) -> io::Result<()> { - if let Err(..) = self.sender.try_send(data) { - let err = io::Error::new(ErrorKind::Other, "udp relay channel full"); - return Err(err); - } - Ok(()) - } -} - -#[derive(Debug, Clone)] -struct ServerContext { - packet_window_filter: PacketWindowFilter, -} - -#[derive(Clone)] -struct ServerSessionContext { - server_session_map: LruCache, -} - -impl ServerSessionContext { - fn new(server_session_expire_duration: Duration) -> ServerSessionContext { - ServerSessionContext { - server_session_map: LruCache::with_expiry_duration(server_session_expire_duration), - } - } -} - -struct UdpAssociationContext { - context: Arc, - peer_addr: SocketAddr, - forward_addr: Address, - proxied_socket: Option, - keepalive_tx: mpsc::Sender, - keepalive_flag: bool, - balancer: PingBalancer, - inbound: Arc, - client_session_id: u64, - client_packet_id: u64, - server_session: Option, - server_session_expire_duration: Duration, -} - -impl Drop for UdpAssociationContext { - fn drop(&mut self) { - debug!("udp association for {} is closed", self.peer_addr); - } -} - -thread_local! { - static CLIENT_SESSION_RNG: RefCell = RefCell::new(SmallRng::from_entropy()); -} - -#[inline] -fn generate_client_session_id() -> u64 { - CLIENT_SESSION_RNG.with(|rng| rng.borrow_mut().gen()) -} - -impl UdpAssociationContext { - fn create( - context: Arc, - inbound: Arc, - peer_addr: SocketAddr, - forward_addr: Address, - keepalive_tx: mpsc::Sender, - balancer: PingBalancer, - server_session_expire_duration: Duration, - ) -> (JoinHandle<()>, mpsc::Sender) { - // Pending packets UDP_ASSOCIATION_SEND_CHANNEL_SIZE for each association should be good enough for a server. - // If there are plenty of packets stuck in the channel, dropping excessive packets is a good way to protect the server from - // being OOM. - let (sender, receiver) = mpsc::channel(UDP_ASSOCIATION_SEND_CHANNEL_SIZE); - - let mut assoc = UdpAssociationContext { - context, - peer_addr, - forward_addr, - proxied_socket: None, - keepalive_tx, - keepalive_flag: false, - balancer, - inbound, - // client_session_id must be random generated, - // server use this ID to identify every independent clients. - client_session_id: generate_client_session_id(), - client_packet_id: 1, - server_session: None, - server_session_expire_duration, - }; - let handle = tokio::spawn(async move { assoc.dispatch_packet(receiver).await }); - - (handle, sender) - } - - async fn dispatch_packet(&mut self, mut receiver: mpsc::Receiver) { - let mut proxied_buffer = Vec::new(); - let mut keepalive_interval = time::interval(Duration::from_secs(1)); - - loop { - tokio::select! { - packet_received_opt = receiver.recv() => { - let data = match packet_received_opt { - Some(d) => d, - None => { - trace!("udp association for {} -> ... channel closed", self.peer_addr); - break; - } - }; - - self.dispatch_received_packet(&data).await; - } - - received_opt = receive_from_proxied_opt(&self.proxied_socket, &mut proxied_buffer), if self.proxied_socket.is_some() => { - let (n, addr, control_opt) = match received_opt { - Ok(r) => r, - Err(err) => { - error!("udp relay {} <- ... failed, error: {}", self.peer_addr, err); - // Socket failure. Reset for recreation. - self.proxied_socket = None; - continue; - } - }; - - if let Some(control) = control_opt { - // Check if Packet ID is in the window - - let session = self.server_session.get_or_insert_with(|| { - ServerSessionContext::new(self.server_session_expire_duration) - }); - - let packet_id = control.packet_id; - let session_context = session - .server_session_map - .entry(control.server_session_id) - .or_insert_with(|| { - trace!( - "udp server with session {} for {} created", - control.client_session_id, - self.peer_addr, - ); - - ServerContext { - packet_window_filter: PacketWindowFilter::new() - } - }); - - if !session_context.packet_window_filter.validate_packet_id(packet_id, u64::MAX) { - error!("udp {} packet_id {} out of window", self.peer_addr, packet_id); - continue; - } - } - - self.send_received_respond_packet(&addr, &proxied_buffer[..n]).await; - } - - _ = keepalive_interval.tick() => { - if self.keepalive_flag { - if let Err(..) = self.keepalive_tx.try_send(self.peer_addr) { - debug!("udp relay {} keep-alive failed, channel full or closed", self.peer_addr); - } else { - self.keepalive_flag = false; - } - } - } - } - } - - #[inline] - async fn receive_from_proxied_opt( - socket: &Option, - buf: &mut Vec, - ) -> io::Result<(usize, Address, Option)> { - match *socket { - None => future::pending().await, - Some(ref s) => { - if buf.is_empty() { - buf.resize(MAXIMUM_UDP_PAYLOAD_SIZE, 0); - } - s.recv_with_ctrl(buf).await - } - } - } - } - - async fn dispatch_received_packet(&mut self, data: &[u8]) { - trace!( - "udp relay {} -> {} with {} bytes", - self.peer_addr, - self.forward_addr, - data.len() - ); - - if let Err(err) = self.dispatch_received_proxied_packet(data).await { - error!( - "udp relay {} -> {} with {} bytes, error: {}", - self.peer_addr, - self.forward_addr, - data.len(), - err - ); - } - } - - async fn dispatch_received_proxied_packet(&mut self, data: &[u8]) -> io::Result<()> { - let socket = match self.proxied_socket { - Some(ref mut socket) => socket, - None => { - // Create a new connection to proxy server - - let server = self.balancer.best_udp_server(); - let svr_cfg = server.server_config(); - - let socket = - ProxySocket::connect_with_opts(self.context.context(), svr_cfg, self.context.connect_opts_ref()) - .await?; - let socket = MonProxySocket::from_socket(socket, self.context.flow_stat()); - - self.proxied_socket.insert(socket) - } - }; - - // Increase Packet ID before send - self.client_packet_id = match self.client_packet_id.checked_add(1) { - Some(i) => i, - None => { - warn!( - "{} -> {} (proxied) sending {} bytes failed, packet id overflowed", - self.peer_addr, - self.forward_addr, - data.len(), - ); - return Ok(()); - } - }; - - let control = UdpSocketControlData { - client_session_id: self.client_session_id, - server_session_id: 0, - packet_id: self.client_packet_id, - }; - - match socket.send_with_ctrl(&self.forward_addr, &control, data).await { - Ok(..) => return Ok(()), - Err(err) => { - debug!( - "{} -> {} (proxied) sending {} bytes failed, error: {}", - self.peer_addr, - self.forward_addr, - data.len(), - err - ); - - // Drop the socket and reconnect to another server. - self.proxied_socket = None; - } - } - - Ok(()) - } - - async fn send_received_respond_packet(&mut self, addr: &Address, data: &[u8]) { - trace!("udp relay {} <- {} received {} bytes", self.peer_addr, addr, data.len()); - - // Keep association alive in map - self.keepalive_flag = true; - - // Send back to client - if let Err(err) = self.inbound.send_to(data, self.peer_addr).await { - warn!( - "udp failed to send back {} bytes to client {}, from target {}, error: {}", - data.len(), - self.peer_addr, - addr, - err - ); - } else { - trace!("udp relay {} <- {} with {} bytes", self.peer_addr, addr, data.len()); - } - } } diff --git a/crates/shadowsocks-service/src/local/utils.rs b/crates/shadowsocks-service/src/local/utils.rs index 1d352a95b861..749ab99ba26e 100644 --- a/crates/shadowsocks-service/src/local/utils.rs +++ b/crates/shadowsocks-service/src/local/utils.rs @@ -38,7 +38,6 @@ where svr_cfg.addr(), ); } else { - debug!("established tcp tunnel {} <-> {} bypassed", peer_addr, target_addr); return establish_tcp_tunnel_bypassed(plain, shadow, peer_addr, target_addr).await; } @@ -95,7 +94,7 @@ where Ok(()) } -async fn establish_tcp_tunnel_bypassed( +pub(crate) async fn establish_tcp_tunnel_bypassed( plain: &mut P, shadow: &mut S, peer_addr: SocketAddr, @@ -105,6 +104,8 @@ where P: AsyncRead + AsyncWrite + Unpin, S: AsyncRead + AsyncWrite + Unpin, { + debug!("established tcp tunnel {} <-> {} bypassed", peer_addr, target_addr); + match copy_bidirectional(plain, shadow).await { Ok((rn, wn)) => { trace!( diff --git a/src/service/local.rs b/src/service/local.rs index 8395422d7acd..853736d01c24 100644 --- a/src/service/local.rs +++ b/src/service/local.rs @@ -738,16 +738,6 @@ pub fn main(matches: &ArgMatches) { return; } - if config.server.is_empty() { - eprintln!( - "missing proxy servers, consider specifying it by \ - --server-addr, --encrypt-method, --password command line option, \ - or --server-url command line option, \ - or configuration file, check more details in https://shadowsocks.org/en/config/quick-guide.html" - ); - return; - } - if let Err(err) = config.check_integrity() { eprintln!("config integrity check failed, {}", err); return;