Skip to content

Commit

Permalink
* cap the number of concurrent requests
Browse files Browse the repository at this point in the history
  • Loading branch information
DisposaBoy committed Mar 2, 2013
1 parent 55aec4b commit c8304a1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 13 deletions.
40 changes: 32 additions & 8 deletions margo9/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ type Response struct {
Data interface{} `json:"data"`
}

type Job struct {
Req *Request
Cl Caller
}

type Broker struct {
served uint
start time.Time
Expand All @@ -32,7 +37,6 @@ type Broker struct {
w io.Writer
in *bufio.Reader
out *json.Encoder
Wg *sync.WaitGroup
}

func NewBroker(r io.Reader, w io.Writer) *Broker {
Expand All @@ -41,7 +45,6 @@ func NewBroker(r io.Reader, w io.Writer) *Broker {
w: w,
in: bufio.NewReader(r),
out: json.NewEncoder(w),
Wg: &sync.WaitGroup{},
}
}

Expand Down Expand Up @@ -85,7 +88,6 @@ func (b *Broker) SendNoLog(resp Response) error {
}

func (b *Broker) call(req *Request, cl Caller) {
defer b.Wg.Done()
b.served++

defer func() {
Expand Down Expand Up @@ -115,7 +117,7 @@ func (b *Broker) call(req *Request, cl Caller) {
})
}

func (b *Broker) accept() (stopLooping bool) {
func (b *Broker) accept(jobsCh chan Job) (stopLooping bool) {
line, err := b.in.ReadBytes('\n')

if err == io.EOF {
Expand Down Expand Up @@ -165,13 +167,22 @@ func (b *Broker) accept() (stopLooping bool) {
return
}

b.Wg.Add(1)
go b.call(req, cl)
jobsCh <- Job{
Req: req,
Cl: cl,
}

return
}

func (b *Broker) Loop(decorate bool) {
func (b *Broker) worker(wg *sync.WaitGroup, jobsCh chan Job) {
defer wg.Done()
for job := range jobsCh {
b.call(job.Req, job.Cl)
}
}

func (b *Broker) Loop(decorate bool, wait bool) {
b.start = time.Now()

if decorate {
Expand All @@ -183,13 +194,26 @@ func (b *Broker) Loop(decorate bool) {
})
}

const workers = 20
wg := &sync.WaitGroup{}
jobsCh := make(chan Job, 1000)
for i := 0; i < workers; i += 1 {
wg.Add(1)
go b.worker(wg, jobsCh)
}

for {
stopLooping := b.accept()
stopLooping := b.accept(jobsCh)
if stopLooping {
break
}
runtime.Gosched()
}
close(jobsCh)

if wait {
wg.Wait()
}

if decorate {
b.SendNoLog(Response{
Expand Down
6 changes: 1 addition & 5 deletions margo9/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,7 @@ func main() {
}
}()
}
broker.Loop(!doCall)

if wait || doCall {
broker.Wg.Wait()
}
broker.Loop(!doCall, (wait || doCall))

byeLck.Lock()
defer byeLck.Unlock() // keep this here for the sake of code correctness
Expand Down

0 comments on commit c8304a1

Please sign in to comment.