Skip to content

Commit

Permalink
Requested changes
Browse files Browse the repository at this point in the history
Signed-off-by: Antonio Navarro Perez <antnavper@gmail.com>
  • Loading branch information
ajnavarro committed Sep 2, 2022
1 parent 584edb3 commit d799146
Showing 1 changed file with 37 additions and 24 deletions.
61 changes: 37 additions & 24 deletions compparallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,19 @@ func NewComposableParallel(routers []*ParallelRouter) *ComposableParallel {
func (r *ComposableParallel) Provide(ctx context.Context, cid cid.Cid, provide bool) error {
var wg sync.WaitGroup
errCh := make(chan error)
closeCh := make(chan bool)
for _, r := range r.routers {
r := r
wg.Add(1)
go func() {
defer wg.Done()
tim := time.NewTimer(r.ExecuteAfter)
defer tim.Stop()
select {
case <-ctx.Done():
if ctx.Err() != nil && !r.IgnoreError {
if !r.IgnoreError {
errCh <- ctx.Err()
}
case <-time.After(r.ExecuteAfter):
case <-tim.C:
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
defer cancel()
err := r.Router.Provide(ctx, cid, provide)
Expand All @@ -48,22 +49,18 @@ func (r *ComposableParallel) Provide(ctx context.Context, cid cid.Cid, provide b
!r.IgnoreError {
errCh <- err
}
case <-closeCh:
return
}
}()
}

go func() {
wg.Wait()
close(closeCh)
close(errCh)
}()

var errOut error
select {
case err := <-errCh:
for err := range errCh {
errOut = multierror.Append(errOut, err)
case <-closeCh:
}

return errOut
Expand All @@ -72,13 +69,18 @@ func (r *ComposableParallel) Provide(ctx context.Context, cid cid.Cid, provide b
func (r *ComposableParallel) FindProvidersAsync(ctx context.Context, cid cid.Cid, count int) <-chan peer.AddrInfo {
addrChanOut := make(chan peer.AddrInfo)
var totalCount int64
var wg sync.WaitGroup
for _, r := range r.routers {
r := r
wg.Add(1)
go func() {
wg.Done()
tim := time.NewTimer(r.ExecuteAfter)
defer tim.Stop()
select {
case <-ctx.Done():
return
case <-time.After(r.ExecuteAfter):
case <-tim.C:
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
defer cancel()
addrChan := r.Router.FindProvidersAsync(ctx, cid, count)
Expand All @@ -95,13 +97,23 @@ func (r *ComposableParallel) FindProvidersAsync(ctx context.Context, cid cid.Cid
return
}

addrChanOut <- addr
select {
case <-ctx.Done():
return
case addrChanOut <- addr:
}

}
}
}
}()
}

go func() {
wg.Wait()
close(addrChanOut)
}()

return addrChanOut
}

Expand All @@ -116,12 +128,14 @@ func (r *ComposableParallel) FindPeer(ctx context.Context, id peer.ID) (peer.Add
wg.Add(1)
go func() {
defer wg.Done()
tim := time.NewTimer(r.ExecuteAfter)
defer tim.Stop()
select {
case <-ctx.Done():
if ctx.Err() != nil && !r.IgnoreError {
if !r.IgnoreError {
errCh <- ctx.Err()
}
case <-time.After(r.ExecuteAfter):
case <-tim.C:
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
defer cancel()
addr, err := r.Router.FindPeer(ctx, id)
Expand Down Expand Up @@ -163,18 +177,19 @@ func (r *ComposableParallel) FindPeer(ctx context.Context, id peer.ID) (peer.Add
func (r *ComposableParallel) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error {
var wg sync.WaitGroup
errCh := make(chan error)
closeCh := make(chan bool)
for _, r := range r.routers {
r := r
wg.Add(1)
go func() {
defer wg.Done()
tim := time.NewTimer(r.ExecuteAfter)
defer tim.Stop()
select {
case <-ctx.Done():
if ctx.Err() != nil && !r.IgnoreError {
if !r.IgnoreError {
errCh <- ctx.Err()
}
case <-time.After(r.ExecuteAfter):
case <-tim.C:
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
defer cancel()
err := r.Router.PutValue(ctx, key, val, opts...)
Expand All @@ -183,22 +198,18 @@ func (r *ComposableParallel) PutValue(ctx context.Context, key string, val []byt
!r.IgnoreError {
errCh <- err
}
case <-closeCh:
return
}
}()
}

go func() {
wg.Wait()
close(closeCh)
close(errCh)
}()

var errOut error
select {
case err := <-errCh:
for err := range errCh {
errOut = multierror.Append(errOut, err)
case <-closeCh:
}

return errOut
Expand All @@ -215,12 +226,14 @@ func (r *ComposableParallel) GetValue(ctx context.Context, key string, opts ...r
wg.Add(1)
go func() {
defer wg.Done()
tim := time.NewTimer(r.ExecuteAfter)
defer tim.Stop()
select {
case <-ctx.Done():
if ctx.Err() != nil && !r.IgnoreError {
if !r.IgnoreError {
errCh <- ctx.Err()
}
case <-time.After(r.ExecuteAfter):
case <-tim.C:
ctx, cancel := context.WithTimeout(ctx, r.Timeout)
defer cancel()
val, err := r.Router.GetValue(ctx, key, opts...)
Expand Down

0 comments on commit d799146

Please sign in to comment.