版本: 1.1.1+
适用场景: 并行网关和包容网关的分支合并
在并行网关和包容网关的分支合并场景中,当多个分支同时到达汇聚节点时,可能会出现数据覆盖问题。例如:
为了解决这个问题,parallel-plugin 提供了可扩展的并发锁机制。
✅ 默认单机锁:开箱即用,无需额外配置
✅ 可扩展设计:支持自定义锁实现(如 Redis 分布式锁)
✅ 自动重试:获取锁失败时自动等待并重试,不会直接失败
✅ 线程安全:使用 ReentrantLock 保证并发安全
✅ 资源管理:自动释放锁,确保不会出现死锁
默认使用 LocalParallelMergeLock 实现,基于 ReentrantLock:
ConcurrentHashMap 存储锁,key 格式:{flowInstanceId}:{nodeKey}ReentrantLock(true)),确保先到先得在 application.properties 或 plugin.properties 中配置:
# 获取锁失败后的重试间隔(毫秒),默认:50
turbo.plugin.parallelGateway.lock.retryIntervalMs=50
# 最大重试次数,默认:10
turbo.plugin.parallelGateway.lock.maxRetryTimes=10
✅ 单机部署:默认实现完全满足需求
✅ 开发测试:无需额外配置即可使用
❌ 多机部署:需要使用分布式锁(见下方扩展指南)
实现 ParallelMergeLock 接口:
package com.example.plugin;
import com.didiglobal.turbo.plugin.lock.ParallelMergeLock;
import org.springframework.stereotype.Component;
@Component
public class RedisParallelMergeLock implements ParallelMergeLock {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String LOCK_PREFIX = "parallel:merge:lock:";
private static final long DEFAULT_LOCK_TIMEOUT_MS = 30000; // 30秒
@Override
public boolean tryLock(String flowInstanceId, String nodeKey, long waitTimeMs) {
String lockKey = LOCK_PREFIX + flowInstanceId + ":" + nodeKey;
String lockValue = Thread.currentThread().getName() + ":" + System.currentTimeMillis();
if (waitTimeMs <= 0) {
// 立即尝试获取锁
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, Duration.ofMillis(DEFAULT_LOCK_TIMEOUT_MS));
return Boolean.TRUE.equals(result);
} else {
// 等待指定时间后尝试获取
long endTime = System.currentTimeMillis() + waitTimeMs;
while (System.currentTimeMillis() < endTime) {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, Duration.ofMillis(DEFAULT_LOCK_TIMEOUT_MS));
if (Boolean.TRUE.equals(result)) {
return true;
}
try {
Thread.sleep(50); // 短暂等待后重试
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
return false;
}
}
@Override
public void unlock(String flowInstanceId, String nodeKey) {
String lockKey = LOCK_PREFIX + flowInstanceId + ":" + nodeKey;
// 使用 Lua 脚本确保只删除当前线程持有的锁
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(lockKey),
Thread.currentThread().getName()
);
}
}
通过 @Component 注解或配置类注册:
方式一:使用 @Component(推荐)
@Component
public class RedisParallelMergeLock implements ParallelMergeLock {
// ... 实现代码
}
方式二:使用配置类
@Configuration
public class CustomLockConfig {
@Bean
public ParallelMergeLock parallelMergeLock() {
return new RedisParallelMergeLock();
}
}
插件会自动检测到自定义的 ParallelMergeLock Bean,并使用它替代默认实现。
@Component
public class RedissonParallelMergeLock implements ParallelMergeLock {
@Autowired
private RedissonClient redissonClient;
@Override
public boolean tryLock(String flowInstanceId, String nodeKey, long waitTimeMs) {
String lockKey = "parallel:merge:lock:" + flowInstanceId + ":" + nodeKey;
RLock lock = redissonClient.getLock(lockKey);
try {
if (waitTimeMs <= 0) {
return lock.tryLock();
} else {
return lock.tryLock(waitTimeMs, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
@Override
public void unlock(String flowInstanceId, String nodeKey) {
String lockKey = "parallel:merge:lock:" + flowInstanceId + ":" + nodeKey;
RLock lock = redissonClient.getLock(lockKey);
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
参考上面的 Redis 示例代码,注意以下几点:
InterruptedException分支到达汇聚节点
↓
尝试获取锁(立即返回)
↓
成功? → 是 → 执行分支合并逻辑 → 释放锁
↓
否
↓
等待重试间隔(默认 50ms)
↓
再次尝试获取锁
↓
成功? → 是 → 执行分支合并逻辑 → 释放锁
↓
否
↓
重复重试(最多 10 次)
↓
达到最大重试次数 → 抛出异常
{flowInstanceId}:{nodeKey}根据实际场景调整重试参数:
高并发场景:
# 缩短重试间隔,加快响应
turbo.plugin.parallelGateway.lock.retryIntervalMs=20
# 增加重试次数,提高成功率
turbo.plugin.parallelGateway.lock.maxRetryTimes=20
低并发场景:
# 默认配置即可
turbo.plugin.parallelGateway.lock.retryIntervalMs=50
turbo.plugin.parallelGateway.lock.maxRetryTimes=10
A: 当您的应用部署在多台服务器上时,必须使用分布式锁。单机部署使用默认的单机锁即可。
A: 插件会自动重试(默认最多 10 次,每次间隔 50ms)。如果所有重试都失败,会抛出异常,建议在业务层进行重试。
A:
A:
A: