Skip to content

Commit

Permalink
1.更新README.
Browse files Browse the repository at this point in the history
  • Loading branch information
654894017 committed Oct 27, 2021
1 parent ed240de commit 894a898
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 40 deletions.
104 changes: 78 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

#### 介绍

**RMQ**(reliable-message-queue)是**基于可靠消息的最终一致性**的分布式事务解决方案。
**RMQ**(reliable-message-queue)是**基于可靠消息的最终一致性**的分布式事务解决方案。同时基于事务消息半提交原理,结合消息的回查机制,实现类似TCC的事务模型。


- RMQ不同于seata、tcc-transaction、Hmily登录类似框架,需要在相同的协议下比如都是dubbo、spring cloud下才能够使用。RMQ给与用户最灵活的选择,不局限于dubbo、spring cloud,对方接口可以是grpc、thrift、php语言等类似接口。只要业务方接口提供类似Try、Commit、Cancel接口,或Commit、Cancel接口。我们在业务层面通过硬编码的形式实现类型TCC的效果。


## 框架定位
- RMQ本身不生产消息队列,只是消息的搬运工。
- RMQ框架提供消息预发送、消息发送、消息确认、消息恢复、消息管理等功能,结合成熟的消息中间件,解决分布式事务,达到数据最终一致性。
- RMQ基于事务消息半提交原理,结合消息的回查机制,实现类似TCC的事务模型(业务硬编码)。

------------

Expand All @@ -29,52 +33,100 @@

#### 在业务代码中引入RMQ的Dubbo服务
```
import org.apache.dubbo.config.annotation.Reference;
import com.cn.rmq.api.service.IRmqService;
import org.apache.dubbo.config.annotation.DubboReference;
import com.cn.rmq.api.service.IReliableMessageService;
@Reference
private IRmqService rmqService;
@DubboReference
private IReliableMessageService reliableMessageService;
```

#### 编写消息发送方业务方法
结合事务消息实现TCC效果,如果不需要使用MQ传递领域消息到其他业务模块,可以在完成业务后删除事务消息,不需要confirm它。

```
public void doBusiness() {
// 自定义消息队列名称
String queue = "test_queue";
// 消息内容, 如果传输对象,建议转换成json字符串
String messageContent = "......";
// 调用RMQ,预发送消息
String messageId = rmqService.createPreMessage(queue, messageContent);
String messageId = reliableMessageService.createPreMessage(queue, messageContent);
try{
// 执行业务1(业务层面需要做好幂等、悬挂)
// 执行业务2(业务层面需要做好幂等、悬挂)
// 异步调用RMQ,确认发送消息(如果是当做分布式事务框架使用,不需要对外发送消息,则不需要进行消息confirm操作,直接调用deleteMessage删除事务消息即可)
RpcContext.getContext().asyncCall(() -> reliableMessageService.confirmAndSendMessage(queue, messageId));
}catch(Exception e){
// 回滚业务1(业务层面需要做好幂等、悬挂、空回滚问题)
// 回滚业务2(业务层面需要做好幂等、悬挂、空回滚问题)
// 删除预发送消息
RpcContext.getContext().asyncCall(() -> reliableMessageService.deleteMessage(queue, messageId));
}
}
```

// 执行业务
...
...
#### 编写业务回调check方法
当执行doBusiness异常回滚业务时,系统奔溃,消息确认子系统定时发起消息确认

