-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFQ API: add GET /ack endpoint #2643
Changes from 6 commits
fde6560
e3b690f
3984a35
f5155e6
f56104a
b201299
26fcd00
71651df
43b58f8
b9fe8ca
1941b30
3c47bfb
a682021
b56e825
c8cf473
7e85d35
d9037b4
e50f170
7e9975d
86ff8ee
8265f20
2327ba5
81c7791
85b3dc0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -5,6 +5,7 @@ import ( | |||||
"fmt" | ||||||
"os" | ||||||
"path/filepath" | ||||||
"time" | ||||||
|
||||||
"github.com/jftuga/ellipsis" | ||||||
"gopkg.in/yaml.v2" | ||||||
|
@@ -21,8 +22,19 @@ type Config struct { | |||||
Database DatabaseConfig `yaml:"database"` | ||||||
OmniRPCURL string `yaml:"omnirpc_url"` | ||||||
// bridges is a map of chainid->address | ||||||
Bridges map[uint32]string `yaml:"bridges"` | ||||||
Port string `yaml:"port"` | ||||||
Bridges map[uint32]string `yaml:"bridges"` | ||||||
Port string `yaml:"port"` | ||||||
RelayAckTimeout time.Duration `yaml:"relay_ack_timeout"` | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a YAML tag for - RelayAckTimeout time.Duration `yaml:"relay_ack_timeout"`
+ RelayAckTimeout time.Duration `yaml:"relay_ack_timeout" yaml:"relay_ack_timeout"` Committable suggestion
Suggested change
|
||||||
} | ||||||
|
||||||
const defaultRelayAckTimeout = 5 * time.Second | ||||||
dwasse marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
// GetRelayAckTimeout returns the relay ack timeout. | ||||||
func (c Config) GetRelayAckTimeout() time.Duration { | ||||||
if c.RelayAckTimeout == 0 { | ||||||
return defaultRelayAckTimeout | ||||||
} | ||||||
return c.RelayAckTimeout | ||||||
} | ||||||
|
||||||
// LoadConfig loads the config from the given path. | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
"context" | ||
"fmt" | ||
"net/http" | ||
"sync" | ||
"time" | ||
|
||
"github.com/ipfs/go-log" | ||
|
@@ -25,6 +26,7 @@ | |
"github.com/synapsecns/sanguine/services/rfq/api/docs" | ||
"github.com/synapsecns/sanguine/services/rfq/api/model" | ||
"github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge" | ||
"github.com/synapsecns/sanguine/services/rfq/relayer/relapi" | ||
) | ||
|
||
// QuoterAPIServer is a struct that holds the configuration, database connection, gin engine, RPC client, metrics handler, and fast bridge contracts. | ||
|
@@ -37,6 +39,11 @@ | |
handler metrics.Handler | ||
fastBridgeContracts map[uint32]*fastbridge.FastBridge | ||
roleCache map[uint32]*ttlcache.Cache[string, bool] | ||
// relayAckCache contains a set of transactionID values that reflect | ||
// transactions that have been acked for relay | ||
relayAckCache *ttlcache.Cache[string, bool] | ||
// ackMux is a mutex used to ensure that only one transaction id can be acked at a time. | ||
ackMux sync.Mutex | ||
} | ||
|
||
// NewAPI holds the configuration, database connection, gin engine, RPC client, metrics handler, and fast bridge contracts. | ||
|
@@ -80,27 +87,40 @@ | |
ttlcache.WithTTL[string, bool](cacheInterval), | ||
) | ||
roleCache := roles[chainID] | ||
|
||
go roleCache.Start() | ||
go func() { | ||
<-ctx.Done() | ||
roleCache.Stop() | ||
}() | ||
} | ||
|
||
// create the relay ack cache | ||
relayAckCache := ttlcache.New[string, bool]( | ||
dwasse marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ttlcache.WithTTL[string, bool](cfg.GetRelayAckTimeout()), | ||
ttlcache.WithDisableTouchOnHit[string, bool](), | ||
) | ||
go relayAckCache.Start() | ||
go func() { | ||
<-ctx.Done() | ||
relayAckCache.Stop() | ||
}() | ||
|
||
return &QuoterAPIServer{ | ||
cfg: cfg, | ||
db: store, | ||
omnirpcClient: omniRPCClient, | ||
handler: handler, | ||
fastBridgeContracts: bridges, | ||
roleCache: roles, | ||
relayAckCache: relayAckCache, | ||
ackMux: sync.Mutex{}, | ||
}, nil | ||
} | ||
|
||
// QuoteRoute is the API endpoint for handling quote related requests. | ||
const ( | ||
QuoteRoute = "/quotes" | ||
AckRoute = "/ack" | ||
cacheInterval = time.Minute | ||
) | ||
|
||
|
@@ -120,6 +140,7 @@ | |
// GET routes without the AuthMiddleware | ||
// engine.PUT("/quotes", h.ModifyQuote) | ||
engine.GET(QuoteRoute, h.GetQuotes) | ||
engine.GET(AckRoute, r.GetRelayAck) | ||
dwasse marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
r.engine = engine | ||
|
||
|
@@ -188,3 +209,28 @@ | |
c.Next() | ||
} | ||
} | ||
|
||
// GetRelayAck checks if a relay is pending or not. | ||
func (r *QuoterAPIServer) GetRelayAck(c *gin.Context) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we update this w/ swagger docs? You should see this in handler.go and be able to regenerate. Just consists of copying the comments over from anything in Would also be good to add the endpoint here and document the step here |
||
transactionID := c.Query("id") | ||
if transactionID == "" { | ||
c.JSON(http.StatusBadRequest, gin.H{"error": "Must specify 'id'"}) | ||
return | ||
} | ||
|
||
// If the tx id is already in the cache, it should not be relayed. | ||
// Otherwise, insert into the cache. | ||
r.ackMux.Lock() | ||
ack := r.relayAckCache.Get(transactionID) | ||
shouldRelay := ack == nil | ||
if shouldRelay { | ||
r.relayAckCache.Set(transactionID, true, ttlcache.DefaultTTL) | ||
} | ||
r.ackMux.Unlock() | ||
|
||
resp := relapi.GetRelayAckResponse{ | ||
TxID: transactionID, | ||
ShouldRelay: shouldRelay, | ||
} | ||
c.JSON(http.StatusOK, resp) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -138,14 +138,25 @@ | |
return fmt.Errorf("could not get committable balance: %w", err) | ||
} | ||
|
||
// if committableBalance > destAmount | ||
// check if we have enough inventory to handle the request | ||
if committableBalance.Cmp(request.Transaction.DestAmount) < 0 { | ||
err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.NotEnoughInventory) | ||
if err != nil { | ||
return fmt.Errorf("could not update request status: %w", err) | ||
} | ||
return nil | ||
} | ||
|
||
Comment on lines
+142
to
+150
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add logging for better traceability and debugging. // check if we have enough inventory to handle the request
if committableBalance.Cmp(request.Transaction.DestAmount) < 0 {
logger.Infof("Not enough inventory for transaction ID: %s", hexutil.Encode(request.TransactionID[:]))
err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.NotEnoughInventory)
if err != nil {
return fmt.Errorf("could not update request status: %w", err)
}
return nil
} |
||
// get ack from API to synchronize calls with other relayers and avoid reverts | ||
resp, err := q.apiClient.GetRelayAck(ctx, hexutil.Encode(request.TransactionID[:])) | ||
if err != nil { | ||
return fmt.Errorf("could not get relay ack: %w", err) | ||
} | ||
if !resp.ShouldRelay { | ||
span.SetAttributes(attribute.Bool("should_relay", false)) | ||
return nil | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure proper error handling and logging for API call to The added code correctly calls the // get ack from API to synchronize calls with other relayers and avoid reverts
resp, err := q.apiClient.GetRelayAck(ctx, hexutil.Encode(request.TransactionID[:]))
if err != nil {
logger.Errorf("Failed to get relay ack for transaction ID: %s, error: %v", hexutil.Encode(request.TransactionID[:]), err)
return fmt.Errorf("could not get relay ack: %w", err)
}
if !resp.ShouldRelay {
span.SetAttributes(attribute.Bool("should_relay", false))
logger.Infof("Relay not required for transaction ID: %s", hexutil.Encode(request.TransactionID[:]))
return nil
} |
||
err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.CommittedPending) | ||
if err != nil { | ||
return fmt.Errorf("could not update request status: %w", err) | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -21,6 +21,7 @@ | |||||||||||||||||
cctpSql "github.com/synapsecns/sanguine/services/cctp-relayer/db/sql" | ||||||||||||||||||
"github.com/synapsecns/sanguine/services/cctp-relayer/relayer" | ||||||||||||||||||
omniClient "github.com/synapsecns/sanguine/services/omnirpc/client" | ||||||||||||||||||
rfqAPIClient "github.com/synapsecns/sanguine/services/rfq/api/client" | ||||||||||||||||||
"github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge" | ||||||||||||||||||
"github.com/synapsecns/sanguine/services/rfq/relayer/inventory" | ||||||||||||||||||
"github.com/synapsecns/sanguine/services/rfq/relayer/pricer" | ||||||||||||||||||
|
@@ -40,6 +41,7 @@ | |||||||||||||||||
client omniClient.RPCClient | ||||||||||||||||||
chainListeners map[int]listener.ContractListener | ||||||||||||||||||
apiServer *relapi.RelayerAPIServer | ||||||||||||||||||
apiClient rfqAPIClient.UnauthenticatedClient | ||||||||||||||||||
inventory inventory.Manager | ||||||||||||||||||
quoter quoter.Quoter | ||||||||||||||||||
submitter submitter.TransactionSubmitter | ||||||||||||||||||
|
@@ -120,6 +122,11 @@ | |||||||||||||||||
return nil, fmt.Errorf("could not get api server: %w", err) | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
apiClient, err := rfqAPIClient.NewUnauthenticatedClient(metricHandler, cfg.GetRfqAPIURL()) | ||||||||||||||||||
dwasse marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
if err != nil { | ||||||||||||||||||
return nil, fmt.Errorf("error creating RFQ API client: %w", err) | ||||||||||||||||||
} | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Improve the error message for consistency in the - return nil, fmt.Errorf("error creating RFQ API client: %w", err)
+ return nil, fmt.Errorf("could not create RFQ API client: %w", err) Committable suggestion
Suggested change
|
||||||||||||||||||
|
||||||||||||||||||
cache := ttlcache.New[common.Hash, bool](ttlcache.WithTTL[common.Hash, bool](time.Second * 30)) | ||||||||||||||||||
rel := Relayer{ | ||||||||||||||||||
db: store, | ||||||||||||||||||
|
@@ -134,6 +141,7 @@ | |||||||||||||||||
signer: sg, | ||||||||||||||||||
chainListeners: chainListeners, | ||||||||||||||||||
apiServer: apiServer, | ||||||||||||||||||
apiClient: apiClient, | ||||||||||||||||||
} | ||||||||||||||||||
return &rel, nil | ||||||||||||||||||
} | ||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improve error handling for clarity in the
GetRelayAck
method.Committable suggestion