From 0830a60342b30c033c816d25a5518b2123e34ffc Mon Sep 17 00:00:00 2001 From: Nicolas Bigot Date: Sun, 3 Dec 2023 11:39:09 +0100 Subject: [PATCH] feat: handle long pooling for GetRecords request --- stream/streamiterator.go | 53 +++++++++++++++++++++++---------- types/istreamiteratorhandler.go | 11 +++---- 2 files changed, 44 insertions(+), 20 deletions(-) diff --git a/stream/streamiterator.go b/stream/streamiterator.go index 763c006..401d0a8 100644 --- a/stream/streamiterator.go +++ b/stream/streamiterator.go @@ -111,20 +111,23 @@ func (it *StreamIterator) GetRecords(c *fasthttp.RequestCtx, maxRecords uint) (* recordId, record, foundRecord, canContinue, err = it.handler.GetNextRecord() if !foundRecord { - // // TODO: ajouter un mécanisme comme dans aws sqs qui attend x secondes avant de retourner un résultat - // // on ne retourne pas immédiatement une réponse vide, on attend la fin du timeout (max 60 secs par exemple) - // // on peut imaginer utiliser un header http pour spécifier le délai d'attente maximal - // // long pooling - // if maxWaitTimeSeconds > 0 && response.Count == 0 { - // elaspedTime := int(time.Since(startTime).Seconds()) - // if elaspedTime < maxWaitTimeSeconds { - // // has not timed out already - // time.Sleep() - // continue - // } - // } - - // no record found, this is the end of the stream + // No record found, this is the end of the stream. + // Long polling is activated when the 'createIterator' request has + // the 'MaxWaitTimeSeconds' attribute set to a value greater than 0. + // Long polling addresses this issue by allowing the server to hold the response to a client's request + // until new information is available or a timeout occurs. + // Long polling reducing the occurrence of empty responses, + // which occur when there are no messages available for a GetRecords request. + if response.Count == 0 && it.request.MaxWaitTimeSeconds > 0 { + elaspedTime := int(time.Since(startTime).Seconds()) + if elaspedTime < it.request.MaxWaitTimeSeconds { + // request has not reached its timeout yet + time.Sleep(time.Second) + // continue to search again + continue + } + } + err = nil break } @@ -275,6 +278,11 @@ func init() { "type": "string", "minLength": 1, "maxLength": 256 + }, + "maxWaitTimeSeconds": { + "type": "integer", + "minimum": 0, + "maximum": 60 } }, "required": ["iteratorType"], @@ -299,6 +307,11 @@ func init() { "type": "string", "minLength": 1, "maxLength": 256 + }, + "maxWaitTimeSeconds": { + "type": "integer", + "minimum": 0, + "maximum": 60 } }, "required": ["iteratorType", "messageId"], @@ -323,6 +336,11 @@ func init() { "type": "string", "minLength": 1, "maxLength": 256 + }, + "maxWaitTimeSeconds": { + "type": "integer", + "minimum": 0, + "maximum": 60 } }, "required": ["iteratorType", "timestamp"], @@ -368,11 +386,16 @@ Valid iterator request examples: { "iteratorType": "FIRST_MESSAGE" - "Name": "myApp" + "name": "myApp" } { "iteratorType": "FIRST_MESSAGE" "jqFilter": ".[0]" } + +{ + "iteratorType": "FIRST_MESSAGE" + "maxWaitTimeSeconds": "20" +} */ diff --git a/types/istreamiteratorhandler.go b/types/istreamiteratorhandler.go index 8c2a67a..8744f4b 100644 --- a/types/istreamiteratorhandler.go +++ b/types/istreamiteratorhandler.go @@ -3,11 +3,12 @@ package types import "time" type StreamIteratorRequest struct { - IteratorType string `json:"iteratorType" validate:"required,oneof=FIRST_MESSAGE LAST_MESSAGE AFTER_LAST_MESSAGE AT_MESSAGE_ID AFTER_MESSAGE_ID AT_TIMESTAMP"` - MessageId MessageId `json:"messageId"` - Timestamp time.Time `json:"timestamp"` - JqFilter string `json:"jqFilter"` - Name string `json:"name"` + IteratorType string `json:"iteratorType" validate:"required,oneof=FIRST_MESSAGE LAST_MESSAGE AFTER_LAST_MESSAGE AT_MESSAGE_ID AFTER_MESSAGE_ID AT_TIMESTAMP"` + MessageId MessageId `json:"messageId"` + Timestamp time.Time `json:"timestamp"` + JqFilter string `json:"jqFilter"` + MaxWaitTimeSeconds int `json:"maxWaitTimeSeconds"` + Name string `json:"name"` } type IStreamIteratorHandler interface {