Skip to content

Commit

Permalink
rename Streamer to Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Asim Aslam committed Apr 14, 2018
1 parent c2cfe53 commit 65068e8
Show file tree
Hide file tree
Showing 10 changed files with 21 additions and 146 deletions.
4 changes: 3 additions & 1 deletion client/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
func TestBackoff(t *testing.T) {
delta := time.Duration(0)

c := NewClient()

for i := 0; i < 5; i++ {
d, err := exponentialBackoff(context.TODO(), NewRequest("test", "test", nil), i)
d, err := exponentialBackoff(context.TODO(), c.NewRequest("test", "test", nil), i)
if err != nil {
t.Fatal(err)
}
Expand Down
34 changes: 3 additions & 31 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Client interface {
NewMessage(topic string, msg interface{}) Message
NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Streamer, error)
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
Publish(ctx context.Context, msg Message, opts ...PublishOption) error
String() string
}
Expand All @@ -37,8 +37,8 @@ type Request interface {
Stream() bool
}

// Streamer is the inteface for a bidirectional synchronous stream
type Streamer interface {
// Stream is the inteface for a bidirectional synchronous stream
type Stream interface {
Context() context.Context
Request() Request
Send(interface{}) error
Expand Down Expand Up @@ -76,35 +76,7 @@ var (
DefaultPoolTTL = time.Minute
)

// Makes a synchronous call to a service using the default client
func Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
return DefaultClient.Call(ctx, request, response, opts...)
}

// Creates a streaming connection with a service and returns responses on the
// channel passed in. It's up to the user to close the streamer.
func Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) {
return DefaultClient.Stream(ctx, request, opts...)
}

// Publishes a publication using the default client. Using the underlying broker
// set within the options.
func Publish(ctx context.Context, msg Message) error {
return DefaultClient.Publish(ctx, msg)
}

// Creates a new client with the options passed in
func NewClient(opt ...Option) Client {
return newRpcClient(opt...)
}

// Creates a new message using the default client
func NewMessage(topic string, message interface{}) Message {
return DefaultClient.NewMessage(topic, message)
}

// Creates a new request using the default client. Content Type will
// be set to the default within options and use the appropriate codec
func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return DefaultClient.NewRequest(service, method, request, reqOpts...)
}
2 changes: 1 addition & 1 deletion client/client_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ type CallWrapper func(CallFunc) CallFunc
type Wrapper func(Client) Client

// StreamWrapper wraps a Stream and returns the equivalent
type StreamWrapper func(Streamer) Streamer
type StreamWrapper func(Stream) Stream
2 changes: 1 addition & 1 deletion client/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface
return fmt.Errorf("rpc: can't find service %s", req.Method())
}

func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) {
func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
m.Lock()
defer m.Unlock()

Expand Down
8 changes: 4 additions & 4 deletions client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
}
}

func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Streamer, error) {
func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Stream, error) {
msg := &transport.Message{
Header: make(map[string]string),
}
Expand Down Expand Up @@ -340,7 +340,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
return gerr
}

func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) {
func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
Expand Down Expand Up @@ -371,7 +371,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
default:
}

