Skip to content

Commit

Permalink
Feat: add ackMux
Browse files Browse the repository at this point in the history
  • Loading branch information
dwasse committed May 15, 2024
1 parent e3b690f commit 3984a35
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions services/rfq/api/rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"net/http"
"sync"
"time"

"github.com/ipfs/go-log"
Expand Down Expand Up @@ -41,6 +42,8 @@ type QuoterAPIServer struct {
// relayAckCache contains a set of transactionID values that reflect
// transactions that have been acked for relay
relayAckCache *ttlcache.Cache[string, bool]
// ackMux is a mutex used to ensure that only one transaction id can be acked at a time.
ackMux sync.Mutex
}

// NewAPI holds the configuration, database connection, gin engine, RPC client, metrics handler, and fast bridge contracts.
Expand Down Expand Up @@ -85,29 +88,32 @@ func NewAPI(
)
roleCache := roles[chainID]
go roleCache.Start()

// create the relay ack cache
relayAckCache := ttlcache.New[string, bool](
ttlcache.WithTTL[string, bool](cfg.GetRelayAckTimeout()),
ttlcache.WithDisableTouchOnHit[string, bool](),
)
go relayAckCache.Start()

go func() {
<-ctx.Done()
roleCache.Stop()
relayAckCache.Stop()
}()

}

// create the relay ack cache
relayAckCache := ttlcache.New[string, bool](
ttlcache.WithTTL[string, bool](cfg.GetRelayAckTimeout()),
ttlcache.WithDisableTouchOnHit[string, bool](),
)
go relayAckCache.Start()
go func() {
<-ctx.Done()
relayAckCache.Stop()
}()

return &QuoterAPIServer{
cfg: cfg,
db: store,
omnirpcClient: omniRPCClient,
handler: handler,
fastBridgeContracts: bridges,
roleCache: roles,
relayAckCache: relayAckCache,
ackMux: sync.Mutex{},
}, nil
}

Expand Down Expand Up @@ -214,11 +220,13 @@ func (r *QuoterAPIServer) GetRelayAck(c *gin.Context) {

// If the tx id is already in the cache, it should not be relayed.
// Otherwise, insert into the cache.
r.ackMux.Lock()
ack := r.relayAckCache.Get(transactionID)
shouldRelay := ack == nil
if shouldRelay {
r.relayAckCache.Set(transactionID, true, ttlcache.DefaultTTL)
}
r.ackMux.Unlock()

resp := relapi.GetRelayAckResponse{
TxID: transactionID,
Expand Down

0 comments on commit 3984a35

Please sign in to comment.