Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rfq): active quoting API #3128

Merged
merged 109 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 106 commits
Commits
Show all changes
109 commits
Select commit Hold shift + click to select a range
bb18412
WIP: initial websocket wiring
dwasse Sep 13, 2024
31e52d8
WIP: add ws client and handling
dwasse Sep 13, 2024
e8ab231
Fix: receive respsects context
dwasse Sep 13, 2024
782cffd
Cleanup: split into rfq.go
dwasse Sep 13, 2024
6344a37
Fix: build
dwasse Sep 13, 2024
8aa16cb
Feat: add mocked ws client
dwasse Sep 13, 2024
4764926
Fix: build
dwasse Sep 16, 2024
afb2f19
Feat: add SubscribeActiveQuotes() to client
dwasse Sep 16, 2024
e30cd63
Feat: add PutUserQuoteRequest() to api client
dwasse Sep 16, 2024
3c10c02
Fix: build
dwasse Sep 16, 2024
bdae4ca
WIP: rfq tests with ws auth
dwasse Sep 16, 2024
138297d
WIP: test fixes
dwasse Sep 17, 2024
fc1ea97
Feat: working TestHandleActiveRFQ
dwasse Sep 17, 2024
6ae7a71
Feat: add expired request case
dwasse Sep 17, 2024
7cdcade
WIP: functionalize test relayer resps
dwasse Sep 17, 2024
01d83dc
Feat: add runMockRelayer with multiple relayers
dwasse Sep 17, 2024
ee408a9
Feat: add MultipleRelayers case
dwasse Sep 17, 2024
94a8f4d
Feat: add FallbackToPassive case
dwasse Sep 17, 2024
c39d62c
Fix: bigint ptr issue
dwasse Sep 17, 2024
6beb23a
Cleanup: bump expiration window
dwasse Sep 17, 2024
fdf9d12
WIP: logs
dwasse Sep 17, 2024
e23175f
Feat: split into separate tests
dwasse Sep 17, 2024
4b99340
Cleanup: logs
dwasse Sep 17, 2024
c557a28
Feat: add PassiveBestQuote case
dwasse Sep 17, 2024
888ce50
WIP: update db interface with new models
dwasse Sep 17, 2024
3293166
Feat: impl new db funcs
dwasse Sep 17, 2024
63f1a1e
Feat: insert models within api server
dwasse Sep 17, 2024
594d6ea
Feat: update quote request / response statuses
dwasse Sep 17, 2024
7e7c5a1
Fix: db error handling
dwasse Sep 17, 2024
7dcdf59
Fix: api tests
dwasse Sep 17, 2024
8cae8e4
Feat: add initial response validation
dwasse Sep 17, 2024
94ee250
Feat: impl pingpong
dwasse Sep 18, 2024
46d04e2
Fix: register models
dwasse Sep 18, 2024
36701ba
Feat: verify quote request in SingleRelayer case
dwasse Sep 18, 2024
2616b54
Feat: verify more db requests
dwasse Sep 18, 2024
fe7a774
Cleanup: common vars
dwasse Sep 18, 2024
60db841
Cleanup: break down handleActiveRFQ into sub funcs
dwasse Sep 18, 2024
83b7f6d
Cleanup: comments
dwasse Sep 18, 2024
32065ee
Cleanup: remove unused mock
dwasse Sep 18, 2024
c5e9a00
Fix: builds
dwasse Sep 18, 2024
e7d08e7
Feat: make relayer response data optional to signify null resp
dwasse Sep 19, 2024
8e405e5
Fix: response primary key on quote id
dwasse Sep 19, 2024
c6db31f
Fix: build
dwasse Sep 19, 2024
7812573
Feat: update swagger docs
dwasse Sep 19, 2024
a01fb9a
WIP: generic pubsub
dwasse Sep 20, 2024
c27ef32
Feat: add basic PubSubManager
dwasse Sep 20, 2024
b296da8
Feat: implement subscription / unsubscription operations
dwasse Sep 20, 2024
4384fb2
Feat: respond to subscribe operation
dwasse Sep 20, 2024
be695ea
Feat: add runWsListener helper
dwasse Sep 20, 2024
5656bae
Cleanup: reduce chan buffer
dwasse Sep 20, 2024
1c3870c
Cleanup: lints
dwasse Sep 20, 2024
2051b30
Cleanup: break down into smaller funcs
dwasse Sep 20, 2024
f0928c4
Cleanup: refactor ws client
dwasse Sep 20, 2024
4683974
Cleanup: more lints
dwasse Sep 20, 2024
33c24a3
Fix: build
dwasse Sep 20, 2024
7aee229
Cleanup: lints
dwasse Sep 20, 2024
ff0aece
Feat: mark as fulfilled when updating request status
dwasse Sep 20, 2024
91c1bf5
Cleanup: lint
dwasse Sep 20, 2024
cdee6ea
Skip broken test for now
dwasse Sep 20, 2024
8ccbb3f
Cleanup: lint
dwasse Sep 20, 2024
f2920e2
Feat: add open_quote_requests endpoint with test
dwasse Sep 20, 2024
83a3603
Feat: add new open request model
dwasse Sep 20, 2024
f112235
Update swagger
dwasse Sep 20, 2024
292cd37
go mod tidy
trajan0x Sep 23, 2024
dd961c1
fix error
trajan0x Sep 23, 2024
2368313
Fix: respecting context
dwasse Sep 24, 2024
d7948d4
Replace: Fulfilled -> Closed
dwasse Sep 24, 2024
161ea2e
Cleanup: use errors.New()
dwasse Sep 24, 2024
c240cd3
Feat: ReceiveQuoteResponse specifies request id
dwasse Sep 25, 2024
c8b5435
Cleanup: remove logs
dwasse Sep 25, 2024
7fa8003
Feat: add some tracing
dwasse Sep 25, 2024
b05e6b7
Feat: add IntegratorID
dwasse Sep 25, 2024
f2a4be9
Feat: remove repetitive fields from relayer quote response, move requ…
dwasse Sep 25, 2024
f203e7c
Cleanup: use new routes
dwasse Sep 25, 2024
0835aae
Cleanup: migrate req/res struct naming
dwasse Sep 25, 2024
2996aaa
Cleanup: update swagger
dwasse Sep 25, 2024
89c565e
Cleanup: lint
dwasse Sep 25, 2024
0a2b46a
[goreleaser]
dwasse Sep 25, 2024
8850cf0
Feat: run ws endpoint within existing server
dwasse Sep 27, 2024
2bae6b1
[goreleaser]
dwasse Sep 27, 2024
af384d4
Fix: build
dwasse Sep 27, 2024
3ae9552
[goreleaser]
dwasse Sep 27, 2024
d3f839f
Feat: add more tracing
dwasse Sep 27, 2024
925617e
[goreleaser]
dwasse Sep 27, 2024
7ff7c81
feat(rfq-relayer): relayer supports active quoting (#3198)
dwasse Sep 30, 2024
99c9d5c
Fix: build
dwasse Sep 30, 2024
6d6d172
Cleanup: lint
dwasse Sep 30, 2024
c40dada
Cleanup: lint
dwasse Oct 1, 2024
7878364
Cleanup: update swagger
dwasse Oct 1, 2024
2c46bcb
Feat: client sends pings, server sends pongs
dwasse Oct 1, 2024
1025c6c
[goreleaser]
dwasse Oct 1, 2024
65ddc92
Cleanup: remove unused func
dwasse Oct 1, 2024
16b3a5b
WIP: ws error handling
dwasse Oct 1, 2024
d71d686
[goreleaser]
dwasse Oct 1, 2024
a0591d6
Feat: ws client uses errgroup
dwasse Oct 1, 2024
3bc93ab
Cleanup: remove log
dwasse Oct 1, 2024
aa50d07
[goreleaser]
dwasse Oct 1, 2024
b4a25e1
Replace: PutUserQuoteResponse -> PutRFQResponse
dwasse Oct 1, 2024
26c6bbc
Feat: add QuoteID to PutRFQResponse
dwasse Oct 1, 2024
04ff76b
[goreleaser]
dwasse Oct 1, 2024
3324e53
Cleanup: lint
dwasse Oct 2, 2024
cb7dde0
Fix: build
dwasse Oct 2, 2024
cbc6e18
Cleanup: lint
dwasse Oct 2, 2024
5cb6050
[goreleaser]
dwasse Oct 2, 2024
e687ece
Add logs
dwasse Oct 2, 2024
c8a5868
[goreleaser]
dwasse Oct 2, 2024
8bad457
Add logs
dwasse Oct 2, 2024
7e88a97
[goreleaser]
dwasse Oct 2, 2024
526f2af
Cleanup: remove logs
dwasse Oct 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions contrib/opbot/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.54.0 // indirect
github.com/prometheus/procfs v0.15.0 // indirect
github.com/puzpuzpuz/xsync v1.5.2 // indirect
github.com/puzpuzpuz/xsync/v2 v2.5.1 // indirect
github.com/richardwilkes/toolbox v1.74.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
Expand Down
249 changes: 230 additions & 19 deletions services/rfq/api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@

import (
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"

"github.com/ipfs/go-log"
dwasse marked this conversation as resolved.
Show resolved Hide resolved

Comment on lines +14 to +15
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider updating the logging package

The current implementation uses github.com/ipfs/go-log for logging. It's recommended to use a more modern and widely adopted logging library or the project's standard logging approach. This would help maintain consistency across the project and potentially avoid issues with outdated dependencies.

Consider replacing github.com/ipfs/go-log with a more standard logging library such as log from the standard library, or popular third-party options like github.com/sirupsen/logrus or go.uber.org/zap.

"github.com/google/uuid"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"

Expand All @@ -17,17 +21,23 @@

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/go-resty/resty/v2"
"github.com/gorilla/websocket"
dwasse marked this conversation as resolved.
Show resolved Hide resolved
"github.com/synapsecns/sanguine/ethergo/signer/signer"
"github.com/synapsecns/sanguine/services/rfq/api/model"
"github.com/synapsecns/sanguine/services/rfq/api/rest"
)

var logger = log.Logger("rfq-client")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider updating logging package

You're using github.com/ipfs/go-log for logging. It's recommended to use a more modern logging library or the project's standard logging approach to maintain consistency and potentially avoid issues with outdated dependencies.


const pingPeriod = 20 * time.Second

// AuthenticatedClient is an interface for the RFQ API.
// It provides methods for creating, retrieving and updating quotes.
type AuthenticatedClient interface {
PutQuote(ctx context.Context, q *model.PutQuoteRequest) error
PutQuote(ctx context.Context, q *model.PutRelayerQuoteRequest) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

PutQuote method calls may still use the old parameter type *model.PutQuoteRequest. Please update all instances to use *model.PutRelayerQuoteRequest to ensure type consistency.

  • services/rfq/relayer/quoter/quoter.go:817
  • services/rfq/api/client/client_test.go:20
  • services/rfq/api/client/client_test.go:111
  • services/rfq/api/client/client_test.go:148
🔗 Analysis chain

Verify the function signature change in the codebase.

The PutQuote method signature has been modified to accept a *model.PutRelayerQuoteRequest instead of a *model.PutQuoteRequest. Ensure that all calls to this method have been updated to pass the correct parameter type.

Run the following script to verify the PutQuote method usage:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify all calls to `PutQuote` pass the correct parameter type.

# Search for PutQuote calls that don't use *model.PutRelayerQuoteRequest
rg --type go 'PutQuote\(' | rg -v 'PutRelayerQuoteRequest'

# Also search for any imports of the old PutQuoteRequest type
rg --type go 'model\.PutQuoteRequest'

Length of output: 456


Script:

#!/bin/bash
# Description: Extract the argument types passed to `PutQuote` to ensure correctness.

# Find all PutQuote calls and display the line with the arguments
rg --type go 'PutQuote\(' --pretty

Length of output: 536


Script:

#!/bin/bash
# Description: Verify that 'req' is of type '*model.PutRelayerQuoteRequest' in all PutQuote calls.

# Define an array of files where PutQuote is called
files=(
  "services/rfq/relayer/quoter/quoter.go"
  "services/rfq/api/client/client_test.go"
)

# Iterate over each file and search for the declaration of 'req'
for file in "${files[@]}"; do
  echo "Checking in $file:"
  rg --type go 'var\s+req\s+\*\model\.PutRelayerQuoteRequest' "$file"
done

Length of output: 821


Script:

#!/bin/bash
# Description: Verify that 'req' is of type '*model.PutRelayerQuoteRequest' in all PutQuote calls.

# Define an array of files where PutQuote is called
files=(
  "services/rfq/relayer/quoter/quoter.go"
  "services/rfq/api/client/client_test.go"
)

# Iterate over each file and search for the declaration of 'req'
for file in "${files[@]}"; do
  echo "Checking in $file:"
  rg --type go 'var\s+req\s+\*model\.PutRelayerQuoteRequest' "$file"
done

Length of output: 569


Script:

#!/bin/bash
# Description: Find all declarations of 'req' and display their types to verify correctness.

# Define an array of files where PutQuote is called
files=(
  "services/rfq/relayer/quoter/quoter.go"
  "services/rfq/api/client/client_test.go"
)

# Iterate over each file and search for the declaration of 'req'
for file in "${files[@]}"; do
  echo "Checking declarations in $file:"
  # Search for lines where 'req' is declared and extract the type
  rg --type go 'req\s*[:=]\s*\*?model\.Put(?:Relayer)?QuoteRequest' "$file" -o
done

Length of output: 641

PutBulkQuotes(ctx context.Context, q *model.PutBulkQuotesRequest) error
PutRelayAck(ctx context.Context, req *model.PutAckRequest) (*model.PutRelayAckResponse, error)
SubscribeActiveQuotes(ctx context.Context, req *model.SubscribeActiveRFQRequest, reqChan chan *model.ActiveRFQMessage) (respChan chan *model.ActiveRFQMessage, err error)
trajan0x marked this conversation as resolved.
Show resolved Hide resolved
UnauthenticatedClient
}

Expand All @@ -37,6 +47,7 @@
GetSpecificQuote(ctx context.Context, q *model.GetQuoteSpecificRequest) ([]*model.GetQuoteResponse, error)
GetQuoteByRelayerAddress(ctx context.Context, relayerAddr string) ([]*model.GetQuoteResponse, error)
GetRFQContracts(ctx context.Context) (*model.GetContractsResponse, error)
PutRFQRequest(ctx context.Context, q *model.PutRFQRequest) (*model.PutRFQResponse, error)
resty() *resty.Client
}

