Skip to content

Commit

Permalink
optimize: optimize client batch sending. (apache#4157)
Browse files Browse the repository at this point in the history
  • Loading branch information
objcoding committed Nov 29, 2021
1 parent ea89cec commit 87d773f
Show file tree
Hide file tree
Showing 15 changed files with 111 additions and 12 deletions.
1 change: 1 addition & 0 deletions changes/1.5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [[#4056](https://github.com/seata/seata/pull/4056)] 优化 DurationUtil
- [[#4103](https://github.com/seata/seata/pull/4103)] 减少分支事务注册无需竞争锁时的内存占用
- [[#4144](https://github.com/seata/seata/pull/4144)] 支持默认的事务分组配置
- [[#4157](https://github.com/seata/seata/pull/4157)] 优化客户端批量发送请求
### test:


Expand Down
2 changes: 1 addition & 1 deletion changes/en-us/1.5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
- [[#4056](https://github.com/seata/seata/pull/4056)] optimize the DurationUtil
- [[#4103](https://github.com/seata/seata/pull/4103)] optimize AbstractLockManager#collectRowLocks logic
- [[#4144](https://github.com/seata/seata/pull/4144)] support default configuration of tx-service-group
- [[#4157](https://github.com/seata/seata/pull/4157)] optimize client batch sending.
### test:


Expand Down
3 changes: 3 additions & 0 deletions common/src/main/java/io/seata/common/DefaultValues.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ public interface DefaultValues {

String DEFAULT_SELECTOR_THREAD_PREFIX = "NettyClientSelector";
String DEFAULT_WORKER_THREAD_PREFIX = "NettyClientWorkerThread";
@Deprecated
boolean DEFAULT_ENABLE_CLIENT_BATCH_SEND_REQUEST = true;
boolean DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST = false;
boolean DEFAULT_ENABLE_RM_CLIENT_BATCH_SEND_REQUEST = true;


String DEFAULT_BOSS_THREAD_PREFIX = "NettyBoss";
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/io/seata/core/constants/ConfigurationKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -547,8 +547,19 @@ public interface ConfigurationKeys {
/**
* The constant ENABLE_CLIENT_BATCH_SEND_REQUEST
*/
@Deprecated
String ENABLE_CLIENT_BATCH_SEND_REQUEST = TRANSPORT_PREFIX + "enableClientBatchSendRequest";

/**
* The constant ENABLE_TM_CLIENT_BATCH_SEND_REQUEST
*/
String ENABLE_TM_CLIENT_BATCH_SEND_REQUEST = TRANSPORT_PREFIX + "enableTmClientBatchSendRequest";

/**
* The constant ENABLE_RM_CLIENT_BATCH_SEND_REQUEST
*/
String ENABLE_RM_CLIENT_BATCH_SEND_REQUEST = TRANSPORT_PREFIX + "enableRmClientBatchSendRequest";

/**
* The constant DISABLE_GLOBAL_TRANSACTION.
*/
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/java/io/seata/core/rpc/RemotingClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import io.netty.channel.Channel;
import io.seata.core.protocol.AbstractMessage;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.rpc.netty.NettyClientConfig;
import io.seata.core.rpc.netty.RmNettyRemotingClient;
import io.seata.core.rpc.netty.TmNettyRemotingClient;
import io.seata.core.rpc.processor.RemotingProcessor;

import java.util.concurrent.ExecutorService;
Expand All @@ -35,8 +36,10 @@ public interface RemotingClient {

/**
* client send sync request.
* In this request, if {@link NettyClientConfig#isEnableClientBatchSendRequest} is enabled,
* the message will be sent in batches.
* In this request, if
* {@link RmNettyRemotingClient#isEnableClientBatchSendRequest()}
* {@link TmNettyRemotingClient#isEnableClientBatchSendRequest()}
* is enabled, the message will be sent in batches.
*
* @param msg transaction message {@link io.seata.core.protocol}
* @return server result message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting
/**
* When batch sending is enabled, the message will be stored to basketMap
* Send via asynchronous thread {@link MergedSendRunnable}
* {@link NettyClientConfig#isEnableClientBatchSendRequest}
* {@link this#isEnableClientBatchSendRequest()}
*/
protected final ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap<>();

Expand All @@ -114,7 +114,7 @@ public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
Expand Down Expand Up @@ -144,7 +144,7 @@ public Object sendSyncRequest(Object msg) throws TimeoutException {

// send batch message
// put message into basketMap, @see MergedSendRunnable
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
if (this.isEnableClientBatchSendRequest()) {

// send batch message is sync request, needs to create messageFuture and put it in futures.
MessageFuture messageFuture = new MessageFuture();
Expand Down Expand Up @@ -316,6 +316,13 @@ private String getThreadPrefix() {
*/
protected abstract String getTransactionServiceGroup();

/**
* Whether to enable batch sending of requests, hand over to subclass implementation.
*
* @return true:enable, false:disable
*/
protected abstract boolean isEnableClientBatchSendRequest();

/**
* The type Merged send runnable.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ public String getRmDispatchThreadPrefix() {
return RPC_DISPATCH_THREAD_PREFIX + "_" + NettyPoolKey.TransactionRole.RMROLE.name();
}

@Deprecated
public static boolean isEnableClientBatchSendRequest() {
return ENABLE_CLIENT_BATCH_SEND_REQUEST;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

import io.netty.channel.Channel;
import io.netty.util.concurrent.EventExecutorGroup;
import io.seata.common.DefaultValues;
import io.seata.common.exception.FrameworkErrorCode;
import io.seata.common.exception.FrameworkException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.StringUtils;
import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys;
import io.seata.core.model.Resource;
import io.seata.core.model.ResourceManager;
import io.seata.core.protocol.AbstractMessage;
Expand Down Expand Up @@ -267,6 +270,20 @@ protected String getTransactionServiceGroup() {
return transactionServiceGroup;
}

@Override
public boolean isEnableClientBatchSendRequest() {
// New configuration takes precedence
String newConfig = ConfigurationFactory.getInstance().getConfig(ConfigurationKeys.ENABLE_RM_CLIENT_BATCH_SEND_REQUEST);
if (StringUtils.isNotBlank(newConfig)) {
return Boolean.parseBoolean(newConfig);
}
// Compatible with old configuration
// If the old configuration exists, use the old configuration
// RM client Turns on batch sending by default
return ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_CLIENT_BATCH_SEND_REQUEST,
DefaultValues.DEFAULT_ENABLE_RM_CLIENT_BATCH_SEND_REQUEST);
}

private void registerProcessor() {
// 1.registry rm client handle branch commit processor
RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@

import io.netty.channel.Channel;
import io.netty.util.concurrent.EventExecutorGroup;
import io.seata.common.DefaultValues;
import io.seata.common.exception.FrameworkException;
import io.seata.common.loader.EnhancedServiceLoader;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.thread.RejectedPolicies;
import io.seata.common.util.NetUtil;
import io.seata.config.ConfigurationFactory;
import io.seata.core.auth.AuthSigner;
import io.seata.core.constants.ConfigurationKeys;
import io.seata.core.protocol.AbstractMessage;
import io.seata.core.protocol.MessageType;
import io.seata.core.protocol.RegisterTMRequest;
Expand Down Expand Up @@ -186,6 +189,12 @@ public String getTransactionServiceGroup() {
return transactionServiceGroup;
}

@Override
public boolean isEnableClientBatchSendRequest() {
return ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.ENABLE_TM_CLIENT_BATCH_SEND_REQUEST,
DefaultValues.DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST);
}

@Override
public void onRegisterMsgSuccess(String serverAddress, Channel channel, Object response,
AbstractMessage requestMessage) {
Expand Down
6 changes: 4 additions & 2 deletions script/client/conf/file.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ transport {
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
# the tm client batch send request enable
enableTmClientBatchSendRequest = false
# the rm client batch send request enable
enableRmClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
Expand Down
3 changes: 2 additions & 1 deletion script/client/spring/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ seata.transport.server=NIO
seata.transport.heartbeat=true
seata.transport.serialization=seata
seata.transport.compressor=none
seata.transport.enable-client-batch-send-request=true
seata.transport.enable-tm-client-batch-send-request=false
seata.transport.enable-rm-client-batch-send-request=false

seata.config.type=file

Expand Down
3 changes: 2 additions & 1 deletion script/client/spring/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ seata:
heartbeat: true
serialization: seata
compressor: none
enable-client-batch-send-request: true
enable-tm-client-batch-send-request: false
enable-rm-client-batch-send-request: true
config:
type: file
consul:
Expand Down
3 changes: 2 additions & 1 deletion script/config-center/config.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.springframework.stereotype.Component;

import static io.seata.common.DefaultValues.DEFAULT_ENABLE_CLIENT_BATCH_SEND_REQUEST;
import static io.seata.common.DefaultValues.DEFAULT_ENABLE_RM_CLIENT_BATCH_SEND_REQUEST;
import static io.seata.common.DefaultValues.DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST;
import static io.seata.common.DefaultValues.DEFAULT_TRANSPORT_HEARTBEAT;
import static io.seata.spring.boot.autoconfigure.StarterConstants.TRANSPORT_PREFIX;

Expand Down Expand Up @@ -54,6 +56,16 @@ public class TransportProperties {
*/
private boolean enableClientBatchSendRequest = DEFAULT_ENABLE_CLIENT_BATCH_SEND_REQUEST;

/**
* enable TM client batch send request
*/
private boolean enableTmClientBatchSendRequest = DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST;

/**
* enable RM client batch send request
*/
private boolean enableRmClientBatchSendRequest = DEFAULT_ENABLE_RM_CLIENT_BATCH_SEND_REQUEST;

public String getType() {
return type;
}
Expand Down Expand Up @@ -107,4 +119,22 @@ public TransportProperties setEnableClientBatchSendRequest(boolean enableClientB
this.enableClientBatchSendRequest = enableClientBatchSendRequest;
return this;
}

public boolean isEnableTmClientBatchSendRequest() {
return enableTmClientBatchSendRequest;
}

public TransportProperties setEnableTmClientBatchSendRequest(boolean enableTmClientBatchSendRequest) {
this.enableTmClientBatchSendRequest = enableTmClientBatchSendRequest;
return this;
}

public boolean isEnableRmClientBatchSendRequest() {
return enableRmClientBatchSendRequest;
}

public TransportProperties setEnableRmClientBatchSendRequest(boolean enableRmClientBatchSendRequest) {
this.enableRmClientBatchSendRequest = enableRmClientBatchSendRequest;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@
"sourceType": "io.seata.spring.boot.autoconfigure.properties.TransportProperties",
"defaultValue": true
},
{
"name": "seata.transport.enable-tm-client-batch-send-request",
"type": "java.lang.Boolean",
"sourceType": "io.seata.spring.boot.autoconfigure.properties.TransportProperties",
"defaultValue": false
},
{
"name": "seata.transport.enable-rm-client-batch-send-request",
"type": "java.lang.Boolean",
"sourceType": "io.seata.spring.boot.autoconfigure.properties.TransportProperties",
"defaultValue": true
},
{
"name": "seata.transport.shutdown.wait",
"type": "java.lang.Integer",
Expand Down

0 comments on commit 87d773f

Please sign in to comment.