Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]feature:[loom] replace the usages of synchronized with ReentrantLock at saga module #7174

Open
wants to merge 10 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7114](https://github.com/apache/incubator-seata/pull/7114)] support raft mode registry to namingserver
- [[#7133](https://github.com/apache/incubator-seata/pull/7133)] Implement scheduled handling for end status transaction
- [[#7171](https://github.com/apache/incubator-seata/pull/7171)] support EpollEventLoopGroup in client
- [[#7174](https://github.com/apache/incubator-seata/pull/7174)] replace the usages of synchronized with ReentrantLock at saga module
- [[#7182](https://github.com/apache/incubator-seata/pull/7182)] use the ip of the peerId as the host of the raft node
- [[#7181](https://github.com/apache/incubator-seata/pull/7181)] raft implements domain name resolution and selects peerId

Expand Down Expand Up @@ -89,5 +90,6 @@ Thanks to these contributors for their code commits. Please report an unintended
- [xingfudeshi](https://github.com/xingfudeshi)
- [YongGoose](https://github.com/YongGoose)
- [Monilnarang](https://github.com/Monilnarang)
- [lightClouds917](https://github.com/lightClouds917)

Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
2 changes: 2 additions & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- [[#7114](https://github.com/apache/incubator-seata/pull/7114)] 支持raft集群注册至namingserver
- [[#7133](https://github.com/apache/incubator-seata/pull/7133)] 实现对残留的end状态事务定时处理
- [[#7171](https://github.com/apache/incubator-seata/pull/7171)] 客户端支持 EpollEventLoopGroup
- [[#7174](https://github.com/apache/incubator-seata/pull/7174)] saga模块synchronized替换为ReentrantLock
- [[#7182](https://github.com/apache/incubator-seata/pull/7182)] 采用peerId的ip作为raft节点的host
- [[#7181](https://github.com/apache/incubator-seata/pull/7181)] raft实现域名解析并选择peerId

Expand Down Expand Up @@ -90,5 +91,6 @@
- [xingfudeshi](https://github.com/xingfudeshi)
- [YongGoose](https://github.com/YongGoose)
- [Monilnarang](https://github.com/Monilnarang)
- [lightClouds917](https://github.com/lightClouds917)

同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,5 @@ public interface ProcessContext extends org.apache.seata.saga.proctrl.ProcessCon
* @return the get instruction
*/
<T extends Instruction> T getInstruction(Class<T> clazz);

}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ public void setParent(ProcessContext parent) {
public String toString() {
return actual.toString();
}

public static ProcessContextImpl wrap(org.apache.seata.saga.proctrl.HierarchicalProcessContext target) {
return new ProcessContextImpl(target);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;

import org.apache.seata.common.exception.FrameworkErrorCode;
import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.saga.engine.StateMachineConfig;
import org.apache.seata.saga.engine.exception.EngineExecutionException;
Expand All @@ -42,6 +43,8 @@
*/
public class ChoiceStateHandler implements StateHandler {

private final ResourceLock choiceStateLock = new ResourceLock();

@Override
public void process(ProcessContext context) throws EngineExecutionException {

Expand All @@ -50,7 +53,7 @@ public void process(ProcessContext context) throws EngineExecutionException {

Map<Object, String> choiceEvaluators = choiceState.getChoiceEvaluators();
if (choiceEvaluators == null) {
synchronized (choiceState) {
try (ResourceLock ignored = choiceStateLock.obtain()) {
choiceEvaluators = choiceState.getChoiceEvaluators();
if (choiceEvaluators == null) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.seata.common.exception.FrameworkErrorCode;
import org.apache.seata.common.loader.LoadLevel;
import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.saga.engine.StateMachineConfig;
Expand Down Expand Up @@ -312,7 +313,7 @@ private void decideExecutionStatus(ProcessContext context, StateInstance stateIn

Map<Object, String> statusEvaluators = state.getStatusEvaluators();
if (statusEvaluators == null) {
synchronized (state) {
try (ResourceLock ignored = state.getResourceLock().obtain()) {
statusEvaluators = state.getStatusEvaluators();
if (statusEvaluators == null) {
statusEvaluators = new LinkedHashMap<>(statusMatchList.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ConcurrentHashMap;

import org.apache.seata.common.exception.FrameworkErrorCode;
import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.saga.engine.exception.EngineExecutionException;
Expand Down Expand Up @@ -60,19 +61,22 @@ public class CompensationHolder {
*/
private Stack<StateInstance> stateStackNeedCompensation = new Stack<>();

private static final ConcurrentHashMap<ProcessContext,ResourceLock> LOCK_MAP = new ConcurrentHashMap<>();

public static CompensationHolder getCurrent(ProcessContext context, boolean forceCreate) {

CompensationHolder compensationholder = (CompensationHolder)context.getVariable(
DomainConstants.VAR_NAME_CURRENT_COMPENSATION_HOLDER);
if (compensationholder == null && forceCreate) {
synchronized (context) {

try (ResourceLock ignored = LOCK_MAP.computeIfAbsent(context, k -> new ResourceLock()).obtain()) {
compensationholder = (CompensationHolder)context.getVariable(
DomainConstants.VAR_NAME_CURRENT_COMPENSATION_HOLDER);
if (compensationholder == null) {
compensationholder = new CompensationHolder();
context.setVariable(DomainConstants.VAR_NAME_CURRENT_COMPENSATION_HOLDER, compensationholder);
}
} finally {
LOCK_MAP.remove(context);
}
}
return compensationholder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import java.util.Collection;
import java.util.Stack;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.saga.proctrl.ProcessContext;
import org.apache.seata.saga.statelang.domain.DomainConstants;

Expand All @@ -37,19 +39,23 @@ public class LoopContextHolder {
private final Stack<Integer> loopCounterStack = new Stack<>();
private final Stack<Integer> forwardCounterStack = new Stack<>();
private Collection collection;
private final ResourceLock lock = new ResourceLock();
private static final ConcurrentHashMap<ProcessContext,ResourceLock> LOCK_MAP = new ConcurrentHashMap<>();

public static LoopContextHolder getCurrent(ProcessContext context, boolean forceCreate) {
LoopContextHolder loopContextHolder = (LoopContextHolder)context.getVariable(
DomainConstants.VAR_NAME_CURRENT_LOOP_CONTEXT_HOLDER);

if (null == loopContextHolder && forceCreate) {
synchronized (context) {
try (ResourceLock ignored = LOCK_MAP.computeIfAbsent(context, k -> new ResourceLock()).obtain()) {
loopContextHolder = (LoopContextHolder)context.getVariable(
DomainConstants.VAR_NAME_CURRENT_LOOP_CONTEXT_HOLDER);
if (null == loopContextHolder) {
loopContextHolder = new LoopContextHolder();
context.setVariable(DomainConstants.VAR_NAME_CURRENT_LOOP_CONTEXT_HOLDER, loopContextHolder);
}
} finally {
LOCK_MAP.remove(context);
}
}
return loopContextHolder;
Expand Down Expand Up @@ -102,4 +108,8 @@ public Collection getCollection() {
public void setCollection(Collection collection) {
this.collection = collection;
}

public ResourceLock getLock() {
return lock;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.stream.Collectors;

import org.apache.seata.common.exception.FrameworkErrorCode;
import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.NumberUtils;
import org.apache.seata.common.util.StringUtils;
Expand Down Expand Up @@ -225,7 +226,7 @@ public static boolean isCompletionConditionSatisfied(ProcessContext context) {
int nrOfCompletedInstances = currentLoopContext.getNrOfCompletedInstances().get();

if (!currentLoopContext.isCompletionConditionSatisfied()) {
synchronized (currentLoopContext) {
try (ResourceLock ignored = currentLoopContext.getLock().obtain()) {
if (!currentLoopContext.isCompletionConditionSatisfied()) {
Map<String, Object> stateMachineContext = (Map<String, Object>)context.getVariable(
DomainConstants.VAR_NAME_STATEMACHINE_CONTEXT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.saga.engine.repo.StateMachineRepository;
Expand Down Expand Up @@ -57,7 +58,7 @@ public StateMachine getStateMachineById(String stateMachineId) {
Item item = CollectionUtils.computeIfAbsent(stateMachineMapById, stateMachineId,
key -> new Item());
if (item.getValue() == null && stateLangStore != null) {
synchronized (item) {
try (ResourceLock ignored = item.getLock().obtain()) {
if (item.getValue() == null) {
StateMachine stateMachine = stateLangStore.getStateMachineById(stateMachineId);
if (stateMachine != null) {
Expand Down Expand Up @@ -85,7 +86,7 @@ public StateMachine getStateMachine(String stateMachineName, String tenantId) {
Item item = CollectionUtils.computeIfAbsent(stateMachineMapByNameAndTenant, stateMachineName + "_" + tenantId,
key -> new Item());
if (item.getValue() == null && stateLangStore != null) {
synchronized (item) {
try (ResourceLock ignored = item.getLock().obtain()) {
if (item.getValue() == null) {
StateMachine stateMachine = stateLangStore.getLastVersionStateMachine(stateMachineName, tenantId);
if (stateMachine != null) {
Expand Down Expand Up @@ -218,6 +219,7 @@ public void setJsonParserName(String jsonParserName) {
private static class Item {

private StateMachine value;
private final ResourceLock lock = new ResourceLock();

private Item() {
}
Expand All @@ -233,5 +235,9 @@ public StateMachine getValue() {
public void setValue(StateMachine value) {
this.value = value;
}

public ResourceLock getLock() {
return lock;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,5 @@ public interface ProcessContext {
* @return the get instruction
*/
<T extends Instruction> T getInstruction(Class<T> clazz);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Stack;

import org.apache.seata.common.exception.FrameworkException;
import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.saga.proctrl.ProcessContext;
import org.apache.seata.saga.proctrl.eventing.EventConsumer;
Expand All @@ -36,6 +37,8 @@ public class DirectEventBus extends AbstractEventBus<ProcessContext> {

private static final String VAR_NAME_SYNC_EXE_STACK = "_sync_execution_stack_";

private final ResourceLock contextLock = new ResourceLock();

@Override
public boolean offer(ProcessContext context) throws FrameworkException {
List<EventConsumer> eventHandlers = getEventConsumers(context.getClass());
Expand All @@ -49,7 +52,7 @@ public boolean offer(ProcessContext context) throws FrameworkException {
boolean isFirstEvent = false;
Stack<ProcessContext> currentStack = (Stack<ProcessContext>)context.getVariable(VAR_NAME_SYNC_EXE_STACK);
if (currentStack == null) {
synchronized (context) {
try (ResourceLock ignored = contextLock.obtain()) {
currentStack = (Stack<ProcessContext>)context.getVariable(VAR_NAME_SYNC_EXE_STACK);
if (currentStack == null) {
currentStack = new Stack<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.seata.common.exception.FrameworkErrorCode;
import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.saga.engine.exception.EngineExecutionException;
import org.apache.seata.saga.engine.invoker.ServiceInvoker;
Expand Down Expand Up @@ -56,6 +57,7 @@ public class SpringBeanServiceInvoker implements ServiceInvoker, ApplicationCont
private ApplicationContext applicationContext;
private ThreadPoolExecutor threadPoolExecutor;
private String sagaJsonParser;
private final ResourceLock stateLock = new ResourceLock();

@Override
public Object invoke(ServiceTaskState serviceTaskState, Object... input) throws Throwable {
Expand Down Expand Up @@ -92,12 +94,11 @@ public void run() {
}

protected Object doInvoke(ServiceTaskStateImpl state, Object[] input) throws Throwable {

Object bean = applicationContext.getBean(state.getServiceName());

Method method = state.getMethod();
if (method == null) {
synchronized (state) {
try (ResourceLock ignored = stateLock.obtain()) {
method = state.getMethod();
if (method == null) {
method = findMethod(bean.getClass(), state.getServiceMethod(), state.getParameterTypes());
Expand All @@ -112,7 +113,6 @@ protected Object doInvoke(ServiceTaskStateImpl state, Object[] input) throws Thr
throw new EngineExecutionException(
"No such method[" + state.getServiceMethod() + "] on BeanClass[" + bean.getClass() + "]",
FrameworkErrorCode.NoSuchMethod);

}

Object[] args = new Object[method.getParameterCount()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;

import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.saga.statelang.domain.StateType;
import org.apache.seata.saga.statelang.domain.ServiceTaskState;

Expand All @@ -36,6 +37,7 @@ public class ServiceTaskStateImpl extends AbstractTaskState implements ServiceTa
private Method method;
private Map<Object, String> statusEvaluators;
private boolean isAsync;
private final ResourceLock resourceLock = new ResourceLock();

public ServiceTaskStateImpl() {
setType(StateType.SERVICE_TASK);
Expand Down Expand Up @@ -100,4 +102,13 @@ public boolean isAsync() {
public void setAsync(boolean async) {
isAsync = async;
}

/**
* Get the ResourceLock for statusEvaluators
*
* @return the ResourceLock instance
*/
public ResourceLock getResourceLock() {
return resourceLock;
}
}
Loading