diff --git a/config.go b/config.go index d9694b05..d8e1e9d0 100755 --- a/config.go +++ b/config.go @@ -103,17 +103,17 @@ func initConfig() (err error) { udpArgs.Local = udp.Flag("local", "local ip:port to listen").Short('p').Default(":33080").String() //########mux-server######### - muxServer := app.Command("server", "proxy on mux server mode") + muxServer := app.Command("server", "proxy on mux server mode").Hidden() muxServerArgs.Parent = muxServer.Flag("parent", "parent address, such as: \"23.32.32.19:28008\"").Default("").Short('P').String() muxServerArgs.CertFile = muxServer.Flag("cert", "cert file for tls").Short('C').Default("proxy.crt").String() muxServerArgs.KeyFile = muxServer.Flag("key", "key file for tls").Short('K').Default("proxy.key").String() muxServerArgs.Timeout = muxServer.Flag("timeout", "tcp timeout with milliseconds").Short('t').Default("2000").Int() muxServerArgs.IsUDP = muxServer.Flag("udp", "proxy on udp mux server mode").Default("false").Bool() muxServerArgs.Key = muxServer.Flag("k", "client key").Default("default").String() - muxServerArgs.Route = muxServer.Flag("route", "local route to client's network, such as :PROTOCOL://LOCAL_IP:LOCAL_PORT@[CLIENT_KEY]CLIENT_LOCAL_HOST:CLIENT_LOCAL_PORT").Short('r').Default("").Strings() + muxServerArgs.Route = muxServer.Flag("route", "local route to client's network, such as: PROTOCOL://LOCAL_IP:LOCAL_PORT@[CLIENT_KEY]CLIENT_LOCAL_HOST:CLIENT_LOCAL_PORT").Short('r').Default("").Strings() //########mux-client######### - muxClient := app.Command("client", "proxy on mux client mode") + muxClient := app.Command("client", "proxy on mux client mode").Hidden() muxClientArgs.Parent = muxClient.Flag("parent", "parent address, such as: \"23.32.32.19:28008\"").Default("").Short('P').String() muxClientArgs.CertFile = muxClient.Flag("cert", "cert file for tls").Short('C').Default("proxy.crt").String() muxClientArgs.KeyFile = muxClient.Flag("key", "key file for tls").Short('K').Default("proxy.key").String() @@ -121,7 +121,7 @@ func initConfig() (err error) { muxClientArgs.Key = muxClient.Flag("k", "key same with server").Default("default").String() //########mux-bridge######### - muxBridge := app.Command("bridge", "proxy on mux bridge mode") + muxBridge := app.Command("bridge", "proxy on mux bridge mode").Hidden() muxBridgeArgs.CertFile = muxBridge.Flag("cert", "cert file for tls").Short('C').Default("proxy.crt").String() muxBridgeArgs.KeyFile = muxBridge.Flag("key", "key file for tls").Short('K').Default("proxy.key").String() muxBridgeArgs.Timeout = muxBridge.Flag("timeout", "tcp timeout with milliseconds").Short('t').Default("2000").Int() @@ -135,7 +135,7 @@ func initConfig() (err error) { tunnelServerArgs.Timeout = tunnelServer.Flag("timeout", "tcp timeout with milliseconds").Short('t').Default("2000").Int() tunnelServerArgs.IsUDP = tunnelServer.Flag("udp", "proxy on udp tunnel server mode").Default("false").Bool() tunnelServerArgs.Key = tunnelServer.Flag("k", "client key").Default("default").String() - tunnelServerArgs.Route = tunnelServer.Flag("route", "local route to client's network, such as :PROTOCOL://LOCAL_IP:LOCAL_PORT@[CLIENT_KEY]CLIENT_LOCAL_HOST:CLIENT_LOCAL_PORT").Short('r').Default("").Strings() + tunnelServerArgs.Route = tunnelServer.Flag("route", "local route to client's network, such as: PROTOCOL://LOCAL_IP:LOCAL_PORT@[CLIENT_KEY]CLIENT_LOCAL_HOST:CLIENT_LOCAL_PORT").Short('r').Default("").Strings() //########tunnel-client######### tunnelClient := app.Command("tclient", "proxy on tunnel client mode") @@ -259,7 +259,7 @@ func initConfig() (err error) { log.Printf("ERR:%s,restarting...", err) continue } - log.Printf("%s [PID] %d unexpected exited, restarting...\n", os.Args[0], pid) + log.Printf("worker %s [PID] %d unexpected exited, restarting...\n", os.Args[0], pid) time.Sleep(time.Second * 5) } }() diff --git a/services/mux_bridge.go b/services/mux_bridge.go index 32c4b88c..c81c4699 100644 --- a/services/mux_bridge.go +++ b/services/mux_bridge.go @@ -2,6 +2,7 @@ package services import ( "bufio" + "io" "log" "net" "proxy/utils" @@ -11,20 +12,14 @@ import ( "github.com/xtaci/smux" ) -type MuxServerConn struct { - //ClientLocalAddr string //tcp:2.2.22:333@ID - Conn *net.Conn -} type MuxBridge struct { cfg MuxBridgeArgs - serverConns utils.ConcurrentMap clientControlConns utils.ConcurrentMap } func NewMuxBridge() Service { return &MuxBridge{ cfg: MuxBridgeArgs{}, - serverConns: utils.NewConcurrentMap(), clientControlConns: utils.NewConcurrentMap(), } } @@ -51,31 +46,46 @@ func (s *MuxBridge) Start(args interface{}) (err error) { err = sc.ListenTls(s.cfg.CertBytes, s.cfg.KeyBytes, func(inConn net.Conn) { reader := bufio.NewReader(inConn) + var err error var connType uint8 - err = utils.ReadPacket(reader, &connType) + var key string + err = utils.ReadPacket(reader, &connType, &key) if err != nil { log.Printf("read error,ERR:%s", err) return } switch connType { case CONN_SERVER: + log.Printf("server connection %s", key) session, err := smux.Server(inConn, nil) if err != nil { utils.CloseConn(&inConn) - log.Printf("server underlayer connection error,ERR:%s", err) + log.Printf("server session error,ERR:%s", err) return } - conn, err := session.AcceptStream() + for { + stream, err := session.AcceptStream() + if err != nil { + session.Close() + utils.CloseConn(&inConn) + return + } + go s.callback(stream, key) + } + case CONN_CLIENT: + + log.Printf("client connection %s", key) + session, err := smux.Client(inConn, nil) if err != nil { - session.Close() utils.CloseConn(&inConn) + log.Printf("client session error,ERR:%s", err) return } - log.Printf("server connection %s", conn.RemoteAddr()) - //s.callback(conn) + s.clientControlConns.Set(key, session) + //log.Printf("set client session,key: %s", key) } - s.callback(inConn) + }) if err != nil { return @@ -86,86 +96,48 @@ func (s *MuxBridge) Start(args interface{}) (err error) { func (s *MuxBridge) Clean() { s.StopService() } -func (s *MuxBridge) callback(inConn net.Conn) { +func (s *MuxBridge) callback(inConn net.Conn, key string) { reader := bufio.NewReader(inConn) var err error - var connType uint8 - err = utils.ReadPacket(reader, &connType) + var ID, clientLocalAddr, serverID string + err = utils.ReadPacketData(reader, &ID, &clientLocalAddr, &serverID) if err != nil { log.Printf("read error,ERR:%s", err) return } - switch connType { - case CONN_SERVER: - var key, ID, clientLocalAddr, serverID string - err = utils.ReadPacketData(reader, &key, &ID, &clientLocalAddr, &serverID) - if err != nil { - log.Printf("read error,ERR:%s", err) - return + packet := utils.BuildPacketData(ID, clientLocalAddr, serverID) + try := 20 + for { + try-- + if try == 0 { + break } - packet := utils.BuildPacketData(ID, clientLocalAddr, serverID) - log.Printf("server connection, key: %s , id: %s %s %s", key, ID, clientLocalAddr, serverID) - - //addr := clientLocalAddr + "@" + ID - s.serverConns.Set(ID, MuxServerConn{ - Conn: &inConn, - }) - for { - item, ok := s.clientControlConns.Get(key) - if !ok { - log.Printf("client %s control conn not exists", key) - time.Sleep(time.Second * 3) - continue - } - (*item.(*net.Conn)).SetWriteDeadline(time.Now().Add(time.Second * 3)) - _, err := (*item.(*net.Conn)).Write(packet) - (*item.(*net.Conn)).SetWriteDeadline(time.Time{}) + session, ok := s.clientControlConns.Get(key) + if !ok { + log.Printf("client %s session not exists", key) + time.Sleep(time.Second * 3) + continue + } + stream, err := session.(*smux.Session).OpenStream() + if err != nil { + log.Printf("%s client session open stream fail, err: %s, retrying...", key, err) + time.Sleep(time.Second * 3) + continue + } else { + _, err := stream.Write(packet) if err != nil { - log.Printf("%s client control conn write signal fail, err: %s, retrying...", key, err) + log.Printf("server %s stream write fail, err: %s, retrying...", key, err) time.Sleep(time.Second * 3) continue - } else { - break } - } - case CONN_CLIENT: - var key, ID, serverID string - err = utils.ReadPacketData(reader, &key, &ID, &serverID) - if err != nil { - log.Printf("read error,ERR:%s", err) - return - } - log.Printf("client connection , key: %s , id: %s, server id:%s", key, ID, serverID) - - serverConnItem, ok := s.serverConns.Get(ID) - if !ok { + log.Printf("server stream %s created", ID) + go io.Copy(stream, inConn) + io.Copy(inConn, stream) + stream.Close() inConn.Close() - log.Printf("server conn %s exists", ID) - return + log.Printf("server stream %s released", ID) + break } - serverConn := serverConnItem.(MuxServerConn).Conn - utils.IoBind(*serverConn, inConn, func(err interface{}) { - s.serverConns.Remove(ID) - // s.cmClient.RemoveOne(key, ID) - // s.cmServer.RemoveOne(serverID, ID) - log.Printf("conn %s released", ID) - }) - // s.cmClient.Add(key, ID, &inConn) - log.Printf("conn %s created", ID) - - case CONN_CLIENT_CONTROL: - var key string - err = utils.ReadPacketData(reader, &key) - if err != nil { - log.Printf("read error,ERR:%s", err) - return - } - log.Printf("client control connection, key: %s", key) - if s.clientControlConns.Has(key) { - item, _ := s.clientControlConns.Get(key) - (*item.(*net.Conn)).Close() - } - s.clientControlConns.Set(key, &inConn) - log.Printf("set client %s control conn", key) } + } diff --git a/services/mux_client.go b/services/mux_client.go index 08e05de5..4a8f3245 100644 --- a/services/mux_client.go +++ b/services/mux_client.go @@ -2,89 +2,28 @@ package services import ( "crypto/tls" - "fmt" - "io" "log" "net" "proxy/utils" "time" + + "github.com/xtaci/smux" ) type MuxClient struct { cfg MuxClientArgs - // cm utils.ConnManager - ctrlConn net.Conn } func NewMuxClient() Service { return &MuxClient{ cfg: MuxClientArgs{}, - // cm: utils.NewConnManager(), } } func (s *MuxClient) InitService() { - // s.InitHeartbeatDeamon() -} -// func (s *MuxClient) InitHeartbeatDeamon() { -// log.Printf("heartbeat started") -// go func() { -// var heartbeatConn net.Conn -// var ID = *s.cfg.Key -// for { +} -// //close all connection -// s.cm.RemoveAll() -// if s.ctrlConn != nil { -// s.ctrlConn.Close() -// } -// utils.CloseConn(&heartbeatConn) -// heartbeatConn, err := s.GetInConn(CONN_CLIENT_HEARBEAT, ID) -// if err != nil { -// log.Printf("heartbeat connection err: %s, retrying...", err) -// time.Sleep(time.Second * 3) -// utils.CloseConn(&heartbeatConn) -// continue -// } -// log.Printf("heartbeat connection created,id:%s", ID) -// writeDie := make(chan bool) -// readDie := make(chan bool) -// go func() { -// for { -// heartbeatConn.SetWriteDeadline(time.Now().Add(time.Second * 3)) -// _, err = heartbeatConn.Write([]byte{0x00}) -// heartbeatConn.SetWriteDeadline(time.Time{}) -// if err != nil { -// log.Printf("heartbeat connection write err %s", err) -// break -// } -// time.Sleep(time.Second * 3) -// } -// close(writeDie) -// }() -// go func() { -// for { -// signal := make([]byte, 1) -// heartbeatConn.SetReadDeadline(time.Now().Add(time.Second * 6)) -// _, err := heartbeatConn.Read(signal) -// heartbeatConn.SetReadDeadline(time.Time{}) -// if err != nil { -// log.Printf("heartbeat connection read err: %s", err) -// break -// } else { -// //log.Printf("heartbeat from bridge") -// } -// } -// close(readDie) -// }() -// select { -// case <-readDie: -// case <-writeDie: -// } -// } -// }() -// } func (s *MuxClient) CheckArgs() { if *s.cfg.Parent != "" { log.Printf("use tls parent %s", *s.cfg.Parent) @@ -97,37 +36,47 @@ func (s *MuxClient) CheckArgs() { s.cfg.CertBytes, s.cfg.KeyBytes = utils.TlsBytes(*s.cfg.CertFile, *s.cfg.KeyFile) } func (s *MuxClient) StopService() { - // s.cm.RemoveAll() + } func (s *MuxClient) Start(args interface{}) (err error) { s.cfg = args.(MuxClientArgs) s.CheckArgs() s.InitService() - log.Printf("proxy on tunnel client mode") - + log.Printf("proxy on mux client mode") for { - //close all conn - // s.cm.Remove(*s.cfg.Key) - if s.ctrlConn != nil { - s.ctrlConn.Close() + var _conn tls.Conn + _conn, err = utils.TlsConnectHost(*s.cfg.Parent, *s.cfg.Timeout, s.cfg.CertBytes, s.cfg.KeyBytes) + if err != nil { + log.Printf("connection err: %s, retrying...", err) + time.Sleep(time.Second * 3) + continue } - - s.ctrlConn, err = s.GetInConn(CONN_CLIENT_CONTROL, *s.cfg.Key) + conn := net.Conn(&_conn) + _, err = conn.Write(utils.BuildPacket(CONN_CLIENT, *s.cfg.Key)) if err != nil { - log.Printf("control connection err: %s, retrying...", err) + conn.Close() + log.Printf("connection err: %s, retrying...", err) + time.Sleep(time.Second * 3) + continue + } + session, err := smux.Server(conn, nil) + if err != nil { + log.Printf("session err: %s, retrying...", err) + conn.Close() time.Sleep(time.Second * 3) - if s.ctrlConn != nil { - s.ctrlConn.Close() - } continue } for { + stream, err := session.AcceptStream() + if err != nil { + log.Printf("accept stream err: %s, retrying...", err) + session.Close() + time.Sleep(time.Second * 3) + break + } var ID, clientLocalAddr, serverID string - err = utils.ReadPacketData(s.ctrlConn, &ID, &clientLocalAddr, &serverID) + err = utils.ReadPacketData(stream, &ID, &clientLocalAddr, &serverID) if err != nil { - if s.ctrlConn != nil { - s.ctrlConn.Close() - } log.Printf("read connection signal err: %s, retrying...", err) break } @@ -135,78 +84,40 @@ func (s *MuxClient) Start(args interface{}) (err error) { protocol := clientLocalAddr[:3] localAddr := clientLocalAddr[4:] if protocol == "udp" { - go s.ServeUDP(localAddr, ID, serverID) + go s.ServeUDP(stream, localAddr, ID) } else { - go s.ServeConn(localAddr, ID, serverID) + go s.ServeConn(stream, localAddr, ID) } } } + } func (s *MuxClient) Clean() { s.StopService() } -func (s *MuxClient) GetInConn(typ uint8, data ...string) (outConn net.Conn, err error) { - outConn, err = s.GetConn() - if err != nil { - err = fmt.Errorf("connection err: %s", err) - return - } - _, err = outConn.Write(utils.BuildPacket(typ, data...)) - if err != nil { - err = fmt.Errorf("write connection data err: %s ,retrying...", err) - utils.CloseConn(&outConn) - return - } - return -} -func (s *MuxClient) GetConn() (conn net.Conn, err error) { - var _conn tls.Conn - _conn, err = utils.TlsConnectHost(*s.cfg.Parent, *s.cfg.Timeout, s.cfg.CertBytes, s.cfg.KeyBytes) - if err == nil { - conn = net.Conn(&_conn) - } - return -} -func (s *MuxClient) ServeUDP(localAddr, ID, serverID string) { - var inConn net.Conn - var err error - // for { - for { - // s.cm.RemoveOne(*s.cfg.Key, ID) - inConn, err = s.GetInConn(CONN_CLIENT, *s.cfg.Key, ID, serverID) - if err != nil { - utils.CloseConn(&inConn) - log.Printf("connection err: %s, retrying...", err) - time.Sleep(time.Second * 3) - continue - } else { - break - } - } - // s.cm.Add(*s.cfg.Key, ID, &inConn) - log.Printf("conn %s created", ID) + +func (s *MuxClient) ServeUDP(inConn *smux.Stream, localAddr, ID string) { for { srcAddr, body, err := utils.ReadUDPPacket(inConn) - if err == io.EOF || err == io.ErrUnexpectedEOF { + if err != nil { + log.Printf("udp packet revecived fail, err: %s", err) log.Printf("connection %s released", ID) - utils.CloseConn(&inConn) + inConn.Close() break - } else if err != nil { - log.Printf("udp packet revecived fail, err: %s", err) } else { //log.Printf("udp packet revecived:%s,%v", srcAddr, body) - go s.processUDPPacket(&inConn, srcAddr, localAddr, body) + go s.processUDPPacket(inConn, srcAddr, localAddr, body) } } // } } -func (s *MuxClient) processUDPPacket(inConn *net.Conn, srcAddr, localAddr string, body []byte) { +func (s *MuxClient) processUDPPacket(inConn *smux.Stream, srcAddr, localAddr string, body []byte) { dstAddr, err := net.ResolveUDPAddr("udp", localAddr) if err != nil { log.Printf("can't resolve address: %s", err) - utils.CloseConn(inConn) + inConn.Close() return } clientSrcAddr := &net.UDPAddr{IP: net.IPv4zero, Port: 0} @@ -234,26 +145,14 @@ func (s *MuxClient) processUDPPacket(inConn *net.Conn, srcAddr, localAddr string _, err = (*inConn).Write(bs) if err != nil { log.Printf("send udp response fail ,ERR:%s", err) - utils.CloseConn(inConn) + inConn.Close() return } //log.Printf("send udp response success ,from:%s ,%d ,%v", dstAddr.String(), len(bs), bs) } -func (s *MuxClient) ServeConn(localAddr, ID, serverID string) { - var inConn, outConn net.Conn +func (s *MuxClient) ServeConn(inConn *smux.Stream, localAddr, ID string) { var err error - for { - inConn, err = s.GetInConn(CONN_CLIENT, *s.cfg.Key, ID, serverID) - if err != nil { - utils.CloseConn(&inConn) - log.Printf("connection err: %s, retrying...", err) - time.Sleep(time.Second * 3) - continue - } else { - break - } - } - + var outConn net.Conn i := 0 for { i++ @@ -269,15 +168,13 @@ func (s *MuxClient) ServeConn(localAddr, ID, serverID string) { } } if err != nil { - utils.CloseConn(&inConn) + inConn.Close() utils.CloseConn(&outConn) log.Printf("build connection error, err: %s", err) return } utils.IoBind(inConn, outConn, func(err interface{}) { log.Printf("conn %s released", ID) - // s.cm.RemoveOne(*s.cfg.Key, ID) }) - // s.cm.Add(*s.cfg.Key, ID, &inConn) log.Printf("conn %s created", ID) } diff --git a/services/mux_server.go b/services/mux_server.go index 581ddb6e..4d9ca953 100644 --- a/services/mux_server.go +++ b/services/mux_server.go @@ -3,7 +3,6 @@ package services import ( "crypto/tls" "fmt" - "io" "log" "net" "proxy/utils" @@ -16,11 +15,11 @@ import ( ) type MuxServer struct { - cfg MuxServerArgs - udpChn chan MuxUDPItem - sc utils.ServerChannel - underLayerConn net.Conn - session *smux.Session + cfg MuxServerArgs + udpChn chan MuxUDPItem + sc utils.ServerChannel + session *smux.Session + lockChn chan bool } type MuxServerManager struct { @@ -28,7 +27,6 @@ type MuxServerManager struct { udpChn chan MuxUDPItem sc utils.ServerChannel serverID string - // cm utils.ConnManager } func NewMuxServerManager() Service { @@ -36,7 +34,6 @@ func NewMuxServerManager() Service { cfg: MuxServerArgs{}, udpChn: make(chan MuxUDPItem, 50000), serverID: utils.Uniqueid(), - // cm: utils.NewConnManager(), } } func (s *MuxServerManager) Start(args interface{}) (err error) { @@ -53,6 +50,9 @@ func (s *MuxServerManager) Start(args interface{}) (err error) { log.Printf("server id: %s", s.serverID) //log.Printf("route:%v", *s.cfg.Route) for _, _info := range *s.cfg.Route { + if _info == "" { + continue + } IsUDP := *s.cfg.IsUDP if strings.HasPrefix(_info, "udp://") { IsUDP = true @@ -95,7 +95,6 @@ func (s *MuxServerManager) Clean() { s.StopService() } func (s *MuxServerManager) StopService() { - // s.cm.RemoveAll() } func (s *MuxServerManager) CheckArgs() { if *s.cfg.CertFile == "" || *s.cfg.KeyFile == "" { @@ -104,13 +103,13 @@ func (s *MuxServerManager) CheckArgs() { s.cfg.CertBytes, s.cfg.KeyBytes = utils.TlsBytes(*s.cfg.CertFile, *s.cfg.KeyFile) } func (s *MuxServerManager) InitService() { - } func NewMuxServer() Service { return &MuxServer{ - cfg: MuxServerArgs{}, - udpChn: make(chan MuxUDPItem, 50000), + cfg: MuxServerArgs{}, + udpChn: make(chan MuxUDPItem, 50000), + lockChn: make(chan bool, 1), } } @@ -147,18 +146,18 @@ func (s *MuxServer) Start(args interface{}) (err error) { if err != nil { return } - log.Printf("proxy on udp tunnel server mode %s", (*s.sc.UDPListener).LocalAddr()) + log.Printf("proxy on udp mux server mode %s", (*s.sc.UDPListener).LocalAddr()) } else { err = s.sc.ListenTCP(func(inConn net.Conn) { defer func() { if err := recover(); err != nil { - log.Printf("tserver conn handler crashed with err : %s \nstack: %s", err, string(debug.Stack())) + log.Printf("server conn handler crashed with err : %s \nstack: %s", err, string(debug.Stack())) } }() var outConn net.Conn var ID string for { - outConn, ID, err = s.GetOutConn(CONN_SERVER) + outConn, ID, err = s.GetOutConn() if err != nil { utils.CloseConn(&outConn) log.Printf("connect to %s fail, err: %s, retrying...", *s.cfg.Parent, err) @@ -169,24 +168,22 @@ func (s *MuxServer) Start(args interface{}) (err error) { } } utils.IoBind(inConn, outConn, func(err interface{}) { - // s.cfg.Mgr.cm.RemoveOne(s.cfg.Mgr.serverID, ID) log.Printf("%s conn %s released", *s.cfg.Key, ID) }) //add conn - // s.cfg.Mgr.cm.Add(s.cfg.Mgr.serverID, ID, &inConn) log.Printf("%s conn %s created", *s.cfg.Key, ID) }) if err != nil { return } - log.Printf("proxy on tunnel server mode %s", (*s.sc.Listener).Addr()) + log.Printf("proxy on mux server mode %s", (*s.sc.Listener).Addr()) } return } func (s *MuxServer) Clean() { } -func (s *MuxServer) GetOutConn(typ uint8) (outConn net.Conn, ID string, err error) { +func (s *MuxServer) GetOutConn() (outConn net.Conn, ID string, err error) { outConn, err = s.GetConn() if err != nil { log.Printf("connection err: %s", err) @@ -197,7 +194,7 @@ func (s *MuxServer) GetOutConn(typ uint8) (outConn net.Conn, ID string, err erro remoteAddr = "udp:" + *s.cfg.Remote } ID = utils.Uniqueid() - _, err = outConn.Write(utils.BuildPacket(typ, *s.cfg.Key, ID, remoteAddr, s.cfg.Mgr.serverID)) + _, err = outConn.Write(utils.BuildPacketData(ID, remoteAddr, s.cfg.Mgr.serverID)) if err != nil { log.Printf("write connection data err: %s ,retrying...", err) utils.CloseConn(&outConn) @@ -206,11 +203,43 @@ func (s *MuxServer) GetOutConn(typ uint8) (outConn net.Conn, ID string, err erro return } func (s *MuxServer) GetConn() (conn net.Conn, err error) { - var _conn tls.Conn - _conn, err = utils.TlsConnectHost(*s.cfg.Parent, *s.cfg.Timeout, s.cfg.CertBytes, s.cfg.KeyBytes) - if err == nil { - conn = net.Conn(&_conn) + select { + case s.lockChn <- true: + default: + err = fmt.Errorf("can not connect at same time") + return } + defer func() { + <-s.lockChn + }() + if s.session == nil { + var _conn tls.Conn + _conn, err = utils.TlsConnectHost(*s.cfg.Parent, *s.cfg.Timeout, s.cfg.CertBytes, s.cfg.KeyBytes) + if err != nil { + s.session = nil + return + } + c := net.Conn(&_conn) + _, err = c.Write(utils.BuildPacket(CONN_SERVER, *s.cfg.Key)) + if err != nil { + c.Close() + s.session = nil + return + } + if err == nil { + s.session, err = smux.Client(c, nil) + if err != nil { + s.session = nil + return + } + } + } + conn, err = s.session.OpenStream() + if err != nil { + s.session.Close() + s.session = nil + } + return } func (s *MuxServer) UDPConnDeamon() { @@ -221,18 +250,15 @@ func (s *MuxServer) UDPConnDeamon() { } }() var outConn net.Conn - // var hb utils.HeartbeatReadWriter var ID string - // var cmdChn = make(chan bool, 1000) var err error for { item := <-s.udpChn RETRY: if outConn == nil { for { - outConn, ID, err = s.GetOutConn(CONN_SERVER) + outConn, ID, err = s.GetOutConn() if err != nil { - // cmdChn <- true outConn = nil utils.CloseConn(&outConn) log.Printf("connect to %s fail, err: %s, retrying...", *s.cfg.Parent, err) @@ -241,18 +267,14 @@ func (s *MuxServer) UDPConnDeamon() { } else { go func(outConn net.Conn, ID string) { go func() { - // <-cmdChn // outConn.Close() }() for { srcAddrFromConn, body, err := utils.ReadUDPPacket(outConn) - if err == io.EOF || err == io.ErrUnexpectedEOF { - log.Printf("UDP deamon connection %s exited", ID) - break - } if err != nil { log.Printf("parse revecived udp packet fail, err: %s ,%v", err, body) - continue + log.Printf("UDP deamon connection %s exited", ID) + break } //log.Printf("udp packet revecived over parent , local:%s", srcAddrFromConn) _srcAddr := strings.Split(srcAddrFromConn, ":")