Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rfq-relayer): relayer supports active quoting #3198

Merged
merged 50 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
d93e20f
Feat: add active rfq subscription on quoter
dwasse Sep 26, 2024
481f043
Feat: relayer subscribes to active quotes upon starting
dwasse Sep 26, 2024
fdbc865
[goreleaser]
dwasse Sep 26, 2024
77c51e8
Feat: specify ws url in relayer
dwasse Sep 26, 2024
d10d56d
[goreleaser]
dwasse Sep 26, 2024
d2b1701
Merge branch 'feat/active-rfq-api' into feat/active-rfq-relayer
dwasse Sep 27, 2024
f06a64b
[goreleaser]
dwasse Sep 27, 2024
f6300a1
Fix: build
dwasse Sep 27, 2024
2646149
[goreleaser]
dwasse Sep 27, 2024
ea61286
Merge branch 'feat/active-rfq-api' into feat/active-rfq-relayer
dwasse Sep 27, 2024
dcd264a
Feat: relayer tracing
dwasse Sep 27, 2024
02bf53c
[goreleaser]
dwasse Sep 27, 2024
a83253f
Feat: use supports_active_quoting instead of ws url
dwasse Sep 27, 2024
6febf7b
[goreleaser]
dwasse Sep 27, 2024
3ce1bd3
WIP: add logs
dwasse Sep 27, 2024
460f5ac
[goreleaser]
dwasse Sep 27, 2024
9ec49cb
WIP: more logs
dwasse Sep 27, 2024
92b49ec
[goreleaser]
dwasse Sep 30, 2024
e85ff62
More logs
dwasse Sep 30, 2024
f4ed5b5
[goreleaser]
dwasse Sep 30, 2024
0c0b562
More logs
dwasse Sep 30, 2024
1742fe3
[goreleaser]
dwasse Sep 30, 2024
0d7a7c4
More logs
dwasse Sep 30, 2024
4828dfc
[goreleaser]
dwasse Sep 30, 2024
ab9513d
Close conn when encountering write err
dwasse Sep 30, 2024
a2a2079
[goreleaser]
dwasse Sep 30, 2024
69ed171
More logs
dwasse Sep 30, 2024
fbccdf8
[goreleaser]
dwasse Sep 30, 2024
4b098f9
More logs
dwasse Sep 30, 2024
135f1ac
[goreleaser]
dwasse Sep 30, 2024
b4969e9
More logs
dwasse Sep 30, 2024
bf3adaa
[goreleaser]
dwasse Sep 30, 2024
82ed6bb
More logs
dwasse Sep 30, 2024
896d52d
[goreleaser]
dwasse Sep 30, 2024
0018269
Logs with ts
dwasse Sep 30, 2024
c3f0eb3
[goreleaser]
dwasse Sep 30, 2024
50b969e
More tracing
dwasse Sep 30, 2024
64389c4
[goreleaser]
dwasse Sep 30, 2024
0c07fe4
Fix: send to reqChan
dwasse Sep 30, 2024
719361a
[goreleaser]
dwasse Sep 30, 2024
26c9174
Check for zero pong time
dwasse Sep 30, 2024
ccd24b3
Fix: make close_at and closed_quote_id optional
dwasse Sep 30, 2024
9688fa7
[goreleaser]
dwasse Sep 30, 2024
b98ca8c
Feat: remove extra fields from responses
dwasse Sep 30, 2024
739472d
[goreleaser]
dwasse Sep 30, 2024
7d7f6df
Fix: skip passive quote
dwasse Sep 30, 2024
6a5590f
[goreleaser]
dwasse Sep 30, 2024
56afeff
Cleanup: remove logs
dwasse Sep 30, 2024
bdb7539
Fix: use correct span
dwasse Sep 30, 2024
ab15e5d
Cleanup: remove logs
dwasse Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions services/rfq/api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (c *clientImpl) PutRelayAck(ctx context.Context, req *model.PutAckRequest)
}