Expand All @@ -50,7 +61,8 @@

type clientImpl struct {
UnauthenticatedClient
rClient *resty.Client
rClient *resty.Client
reqSigner signer.Signer
Comment on lines +64 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid redundancy in client fields.

The clientImpl struct defines its own rClient field while also embedding UnauthenticatedClient, which may already contain an rClient field. This could lead to confusion or unintended behavior due to field shadowing.

Consider removing the rClient field from clientImpl or accessing the embedded rClient through the UnauthenticatedClient interface to maintain clarity.

}

// NewAuthenticatedClient creates a new client for the RFQ quoting API.
Expand All @@ -65,33 +77,40 @@
// to a new variable for clarity.
authedClient := unauthedClient.resty().
OnBeforeRequest(func(client *resty.Client, request *resty.Request) error {
// if request.Method == "PUT" && request.URL == rfqURL+rest.QUOTE_ROUTE {
// i.e. signature (hex encoded) = keccak(bytes.concat("\x19Ethereum Signed Message:\n", len(strconv.Itoa(time.Now().Unix()), strconv.Itoa(time.Now().Unix())))
// so that full auth header string: auth = strconv.Itoa(time.Now().Unix()) + ":" + signature
// Get the current Unix timestamp as a string.
now := strconv.Itoa(int(time.Now().Unix()))

// Prepare the data to be signed.
data := "\x19Ethereum Signed Message:\n" + strconv.Itoa(len(now)) + now

sig, err := reqSigner.SignMessage(request.Context(), []byte(data), true)

authHeader, err := getAuthHeader(request.Context(), reqSigner)
if err != nil {
return fmt.Errorf("failed to sign request: %w", err)
return fmt.Errorf("failed to get auth header: %w", err)

Check warning on line 82 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L82

Added line #L82 was not covered by tests
}

res := fmt.Sprintf("%s:%s", now, hexutil.Encode(signer.Encode(sig)))
request.SetHeader("Authorization", res)

request.SetHeader(rest.AuthorizationHeader, authHeader)
Comment on lines +80 to +84
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential error when setting auth header.

In the OnBeforeRequest callback, if getAuthHeader returns an error, it propagates up and might cause the request to fail silently. Make sure to log the error or handle it appropriately to aid in debugging.

Apply this diff to add error logging:

 authHeader, err := getAuthHeader(request.Context(), reqSigner)
 if err != nil {
+    logger.Errorf("Failed to get auth header: %v", err)
     return fmt.Errorf("failed to get auth header: %w", err)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
authHeader, err := getAuthHeader(request.Context(), reqSigner)
if err != nil {
return fmt.Errorf("failed to sign request: %w", err)
return fmt.Errorf("failed to get auth header: %w", err)
}
res := fmt.Sprintf("%s:%s", now, hexutil.Encode(signer.Encode(sig)))
request.SetHeader("Authorization", res)
request.SetHeader(rest.AuthorizationHeader, authHeader)
authHeader, err := getAuthHeader(request.Context(), reqSigner)
if err != nil {
logger.Errorf("Failed to get auth header: %v", err)
return fmt.Errorf("failed to get auth header: %w", err)
}
request.SetHeader(rest.AuthorizationHeader, authHeader)
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 82-82: services/rfq/api/client/client.go#L82
Added line #L82 was not covered by tests

return nil
})

return &clientImpl{
UnauthenticatedClient: unauthedClient,
rClient: authedClient,
reqSigner: reqSigner,
}, nil
}

