-
Notifications
You must be signed in to change notification settings - Fork 9
/
server.go
141 lines (126 loc) · 3.26 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package server
import (
"github.com/haxii/fastproxy/servertime"
"github.com/haxii/log/v2"
"errors"
"io"
"net"
"strings"
"sync"
"time"
)
// Server a simple connection server
type Server struct {
// Concurrency server concurrency
Concurrency int
// OnConcurrencyLimitExceeded called when the concurrency
// limit exceeds, before the conn is force closed
OnConcurrencyLimitExceeded func(net.Conn)
// Listener server's listener
Listener net.Listener
// connections handler
ConnHandler ConnHandler
// ServiceName, server's service name, used for logging
ServiceName string
// active connections
activeConn map[net.Conn]struct{}
mu sync.Mutex
}
// DefaultConcurrency is the maximum number of concurrent connections
const DefaultConcurrency = 256 * 1024
// ListenAndServe serves incoming connections from the given listener.
//
// Serve blocks until the given listener returns permanent error.
func (s *Server) ListenAndServe() error {
if s.Listener == nil {
return errors.New("No net.listener provided")
}
if s.ConnHandler == nil {
return errors.New("No connection handler provided")
}
if s.Concurrency <= 0 {
s.Concurrency = DefaultConcurrency
}
if len(s.ServiceName) == 0 {
s.ServiceName = "fastproxy.server"
}
var lastOverflowErrorTime time.Time
var lastPerIPErrorTime time.Time
var c net.Conn
var err error
wp := &WorkerPool{
WorkerFunc: s.ConnHandler,
Tracker: s.trackConn,
MaxWorkersCount: s.Concurrency,
}
wp.Start()
for {
if c, err = s.acceptConn(s.Listener, &lastPerIPErrorTime); err != nil {
wp.Stop()
if err == io.EOF {
return nil
}
return err
}
if !wp.Serve(c) {
if s.OnConcurrencyLimitExceeded != nil {
s.OnConcurrencyLimitExceeded(c)
}
c.Close()
if time.Since(lastOverflowErrorTime) > time.Minute {
log.Errorf(errors.New("concurrency exceeded"), "The incoming connection cannot be served, "+
"because %d concurrent connections are served. Try increasing server's concurrency",
s.Concurrency)
lastOverflowErrorTime = servertime.CoarseTimeNow()
}
time.Sleep(100 * time.Millisecond)
}
c = nil
}
}
func (s *Server) acceptConn(ln net.Listener, lastPerIPErrorTime *time.Time) (net.Conn, error) {
for {
c, err := ln.Accept()
if err != nil {
if c != nil {
panic("BUG: net.Listener returned non-nil conn and non-nil error")
}
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
log.Errorf(netErr, "temporary error when accepting new connections")
time.Sleep(time.Second)
continue
}
if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
log.Errorf(err, "permanent error when accepting new connections")
return nil, err
}
return nil, io.EOF
}
if c == nil {
panic("BUG: net.Listener returned (nil, nil)")
}
return c, nil
}
}
// Close close the server and close all the active connections
func (s *Server) Close() {
s.mu.Lock()
defer s.mu.Unlock()
s.Listener.Close()
for c := range s.activeConn {
c.Close()
delete(s.activeConn, c)
}
}
func (s *Server) trackConn(c net.Conn, add bool) {
s.mu.Lock()
defer s.mu.Unlock()
if s.activeConn == nil {
s.activeConn = make(map[net.Conn]struct{})
}
if add {
s.activeConn[c] = struct{}{}
} else {
delete(s.activeConn, c)
}
}