Skip to content

Commit

Permalink
feat: add 'prefetch' test cases.
Browse files Browse the repository at this point in the history
  • Loading branch information
sprainkle committed Jun 22, 2019
1 parent 599772f commit de010ea
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ spring:
expires: 10000000 # 当queue空闲多长时间后会被删除。
bindingRoutingKey: '#' # 将queue绑定到exchange时使用的routing key。默认'#'
queueNameGroupOnly: false # 默认为false。当为true时,从queue名称与属性group的值相等的队列消费消息,如果不是则为destination.group。该属性还可以用在从已存在的queue消费消息。
prefetch: 1 # 限制consumer在消费消息时,一次能同时获取的消息数量,默认:1。这个需要多注意, 因为会破坏队列的顺序性。不过一般都设置1。
prefetch: 1 # 限制consumer在消费消息时,一次能同时获取的消息数量,默认:1。
ttl: 100000 # 默认不做限制,即无限。消息在队列中最大的存活时间。当消息滞留超过ttl时,会被当成消费失败消息,即会被转发到死信队列或丢弃
txSize: 1 # 感觉像是批量确认的意思. 原文: The number of deliveries between acks.
declareExchange: true # 是否声明目标exchange。默认true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

spring:
cloud:
stream:
bindings:
packetUplinkOutput:
destination: packetUplinkTopic
content-type: application/json
binder: rabbit

packetUplinkInput:
destination: packetUplinkTopic
content-type: application/json
group: ${spring.application.name}
binder: rabbit
consumer:
concurrency: 1 # 初始/最少/空闲时 消费者数量。默认1

rabbit:
bindings:
packetUplinkInput:
consumer:
prefetch: 5 # 限制consumer在消费消息时,一次能同时获取的消息数量,默认:1。
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.sprainkle;

import com.sprainkle.event.model.PacketModel;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;

/**
* <pre>
*
* </pre>
*
* @author sprainkle
* @date 2019/6/2
*/
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableBinding({ScasPrefetchTest.MessageSink.class, ScasPrefetchTest.MessageSource.class})
@ActiveProfiles("prefetch")
public class ScasPrefetchTest {

@Autowired
private PacketUplinkProducer packetUplinkProducer;

private Random random = new Random();
private List<String> devEuis = new ArrayList<>(10);

@PostConstruct
private void initDevEuis() {
devEuis.add("10001");
devEuis.add("10002");
devEuis.add("10003");
devEuis.add("10004");
devEuis.add("10005");
devEuis.add("10006");
devEuis.add("10007");
devEuis.add("10008");
devEuis.add("10009");
devEuis.add("10010");
}

@Test
public void test() throws InterruptedException {
for (int i = 0; i < 500000; i++) {
String devEui = getDevEuis();
packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
}

Thread.sleep(1000000);
}

private String getDevEuis() {
return devEuis.get(random.nextInt(10));
}

@Component
public static class PacketUplinkProducer {

@Autowired
private MessageSource messageSource;

public void publish(PacketModel model) {
log.info("发布上行数据包消息. model: [{}].", model);
messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
}

}

@Component
public static class PacketUplinkHandler {

@StreamListener("packetUplinkInput")
public void handle(PacketModel model) throws InterruptedException {
log.info("消费上行数据包消息. model: [{}].", model);
}

}

public interface MessageSink {

@Input("packetUplinkInput")
SubscribableChannel packetUplinkInput();

}

public interface MessageSource {

@Output("packetUplinkOutput")
MessageChannel packetUplinkOutput();

}

}

0 comments on commit de010ea

Please sign in to comment.