diff --git a/find/client/dhash_client.go b/find/client/dhash_client.go index af69d3d..a57c935 100644 --- a/find/client/dhash_client.go +++ b/find/client/dhash_client.go @@ -71,9 +71,11 @@ func NewDHashClient(dhstoreURL, stiURL string, options ...Option) (*DHashClient, // Find launches FindAsync in a separate go routine and assembles the result into FindResponse as if it was a synchronous invocation. func (c *DHashClient) Find(ctx context.Context, mh multihash.Multihash) (*model.FindResponse, error) { resChan := make(chan model.ProviderResult) - errChan := make(chan error, 2) + errChan := make(chan error, 1) - go c.FindAsync(ctx, mh, resChan, errChan) + go func() { + errChan <- c.FindAsync(ctx, mh, resChan) + }() mhr := model.MultihashResult{ Multihash: mh, @@ -90,59 +92,47 @@ func (c *DHashClient) Find(ctx context.Context, mh multihash.Multihash) (*model. }, nil } -// FindAsync implements double hashed lookup workflow. It submits results as they get decrypted and assembled into resChan. If an error occurs it is sent to errChan. -// Once the workflow is finished both channels are closed. -func (c *DHashClient) FindAsync(ctx context.Context, mh multihash.Multihash, resChan chan<- model.ProviderResult, errChan chan<- error) { - defer func() { - close(resChan) - close(errChan) - }() +// FindAsync implements double hashed lookup workflow. FindAsync returns +// results on resChan until there are no more results or error. When finished, +// resChan is closed and the error or nil is returned. +func (c *DHashClient) FindAsync(ctx context.Context, mh multihash.Multihash, resChan chan<- model.ProviderResult) error { + defer close(resChan) smh, err := dhash.SecondMultihash(mh) if err != nil { - errChan <- err - return + return err } u := c.dhFindURL.JoinPath(smh.B58String()) req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) if err != nil { - errChan <- err - return + return err } req.Header.Add("Accept", "application/json") resp, err := c.c.Do(req) if err != nil { - errChan <- err - return + return err } body, err := io.ReadAll(resp.Body) defer resp.Body.Close() if err != nil { - errChan <- err - return + return err } if resp.StatusCode != http.StatusOK { - errChan <- apierror.FromResponse(resp.StatusCode, body) - return + return apierror.FromResponse(resp.StatusCode, body) } encResponse := &model.FindResponse{} err = json.Unmarshal(body, encResponse) if err != nil { - errChan <- err - return + return err } for _, emhrs := range encResponse.EncryptedMultihashResults { for _, evk := range emhrs.EncryptedValueKeys { - if ctx.Err() != nil { - errChan <- ctx.Err() - return - } vk, err := dhash.DecryptValueKey(evk, mh) // skip errors as we don't want to fail the whole query, warn instead. Same applies to the rest of the loop. if err != nil { @@ -170,10 +160,15 @@ func (c *DHashClient) FindAsync(ctx context.Context, mh multihash.Multihash, res } for _, pr := range prs { - resChan <- pr + select { + case resChan <- pr: + case <-ctx.Done(): + return ctx.Err() + } } } } + return nil } // fetchMetadata fetches and decrypts metadata from a remote server.