Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Sequential and Parallel Routers #58

Merged
merged 15 commits into from
Sep 16, 2022
44 changes: 16 additions & 28 deletions compsequential.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,14 @@ func (r *composableSequential) Ready() bool {
// If count is set, the channel will return up to count results, stopping routers iteration.
func (r *composableSequential) FindProvidersAsync(ctx context.Context, cid cid.Cid, count int) <-chan peer.AddrInfo {
var totalCount int64
ch, _ := getChannelOrErrorSequential(ctx, r.routers,
return getChannelOrErrorSequential(ctx, r.routers,
func(ctx context.Context, r routing.Routing) (<-chan peer.AddrInfo, error) {
return r.FindProvidersAsync(ctx, cid, count), nil
},
func() bool {
return atomic.AddInt64(&totalCount, 1) > int64(count) && count != 0
},
)

return ch
}

// FindPeer calls FindPeer per each router sequentially.
Expand Down Expand Up @@ -116,13 +114,15 @@ func (r *composableSequential) GetValue(ctx context.Context, key string, opts ..
// If some router fails and the IgnoreError flag is true, we continue to the next router.
// Context timeout error will be also ignored if the flag is set.
func (r *composableSequential) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
return getChannelOrErrorSequential(ctx, r.routers,
ch := getChannelOrErrorSequential(ctx, r.routers,
func(ctx context.Context, r routing.Routing) (<-chan []byte, error) {
return r.SearchValue(ctx, key, opts...)
},
func() bool { return false },
)

return ch, nil

}

// If some router fails and the IgnoreError flag is true, we continue to the next router.
Expand Down Expand Up @@ -184,50 +184,38 @@ func getChannelOrErrorSequential[T any](
routers []*SequentialRouter,
f func(context.Context, routing.Routing) (<-chan T, error),
shouldStop func() bool,
) (chan T, error) {
) chan T {
chanOut := make(chan T)
var chans []<-chan T
var cancels []context.CancelFunc

for _, router := range routers {
ctx, cancel := context.WithTimeout(ctx, router.Timeout)
rch, err := f(ctx, router.Router)
if err != nil &&
!errors.Is(err, routing.ErrNotFound) &&
!router.IgnoreError {
cancel()
return nil, err
}

cancels = append(cancels, cancel)
chans = append(chans, rch)
}

go func() {
for i := 0; i < len(chans); i++ {
if chans[i] == nil {
cancels[i]()
continue
for _, router := range routers {
ctx, cancel := context.WithTimeout(ctx, router.Timeout)
ajnavarro marked this conversation as resolved.
Show resolved Hide resolved
rch, err := f(ctx, router.Router)
if err != nil &&
!errors.Is(err, routing.ErrNotFound) &&
!router.IgnoreError {
cancel()
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it makes sense to log these attempts and errors at info/debug level so that someone can debug what's happening? (if so, we can follow up with this in a separate PR)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, it would make sense

}

f:
for {
select {
case <-ctx.Done():
break f
case v, ok := <-chans[i]:
case v, ok := <-rch:
if !ok {
break f
}
chanOut <- v
ajnavarro marked this conversation as resolved.
Show resolved Hide resolved
}
}

cancels[i]()
cancel()
}

close(chanOut)
}()

return chanOut, nil
return chanOut
}