func (c *clientImpl) SubscribeActiveQuotes(ctx context.Context, req *model.SubscribeActiveRFQRequest, reqChan chan *model.ActiveRFQMessage) (respChan chan *model.ActiveRFQMessage, err error) {
fmt.Println("client: subscribing")
conn, err := c.connectWebsocket(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to connect to websocket: %w", err)
Expand All @@ -190,28 +191,34 @@ func (c *clientImpl) SubscribeActiveQuotes(ctx context.Context, req *model.Subsc
if err != nil {
return respChan, fmt.Errorf("error marshaling subscription params: %w", err)
}
fmt.Printf("%s sub: %v\n", time.Now().String(), sub)
err = conn.WriteJSON(model.ActiveRFQMessage{
Op: rest.SubscribeOp,
Content: json.RawMessage(subJSON),
})
if err != nil {
fmt.Printf("%s error sending subscribe message: %v\n", time.Now().String(), err)
return nil, fmt.Errorf("error sending subscribe message: %w", err)
}

// make sure subscription is successful
fmt.Printf("%s waiting for subscribe response\n", time.Now().String())
var resp model.ActiveRFQMessage
err = conn.ReadJSON(&resp)
if err != nil {
return nil, fmt.Errorf("error reading subscribe response: %w", err)
}
fmt.Printf("%s subscribe response: %v\n", time.Now().String(), resp)
if !resp.Success || resp.Op != rest.SubscribeOp {
fmt.Printf("%s subscription failed: %v\n", time.Now().String(), resp)
return nil, fmt.Errorf("subscription failed")
}

respChan = make(chan *model.ActiveRFQMessage)
go func() {
err = c.processWebsocket(ctx, conn, reqChan, respChan)
if err != nil {
fmt.Printf("%s Error running websocket listener: %s\n", time.Now().String(), err)
logger.Error("Error running websocket listener: %s", err)
}
}()
Expand All @@ -220,6 +227,7 @@ func (c *clientImpl) SubscribeActiveQuotes(ctx context.Context, req *model.Subsc
}

func (c *clientImpl) connectWebsocket(ctx context.Context, req *model.SubscribeActiveRFQRequest) (conn *websocket.Conn, err error) {
fmt.Printf("%s connecting websocket: %v\n", time.Now().String(), req)
if len(req.ChainIDs) == 0 {
return nil, fmt.Errorf("chain IDs are required")
}
Expand All @@ -230,15 +238,19 @@ func (c *clientImpl) connectWebsocket(ctx context.Context, req *model.SubscribeA
}

reqURL := strings.Replace(c.rClient.BaseURL, "http", "ws", 1) + rest.RFQStreamRoute
fmt.Printf("dialing websocket: %s\n", reqURL)
fmt.Printf("headers: %v\n", header)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve logging approach for WebSocket connection details

While the added print statements can be helpful for debugging, using fmt.Printf directly is not ideal for production code. Consider the following improvements:

  1. Use the existing logger variable (which is an instance of log.Logger) instead of fmt.Printf. This allows for better log management and potential log level control.
  2. Consider logging at a debug or trace level to avoid cluttering production logs.
  3. Be cautious about logging headers, as they may contain sensitive information.

Here's a suggested refactor:

-	fmt.Printf("dialing websocket: %s\n", reqURL)
-	fmt.Printf("headers: %v\n", header)
+	logger.Debugf("Dialing WebSocket: %s", reqURL)
+	// Log headers selectively or at a lower log level
+	logger.Tracef("WebSocket headers: %v", header)

Also, consider adding a helper function to sanitize headers before logging to ensure sensitive information is not exposed.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fmt.Printf("dialing websocket: %s\n", reqURL)
fmt.Printf("headers: %v\n", header)
logger.Debugf("Dialing WebSocket: %s", reqURL)
// Log headers selectively or at a lower log level
logger.Tracef("WebSocket headers: %v", header)

conn, httpResp, err := websocket.DefaultDialer.Dial(reqURL, header)
if err != nil {
return nil, fmt.Errorf("failed to connect to websocket: %w", err)
}
err = httpResp.Body.Close()
if err != nil {
fmt.Printf("error closing resp body: %v\n", err)
logger.Warnf("error closing websocket connection: %v", err)
}

fmt.Println("connected to websocket")
return conn, nil
}

Expand All @@ -258,10 +270,12 @@ func (c *clientImpl) getWsHeaders(ctx context.Context, req *model.SubscribeActiv
}

func (c *clientImpl) processWebsocket(ctx context.Context, conn *websocket.Conn, reqChan, respChan chan *model.ActiveRFQMessage) (err error) {
fmt.Printf("%s processing websocket\n", time.Now().String())
defer func() {
close(respChan)
err := conn.Close()
if err != nil {
fmt.Printf("%s Error closing websocket connection: %v\n", time.Now().String(), err)
logger.Warnf("error closing websocket connection: %v", err)
}
}()
Expand All @@ -272,9 +286,11 @@ func (c *clientImpl) processWebsocket(ctx context.Context, conn *websocket.Conn,
for {
select {
case <-ctx.Done():
fmt.Printf("%s context done for processWebsocket\n", time.Now().String())
return nil
case msg, ok := <-reqChan:
if !ok {
fmt.Printf("%s reqChan closed\n", time.Now().String())
return nil
}
err := conn.WriteJSON(msg)
Expand All @@ -283,10 +299,12 @@ func (c *clientImpl) processWebsocket(ctx context.Context, conn *websocket.Conn,
}
case msg, ok := <-readChan:
if !ok {
fmt.Printf("%s readChan closed\n", time.Now().String())
return nil
}
err = c.handleWsMessage(ctx, msg, reqChan, respChan)
if err != nil {
fmt.Printf("%s error handling websocket message: %v\n", time.Now().String(), err)
return fmt.Errorf("error handling websocket message: %w", err)
}
}
Expand All @@ -297,6 +315,8 @@ func (c *clientImpl) listenWsMessages(ctx context.Context, conn *websocket.Conn,
defer close(readChan)
for {
_, message, err := conn.ReadMessage()
fmt.Printf("read message: %v\n", message)
fmt.Printf("read message err: %v\n", err)
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
logger.Warnf("websocket connection closed unexpectedly: %v", err)
Expand All @@ -306,15 +326,18 @@ func (c *clientImpl) listenWsMessages(ctx context.Context, conn *websocket.Conn,
select {
case readChan <- message:
case <-ctx.Done():
fmt.Println("context done for listenWsMessages")
return
}
}
}

func (c *clientImpl) handleWsMessage(ctx context.Context, msg []byte, reqChan, respChan chan *model.ActiveRFQMessage) (err error) {
fmt.Printf("%s handling websocket message: %v\n", time.Now().String(), msg)
var rfqMsg model.ActiveRFQMessage
err = json.Unmarshal(msg, &rfqMsg)
if err != nil {
fmt.Printf("%s error unmarshaling message: %v\n", time.Now().String(), err)
return fmt.Errorf("error unmarshaling message: %w", err)
}

Expand All @@ -333,7 +356,9 @@ func (c *clientImpl) handleWsMessage(ctx context.Context, msg []byte, reqChan, r

select {
case respChan <- &rfqMsg:
fmt.Printf("%s sent message to respChan\n", time.Now().String())
case <-ctx.Done():
fmt.Printf("%s context done for handleWsMessage\n", time.Now().String())
return nil
}
return nil
Expand Down
8 changes: 7 additions & 1 deletion services/rfq/api/rest/rfq.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,14 @@ func (r *QuoterAPIServer) collectRelayerResponses(ctx context.Context, request *
wg.Add(1)
go func(client WsClient) {
var respStatus db.ActiveQuoteResponseStatus
var err error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix variable shadowing of err to ensure correct error handling

The use of := in resp, err := client.ReceiveQuoteResponse(collectionCtx, requestID) shadows the err variable declared earlier at line 93. This means that the err used in the deferred function may remain nil, and any error returned by ReceiveQuoteResponse won't be properly logged or passed to metrics.EndSpanWithErr. To fix this, replace := with = to assign to the existing err variable.

Apply this diff to fix the issue:

-            resp, err := client.ReceiveQuoteResponse(collectionCtx, requestID)
+            resp, err = client.ReceiveQuoteResponse(collectionCtx, requestID)

Committable suggestion was skipped due to low confidence.

_, clientSpan := r.handler.Tracer().Start(collectionCtx, "collectRelayerResponses", trace.WithAttributes(
attribute.String("relayer_address", relayerAddr),
attribute.String("request_id", requestID),
))
defer func() {
clientSpan.SetAttributes(attribute.String("status", respStatus.String()))
metrics.EndSpan(clientSpan)
metrics.EndSpanWithErr(clientSpan, err)
}()

defer wg.Done()
Expand All @@ -105,6 +106,11 @@ func (r *QuoterAPIServer) collectRelayerResponses(ctx context.Context, request *
logger.Errorf("Error receiving quote response: %v", err)
return
}
span.AddEvent("received quote response", trace.WithAttributes(
attribute.String("relayer_address", relayerAddr),
attribute.String("request_id", requestID),
attribute.String("dest_amount", resp.DestAmount),
))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use clientSpan instead of parent span within goroutine to prevent data races

Accessing the parent span from within a goroutine can lead to data races because OpenTelemetry spans are not concurrency-safe. Instead, you should use the clientSpan created within the goroutine to add events.

Apply this diff to fix the issue:

-        span.AddEvent("received quote response", trace.WithAttributes(
+        clientSpan.AddEvent("received quote response", trace.WithAttributes(
             attribute.String("relayer_address", relayerAddr),
             attribute.String("request_id", requestID),
             attribute.String("dest_amount", resp.DestAmount),
         ))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
span.AddEvent("received quote response", trace.WithAttributes(
attribute.String("relayer_address", relayerAddr),
attribute.String("request_id", requestID),
attribute.String("dest_amount", resp.DestAmount),
))
clientSpan.AddEvent("received quote response", trace.WithAttributes(
attribute.String("relayer_address", relayerAddr),
attribute.String("request_id", requestID),
attribute.String("dest_amount", resp.DestAmount),
))


// validate the response
respStatus = getQuoteResponseStatus(expireCtx, resp)
Expand Down
31 changes: 31 additions & 0 deletions services/rfq/api/rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func (r *QuoterAPIServer) Run(ctx context.Context) error {
wsRoute := engine.Group(RFQStreamRoute)
wsRoute.Use(r.AuthMiddleware())
wsRoute.GET("", func(c *gin.Context) {
fmt.Println("GET /rfq_stream")
r.GetActiveRFQWebsocket(ctx, c)
})

Expand Down Expand Up @@ -275,7 +276,9 @@ func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc {
loggedRequest = &req
}
case RFQRoute, RFQStreamRoute:
fmt.Println("RFQRoute, RFQStreamRoute")
chainsHeader := c.GetHeader(ChainsHeader)
fmt.Printf("got chains header: %s\n", chainsHeader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using a proper logging framework and removing debug logs.

While adding logs can be helpful for debugging, using fmt.Println and fmt.Printf directly is not ideal for production code. Consider the following improvements:

  1. Use a structured logging framework (e.g., logrus, zap) instead of fmt for better log management and performance.
  2. Use appropriate log levels (e.g., debug, info) instead of print statements.
  3. Consider making these logs conditional based on a debug flag to avoid unnecessary logging in production.

Example using the existing logger:

if debugMode {
    logger.Debug("RFQRoute or RFQStreamRoute hit")
    logger.Debugf("Got chains header: %s", chainsHeader)
}

if chainsHeader != "" {
var chainIDs []int
err = json.Unmarshal([]byte(chainsHeader), &chainIDs)
Expand All @@ -285,10 +288,12 @@ func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc {
}
}
}
fmt.Printf("dest chain ids: %v\n", destChainIDs)
default:
err = fmt.Errorf("unexpected request path: %s", c.Request.URL.Path)
}
if err != nil {
fmt.Printf("error: %v\n", err)
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
c.Abort()
return
Expand All @@ -299,13 +304,16 @@ func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc {
for _, destChainID := range destChainIDs {
addr, err := r.checkRole(c, destChainID)
if err != nil {
fmt.Printf("error checking role: %v\n", err)
c.JSON(http.StatusBadRequest, gin.H{"msg": err.Error()})
c.Abort()
return
}
if addressRecovered == nil {
fmt.Printf("no address recovered, setting to %s\n", addr.Hex())
addressRecovered = &addr
} else if *addressRecovered != addr {
fmt.Printf("relayer address mismatch: %s != %s\n", addressRecovered.Hex(), addr.Hex())
c.JSON(http.StatusBadRequest, gin.H{"msg": "relayer address mismatch"})
c.Abort()
return
Expand All @@ -316,6 +324,7 @@ func (r *QuoterAPIServer) AuthMiddleware() gin.HandlerFunc {
// Store the request in context after binding and validation
c.Set("putRequest", loggedRequest)
c.Set("relayerAddr", addressRecovered.Hex())
fmt.Println("auth succeeded")
c.Next()
}
}
Expand Down Expand Up @@ -434,28 +443,34 @@ func (r *QuoterAPIServer) PutRelayAck(c *gin.Context) {
// @Header 101 {string} X-Api-Version "API Version Number - See docs for more info"
// @Router /quote_requests [get].
func (r *QuoterAPIServer) GetActiveRFQWebsocket(ctx context.Context, c *gin.Context) {
fmt.Println("GetActiveRFQWebsocket")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance logging with proper framework and more context.

The current debug print statement can be improved for better logging practices:

  1. Replace fmt.Println with a proper logging framework, preferably the same one used elsewhere in the codebase (e.g., the logger variable).
  2. Consider adding more context to the log, such as the relayer address or any relevant request parameters.
  3. Use appropriate log levels to allow for easy filtering in different environments.

Example improvement:

logger.WithFields(log.Fields{
    "relayerAddr": relayerAddr,
    "remoteAddr": c.Request.RemoteAddr,
}).Debug("Handling WebSocket connection for active quote requests")

Also, consider wrapping this log in a debug flag check to avoid unnecessary logging in production environments.

ctx, span := r.handler.Tracer().Start(ctx, "GetActiveRFQWebsocket")
defer func() {
metrics.EndSpan(span)
}()

fmt.Println("upgrading websocket")
ws, err := r.upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
logger.Error("Failed to set websocket upgrade", "error", err)
return
}
fmt.Println("upgraded websocket")

// use the relayer address as the ID for the connection
rawRelayerAddr, exists := c.Get("relayerAddr")
if !exists {
fmt.Println("no relayer address recovered from signature")
c.JSON(http.StatusBadRequest, gin.H{"error": "No relayer address recovered from signature"})
return
}
relayerAddr, ok := rawRelayerAddr.(string)
if !ok {
fmt.Println("invalid relayer address type")
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid relayer address type"})
return
}
fmt.Printf("relayer address: %s\n", relayerAddr)

span.SetAttributes(
attribute.String("relayer_address", relayerAddr),
Expand All @@ -464,6 +479,7 @@ func (r *QuoterAPIServer) GetActiveRFQWebsocket(ctx context.Context, c *gin.Cont
// only one connection per relayer allowed
_, ok = r.wsClients.Load(relayerAddr)
if ok {
fmt.Println("relayer already connected")
c.JSON(http.StatusBadRequest, gin.H{"error": "relayer already connected"})
return
}
Expand All @@ -474,12 +490,16 @@ func (r *QuoterAPIServer) GetActiveRFQWebsocket(ctx context.Context, c *gin.Cont
}()

client := newWsClient(relayerAddr, ws, r.pubSubManager, r.handler)
fmt.Println("registered ws client")
r.wsClients.Store(relayerAddr, client)
span.AddEvent("registered ws client")
fmt.Println("running ws client")
err = client.Run(ctx)
if err != nil {
fmt.Printf("error running ws client: %v\n", err)
logger.Error("Error running websocket client", "error", err)
}
fmt.Println("ws client done")
}

const (
Expand Down Expand Up @@ -533,16 +553,23 @@ func (r *QuoterAPIServer) PutRFQRequest(c *gin.Context) {
var activeQuote *model.QuoteData
if isActiveRFQ {
activeQuote = r.handleActiveRFQ(ctx, &req, requestID)
if activeQuote != nil && activeQuote.DestAmount != nil {
span.SetAttributes(attribute.String("active_quote_dest_amount", *activeQuote.DestAmount))
}
}
passiveQuote, err := r.handlePassiveRFQ(ctx, &req)
if err != nil {
logger.Error("Error handling passive RFQ", "error", err)
}
if passiveQuote != nil && passiveQuote.DestAmount != nil {
span.SetAttributes(attribute.String("passive_quote_dest_amount", *passiveQuote.DestAmount))
}
quote, _ := getBestQuote(activeQuote, passiveQuote)

// construct the response
var resp model.PutUserQuoteResponse
if quote == nil {
span.AddEvent("no quotes found")
resp = model.PutUserQuoteResponse{
Success: false,
Reason: "no quotes found",
Expand All @@ -552,6 +579,10 @@ func (r *QuoterAPIServer) PutRFQRequest(c *gin.Context) {
if activeQuote == nil {
quoteType = quoteTypePassive
}
span.SetAttributes(
attribute.String("quote_type", quoteType),
attribute.String("quote_dest_amount", *quote.DestAmount),
)
resp = model.PutUserQuoteResponse{
Success: true,
Data: *quote,
Expand Down
3 changes: 0 additions & 3 deletions services/rfq/api/rest/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type ServerSuite struct {
handler metrics.Handler
QuoterAPIServer *rest.QuoterAPIServer
port uint16
wsPort uint16
originChainID int
destChainID int
}
Expand Down Expand Up @@ -75,8 +74,6 @@ func (c *ServerSuite) SetupTest() {
c.True(ok)
port, err := freeport.GetFreePort()
c.port = uint16(port)
wsPort, err := freeport.GetFreePort()
c.wsPort = uint16(wsPort)
c.Require().NoError(err)

testConfig := config.Config{
Expand Down
Loading
Loading