func getAuthHeader(ctx context.Context, reqSigner signer.Signer) (string, error) {
// if request.Method == "PUT" && request.URL == rfqURL+rest.QUOTE_ROUTE {
// i.e. signature (hex encoded) = keccak(bytes.concat("\x19Ethereum Signed Message:\n", len(strconv.Itoa(time.Now().Unix()), strconv.Itoa(time.Now().Unix())))
// so that full auth header string: auth = strconv.Itoa(time.Now().Unix()) + ":" + signature
// Get the current Unix timestamp as a string.
now := strconv.Itoa(int(time.Now().Unix()))

// Prepare the data to be signed.
data := "\x19Ethereum Signed Message:\n" + strconv.Itoa(len(now)) + now

sig, err := reqSigner.SignMessage(ctx, []byte(data), true)

if err != nil {
return "", fmt.Errorf("failed to sign request: %w", err)
}

Check warning on line 109 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L108-L109

Added lines #L108 - L109 were not covered by tests

return fmt.Sprintf("%s:%s", now, hexutil.Encode(signer.Encode(sig))), nil
}

// NewUnauthenticatedClient creates a new client for the RFQ quoting API.
func NewUnauthenticatedClient(metricHandler metrics.Handler, rfqURL string) (UnauthenticatedClient, error) {
client := resty.New().
Expand All @@ -115,7 +134,7 @@
}

// PutQuote puts a new quote in the RFQ quoting API.
func (c *clientImpl) PutQuote(ctx context.Context, q *model.PutQuoteRequest) error {
func (c *clientImpl) PutQuote(ctx context.Context, q *model.PutRelayerQuoteRequest) error {
res, err := c.rClient.R().
SetContext(ctx).
SetBody(q).
Expand Down Expand Up @@ -159,6 +178,179 @@
return ack, nil
}

func (c *clientImpl) SubscribeActiveQuotes(ctx context.Context, req *model.SubscribeActiveRFQRequest, reqChan chan *model.ActiveRFQMessage) (respChan chan *model.ActiveRFQMessage, err error) {

Check warning on line 181 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L181

Added line #L181 was not covered by tests
fmt.Println("SubscribeActiveQuotes - starting")
conn, err := c.connectWebsocket(ctx, req)
if err != nil {

Check warning on line 184 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L183-L184

Added lines #L183 - L184 were not covered by tests
fmt.Printf("SubscribeActiveQuotes - failed to connect to websocket: %s\n", err)
return nil, fmt.Errorf("failed to connect to websocket: %w", err)
}

Check warning on line 187 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L186-L187

Added lines #L186 - L187 were not covered by tests
fmt.Println("SubscribeActiveQuotes - connected to websocket")
// first, subscrbe to the given chains
sub := model.SubscriptionParams{
Chains: req.ChainIDs,
}

Check warning on line 192 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L190-L192

Added lines #L190 - L192 were not covered by tests
fmt.Printf("SubscribeActiveQuotes - sub: %v\n", sub)
subJSON, err := json.Marshal(sub)
if err != nil {
return respChan, fmt.Errorf("error marshaling subscription params: %w", err)
}
err = conn.WriteJSON(model.ActiveRFQMessage{
Op: rest.SubscribeOp,
Content: json.RawMessage(subJSON),
})
if err != nil {

Check warning on line 202 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L194-L202

Added lines #L194 - L202 were not covered by tests
fmt.Printf("SubscribeActiveQuotes - error sending subscribe message: %s\n", err)
return nil, fmt.Errorf("error sending subscribe message: %w", err)
}

Check warning on line 205 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L204-L205

Added lines #L204 - L205 were not covered by tests
fmt.Println("SubscribeActiveQuotes - subscribed to chains")
// make sure subscription is successful
var resp model.ActiveRFQMessage
err = conn.ReadJSON(&resp)
if err != nil {

Check warning on line 210 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L208-L210

Added lines #L208 - L210 were not covered by tests
fmt.Printf("SubscribeActiveQuotes - error reading subscribe response: %s\n", err)
return nil, fmt.Errorf("error reading subscribe response: %w", err)
}

Check warning on line 213 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L212-L213

Added lines #L212 - L213 were not covered by tests
fmt.Printf("SubscribeActiveQuotes - resp: %v\n", resp)
if !resp.Success || resp.Op != rest.SubscribeOp {
return nil, fmt.Errorf("subscription failed")
}

Check warning on line 217 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L215-L217

Added lines #L215 - L217 were not covered by tests

respChan = make(chan *model.ActiveRFQMessage)
go func() {

Check warning on line 220 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L219-L220

Added lines #L219 - L220 were not covered by tests
wsErr := c.processWebsocket(ctx, conn, reqChan, respChan)
if wsErr != nil {
logger.Error("Error running websocket listener: %s", wsErr)
}

Check warning on line 224 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L224

Added line #L224 was not covered by tests
}()

return respChan, nil

Check warning on line 227 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L227

Added line #L227 was not covered by tests
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve SubscribeActiveQuotes implementation

The SubscribeActiveQuotes method is a good addition for real-time quote updates. However, there are several areas that could be improved:

  1. Replace fmt.Println calls with proper logging using the logger variable.
  2. Add a timeout for the initial subscription message to prevent hanging.
  3. Ensure proper cleanup to prevent goroutine leaks.

Here's a suggested improvement:

 func (c *clientImpl) SubscribeActiveQuotes(ctx context.Context, req *model.SubscribeActiveRFQRequest, reqChan chan *model.ActiveRFQMessage) (respChan chan *model.ActiveRFQMessage, err error) {
-    fmt.Println("SubscribeActiveQuotes - starting")
+    logger.Info("SubscribeActiveQuotes - starting")
     conn, err := c.connectWebsocket(ctx, req)
     if err != nil {
-        fmt.Printf("SubscribeActiveQuotes - failed to connect to websocket: %s\n", err)
+        logger.Errorf("SubscribeActiveQuotes - failed to connect to websocket: %s", err)
         return nil, fmt.Errorf("failed to connect to websocket: %w", err)
     }
-    fmt.Println("SubscribeActiveQuotes - connected to websocket")
+    logger.Info("SubscribeActiveQuotes - connected to websocket")
     
+    // Add cleanup in case of early return
+    defer func() {
+        if err != nil {
+            conn.Close()
+        }
+    }()

     // ... (rest of the code)

-    err = conn.ReadJSON(&resp)
+    // Add timeout for initial subscription
+    err = conn.SetReadDeadline(time.Now().Add(10 * time.Second))
+    if err != nil {
+        return nil, fmt.Errorf("failed to set read deadline: %w", err)
+    }
+    err = conn.ReadJSON(&resp)
+    conn.SetReadDeadline(time.Time{}) // Reset deadline
     if err != nil {
-        fmt.Printf("SubscribeActiveQuotes - error reading subscribe response: %s\n", err)
+        logger.Errorf("SubscribeActiveQuotes - error reading subscribe response: %s", err)
         return nil, fmt.Errorf("error reading subscribe response: %w", err)
     }
-    fmt.Printf("SubscribeActiveQuotes - resp: %v\n", resp)
+    logger.Infof("SubscribeActiveQuotes - resp: %v", resp)
     // ... (rest of the code)
 }

These changes improve logging, add a timeout for the initial subscription, and ensure proper cleanup in case of errors.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *clientImpl) SubscribeActiveQuotes(ctx context.Context, req *model.SubscribeActiveRFQRequest, reqChan chan *model.ActiveRFQMessage) (respChan chan *model.ActiveRFQMessage, err error) {
fmt.Println("SubscribeActiveQuotes - starting")
conn, err := c.connectWebsocket(ctx, req)
if err != nil {
fmt.Printf("SubscribeActiveQuotes - failed to connect to websocket: %s\n", err)
return nil, fmt.Errorf("failed to connect to websocket: %w", err)
}
fmt.Println("SubscribeActiveQuotes - connected to websocket")
// first, subscrbe to the given chains
sub := model.SubscriptionParams{
Chains: req.ChainIDs,
}
fmt.Printf("SubscribeActiveQuotes - sub: %v\n", sub)
subJSON, err := json.Marshal(sub)
if err != nil {
return respChan, fmt.Errorf("error marshaling subscription params: %w", err)
}
err = conn.WriteJSON(model.ActiveRFQMessage{
Op: rest.SubscribeOp,
Content: json.RawMessage(subJSON),
})
if err != nil {
fmt.Printf("SubscribeActiveQuotes - error sending subscribe message: %s\n", err)
return nil, fmt.Errorf("error sending subscribe message: %w", err)
}
fmt.Println("SubscribeActiveQuotes - subscribed to chains")
// make sure subscription is successful
var resp model.ActiveRFQMessage
err = conn.ReadJSON(&resp)
if err != nil {
fmt.Printf("SubscribeActiveQuotes - error reading subscribe response: %s\n", err)
return nil, fmt.Errorf("error reading subscribe response: %w", err)
}
fmt.Printf("SubscribeActiveQuotes - resp: %v\n", resp)
if !resp.Success || resp.Op != rest.SubscribeOp {
return nil, fmt.Errorf("subscription failed")
}
respChan = make(chan *model.ActiveRFQMessage)
go func() {
wsErr := c.processWebsocket(ctx, conn, reqChan, respChan)
if wsErr != nil {
logger.Error("Error running websocket listener: %s", wsErr)
}
}()
return respChan, nil
}
func (c *clientImpl) SubscribeActiveQuotes(ctx context.Context, req *model.SubscribeActiveRFQRequest, reqChan chan *model.ActiveRFQMessage) (respChan chan *model.ActiveRFQMessage, err error) {
logger.Info("SubscribeActiveQuotes - starting")
conn, err := c.connectWebsocket(ctx, req)
if err != nil {
logger.Errorf("SubscribeActiveQuotes - failed to connect to websocket: %s", err)
return nil, fmt.Errorf("failed to connect to websocket: %w", err)
}
logger.Info("SubscribeActiveQuotes - connected to websocket")
// Add cleanup in case of early return
defer func() {
if err != nil {
conn.Close()
}
}()
// first, subscrbe to the given chains
sub := model.SubscriptionParams{
Chains: req.ChainIDs,
}
fmt.Printf("SubscribeActiveQuotes - sub: %v\n", sub)
subJSON, err := json.Marshal(sub)
if err != nil {
return respChan, fmt.Errorf("error marshaling subscription params: %w", err)
}
err = conn.WriteJSON(model.ActiveRFQMessage{
Op: rest.SubscribeOp,
Content: json.RawMessage(subJSON),
})
if err != nil {
fmt.Printf("SubscribeActiveQuotes - error sending subscribe message: %s\n", err)
return nil, fmt.Errorf("error sending subscribe message: %w", err)
}
fmt.Println("SubscribeActiveQuotes - subscribed to chains")
// make sure subscription is successful
var resp model.ActiveRFQMessage
// Add timeout for initial subscription
err = conn.SetReadDeadline(time.Now().Add(10 * time.Second))
if err != nil {
return nil, fmt.Errorf("failed to set read deadline: %w", err)
}
err = conn.ReadJSON(&resp)
conn.SetReadDeadline(time.Time{}) // Reset deadline
if err != nil {
logger.Errorf("SubscribeActiveQuotes - error reading subscribe response: %s", err)
return nil, fmt.Errorf("error reading subscribe response: %w", err)
}
logger.Infof("SubscribeActiveQuotes - resp: %v", resp)
if !resp.Success || resp.Op != rest.SubscribeOp {
return nil, fmt.Errorf("subscription failed")
}
respChan = make(chan *model.ActiveRFQMessage)
go func() {
wsErr := c.processWebsocket(ctx, conn, reqChan, respChan)
if wsErr != nil {
logger.Error("Error running websocket listener: %s", wsErr)
}
}()
return respChan, nil
}
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 181-181: services/rfq/api/client/client.go#L181
Added line #L181 was not covered by tests


[warning] 183-184: services/rfq/api/client/client.go#L183-L184
Added lines #L183 - L184 were not covered by tests


[warning] 186-187: services/rfq/api/client/client.go#L186-L187
Added lines #L186 - L187 were not covered by tests


[warning] 190-192: services/rfq/api/client/client.go#L190-L192
Added lines #L190 - L192 were not covered by tests


[warning] 194-202: services/rfq/api/client/client.go#L194-L202
Added lines #L194 - L202 were not covered by tests


[warning] 204-205: services/rfq/api/client/client.go#L204-L205
Added lines #L204 - L205 were not covered by tests


[warning] 208-210: services/rfq/api/client/client.go#L208-L210
Added lines #L208 - L210 were not covered by tests


[warning] 212-213: services/rfq/api/client/client.go#L212-L213
Added lines #L212 - L213 were not covered by tests


[warning] 215-217: services/rfq/api/client/client.go#L215-L217
Added lines #L215 - L217 were not covered by tests


[warning] 219-220: services/rfq/api/client/client.go#L219-L220
Added lines #L219 - L220 were not covered by tests


[warning] 224-224: services/rfq/api/client/client.go#L224
Added line #L224 was not covered by tests


[warning] 227-227: services/rfq/api/client/client.go#L227
Added line #L227 was not covered by tests


func (c *clientImpl) connectWebsocket(ctx context.Context, req *model.SubscribeActiveRFQRequest) (conn *websocket.Conn, err error) {
if len(req.ChainIDs) == 0 {
return nil, fmt.Errorf("chain IDs are required")
}

Check warning on line 233 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L230-L233

Added lines #L230 - L233 were not covered by tests

header, err := c.getWsHeaders(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to get auth header: %w", err)
}

Check warning on line 238 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L235-L238

Added lines #L235 - L238 were not covered by tests

reqURL := strings.Replace(c.rClient.BaseURL, "http", "ws", 1) + rest.RFQStreamRoute
conn, httpResp, err := websocket.DefaultDialer.Dial(reqURL, header)
Comment on lines +232 to +233
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Correct the WebSocket URL scheme handling

The current method of replacing "http" with "ws" in the base URL can produce incorrect WebSocket URLs when using "https". For instance, "https://example.com" would become "ws://example.com", which is not secure. The correct secure WebSocket URL should be "wss://example.com".

Apply this diff to properly handle the URL scheme:

-import "strings"
+import (
+    "net/url"
+    "strings"
+)

...

 reqURL := strings.Replace(c.rClient.BaseURL, "http", "ws", 1) + rest.RFQStreamRoute
+parsedURL, err := url.Parse(c.rClient.BaseURL)
+if err != nil {
+    return nil, fmt.Errorf("failed to parse base URL: %w", err)
+}
+switch parsedURL.Scheme {
+case "http":
+    parsedURL.Scheme = "ws"
+case "https":
+    parsedURL.Scheme = "wss"
+default:
+    return nil, fmt.Errorf("unsupported URL scheme: %s", parsedURL.Scheme)
+}
+parsedURL.Path = rest.RFQStreamRoute
+reqURL := parsedURL.String()

This change ensures that the WebSocket URL uses the correct scheme based on the original URL, supporting both http to ws and https to wss conversions.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
reqURL := strings.Replace(c.rClient.BaseURL, "http", "ws", 1) + rest.RFQStreamRoute
conn, httpResp, err := websocket.DefaultDialer.Dial(reqURL, header)
parsedURL, err := url.Parse(c.rClient.BaseURL)
if err != nil {
return nil, fmt.Errorf("failed to parse base URL: %w", err)
}
switch parsedURL.Scheme {
case "http":
parsedURL.Scheme = "ws"
case "https":
parsedURL.Scheme = "wss"
default:
return nil, fmt.Errorf("unsupported URL scheme: %s", parsedURL.Scheme)
}
parsedURL.Path = rest.RFQStreamRoute
reqURL := parsedURL.String()
conn, httpResp, err := websocket.DefaultDialer.Dial(reqURL, header)
```
Note: The import statement changes suggested in the diff are not included in the code suggestion as they are typically placed at the beginning of the file, which is not shown in the original code snippet. Make sure to add the following import if not already present:
```go
import (
"net/url"
"fmt"
)

if err != nil {
return nil, fmt.Errorf("failed to connect to websocket: %w", err)
}
err = httpResp.Body.Close()
if err != nil {
logger.Warnf("error closing websocket connection: %v", err)
}

Check warning on line 248 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L240-L248

Added lines #L240 - L248 were not covered by tests
Comment on lines +237 to +240
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent nil pointer dereference when closing httpResp.Body.

In the connectWebsocket function, httpResp may be nil upon a successful connection, as per the websocket.Dial documentation. Attempting to close httpResp.Body without checking if httpResp is not nil could lead to a nil pointer dereference.

Apply this diff to check for nil before closing:

-err = httpResp.Body.Close()
-if err != nil {
-    logger.Warnf("error closing websocket connection: %v", err)
-}
+if httpResp != nil && httpResp.Body != nil {
+    err = httpResp.Body.Close()
+    if err != nil {
+        logger.Warnf("error closing websocket response body: %v", err)
+    }
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
err = httpResp.Body.Close()
if err != nil {
logger.Warnf("error closing websocket connection: %v", err)
}
if httpResp != nil && httpResp.Body != nil {
err = httpResp.Body.Close()
if err != nil {
logger.Warnf("error closing websocket response body: %v", err)
}
}
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 234-242: services/rfq/api/client/client.go#L234-L242
Added lines #L234 - L242 were not covered by tests


return conn, nil

Check warning on line 250 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L250

Added line #L250 was not covered by tests
}
Comment on lines +222 to +243
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve WebSocket connection handling

The connectWebsocket method establishes a WebSocket connection with proper headers. However, there are several areas that could be improved:

  1. The URL scheme conversion doesn't correctly handle HTTPS to WSS.
  2. The error from httpResp.Body.Close() is logged but not returned.
  3. There's no context timeout for the dial operation.

Here's a suggested improvement:

+import "net/url"

 func (c *clientImpl) connectWebsocket(ctx context.Context, req *model.SubscribeActiveRFQRequest) (conn *websocket.Conn, err error) {
     if len(req.ChainIDs) == 0 {
         return nil, fmt.Errorf("chain IDs are required")
     }

     header, err := c.getWsHeaders(ctx, req)
     if err != nil {
         return nil, fmt.Errorf("failed to get auth header: %w", err)
     }

-    reqURL := strings.Replace(c.rClient.BaseURL, "http", "ws", 1) + rest.RFQStreamRoute
+    baseURL, err := url.Parse(c.rClient.BaseURL)
+    if err != nil {
+        return nil, fmt.Errorf("invalid base URL: %w", err)
+    }
+    switch baseURL.Scheme {
+    case "http":
+        baseURL.Scheme = "ws"
+    case "https":
+        baseURL.Scheme = "wss"
+    default:
+        return nil, fmt.Errorf("unsupported URL scheme: %s", baseURL.Scheme)
+    }
+    baseURL.Path = path.Join(baseURL.Path, rest.RFQStreamRoute)
+    reqURL := baseURL.String()

+    ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
+    defer cancel()
-    conn, httpResp, err := websocket.DefaultDialer.Dial(reqURL, header)
+    conn, httpResp, err := websocket.DefaultDialer.DialContext(ctx, reqURL, header)
     if err != nil {
         return nil, fmt.Errorf("failed to connect to websocket: %w", err)
     }
-    err = httpResp.Body.Close()
-    if err != nil {
-        logger.Warnf("error closing websocket connection: %v", err)
+    if httpResp != nil && httpResp.Body != nil {
+        if closeErr := httpResp.Body.Close(); closeErr != nil {
+            return conn, fmt.Errorf("error closing response body: %w", closeErr)
+        }
     }

     return conn, nil
 }

These changes ensure proper handling of HTTPS URLs, add a timeout for the dial operation, and improve error handling when closing the response body.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *clientImpl) connectWebsocket(ctx context.Context, req *model.SubscribeActiveRFQRequest) (conn *websocket.Conn, err error) {
if len(req.ChainIDs) == 0 {
return nil, fmt.Errorf("chain IDs are required")
}
header, err := c.getWsHeaders(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to get auth header: %w", err)
}
reqURL := strings.Replace(c.rClient.BaseURL, "http", "ws", 1) + rest.RFQStreamRoute
conn, httpResp, err := websocket.DefaultDialer.Dial(reqURL, header)
if err != nil {
return nil, fmt.Errorf("failed to connect to websocket: %w", err)
}
err = httpResp.Body.Close()
if err != nil {
logger.Warnf("error closing websocket connection: %v", err)
}
return conn, nil
}
import (
"net/url"
"time"
)
func (c *clientImpl) connectWebsocket(ctx context.Context, req *model.SubscribeActiveRFQRequest) (conn *websocket.Conn, err error) {
if len(req.ChainIDs) == 0 {
return nil, fmt.Errorf("chain IDs are required")
}
header, err := c.getWsHeaders(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to get auth header: %w", err)
}
baseURL, err := url.Parse(c.rClient.BaseURL)
if err != nil {
return nil, fmt.Errorf("invalid base URL: %w", err)
}
switch baseURL.Scheme {
case "http":
baseURL.Scheme = "ws"
case "https":
baseURL.Scheme = "wss"
default:
return nil, fmt.Errorf("unsupported URL scheme: %s", baseURL.Scheme)
}
baseURL.Path = path.Join(baseURL.Path, rest.RFQStreamRoute)
reqURL := baseURL.String()
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
conn, httpResp, err := websocket.DefaultDialer.DialContext(ctx, reqURL, header)
if err != nil {
return nil, fmt.Errorf("failed to connect to websocket: %w", err)
}
if httpResp != nil && httpResp.Body != nil {
if closeErr := httpResp.Body.Close(); closeErr != nil {
return conn, fmt.Errorf("error closing response body: %w", closeErr)
}
}
return conn, nil
}
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 230-233: services/rfq/api/client/client.go#L230-L233
Added lines #L230 - L233 were not covered by tests


[warning] 235-238: services/rfq/api/client/client.go#L235-L238
Added lines #L235 - L238 were not covered by tests


[warning] 240-248: services/rfq/api/client/client.go#L240-L248
Added lines #L240 - L248 were not covered by tests


[warning] 250-250: services/rfq/api/client/client.go#L250
Added line #L250 was not covered by tests


func (c *clientImpl) getWsHeaders(ctx context.Context, req *model.SubscribeActiveRFQRequest) (header http.Header, err error) {
header = http.Header{}
chainIDsJSON, err := json.Marshal(req.ChainIDs)
if err != nil {
return header, fmt.Errorf("failed to marshal chain IDs: %w", err)
}
header.Set(rest.ChainsHeader, string(chainIDsJSON))
authHeader, err := getAuthHeader(ctx, c.reqSigner)
if err != nil {
return header, fmt.Errorf("failed to get auth header: %w", err)
}
header.Set(rest.AuthorizationHeader, authHeader)
return header, nil

Check warning on line 265 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L253-L265

Added lines #L253 - L265 were not covered by tests
}

func (c *clientImpl) processWebsocket(ctx context.Context, conn *websocket.Conn, reqChan, respChan chan *model.ActiveRFQMessage) (err error) {
defer func() {
close(respChan)
err := conn.Close()
if err != nil {
logger.Warnf("error closing websocket connection: %v", err)
}

Check warning on line 274 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L268-L274

Added lines #L268 - L274 were not covered by tests
Comment on lines +264 to +266
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid variable shadowing when closing the connection

In the deferred function within processWebsocket, the variable err is redeclared, which shadows the named return parameter err. This can lead to confusion or unintended behavior. Assign the result of conn.Close() to a new variable instead.

Apply this diff to fix the variable shadowing:

 func (c *clientImpl) processWebsocket(ctx context.Context, conn *websocket.Conn, reqChan, respChan chan *model.ActiveRFQMessage) (err error) {
     defer func() {
         close(respChan)
-        err := conn.Close()
+        cerr := conn.Close()
         if cerr != nil {
             logger.Warnf("error closing websocket connection: %v", cerr)
         }
     }()

Committable suggestion was skipped due to low confidence.

🧰 Tools
GitHub Check: codecov/patch

[warning] 264-270: services/rfq/api/client/client.go#L264-L270
Added lines #L264 - L270 were not covered by tests

Comment on lines +260 to +266
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Respect context cancellation in processWebsocket.

The processWebsocket function should respect the context cancellation and terminate gracefully when the context is done. Currently, it only checks for context cancellation in the main loop but not in the goroutines it spawns.

Modify the listenWsMessages and sendPings functions to accept the context and check for its cancellation:

-go c.listenWsMessages(ctx, conn, readChan)
+go func() {
+    err := c.listenWsMessages(ctx, conn, readChan)
+    if err != nil {
+        logger.Errorf("Error in listenWsMessages: %v", err)
+    }
+}()
-go c.sendPings(ctx, reqChan)
+go func() {
+    err := c.sendPings(ctx, reqChan)
+    if err != nil {
+        logger.Errorf("Error in sendPings: %v", err)
+    }
+}()

Then, update the listenWsMessages and sendPings functions to check for context cancellation and return an error if the context is done.

Committable suggestion was skipped due to low confidence.

}()

readChan := make(chan []byte)
go c.listenWsMessages(ctx, conn, readChan)
go c.sendPings(ctx, reqChan)

for {
select {
case <-ctx.Done():
return nil
case msg, ok := <-reqChan:
if !ok {
return fmt.Errorf("error reading from reqChan: %w", ctx.Err())
}
err := conn.WriteJSON(msg)
if err != nil {
return fmt.Errorf("error sending message to websocket: %w", err)
}
Comment on lines +282 to +284
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle error when writing to websocket

When sending a message using conn.WriteJSON(msg), if an error occurs, the loop continues without handling it appropriately. This could lead to silent failures or infinite loops on write errors.

Consider returning the error to terminate the function or handle it as needed:

 err := conn.WriteJSON(msg)
 if err != nil {
     return fmt.Errorf("error sending message to websocket: %w", err)
+    // Optionally, you might want to break or continue based on the error type.
 }

Committable suggestion was skipped due to low confidence.

case msg, ok := <-readChan:
if !ok {
return nil
}
err = c.handleWsMessage(ctx, msg, respChan)
if err != nil {
return fmt.Errorf("error handling websocket message: %w", err)
}

Check warning on line 300 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L277-L300

Added lines #L277 - L300 were not covered by tests
}
}
}
Comment on lines +260 to +295
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve processWebsocket implementation

The processWebsocket method handles the main WebSocket communication loop effectively. However, there are several areas that could be improved:

  1. The error returned from the deferred function is not handled.
  2. There's no mechanism to stop the goroutines when the function returns.
  3. The error handling in the select statement could be more robust.

Here's a suggested improvement:

 func (c *clientImpl) processWebsocket(ctx context.Context, conn *websocket.Conn, reqChan, respChan chan *model.ActiveRFQMessage) (err error) {
+    ctx, cancel := context.WithCancel(ctx)
+    defer cancel()
     defer func() {
         close(respChan)
-        err := conn.Close()
-        if err != nil {
+        if closeErr := conn.Close(); closeErr != nil {
             logger.Warnf("error closing websocket connection: %v", err)
+            if err == nil {
+                err = closeErr
+            }
         }
     }()

     readChan := make(chan []byte)
-    go c.listenWsMessages(ctx, conn, readChan)
-    go c.sendPings(ctx, reqChan)
+    go func() {
+        c.listenWsMessages(ctx, conn, readChan)
+        cancel()
+    }()
+    go func() {
+        c.sendPings(ctx, reqChan)
+        cancel()
+    }()

     for {
         select {
         case <-ctx.Done():
-            return nil
+            return ctx.Err()
         case msg, ok := <-reqChan:
             if !ok {
                 return fmt.Errorf("error reading from reqChan: %w", ctx.Err())
             }
             err := conn.WriteJSON(msg)
             if err != nil {
-                return fmt.Errorf("error sending message to websocket: %w", err)
+                return fmt.Errorf("error sending message to websocket: %w", err)
             }
         case msg, ok := <-readChan:
             if !ok {
-                return nil
+                return fmt.Errorf("websocket read channel closed unexpectedly")
             }
             err = c.handleWsMessage(ctx, msg, respChan)
             if err != nil {
                 return fmt.Errorf("error handling websocket message: %w", err)
             }
         }
     }
 }

These changes improve error handling, ensure proper cleanup of goroutines, and provide more context in error messages.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *clientImpl) processWebsocket(ctx context.Context, conn *websocket.Conn, reqChan, respChan chan *model.ActiveRFQMessage) (err error) {
defer func() {
close(respChan)
err := conn.Close()
if err != nil {
logger.Warnf("error closing websocket connection: %v", err)
}
}()
readChan := make(chan []byte)
go c.listenWsMessages(ctx, conn, readChan)
go c.sendPings(ctx, reqChan)
for {
select {
case <-ctx.Done():
return nil
case msg, ok := <-reqChan:
if !ok {
return fmt.Errorf("error reading from reqChan: %w", ctx.Err())
}
err := conn.WriteJSON(msg)
if err != nil {
return fmt.Errorf("error sending message to websocket: %w", err)
}
case msg, ok := <-readChan:
if !ok {
return nil
}
err = c.handleWsMessage(ctx, msg, respChan)
if err != nil {
return fmt.Errorf("error handling websocket message: %w", err)
}
}
}
}
func (c *clientImpl) processWebsocket(ctx context.Context, conn *websocket.Conn, reqChan, respChan chan *model.ActiveRFQMessage) (err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
close(respChan)
if closeErr := conn.Close(); closeErr != nil {
logger.Warnf("error closing websocket connection: %v", err)
if err == nil {
err = closeErr
}
}
}()
readChan := make(chan []byte)
go func() {
c.listenWsMessages(ctx, conn, readChan)
cancel()
}()
go func() {
c.sendPings(ctx, reqChan)
cancel()
}()
for {
select {
case <-ctx.Done():
return ctx.Err()
case msg, ok := <-reqChan:
if !ok {
return fmt.Errorf("error reading from reqChan: %w", ctx.Err())
}
err := conn.WriteJSON(msg)
if err != nil {
return fmt.Errorf("error sending message to websocket: %w", err)
}
case msg, ok := <-readChan:
if !ok {
return fmt.Errorf("websocket read channel closed unexpectedly")
}
err = c.handleWsMessage(ctx, msg, respChan)
if err != nil {
return fmt.Errorf("error handling websocket message: %w", err)
}
}
}
}
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 268-274: services/rfq/api/client/client.go#L268-L274
Added lines #L268 - L274 were not covered by tests


[warning] 277-300: services/rfq/api/client/client.go#L277-L300
Added lines #L277 - L300 were not covered by tests


func (c *clientImpl) sendPings(ctx context.Context, reqChan chan *model.ActiveRFQMessage) {
pingTicker := time.NewTicker(pingPeriod)
defer pingTicker.Stop()

for {
select {
case <-pingTicker.C:
pingMsg := model.ActiveRFQMessage{
Op: rest.PingOp,
}
reqChan <- &pingMsg
case <-ctx.Done():
return

Check warning on line 317 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L305-L317

Added lines #L305 - L317 were not covered by tests
}
}
}
func (c *clientImpl) listenWsMessages(ctx context.Context, conn *websocket.Conn, readChan chan []byte) {
defer close(readChan)
for {
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
logger.Warnf("websocket connection closed unexpectedly: %v", err)
}
return

Check warning on line 329 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L321-L329

Added lines #L321 - L329 were not covered by tests
}
Comment on lines +314 to +322
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Respect context cancellation in listenWsMessages

In the listenWsMessages function, if conn.ReadMessage() returns an error, the function exits without checking if the context has been canceled. Ensure that the function respects the context cancellation to prevent goroutine leaks.

Modify the error handling to check for context cancellation:

 if err != nil {
+    select {
+    case <-ctx.Done():
+        return
+    default:
+        // handle unexpected error
+    }
     if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
         logger.Warnf("websocket connection closed unexpectedly: %v", err)
     }
     return
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
defer close(readChan)
for {
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
logger.Warnf("websocket connection closed unexpectedly: %v", err)
}
return
}
defer close(readChan)
for {
_, message, err := conn.ReadMessage()
if err != nil {
select {
case <-ctx.Done():
return
default:
// handle unexpected error
}
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
logger.Warnf("websocket connection closed unexpectedly: %v", err)
}
return
}

select {
case readChan <- message:
case <-ctx.Done():
return

Check warning on line 334 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L331-L334

Added lines #L331 - L334 were not covered by tests
}
}
}
Comment on lines +313 to +329
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve listenWsMessages implementation

The listenWsMessages method effectively listens for incoming WebSocket messages. However, there are several areas that could be improved:

  1. Add a timeout for reading messages to prevent indefinite blocking.
  2. Enhance error handling with more informative messages.
  3. Respect context cancellation when writing to the channel.

Here's a suggested improvement:

 func (c *clientImpl) listenWsMessages(ctx context.Context, conn *websocket.Conn, readChan chan []byte) {
     defer close(readChan)
     for {
+        err := conn.SetReadDeadline(time.Now().Add(60 * time.Second))
+        if err != nil {
+            logger.Errorf("Failed to set read deadline: %v", err)
+            return
+        }
         _, message, err := conn.ReadMessage()
         if err != nil {
             if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
-                logger.Warnf("websocket connection closed unexpectedly: %v", err)
+                logger.Errorf("WebSocket connection closed unexpectedly: %v", err)
+            } else if err == io.EOF {
+                logger.Info("WebSocket connection closed")
+            } else {
+                logger.Errorf("Error reading WebSocket message: %v", err)
             }
             return
         }
         select {
         case readChan <- message:
         case <-ctx.Done():
+            logger.Info("Context cancelled, stopping WebSocket listener")
             return
+        default:
+            logger.Warn("Read channel full, discarding message")
         }
     }
 }

