Skip to content

Commit

Permalink
transport memory: add Send/Recv Timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
  • Loading branch information
vtolstov committed Aug 3, 2019
1 parent d250ac7 commit e170902
Showing 1 changed file with 60 additions and 21 deletions.
81 changes: 60 additions & 21 deletions transport/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package memory

import (
"context"
"errors"
"fmt"
"math/rand"
Expand All @@ -24,6 +25,10 @@ type memorySocket struct {

local string
remote string

// for send/recv transport.Timeout
timeout time.Duration
ctx context.Context
sync.RWMutex
}

Expand All @@ -33,11 +38,13 @@ type memoryClient struct {
}

type memoryListener struct {
addr string
exit chan bool
conn chan *memorySocket
opts transport.ListenOptions
addr string
exit chan bool
conn chan *memorySocket
lopts transport.ListenOptions
topts transport.Options
sync.RWMutex
ctx context.Context
}

type memoryTransport struct {
Expand All @@ -49,7 +56,17 @@ type memoryTransport struct {
func (ms *memorySocket) Recv(m *transport.Message) error {
ms.RLock()
defer ms.RUnlock()

ctx := ms.ctx
if ms.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ms.ctx, ms.timeout)
defer cancel()
}

select {
case <-ctx.Done():
return ctx.Err()
case <-ms.exit:
return errors.New("connection closed")
case <-ms.lexit:
Expand All @@ -71,7 +88,17 @@ func (ms *memorySocket) Remote() string {
func (ms *memorySocket) Send(m *transport.Message) error {
ms.RLock()
defer ms.RUnlock()

ctx := ms.ctx
if ms.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ms.ctx, ms.timeout)
defer cancel()
}

select {
case <-ctx.Done():
return ctx.Err()
case <-ms.exit:
return errors.New("connection closed")
case <-ms.lexit:
Expand Down Expand Up @@ -116,12 +143,14 @@ func (m *memoryListener) Accept(fn func(transport.Socket)) error {
return nil
case c := <-m.conn:
go fn(&memorySocket{
lexit: c.lexit,
exit: c.exit,
send: c.recv,
recv: c.send,
local: c.Remote(),
remote: c.Local(),
lexit: c.lexit,
exit: c.exit,
send: c.recv,
recv: c.send,
local: c.Remote(),
remote: c.Local(),
timeout: m.topts.Timeout,
ctx: m.topts.Context,
})
}
}
Expand All @@ -143,12 +172,14 @@ func (m *memoryTransport) Dial(addr string, opts ...transport.DialOption) (trans

client := &memoryClient{
&memorySocket{
send: make(chan *transport.Message),
recv: make(chan *transport.Message),
exit: make(chan bool),
lexit: listener.exit,
local: addr,
remote: addr,
send: make(chan *transport.Message),
recv: make(chan *transport.Message),
exit: make(chan bool),
lexit: listener.exit,
local: addr,
remote: addr,
timeout: m.opts.Timeout,
ctx: m.opts.Context,
},
options,
}
Expand Down Expand Up @@ -196,10 +227,12 @@ func (m *memoryTransport) Listen(addr string, opts ...transport.ListenOption) (t
}

listener := &memoryListener{
opts: options,
addr: addr,
conn: make(chan *memorySocket),
exit: make(chan bool),
lopts: options,
topts: m.opts,
addr: addr,
conn: make(chan *memorySocket),
exit: make(chan bool),
ctx: m.opts.Context,
}

m.listeners[addr] = listener
Expand All @@ -223,12 +256,18 @@ func (m *memoryTransport) String() string {
}

func NewTransport(opts ...transport.Option) transport.Transport {
rand.Seed(time.Now().UnixNano())
var options transport.Options

rand.Seed(time.Now().UnixNano())

for _, o := range opts {
o(&options)
}

if options.Context == nil {
options.Context = context.Background()
}

return &memoryTransport{
opts: options,
listeners: make(map[string]*memoryListener),
Expand Down

0 comments on commit e170902

Please sign in to comment.