Skip to content

Commit

Permalink
♻️ recycle code
Browse files Browse the repository at this point in the history
  • Loading branch information
sanshengshui committed Jul 3, 2019
1 parent 4f97c76 commit 5f562a2
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void processPublish(Channel channel, MqttPublishMessage msg) {

private void sendPublishMessage(String topic, MqttQoS mqttQoS, byte[] messageBytes, boolean retain, boolean dup) {
List<SubscribeStore> subscribeStores = grozaSubscribeStoreService.search(topic);
subscribeStores.forEach(subscribeStore -> {
for (SubscribeStore subscribeStore : subscribeStores) {
if (grozaSessionStoreService.containsKey(subscribeStore.getClientId())) {
// 订阅者收到MQTT消息的QoS级别, 最终取决于发布消息的QoS和主题订阅的QoS
MqttQoS respQoS = mqttQoS.value() > subscribeStore.getMqttQoS() ? MqttQoS.valueOf(subscribeStore.getMqttQoS()) : mqttQoS;
Expand Down Expand Up @@ -140,7 +140,7 @@ private void sendPublishMessage(String topic, MqttQoS mqttQoS, byte[] messageByt
grozaSessionStoreService.get(subscribeStore.getClientId()).getChannel().writeAndFlush(publishMessage);
}
}
});
}
}

private void sendPubAckMessage(Channel channel, int messageId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.sanshengshui.iot.common.message.GrozaMessageIdService;
import com.sanshengshui.iot.common.message.GrozaRetainMessageStoreService;
import com.sanshengshui.iot.common.message.RetainMessageStore;
import com.sanshengshui.iot.common.session.GrozaSessionStoreService;
import com.sanshengshui.iot.common.session.SessionStore;
import com.sanshengshui.iot.common.subscribe.GrozaSubscribeStoreService;
import com.sanshengshui.iot.common.subscribe.SubscribeStore;
import io.netty.buffer.Unpooled;
Expand All @@ -21,6 +23,8 @@
@Slf4j
public class Subscribe {

private GrozaSessionStoreService grozaSessionStoreService;

private GrozaSubscribeStoreService grozaSubscribeStoreService;

private GrozaMessageIdService grozaMessageIdService;
Expand All @@ -40,25 +44,21 @@ public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
if (this.validTopicFilter(topicSubscriptions)) {
String clientId = (String) channel.attr(AttributeKey.valueOf("clientId")).get();
List<Integer> mqttQoSList = new ArrayList<Integer>();
topicSubscriptions.forEach(topicSubscription -> {
String topicFilter = topicSubscription.topicName();
MqttQoS mqttQoS = topicSubscription.qualityOfService();
for(MqttTopicSubscription mqttTopicSubscription : topicSubscriptions){
String topicFilter = mqttTopicSubscription.topicName();
MqttQoS mqttQoS = mqttTopicSubscription.qualityOfService();
MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, false, mqttQoS, false, 0),
MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),
new MqttSubAckPayload(mqttQoSList));
channel.writeAndFlush(subAckMessage);
this.sendRetainMessage(channel, topicFilter, mqttQoS);

SubscribeStore subscribeStore = new SubscribeStore(clientId, topicFilter, mqttQoS.value());
grozaSubscribeStoreService.put(topicFilter, subscribeStore);
mqttQoSList.add(mqttQoS.value());
log.info("SUBSCRIBE - clientId: {}, topFilter: {}, QoS: {}", clientId, topicFilter, mqttQoS.value());
});
MqttSubAckMessage subAckMessage = (MqttSubAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),
new MqttSubAckPayload(mqttQoSList));
channel.writeAndFlush(subAckMessage);
// 发布保留消息
topicSubscriptions.forEach(topicSubscription -> {
String topicFilter = topicSubscription.topicName();
MqttQoS mqttQoS = topicSubscription.qualityOfService();
this.sendRetainMessage(channel, topicFilter, mqttQoS);
});
}
} else {
channel.close();
}
Expand Down

0 comments on commit 5f562a2

Please sign in to comment.