These changes add a read timeout, provide more detailed error logging, and ensure the method respects context cancellation. The default case in the select statement prevents blocking if the readChan is full.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (c *clientImpl) listenWsMessages(ctx context.Context, conn *websocket.Conn, readChan chan []byte) {
defer close(readChan)
for {
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
logger.Warnf("websocket connection closed unexpectedly: %v", err)
}
return
}
select {
case readChan <- message:
case <-ctx.Done():
return
}
}
}
func (c *clientImpl) listenWsMessages(ctx context.Context, conn *websocket.Conn, readChan chan []byte) {
defer close(readChan)
for {
err := conn.SetReadDeadline(time.Now().Add(60 * time.Second))
if err != nil {
logger.Errorf("Failed to set read deadline: %v", err)
return
}
_, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
logger.Errorf("WebSocket connection closed unexpectedly: %v", err)
} else if err == io.EOF {
logger.Info("WebSocket connection closed")
} else {
logger.Errorf("Error reading WebSocket message: %v", err)
}
return
}
select {
case readChan <- message:
case <-ctx.Done():
logger.Info("Context cancelled, stopping WebSocket listener")
return
default:
logger.Warn("Read channel full, discarding message")
}
}
}
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 321-329: services/rfq/api/client/client.go#L321-L329
Added lines #L321 - L329 were not covered by tests


