Skip to content

Commit

Permalink
feat: handle long pooling for GetRecords request
Browse files Browse the repository at this point in the history
  • Loading branch information
nbigot committed Dec 3, 2023
1 parent c2db026 commit 0830a60
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 20 deletions.
53 changes: 38 additions & 15 deletions stream/streamiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,23 @@ func (it *StreamIterator) GetRecords(c *fasthttp.RequestCtx, maxRecords uint) (*
recordId, record, foundRecord, canContinue, err = it.handler.GetNextRecord()

if !foundRecord {
// // TODO: ajouter un mécanisme comme dans aws sqs qui attend x secondes avant de retourner un résultat
// // on ne retourne pas immédiatement une réponse vide, on attend la fin du timeout (max 60 secs par exemple)
// // on peut imaginer utiliser un header http pour spécifier le délai d'attente maximal
// // long pooling
// if maxWaitTimeSeconds > 0 && response.Count == 0 {
// elaspedTime := int(time.Since(startTime).Seconds())
// if elaspedTime < maxWaitTimeSeconds {
// // has not timed out already
// time.Sleep()
// continue
// }
// }

// no record found, this is the end of the stream
// No record found, this is the end of the stream.
// Long polling is activated when the 'createIterator' request has
// the 'MaxWaitTimeSeconds' attribute set to a value greater than 0.
// Long polling addresses this issue by allowing the server to hold the response to a client's request
// until new information is available or a timeout occurs.
// Long polling reducing the occurrence of empty responses,
// which occur when there are no messages available for a GetRecords request.
if response.Count == 0 && it.request.MaxWaitTimeSeconds > 0 {
elaspedTime := int(time.Since(startTime).Seconds())
if elaspedTime < it.request.MaxWaitTimeSeconds {
// request has not reached its timeout yet
time.Sleep(time.Second)
// continue to search again
continue
}
}

err = nil
break
}
Expand Down Expand Up @@ -275,6 +278,11 @@ func init() {
"type": "string",
"minLength": 1,
"maxLength": 256
},
"maxWaitTimeSeconds": {
"type": "integer",
"minimum": 0,
"maximum": 60
}
},
"required": ["iteratorType"],
Expand All @@ -299,6 +307,11 @@ func init() {
"type": "string",
"minLength": 1,
"maxLength": 256
},
"maxWaitTimeSeconds": {
"type": "integer",
"minimum": 0,
"maximum": 60
}
},
"required": ["iteratorType", "messageId"],
Expand All @@ -323,6 +336,11 @@ func init() {
"type": "string",
"minLength": 1,
"maxLength": 256
},
"maxWaitTimeSeconds": {
"type": "integer",
"minimum": 0,
"maximum": 60
}
},
"required": ["iteratorType", "timestamp"],
Expand Down Expand Up @@ -368,11 +386,16 @@ Valid iterator request examples:
{
"iteratorType": "FIRST_MESSAGE"
"Name": "myApp"
"name": "myApp"
}
{
"iteratorType": "FIRST_MESSAGE"
"jqFilter": ".[0]"
}
{
"iteratorType": "FIRST_MESSAGE"
"maxWaitTimeSeconds": "20"
}
*/
11 changes: 6 additions & 5 deletions types/istreamiteratorhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package types
import "time"

type StreamIteratorRequest struct {
IteratorType string `json:"iteratorType" validate:"required,oneof=FIRST_MESSAGE LAST_MESSAGE AFTER_LAST_MESSAGE AT_MESSAGE_ID AFTER_MESSAGE_ID AT_TIMESTAMP"`
MessageId MessageId `json:"messageId"`
Timestamp time.Time `json:"timestamp"`
JqFilter string `json:"jqFilter"`
Name string `json:"name"`
IteratorType string `json:"iteratorType" validate:"required,oneof=FIRST_MESSAGE LAST_MESSAGE AFTER_LAST_MESSAGE AT_MESSAGE_ID AFTER_MESSAGE_ID AT_TIMESTAMP"`
MessageId MessageId `json:"messageId"`
Timestamp time.Time `json:"timestamp"`
JqFilter string `json:"jqFilter"`
MaxWaitTimeSeconds int `json:"maxWaitTimeSeconds"`
Name string `json:"name"`
}

type IStreamIteratorHandler interface {
Expand Down

0 comments on commit 0830a60

Please sign in to comment.