call := func(i int) (Streamer, error) {
call := func(i int) (Stream, error) {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
Expand Down Expand Up @@ -401,7 +401,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
}

type response struct {
stream Streamer
stream Stream
err error
}

Expand Down
2 changes: 1 addition & 1 deletion publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ type publisher struct {
}

func (p *publisher) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error {
return p.c.Publish(ctx, p.c.NewPublication(p.topic, msg))
return p.c.Publish(ctx, p.c.NewMessage(p.topic, msg))
}
4 changes: 2 additions & 2 deletions server/rpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ func prepareMethod(method reflect.Method) *methodType {

if stream {
// check stream type
streamType := reflect.TypeOf((*Streamer)(nil)).Elem()
streamType := reflect.TypeOf((*Stream)(nil)).Elem()
if !argType.Implements(streamType) {
log.Log(mname, "argument does not implement Streamer interface:", argType)
log.Log(mname, "argument does not implement Stream interface:", argType)
return nil
}
} else {
Expand Down
103 changes: 2 additions & 101 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ package server

import (
"context"
"os"
"os/signal"
"syscall"

"github.com/micro/go-log"
"github.com/pborman/uuid"
)

Expand Down Expand Up @@ -40,11 +36,11 @@ type Request interface {
Stream() bool
}

// Streamer represents a stream established with a client.
// Stream represents a stream established with a client.
// A stream can be bidirectional which is indicated by the request.
// The last error will be left in Error().
// EOF indicated end of the stream.
type Streamer interface {
type Stream interface {
Context() context.Context
Request() Request
Send(interface{}) error
Expand All @@ -67,102 +63,7 @@ var (
DefaultServer Server = newRpcServer()
)

// DefaultOptions returns config options for the default service
func DefaultOptions() Options {
return DefaultServer.Options()
}

// Init initialises the default server with options passed in
func Init(opt ...Option) {
if DefaultServer == nil {
DefaultServer = newRpcServer(opt...)
}
DefaultServer.Init(opt...)
}

// NewServer returns a new server with options passed in
func NewServer(opt ...Option) Server {
return newRpcServer(opt...)
}

// NewSubscriber creates a new subscriber interface with the given topic
// and handler using the default server
func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber {
return DefaultServer.NewSubscriber(topic, h, opts...)
}

// NewHandler creates a new handler interface using the default server
// Handlers are required to be a public object with public
// methods. Call to a service method such as Foo.Bar expects
// the type:
//
// type Foo struct {}
// func (f *Foo) Bar(ctx, req, rsp) error {
// return nil
// }
//
func NewHandler(h interface{}, opts ...HandlerOption) Handler {
return DefaultServer.NewHandler(h, opts...)
}

// Handle registers a handler interface with the default server to
// handle inbound requests
func Handle(h Handler) error {
return DefaultServer.Handle(h)
}

// Subscribe registers a subscriber interface with the default server
// which subscribes to specified topic with the broker
func Subscribe(s Subscriber) error {
return DefaultServer.Subscribe(s)
}

// Register registers the default server with the discovery system
func Register() error {
return DefaultServer.Register()
}

// Deregister deregisters the default server from the discovery system
func Deregister() error {
return DefaultServer.Deregister()
}

// Run starts the default server and waits for a kill
// signal before exiting. Also registers/deregisters the server
func Run() error {
if err := Start(); err != nil {
return err
}

if err := DefaultServer.Register(); err != nil {
return err
}

ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
log.Logf("Received signal %s", <-ch)

if err := DefaultServer.Deregister(); err != nil {
return err
}

return Stop()
}

// Start starts the default server
func Start() error {
config := DefaultServer.Options()
log.Logf("Starting server %s id %s", config.Name, config.Id)
return DefaultServer.Start()
}

// Stop stops the default server
func Stop() error {
log.Logf("Stopping server")
return DefaultServer.Stop()
}

// String returns name of Server implementation
func String() string {
return DefaultServer.String()
}
4 changes: 2 additions & 2 deletions server/server_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type HandlerWrapper func(HandlerFunc) HandlerFunc
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc

// StreamerWrapper wraps a Streamer interface and returns the equivalent.
// StreamWrapper wraps a Stream interface and returns the equivalent.
// Because streams exist for the lifetime of a method invocation this
// is a convenient way to wrap a Stream as its in use for trace, monitoring,
// metrics, etc.
type StreamerWrapper func(Streamer) Streamer
type StreamWrapper func(Stream) Stream
4 changes: 2 additions & 2 deletions wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ func (c *clientWrapper) Call(ctx context.Context, req client.Request, rsp interf
return c.Client.Call(ctx, req, rsp, opts...)
}

func (c *clientWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) {
func (c *clientWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
ctx = c.setHeaders(ctx)
return c.Client.Stream(ctx, req, opts...)
}

func (c *clientWrapper) Publish(ctx context.Context, p client.Publication, opts ...client.PublishOption) error {
func (c *clientWrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
ctx = c.setHeaders(ctx)
return c.Client.Publish(ctx, p, opts...)
}

0 comments on commit 65068e8

Please sign in to comment.