[warning] 331-334: services/rfq/api/client/client.go#L331-L334
Added lines #L331 - L334 were not covered by tests


func (c *clientImpl) handleWsMessage(ctx context.Context, msg []byte, respChan chan *model.ActiveRFQMessage) (err error) {
var rfqMsg model.ActiveRFQMessage
err = json.Unmarshal(msg, &rfqMsg)
if err != nil {
return fmt.Errorf("error unmarshaling message: %w", err)
}

Check warning on line 344 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L339-L344

Added lines #L339 - L344 were not covered by tests

select {
case respChan <- &rfqMsg:
case <-ctx.Done():
return nil

Check warning on line 349 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L346-L349

Added lines #L346 - L349 were not covered by tests
}
return nil

Check warning on line 351 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L351

Added line #L351 was not covered by tests
}
Comment on lines +331 to +344
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Enhance handleWsMessage implementation

The handleWsMessage method effectively unmarshals and handles incoming WebSocket messages. However, there are several areas that could be improved:

  1. Provide more informative error handling.
  2. Handle potential blocking when sending to the channel.
  3. Add validation for the unmarshaled message structure.

Here's a suggested improvement:

 func (c *clientImpl) handleWsMessage(ctx context.Context, msg []byte, respChan chan *model.ActiveRFQMessage) (err error) {
     var rfqMsg model.ActiveRFQMessage
     err = json.Unmarshal(msg, &rfqMsg)
     if err != nil {
-        return fmt.Errorf("error unmarshaling message: %w", err)
+        return fmt.Errorf("failed to unmarshal WebSocket message: %w", err)
     }

+    // Validate the message structure
+    if err := rfqMsg.Validate(); err != nil {
+        return fmt.Errorf("invalid message structure: %w", err)
+    }
+
     select {
     case respChan <- &rfqMsg:
+        logger.Debugf("Successfully sent message to response channel: %v", rfqMsg.Op)
     case <-ctx.Done():
-        return nil
+        return ctx.Err()
+    default:
+        // If the channel is full, log a warning and continue
+        logger.Warn("Response channel full, discarding message")
     }
     return nil
 }

