Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages are not further received after 2046 unaccepted messages #425

Open
radoslawlandowski opened this issue Sep 2, 2024 · 3 comments

Comments

@radoslawlandowski
Copy link

Hello,

(First of all - not sure if i'm misusing rhea library or my broker setup is incorrect)

One ActiveMQ broker, one consumer, durable subscription.
We have large groups of messages, up to 10000 messages. Each message is around 1000 bytes long.
We accept all messages at once only when the last message in the group has arrived.

I setup the broker locally and when sending 10000 messages in the same group after 2047 messages no meessage is received (or the broker stops sending them)
I've been trying to play around with broker settings but none of them change the situation so I'd like to understand if that's rhea library issue

We'd like to receive 10000 messages within a single group.

  • Is there any limitation in rhea which causes this behaviour? Or is it broker specific?
  • Would you consider that a good practice to receive 10000 messages and accept all of them at once?

Receiver setup code:

        this.connection!.open_receiver({
            name: listenerName,
            autoaccept: false,
            source: { address: address, durable: 2, expiry_policy: 'never' },
        })

Broker statement at message sending freeze

DEBUG | queue://queue, subscriptions=1, memory=0%, size=10001, pending=0 toPageIn: 200, force:false, Inflight: 2400, pagedInMessages.size 2400, pagedInPendingDispatch.size 0, enqueueCount: 0, dequeueCount: 0, memUsage:4725600, maxPageSize:200

The last rhea events around the freeze

rhea:frames [connection-2]:0 <- transfer#14 {"delivery_id":2046,"delivery_tag":{"type":"Buffer","data":[7,254]}} <Buffer 00 53 70 d0 00 00 00 05 00 00 00 01 41 00 53 73 d0 00 00 00 4c 00 00 00 03 a1 24 61 32 61 36 30 66 66 37 2d 62 33 39 64 2d 34 31 34 34 2d 62 37 66 32 ... 811 more bytes> +33ms
rhea:message decoding section: Typed { type: TypeDesc { name: 'List32', typecode: 208, width: 4, category: 3, create: [Function (anonymous)] { typecode: 208 } }, value: [ Typed { type: [TypeDesc], value: true } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 112 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 112 } +30ms
rhea:message decoding section: Typed { type: TypeDesc { name: 'List32', typecode: 208, width: 4, category: 3, create: [Function (anonymous)] { typecode: 208 } }, value: [ Typed { type: [TypeDesc], value: 'a2a60ff7-b39d-4144-b7f2-0571a149c7b5' }, Typed { type: [TypeDesc], value: null }, Typed { type: [TypeDesc], value: 'queue' } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 115 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 115 } +1ms
rhea:message decoding section: Typed { type: TypeDesc { name: 'Map32', typecode: 209, width: 4, category: 3, create: [Function (anonymous)] { typecode: 209 } }, value: [ Typed { type: [TypeDesc], value: 'identifier' }, Typed { type: [TypeDesc], value: 'abc' } ], descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 116 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 116 } +1ms
rhea:message decoding section: Typed { type: TypeDesc { name: 'Str32', typecode: 177, width: 4, category: 2, encoding: 'utf8', create: [Function (anonymous)] { typecode: 177 } }, value: '{"mail":"123@123.it","code":"0000002047"}', descriptor: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function] }, value: 119 } } of type: Typed { type: TypeDesc { name: 'SmallUlong', typecode: 83, width: 1, category: 1, read: [Function: read], write: [Function: write], create: [Function (anonymous)] { typecode: 83 } }, value: 119 } +2ms
rhea:events [connection-2] Link got event: message +58ms
rhea:events [connection-2] Session got event: message +1ms
rhea:events [connection-2] Connection got event: message +0ms
rhea:frames [connection-1]:0 -> empty +3s
rhea:raw [connection-1] SENT: 8 0000000802000000 +3s
rhea:frames [connection-2]:0 -> empty +2s
rhea:raw [connection-2] SENT: 8 0000000802000000 +2s
rhea:frames [connection-1]:0 -> empty +5s
rhea:raw [connection-1] SENT: 8 0000000802000000 +5s
rhea:frames [connection-2]:0 -> empty +2s
rhea:raw [connection-2] SENT: 8 0000000802000000 +2s
rhea:frames [connection-1]:0 -> empty +5s
rhea:raw [connection-1] SENT: 8 0000000802000000 +5s
rhea:frames [connection-2]:0 -> empty +2s
rhea:raw [connection-2] SENT: 8 0000000802000000 +2s

Best regards, thank you for that great lib!

@grs
Copy link
Member

grs commented Sep 2, 2024

That is broker specific behaviour, what ActiveMQ terms 'preftech': https://activemq.apache.org/components/classic/documentation/what-is-the-prefetch-limit-for

Also relevant may be https://activemq.apache.org/components/classic/documentation/amqp but you may need to ask on the ActiveMQ forum for details on how to control the prefetch per client over AMQP.

@radoslawlandowski
Copy link
Author

Thank you for your help!

I've been playing around with the configuration mentioned by you and also some AWS advisor but broker side configuration does not solve the problem for me :( . He noticed that apart from broker side configuration one can modify the client-side connection url, in the following way:

"ssl://b-995b1978-0142-4749-9f59-92e9e0623b46-1.mq.eu-west-1.amazonaws.com:61617?jms.prefetchPolicy.all=5000"

But I understand in rhea there is no notion of such parameter jms.prefetchPolicy.all ?

@grs
Copy link
Member

grs commented Sep 18, 2024

Rhea does not have a parameter jms.prefetchPolicy.all because it is a library for the AMQP protocol. However I suspect that there is a way to represent that over the AMQP protocol and if you can determine that (my advice would be to ask the ActiveMQ user group at apache), then rhea should be able to comply with that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants