Skip to content

Commit

Permalink
Merge branch '2.x' into fix_#7140
Browse files Browse the repository at this point in the history
  • Loading branch information
xingfudeshi authored Mar 7, 2025
2 parents 4ed1620 + 117e57a commit 5b1ed0b
Show file tree
Hide file tree
Showing 16 changed files with 177 additions and 17 deletions.
5 changes: 4 additions & 1 deletion changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7135](https://github.com/apache/incubator-seata/pull/7135)] treating a unique index conflict during rollback as a dirty write
- [[#7150](https://github.com/apache/incubator-seata/pull/7150)] The time difference between the raft node and the follower node cannot synchronize data
- [[#7102](https://github.com/apache/incubator-seata/pull/7150)] bugfix: modify XA mode pre commit transaction from commit phase to before close phase
- [[#7188](https://github.com/apache/incubator-seata/pull/7188)] bugfix: Fix missing branchType in BusinessActionContext

### optimize:

Expand All @@ -47,7 +48,8 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7149](https://github.com/apache/incubator-seata/pull/7149)] Fix abnormal character display issues in ./distribution/NOTICE.md
- [[#7170](https://github.com/apache/incubator-seata/pull/7170)] Optimize seata client I/O processing by adjusting thread count
- [[#7187](https://github.com/apache/incubator-seata/pull/7187)] Add dependency-check-maven plugin to detect potential vulnerabilities

- [[#7179](https://github.com/apache/incubator-seata/pull/7179)] Use shared EventLoop for TM and RM clients to reduce thread overhead and improve performance
- [[#7194](https://github.com/apache/incubator-seata/pull/7194)] automatically skipping proxy for datasource of type AbstractRoutingDataSource

### security:
- [[#6069](https://github.com/apache/incubator-seata/pull/6069)] Upgrade Guava dependencies to fix security vulnerabilities
Expand Down Expand Up @@ -88,5 +90,6 @@ Thanks to these contributors for their code commits. Please report an unintended
- [xingfudeshi](https://github.com/xingfudeshi)
- [YongGoose](https://github.com/YongGoose)
- [Monilnarang](https://github.com/Monilnarang)
- [iAmClever](https://github.com/iAmClever)

Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
6 changes: 5 additions & 1 deletion changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
- [[#7135](https://github.com/apache/incubator-seata/pull/7135)] 回滚时遇到唯一索引冲突视为脏写
- [[#7150](https://github.com/apache/incubator-seata/pull/7150)] raft节点之前时间差,follower节点无法同步数据
- [[#7102](https://github.com/apache/incubator-seata/pull/7150)] 将XA模式预提交事务从提交阶段修改为关闭前阶段
- [[#7188](https://github.com/apache/incubator-seata/pull/7188)] 修复 BusinessActionContext 中缺少的 branchType


### optimize:

Expand All @@ -47,7 +49,8 @@
- [[#7149](https://github.com/apache/incubator-seata/pull/7149)] 修复./distribution/NOTICE.md文件中的异常字符串显示问题
- [[#7170](https://github.com/apache/incubator-seata/pull/7170)] 通过调整线程数优化 Seata 客户端 I/O 处理
- [[#7187](https://github.com/apache/incubator-seata/pull/7187)] 增加dependency-check-maven 插件来检测潜在的漏洞

- [[#7179](https://github.com/apache/incubator-seata/pull/7179)] 使用共享的 EventLoop 来减少 TM 和 RM 客户端的线程开销并提高性能
- [[#7194](https://github.com/apache/incubator-seata/pull/7194)] 自动跳过对AbstractRoutingDataSource类型数据源的代理

### security:
- [[#6069](https://github.com/apache/incubator-seata/pull/6069)] 升级Guava依赖版本,修复安全漏洞
Expand Down Expand Up @@ -88,5 +91,6 @@
- [xingfudeshi](https://github.com/xingfudeshi)
- [YongGoose](https://github.com/YongGoose)
- [Monilnarang](https://github.com/Monilnarang)
- [iAmClever](https://github.com/iAmClever)

同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,11 @@ public interface ConfigurationKeys {
*/
String WORKER_THREAD_SIZE = THREAD_FACTORY_PREFIX + "workerThreadSize";

/**
* The constant ENABLE_SHARED_EVENTLOOP
*/
String ENABLE_CLIENT_SHARED_EVENTLOOP = TRANSPORT_PREFIX + "enableClientSharedEventLoopGroup";

/**
* The constant SHUTDOWN_PREFIX
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ public interface DefaultValues {
*/
@Deprecated
boolean DEFAULT_ENABLE_CLIENT_BATCH_SEND_REQUEST = true;
/**
* The constant DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP.
*/
boolean DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP = false;
/**
* The constant DEFAULT_ENABLE_TM_CLIENT_BATCH_SEND_REQUEST.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,21 @@

/**
* Rpc client.
*
*/
public class NettyClientBootstrap implements RemotingBootstrap {

private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class);
private static final String THREAD_PREFIX_SPLIT_CHAR = "_";

private static EventLoopGroup sharedEventLoopGroupWorker = null;

private final NettyClientConfig nettyClientConfig;
private final Bootstrap bootstrap = new Bootstrap();
private final EventLoopGroup eventLoopGroupWorker;
private EventExecutorGroup defaultEventExecutorGroup;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private static final String THREAD_PREFIX_SPLIT_CHAR = "_";
private final NettyPoolKey.TransactionRole transactionRole;
private final EventLoopGroup eventLoopGroupWorker;

private EventExecutorGroup defaultEventExecutorGroup;
private ChannelHandler[] channelHandlers;

public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup,
Expand All @@ -81,14 +84,15 @@ public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExec
this.nettyClientConfig = nettyClientConfig;
int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();
this.transactionRole = transactionRole;
if (NettyServerConfig.enableEpoll()) {
this.eventLoopGroupWorker = new EpollEventLoopGroup(selectorThreadSizeThreadSize,
new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
selectorThreadSizeThreadSize));

boolean enableClientSharedEventLoop = this.nettyClientConfig.getEnableClientSharedEventLoop();
if (enableClientSharedEventLoop) {
if (sharedEventLoopGroupWorker == null) {
sharedEventLoopGroupWorker = getOrCreateEventLoopGroupWorker(selectorThreadSizeThreadSize);
}
eventLoopGroupWorker = sharedEventLoopGroupWorker;
} else {
this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize,
new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
selectorThreadSizeThreadSize));
eventLoopGroupWorker = createEventLoopGroupWorker(selectorThreadSizeThreadSize);
}
this.defaultEventExecutorGroup = eventExecutorGroup;
}
Expand Down Expand Up @@ -123,7 +127,7 @@ public void start() {
new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
nettyClientConfig.getClientWorkerThreads()));
}
this.bootstrap.group(this.eventLoopGroupWorker).channel(
this.bootstrap.group(eventLoopGroupWorker).channel(
nettyClientConfig.getClientChannelClazz()).option(
ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(
ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(
Expand Down Expand Up @@ -170,7 +174,7 @@ public void initChannel(SocketChannel ch) {
@Override
public void shutdown() {
try {
this.eventLoopGroupWorker.shutdownGracefully();
eventLoopGroupWorker.shutdownGracefully();
if (this.defaultEventExecutorGroup != null) {
this.defaultEventExecutorGroup.shutdownGracefully();
}
Expand Down Expand Up @@ -233,4 +237,23 @@ public void handlerAdded(ChannelHandlerContext ctx) {
private String getThreadPrefix(String threadPrefix) {
return threadPrefix + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name();
}

private EventLoopGroup getOrCreateEventLoopGroupWorker(int selectorThreadSizeThreadSize) {
if (eventLoopGroupWorker == null) {
return createEventLoopGroupWorker(selectorThreadSizeThreadSize);
}
return eventLoopGroupWorker;
}

private EventLoopGroup createEventLoopGroupWorker(int selectorThreadSizeThreadSize) {
if (NettyServerConfig.enableEpoll()) {
return new EpollEventLoopGroup(selectorThreadSizeThreadSize,
new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
selectorThreadSizeThreadSize));
}

return new NioEventLoopGroup(selectorThreadSizeThreadSize,
new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
selectorThreadSizeThreadSize));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
package org.apache.seata.core.rpc.netty;

import io.netty.channel.Channel;
import org.apache.seata.core.constants.ConfigurationKeys;
import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.core.rpc.TransportServerType;

import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_CLIENT_BATCH_SEND_REQUEST;
import static org.apache.seata.common.DefaultValues.DEFAULT_PROTOCOL;
import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_RM_REQUEST_TIMEOUT;
import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TM_REQUEST_TIMEOUT;
import static org.apache.seata.common.DefaultValues.DEFAULT_SELECTOR_THREAD_PREFIX;
import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP;
import static org.apache.seata.common.DefaultValues.DEFAULT_WORKER_THREAD_PREFIX;

/**
Expand Down Expand Up @@ -354,6 +355,10 @@ public int getClientSelectorThreadSize() {
return threadSize > 0 ? threadSize : WorkThreadMode.Default.getValue();
}

public boolean getEnableClientSharedEventLoop() {
return CONFIG.getBoolean(ConfigurationKeys.ENABLE_CLIENT_SHARED_EVENTLOOP, DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP);
}

/**
* Get max acquire conn mills long.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc.netty;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class NettyClientBootstrapTest {

@Mock
private NettyClientConfig nettyClientConfig;
private DefaultEventExecutorGroup eventExecutorGroup;

@BeforeEach
void init() {
eventExecutorGroup = new DefaultEventExecutorGroup(1);
}

@Test
void testSharedEventLoopGroupEnabled() {
when(nettyClientConfig.getEnableClientSharedEventLoop()).thenReturn(true);
NettyClientBootstrap tmNettyClientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, NettyPoolKey.TransactionRole.TMROLE);
EventLoopGroup tmEventLoopGroupWorker = getEventLoopGroupWorker(tmNettyClientBootstrap);

NettyClientBootstrap rmNettyClientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, NettyPoolKey.TransactionRole.RMROLE);
EventLoopGroup rmEventLoopGroupWorker = getEventLoopGroupWorker(rmNettyClientBootstrap);

Assertions.assertEquals(tmEventLoopGroupWorker, rmEventLoopGroupWorker);
}

@Test
void testSharedEventLoopGroupDisabled() {
when(nettyClientConfig.getEnableClientSharedEventLoop()).thenReturn(false);
NettyClientBootstrap tmNettyClientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, NettyPoolKey.TransactionRole.TMROLE);
EventLoopGroup tmEventLoopGroupWorker = getEventLoopGroupWorker(tmNettyClientBootstrap);

NettyClientBootstrap rmNettyClientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, NettyPoolKey.TransactionRole.RMROLE);
EventLoopGroup rmEventLoopGroupWorker = getEventLoopGroupWorker(rmNettyClientBootstrap);

Assertions.assertNotEquals(tmEventLoopGroupWorker, rmEventLoopGroupWorker);
}

private EventLoopGroup getEventLoopGroupWorker(NettyClientBootstrap bootstrap) {
try {
java.lang.reflect.Field field = NettyClientBootstrap.class.getDeclaredField("eventLoopGroupWorker");
field.setAccessible(true);
return (EventLoopGroup) field.get(bootstrap);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
2 changes: 2 additions & 0 deletions script/client/conf/file.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ transport {
rpcTmRequestTimeout = 30000
# the rm client rpc request timeout
rpcRmRequestTimeout = 15000
# the shared event loop group enable
enableClientSharedEventLoopGroup = false
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
Expand Down
1 change: 1 addition & 0 deletions script/client/spring/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ seata.transport.enable-tm-client-batch-send-request=false
seata.transport.enable-rm-client-batch-send-request=true
seata.transport.rpc-rm-request-timeout=15000
seata.transport.rpc-tm-request-timeout=30000
seata.transport.enable-client-shared-event-loop-group=false

seata.config.type=file

Expand Down
1 change: 1 addition & 0 deletions script/client/spring/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ seata:
enable-rm-client-batch-send-request: true
rpc-rm-request-timeout: 15000
rpc-tm-request-timeout: 30000
enable-client-shared-event-loop-group: false
config:
type: file
consul:
Expand Down
1 change: 1 addition & 0 deletions script/config-center/config.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.enableClientSharedEventLoopGroup=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TC_REQUEST_TIMEOUT;
import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TM_REQUEST_TIMEOUT;
import static org.apache.seata.common.DefaultValues.DEFAULT_TRANSPORT_HEARTBEAT;
import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.TRANSPORT_PREFIX;


Expand Down Expand Up @@ -92,6 +93,11 @@ public class TransportProperties {
*/
private long rpcTcRequestTimeout = DEFAULT_RPC_TC_REQUEST_TIMEOUT;

/**
* use shared event loop group
*/
private boolean enableClientSharedEventLoop = DEFAULT_ENABLE_CLIENT_USE_SHARED_EVENT_LOOP;


public String getType() {
return type;
Expand Down Expand Up @@ -193,10 +199,18 @@ public long getRpcTcRequestTimeout() {
return rpcTcRequestTimeout;
}

public boolean isEnableClientSharedEventLoop() {
return enableClientSharedEventLoop;
}

public void setRpcTcRequestTimeout(long rpcTcRequestTimeout) {
this.rpcTcRequestTimeout = rpcTcRequestTimeout;
}

public void setEnableClientSharedEventLoop(boolean useSharedEventLoop) {
this.enableClientSharedEventLoop = useSharedEventLoop;
}

public String getProtocol() {
return protocol;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void testTransportProperties() {
assertEquals("seata", context.getBean(TransportProperties.class).getSerialization());
assertEquals("none", context.getBean(TransportProperties.class).getCompressor());
assertTrue(context.getBean(TransportProperties.class).isEnableClientBatchSendRequest());
assertFalse(context.getBean(TransportProperties.class).isEnableClientSharedEventLoop());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public void testTransportProperties() {
transportProperties.setRpcRmRequestTimeout(1);
transportProperties.setRpcTmRequestTimeout(1);
transportProperties.setRpcTcRequestTimeout(1);
transportProperties.setEnableClientSharedEventLoop(true);

Assertions.assertEquals("server", transportProperties.getServer());
Assertions.assertEquals("type", transportProperties.getType());
Expand All @@ -49,5 +50,6 @@ public void testTransportProperties() {
Assertions.assertEquals(1, transportProperties.getRpcRmRequestTimeout());
Assertions.assertEquals(1, transportProperties.getRpcTmRequestTimeout());
Assertions.assertEquals(1, transportProperties.getRpcTcRequestTimeout());
Assertions.assertTrue(transportProperties.isEnableClientSharedEventLoop());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected boolean shouldSkip(Class<?> beanClass, String beanName) {
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// we only care DataSource bean
if (!(bean instanceof DataSource)) {
if (!(bean instanceof DataSource) || isAbstractRoutingDataSource(bean)) {
return bean;
}

Expand Down Expand Up @@ -108,6 +108,22 @@ protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey)
return originEnhancer;
}

/**
* Checks if the given bean is an instance of AbstractRoutingDataSource.
*
* @param bean the object to check
* @return true if the bean is an instance of AbstractRoutingDataSource, false otherwise
*/
private boolean isAbstractRoutingDataSource(Object bean) {
try {
Class<?> clazz = Class.forName("org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource");
return clazz.isAssignableFrom(bean.getClass());
} catch (ClassNotFoundException e) {
// AbstractRoutingDataSource not found
return false;
}
}

SeataDataSourceProxy buildProxy(DataSource origin, String proxyMode) {
if (BranchType.AT.name().equalsIgnoreCase(proxyMode)) {
return new DataSourceProxy(origin);
Expand Down
Loading

0 comments on commit 5b1ed0b

Please sign in to comment.