These changes provide more detailed error messages, add message structure validation (assuming a Validate method exists on ActiveRFQMessage), and handle the case where the response channel might be full. The default case in the select statement prevents blocking if respChan is full, logging a warning instead.

Consider adding a Validate method to the ActiveRFQMessage struct if it doesn't already exist:

func (m *ActiveRFQMessage) Validate() error {
    if m.Op == "" {
        return errors.New("message operation is empty")
    }
    // Add more validation as needed
    return nil
}
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 339-344: services/rfq/api/client/client.go#L339-L344
Added lines #L339 - L344 were not covered by tests


[warning] 346-349: services/rfq/api/client/client.go#L346-L349
Added lines #L346 - L349 were not covered by tests


[warning] 351-351: services/rfq/api/client/client.go#L351
Added line #L351 was not covered by tests


// GetAllQuotes retrieves all quotes from the RFQ quoting API.
func (c *unauthenticatedClient) GetAllQuotes(ctx context.Context) ([]*model.GetQuoteResponse, error) {
var quotes []*model.GetQuoteResponse
Expand Down Expand Up @@ -242,6 +434,25 @@
return contracts, nil
}

func (c unauthenticatedClient) PutRFQRequest(ctx context.Context, q *model.PutRFQRequest) (*model.PutRFQResponse, error) {
var response model.PutRFQResponse
resp, err := c.rClient.R().
SetContext(ctx).
SetBody(q).
SetResult(&response).
Put(rest.RFQRoute)

if err != nil {
return nil, fmt.Errorf("error from server: %s: %w", getStatus(resp), err)
}

Check warning on line 447 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L437-L447

Added lines #L437 - L447 were not covered by tests

if resp.IsError() {
return nil, fmt.Errorf("error from server: %s", getStatus(resp))
}

Check warning on line 451 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L449-L451

Added lines #L449 - L451 were not covered by tests

return &response, nil

Check warning on line 453 in services/rfq/api/client/client.go

View check run for this annotation

Codecov / codecov/patch

services/rfq/api/client/client.go#L453

Added line #L453 was not covered by tests
}
Comment on lines +429 to +446
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve PutRFQRequest implementation

The PutRFQRequest method correctly implements the functionality for unauthenticated quote requests. However, there are several areas that could be improved:

  1. Provide more descriptive error messages.
  2. Add a nil check for the response before dereferencing.
  3. Handle potential empty responses for successful requests.

Here's a suggested improvement:

 func (c unauthenticatedClient) PutRFQRequest(ctx context.Context, q *model.PutRFQRequest) (*model.PutRFQResponse, error) {
     var response model.PutRFQResponse
     resp, err := c.rClient.R().
         SetContext(ctx).
         SetBody(q).
         SetResult(&response).
         Put(rest.RFQRoute)

     if err != nil {
-        return nil, fmt.Errorf("error from server: %s: %w", getStatus(resp), err)
+        return nil, fmt.Errorf("failed to send RFQ request: %s: %w", getStatus(resp), err)
     }

     if resp.IsError() {
-        return nil, fmt.Errorf("error from server: %s", getStatus(resp))
+        return nil, fmt.Errorf("server returned error for RFQ request: %s", getStatus(resp))
     }

+    // Check for empty response on successful requests
+    if resp.IsSuccess() && resp.StatusCode() != http.StatusNoContent {
+        if response == (model.PutRFQResponse{}) {
+            return nil, fmt.Errorf("received empty response for successful RFQ request")
+        }
+    }

     return &response, nil
 }

These changes provide more descriptive error messages and add a check for empty responses on successful requests, improving the robustness of the method.

Consider adding a validation step for the input q *model.PutRFQRequest to ensure all required fields are present before sending the request:

if err := q.Validate(); err != nil {
    return nil, fmt.Errorf("invalid RFQ request: %w", err)
}

This assumes a Validate method exists on the PutRFQRequest struct. If it doesn't, consider adding one to perform input validation.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 437-447: services/rfq/api/client/client.go#L437-L447
Added lines #L437 - L447 were not covered by tests


[warning] 449-451: services/rfq/api/client/client.go#L449-L451
Added lines #L449 - L451 were not covered by tests


[warning] 453-453: services/rfq/api/client/client.go#L453
Added line #L453 was not covered by tests


func getStatus(resp *resty.Response) string {
if resp == nil {
return "http status unavailable"
Expand Down
8 changes: 4 additions & 4 deletions services/rfq/api/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
// TODO: @aurelius tese tests make a lot less sesnes w/ a composite index

func (c *ClientSuite) TestPutAndGetQuote() {
req := model.PutQuoteRequest{
req := model.PutRelayerQuoteRequest{
OriginChainID: 1,
OriginTokenAddr: "0xOriginTokenAddr",
DestChainID: 42161,
Expand Down Expand Up @@ -40,7 +40,7 @@ func (c *ClientSuite) TestPutAndGetQuote() {

func (c *ClientSuite) TestPutAndGetBulkQuotes() {
req := model.PutBulkQuotesRequest{
Quotes: []model.PutQuoteRequest{
Quotes: []model.PutRelayerQuoteRequest{
{
OriginChainID: 1,
OriginTokenAddr: "0xOriginTokenAddr",
Expand Down Expand Up @@ -98,7 +98,7 @@ func (c *ClientSuite) TestPutAndGetBulkQuotes() {
}

func (c *ClientSuite) TestGetSpecificQuote() {
req := model.PutQuoteRequest{
req := model.PutRelayerQuoteRequest{
OriginChainID: 1,
OriginTokenAddr: "0xOriginTokenAddr",
DestChainID: 42161,
Expand Down Expand Up @@ -135,7 +135,7 @@ func (c *ClientSuite) TestGetSpecificQuote() {
}

func (c *ClientSuite) TestGetQuoteByRelayerAddress() {
req := model.PutQuoteRequest{
req := model.PutRelayerQuoteRequest{
OriginChainID: 1,
OriginTokenAddr: "0xOriginTokenAddr",
DestChainID: 42161,
Expand Down
Loading
Loading