Skip to content

Commit

Permalink
扫描库表补偿发货单MQ消息
Browse files Browse the repository at this point in the history
  • Loading branch information
Casflawed committed Sep 5, 2023
1 parent ce41fd1 commit 1548bd5
Show file tree
Hide file tree
Showing 16 changed files with 188 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class LotteryInvoiceListener {

@Autowired
private IDBRouterStrategy dbRouter;
@Autowired
private SendGoodsFactory goodsFactory;

@KafkaListener(topics = "lottery_invoice", groupId = "lottery")
public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Expand All @@ -47,9 +49,9 @@ public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(K
InvoiceVO invoiceVO = JSON.parseObject((String) message.get(), InvoiceVO.class);
dbRouter.doRouter(invoiceVO.getuId());
// 2. 获取发送奖品工厂,执行发奖
ISendGoods distributionGoodsService = SendGoodsFactory.getSendGoods(invoiceVO.getAwardType());
ISendGoods distributionGoodsService = goodsFactory.getSendGoods(invoiceVO.getAwardType());
AwardSenderRes distributionRes = distributionGoodsService.doSend(new GoodsReq(invoiceVO.getuId(), invoiceVO.getOrderId(), invoiceVO.getAwardId(), invoiceVO.getAwardName(), invoiceVO.getAwardContent()));

// TODO: 2023/9/4 如果为false,ack.acknowledge没有执行,那么消费这会走哪个方法呢?或者根本不会走消费者方法
Assert.isTrue(Constants.AwardState.SUCCESS.getCode().equals(distributionRes.getCode()), distributionRes.getInfo());

// 3. 打印日志
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class KafkaProducer {
* 创建topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic lottery_invoice
*
* windows
* 启动zk:D:\environment\kafka_2.12-3.5.1>bin\windows\zookeeper-server-start.bat -daemon config/zookeeper.properties
* 启动zk:D:\environment\kafka_2.12-3.5.1>bin\windows\zookeeper-server-start.bat config/zookeeper.properties
* 启动kafaka:D:\environment\kafka_2.12-3.5.1>bin\windows\kafka-server-start.bat config/server.properties
* 创建topic:bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic lottery_invoice
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
package com.flameking.lottery.application.worker;

import com.alibaba.fastjson.JSON;
import com.flameking.lottery.application.mq.producer.KafkaProducer;
import com.flameking.lottery.common.Constants;
import com.flameking.lottery.common.Result;
import com.flameking.lottery.domain.activity.model.vo.ActivityVO;
import com.flameking.lottery.domain.activity.model.vo.InvoiceVO;
import com.flameking.lottery.domain.activity.service.deploy.IActivityDeploy;
import com.flameking.lottery.domain.activity.service.partake.IActivityPartake;
import com.flameking.lottery.domain.activity.service.workflow.machine.ActivityStateMachine;
import com.flameking.lottery.domain.activity.service.workflow.state.support.ActivityStateSupport;
import com.flameking.middleware.db.router.strategy.IDBRouterStrategy;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.Date;
import java.util.List;
Expand All @@ -28,6 +36,12 @@ public class LotteryXxlJob {
private IActivityDeploy activityDeploy;
@Autowired
private ActivityStateMachine stateHandler;
@Autowired
private IDBRouterStrategy dbRouter;
@Autowired
private IActivityPartake activityPartake;
@Autowired
private KafkaProducer kafkaProducer;

@XxlJob("lotteryActivityStateJobHandler")
public void lotteryActivityStateJobHandler() throws Exception {
Expand Down Expand Up @@ -76,4 +90,71 @@ public void lotteryActivityStateJobHandler() throws Exception {

}

@XxlJob("lotteryOrderMQStateJobHandler")
public void lotteryOrderMQStateJobHandler() throws Exception {
// 验证参数
String jobParam = XxlJobHelper.getJobParam();
if (null == jobParam) {
logger.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 错误 params is null");
return;
}

// 获取分布式任务配置参数信息 参数配置格式:1,2,3 也可以是指定扫描一个,也可以配置多个库,按照部署的任务集群进行数量配置,均摊分别扫描效率更高
String[] params = jobParam.split(",");
logger.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 开始 params:{}", JSON.toJSONString(params));

if (params.length == 0) {
logger.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 结束 params is null");
return;
}

// 获取分库分表配置下的分表数
int tbCount = dbRouter.tbCount();

// 循环获取指定扫描库
for (String param : params) {
// 获取当前任务扫描的指定分库
int dbCount = Integer.parseInt(param);

// 判断配置指定扫描库数,是否存在
if (dbCount > dbRouter.dbCount()) {
logger.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 结束 dbCount not exist");
continue;
}

// 循环扫描对应表
for (int i = 0; i < tbCount; i++) {

// 扫描库表数据
List<InvoiceVO> invoiceVOList = activityPartake.scanInvoiceMqState(dbCount, i);
logger.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 扫描库:{} 扫描表:{} 扫描数:{}", dbCount, i, invoiceVOList.size());

// 补偿 MQ 消息
for (InvoiceVO invoiceVO : invoiceVOList) {

ListenableFuture<SendResult<String, Object>> future = kafkaProducer.sendLotteryInvoice(invoiceVO);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {

@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
// MQ 消息发送完成,更新数据库表 user_strategy_export.mq_state = 1
activityPartake.updateInvoiceMqState(invoiceVO.getuId(), invoiceVO.getOrderId(), Constants.MQState.COMPLETE.getCode());
}

@Override
public void onFailure(Throwable throwable) {
// MQ 消息发送失败,更新数据库表 user_strategy_export.mq_state = 2 【等待定时任务扫码补偿MQ消息】
activityPartake.updateInvoiceMqState(invoiceVO.getuId(), invoiceVO.getOrderId(), Constants.MQState.FAIL.getCode());
}

});
}
}

}

logger.info("扫描用户抽奖奖品发放MQ状态[Table = 2*4] 完成 param:{}", JSON.toJSONString(params));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
import com.flameking.lottery.domain.activity.model.aggregates.PartakeReq;
import com.flameking.lottery.domain.activity.model.vo.ActivityBillVO;
import com.flameking.lottery.domain.activity.model.vo.DrawOrderVO;
import com.flameking.lottery.domain.activity.model.vo.InvoiceVO;
import com.flameking.lottery.domain.activity.model.vo.UserTakeActivityVO;

import java.util.List;

public interface IUserTakeActivityRepository {
boolean takeActivity(PartakeReq partake, ActivityBillVO bill, Long takeId);

Expand All @@ -30,4 +33,11 @@ public interface IUserTakeActivityRepository {
*/
UserTakeActivityVO queryNoConsumedTakeActivityOrder(Long activityId, String uId);

/**
* 扫描发货单 MQ 状态,把超时30分钟未发送 MQ 和发送 MQ 失败的单子扫描出来,做补偿
*
* @return 发货单
*/
List<InvoiceVO> scanInvoiceMqState();

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import com.flameking.lottery.domain.activity.model.aggregates.PartakeReq;
import com.flameking.lottery.domain.activity.model.res.PartakeResult;
import com.flameking.lottery.domain.activity.model.vo.DrawOrderVO;
import com.flameking.lottery.domain.activity.model.vo.InvoiceVO;
import com.flameking.lottery.domain.activity.model.vo.UserTakeActivityVO;

import java.util.List;

public interface IActivityPartake {
/**
* 参与活动
Expand All @@ -25,4 +28,13 @@ public interface IActivityPartake {
* @param mqState MQ 发送状态
*/
void updateInvoiceMqState(String uId, Long orderId, Integer mqState);

/**
* 扫描发货单 MQ 状态,把超时30分钟未发送 MQ 和发送 MQ 失败的单子扫描出来,做补偿
*
* @param dbCount 指定分库
* @param tbCount 指定分表
* @return 发货单
*/
List<InvoiceVO> scanInvoiceMqState(int dbCount, int tbCount);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.flameking.lottery.domain.activity.model.aggregates.PartakeReq;
import com.flameking.lottery.domain.activity.model.vo.ActivityBillVO;
import com.flameking.lottery.domain.activity.model.vo.DrawOrderVO;
import com.flameking.lottery.domain.activity.model.vo.InvoiceVO;
import com.flameking.lottery.domain.activity.model.vo.UserTakeActivityVO;
import com.flameking.lottery.domain.activity.repository.IActivityRepository;
import com.flameking.lottery.domain.activity.repository.IUserTakeActivityCountRepository;
Expand All @@ -18,6 +19,8 @@
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionTemplate;

import java.util.List;

@Slf4j
@Component
public class ActivityPartakeImpl extends BaseActivityPartake {
Expand Down Expand Up @@ -145,4 +148,17 @@ public void updateInvoiceMqState(String uId, Long orderId, Integer mqState) {
userTakeActivityRepository.updateInvoiceMqState(uId, orderId, mqState);
}

@Override
public List<InvoiceVO> scanInvoiceMqState(int dbCount, int tbCount) {
try {
// 设置路由
dbRouter.setDbKey(dbCount);
dbRouter.setTbKey(tbCount);

// 查询数据
return userTakeActivityRepository.scanInvoiceMqState();
} finally {
DataSourceContextHolder.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,40 @@
import com.flameking.lottery.domain.award.goods.impl.DescGoods;
import com.flameking.lottery.domain.award.goods.impl.PhysicalGoods;
import com.flameking.lottery.domain.award.goods.impl.RedeemCodeGoods;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
* 分发商品静态工厂类
*/
@Component
public class SendGoodsFactory {
public final static Map<Integer, ISendGoods> sendGoodsMap = new HashMap<>();

static {
sendGoodsMap.put(Constants.AwardType.DESC.getCode(), new DescGoods());
@Autowired
private DescGoods descGoods;
@Autowired
private RedeemCodeGoods redeemCodeGoods;
@Autowired
private CouponGoods couponGoods;
@Autowired
private PhysicalGoods physicalGoods;

@PostConstruct
public void init() {
sendGoodsMap.put(Constants.AwardType.DESC.getCode(), descGoods);
sendGoodsMap.put(Constants.AwardType.RedeemCodeGoods.getCode(), new RedeemCodeGoods());
sendGoodsMap.put(Constants.AwardType.CouponGoods.getCode(), new CouponGoods());
sendGoodsMap.put(Constants.AwardType.PhysicalGoods.getCode(), new PhysicalGoods());

}

public static ISendGoods getSendGoods(int strategyMode){
public ISendGoods getSendGoods(int strategyMode){
return sendGoodsMap.get(strategyMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.flameking.lottery.domain.award.repository.IAwardRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import javax.annotation.Resource;

Expand All @@ -13,7 +14,7 @@ public class AwardSenderBase {

protected Logger logger = LoggerFactory.getLogger(AwardSenderBase.class);

@Resource
@Autowired
private IAwardRepository awardRepository;

protected void updateUserAwardState(String uId, Long orderId, Long awardId, Integer grantState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public AwardSenderRes doSend(GoodsReq req) {
logger.info("模拟调用优惠券发放接口 uId:{} awardContent:{}", req.getUId(), req.getAwardContent());

// 更新用户领奖结果
super.updateUserAwardState(req.getUId(), req.getOrderId(), req.getAwardId(), Constants.GrantState.COMPLETE.getCode());
this.updateUserAwardState(req.getUId(), req.getOrderId(), req.getAwardId(), Constants.GrantState.COMPLETE.getCode());

return new AwardSenderRes(req.getUId(), Constants.AwardState.SUCCESS.getCode(), Constants.AwardState.SUCCESS.getInfo());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class DescGoods extends AwardSenderBase implements ISendGoods {
@Override
public AwardSenderRes doSend(GoodsReq req) {

super.updateUserAwardState(req.getUId(), req.getOrderId(), req.getAwardId(), Constants.GrantState.COMPLETE.getCode());
this.updateUserAwardState(req.getUId(), req.getOrderId(), req.getAwardId(), Constants.GrantState.COMPLETE.getCode());

return new AwardSenderRes(req.getUId(), Constants.AwardState.SUCCESS.getCode(), Constants.AwardState.SUCCESS.getInfo());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public AwardSenderRes doSend(GoodsReq req) {
logger.info("模拟调用实物发奖 uId:{} awardContent:{}", req.getUId(), req.getAwardContent());

// 更新用户领奖结果
super.updateUserAwardState(req.getUId(), req.getOrderId(), req.getAwardId(), Constants.GrantState.COMPLETE.getCode());
this.updateUserAwardState(req.getUId(), req.getOrderId(), req.getAwardId(), Constants.GrantState.COMPLETE.getCode());

return new AwardSenderRes(req.getUId(), Constants.AwardState.SUCCESS.getCode(), Constants.AwardState.SUCCESS.getInfo());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public AwardSenderRes doSend(GoodsReq req) {
logger.info("模拟调用兑换码 uId:{} awardContent:{}", req.getUId(), req.getAwardContent());

// 更新用户领奖结果
super.updateUserAwardState(req.getUId(), req.getOrderId(), req.getAwardId(), Constants.GrantState.COMPLETE.getCode());
this.updateUserAwardState(req.getUId(), req.getOrderId(), req.getAwardId(), Constants.GrantState.COMPLETE.getCode());

return new AwardSenderRes(req.getUId(), Constants.AwardState.SUCCESS.getCode(), Constants.AwardState.SUCCESS.getInfo());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
public class AwardSenderTemplateImpl implements IAwardSenderTemplate {
@Autowired
private IDrawTemplate drawTemplate;
@Autowired
private SendGoodsFactory goodsFactory;

@Override
public AwardSenderRes sendAward(String uId, Long strategyId, Long orderId) {
Expand All @@ -33,7 +35,7 @@ public AwardSenderRes sendAward(String uId, Long strategyId, Long orderId) {

//根据奖品类型执行不同的发奖(配送)方式
Integer awardType = drawAwardInfo.getAwardType();
ISendGoods sendGoods = SendGoodsFactory.getSendGoods(awardType);
ISendGoods sendGoods = goodsFactory.getSendGoods(awardType);

//orderId 使用用户参与活动时生成
GoodsReq req = new GoodsReq(uId, orderId, drawAwardInfo.getAwardId(), drawAwardInfo.getAwardName(), drawAwardInfo.getAwardContent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.flameking.lottery.domain.activity.model.aggregates.PartakeReq;
import com.flameking.lottery.domain.activity.model.vo.ActivityBillVO;
import com.flameking.lottery.domain.activity.model.vo.DrawOrderVO;
import com.flameking.lottery.domain.activity.model.vo.InvoiceVO;
import com.flameking.lottery.domain.activity.model.vo.UserTakeActivityVO;
import com.flameking.lottery.domain.activity.repository.IUserTakeActivityRepository;
import com.flameking.lottery.infrastructure.entity.UserStrategyExport;
Expand All @@ -15,6 +16,9 @@
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Repository;

import java.util.ArrayList;
import java.util.List;

@Repository
public class UserTakeActivityRepository implements IUserTakeActivityRepository {
@Autowired
Expand Down Expand Up @@ -79,4 +83,12 @@ public UserTakeActivityVO queryNoConsumedTakeActivityOrder(Long activityId, Stri

return userTakeActivityVO;
}

@Override
public List<InvoiceVO> scanInvoiceMqState() {
// 查询发送MQ失败和超时30分钟,未发送MQ的数据
List<UserStrategyExport> userStrategyExportList = userStrategyExportService.scanInvoiceMqState();
// 转换对象
return EntityUtils.toList(userStrategyExportList, InvoiceVO::new);
}
}
Loading

0 comments on commit 1548bd5

Please sign in to comment.