// 异步调用RMQ,确认发送消息
RpcContext.getContext().asyncCall(() -> rmqService.confirmAndSendMessage(messageId));
}
```
@RequestMapping("check")
@ResponseBody
public CheckStatus checkBusStatus(BusReq req) {
//如果业务执行成功
//return new CheckStats(0,1||2)
//如果业务执行失败
//回滚业务(业务层面需要做好幂等、悬挂、空回滚问题)
//return new CheckStats(0,0)
}
CheckStatus 格式
{
"code": 0, // 0 成功 1 失败
"data": 1 // 0 业务处理失败,删除半提交消息 1 业务处理成功,RMQ发送半消息到MQ中间件 2 业务处理成功,RMQ删除半提交消息
}
#### 编写消息消费方业务方法
```
public void handleMsg(RmqMessage msg) {
try {
String messageContent = msg.getMessageBody();

// 执行业务
...
...

// 通知RMQ消息消费成功
// 如果使用的是RMQ的directSendMessage,则无需通知
if (StringUtils.isNotBlank(msg.getMessageId())) {
rmqService.deleteMessageById(msg.getMessageId());
#### 编写消息消费方业务方法(RocketMQ)
```
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xxxxxx_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setConsumeThreadMax(1);
consumer.setConsumeThreadMin(1);
consumer.setMaxReconsumeTimes(1);
consumer.subscribe("xxxxxxx", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt message : msgs) {
String body = new String(message.getBody(), Charset.forName("UTF-8"));
//业务处理
//删除事务消息
reliableMessageService.deleteMessage(message.getTopic(), msg.getMessageId());
log.info("xxxxxx-处理消息成功");
}
} catch (Exception e) {
...
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Throwable e) {
log.info("xxxxx-处理消息失败", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
};
consumer.start();
System.out.printf("xxxxx-consumer started.");
```

------------
6 changes: 3 additions & 3 deletions rmq-admin/src/main/resources/webapp/templates/queue/add.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@
</br>
注:确认消息返回规范</br>
{</br>
&nbsp;&nbsp;"code":"0",</br>
&nbsp;&nbsp;"data":"1" </br>
&nbsp;&nbsp;"code":0,</br>
&nbsp;&nbsp;"data":1 </br>
}
</br>
code: 0 成功 1 失败 </br>
data: 0 失败重试 1 业务处理成功,RMQ发送半消息到MQ中间件 2 业务处理成功,RMQ删除半提交消息 </br>
data: 0 业务处理失败,删除半提交消息 1 业务处理成功,RMQ发送半消息到MQ中间件 2 业务处理成功,RMQ删除半提交消息 </br>
</td>
</tr>
<tr>
Expand Down
6 changes: 3 additions & 3 deletions rmq-admin/src/main/resources/webapp/templates/queue/edit.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@
</br>
注:确认消息返回规范</br>
{</br>
&nbsp;&nbsp;"code":"0",</br>
&nbsp;&nbsp;"data":"1" </br>
&nbsp;&nbsp;"code":0,</br>
&nbsp;&nbsp;"data":1 </br>
}
</br>
code: 0 成功 1 失败 </br>
data: 0 失败重试 1 业务处理成功,RMQ发送半消息到MQ中间件 2 业务处理成功,RMQ删除半提交消息 </br>
data: 0 业务处理失败,删除半提交消息 1 业务处理成功,RMQ发送半消息到MQ中间件 2 业务处理成功,RMQ删除半提交消息 </br>
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
*/
public interface MessageCheckStatusConstant {
/**
* 业务处理失败,需要RMQ重新发起重试
* 业务处理失败,RMQ删除半提交消息
*/
public final static int FAILED = 0;
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ public class CheckTask {

@Scheduled(cron = "0 0/1 * * * ? ")
public void task() {
log.info("【CheckTask】start");
log.info("check task start");

checkMessageService.checkWaitingMessage();

log.info("【CheckTask】end");
log.info("check task end");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ public class DeadTask {
@Scheduled(cron = "0 0/1 * * * ? ")
public void task() {

log.info("【DeadTask】start");
log.info("dead task start");

short maxResendTimes = (short) config.getInterval().size();

int updateCount = messageService.updateMessageDead(maxResendTimes);

log.info("【DeadTask】maxResendTimes={}, updateCount={}", maxResendTimes, updateCount);
log.info("dead task maxResendTimes={}, updateCount={}", maxResendTimes, updateCount);

log.info("【DeadTask】end");
log.info("dead task end");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ public class RecoverTask {

@Scheduled(cron = "0 0/1 * * * ? ")
public void task() {
log.info("【RecoverTask】start");
log.info("recover task start");

recoverMessageService.recoverSendingMessage();

log.info("【RecoverTask】end");
log.info("recover task end");
}
}

0 comments on commit 894a898

Please sign in to comment.