turbo

并行网关&包容网关插件

当前版本: 1.1.1
发布日期: 2025-11-14
最低要求: Turbo Engine 1.3.0+

📚 文档导航


1. 概述

本插件为Turbo提供”并行网关”和”包容网关”的多分支并行处理能力,使开发者可以在工作流中灵活处理分支流程。

核心特性

🌟🌟🌟重要提示:并行网关与包容网关均不支持跨网关的节点回滚操作

2. 功能介绍

2.1 并行网关

4. 快速开始

4.1 添加依赖

<dependency>
    <groupId>com.didiglobal.turbo</groupId>
    <artifactId>parallel-plugin</artifactId>
    <version>1.1.1</version>
</dependency>
<dependency>
    <groupId>com.didiglobal.turbo</groupId>
    <artifactId>engine</artifactId>
    <version>1.3.0</version>
</dependency>

4.2 基础配置

application.propertiesplugin.properties 中添加以下配置:

# 数据库连接配置
turbo.plugin.jdbc.url=jdbc:mysql://127.0.0.1:3306/t_engine?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&autoReconnect=true
turbo.plugin.jdbc.username=username
turbo.plugin.jdbc.password=password
turbo.plugin.jdbc.driver=com.mysql.jdbc.Driver
turbo.plugin.jdbc.maximumPoolSize=10

# 节点类型配置(可选)
# 自定义设置并行网关与包容网关NodeType。并行网关默认为9,包容网关默认为10。如非覆盖Turbo原有执行器插件,请不要设置为1-8
turbo.plugin.element_type.ParallelGatewayElementPlugin=9
turbo.plugin.element_type.InclusiveGatewayElementPlugin=10

# 功能开关配置(可选)
# 并行网关与包容网关的开关配置。默认为true开启
turbo.plugin.support.ParallelGatewayElementPlugin=true
turbo.plugin.support.InclusiveGatewayElementPlugin=true

4.3 线程池配置(可选)

并行网关使用线程池来执行并行分支任务。插件提供了合理的默认配置,一般情况下无需调整。

🚀 推荐配置(JDK 21+ 虚拟线程):

# 启用虚拟线程,轻松支持大规模并行(推荐)
turbo.plugin.parallelGateway.threadPool.useVirtualThreads=true

传统配置(平台线程):

默认配置:

自定义配置:

# 平台线程池配置
turbo.plugin.parallelGateway.threadPool.corePoolSize=10
turbo.plugin.parallelGateway.threadPool.maxPoolSize=20
turbo.plugin.parallelGateway.threadPool.queueCapacity=100
turbo.plugin.parallelGateway.threadPool.timeout=3000

📖 详细的线程池配置、虚拟线程性能对比和调优建议,请参考:线程池配置指南

4.4 并发锁配置(可选)

并行网关在分支合并时使用锁机制来防止并发分支覆盖问题。插件提供了默认的单机锁实现,一般情况下无需配置。

默认配置(单机锁):

自定义配置:

# 重试间隔(毫秒)
turbo.plugin.parallelGateway.lock.retryIntervalMs=50
# 最大重试次数
turbo.plugin.parallelGateway.lock.maxRetryTimes=10

分布式锁扩展(Redis 示例):

@Configuration
public class CustomLockConfig {
    @Bean
    public ParallelMergeLock parallelMergeLock() {
        return new RedisParallelMergeLock(); // 用户自定义实现
    }
}

📖 详细的并发安全机制、锁扩展指南和最佳实践,请参考:并发安全机制

5. 插件使用

5.1 并发安全机制(1.1.1 新增)

从 1.1.1 版本开始,parallel-plugin 提供了并发安全机制,防止分支合并时的数据覆盖问题。

工作原理

默认配置

多机部署: 如果您的应用部署在多台服务器上,需要实现自定义的分布式锁(如 Redis)。详细说明请参考:并发安全机制

5.2 嵌套并行网关(1.1.1 新增)

从 1.1.1 版本开始,parallel-plugin 支持嵌套并行网关,即在并行网关的某个分支内部再创建并行网关。

使用场景示例

主流程并行分支:
  ├─ 分支1:审批流程(内部嵌套并行网关)
  │   ├─ 内部分支1-1:财务审批
  │   └─ 内部分支1-2:法务审批
  ├─ 分支2:数据处理
  └─ 分支3:通知发送

所有分支(包括嵌套的子分支)完成后,主流程继续执行。

流程结构

                                  |---> 二级ParallelFork ---> Task1-1 --|
                 |---> ExclusiveGW ----|                                  |---> 二级ParallelJoin --|
StartEvent --->  |                     |---> 二级ParallelFork ---> Task1-2 --|                       |
  一级ParallelFork|---> ExclusiveGW ---> Task2 ---------------------------------------------------|---> 一级ParallelJoin ---> EndEvent
                 |---> ExclusiveGW ---> Task3 ---------------------------------------------------|

重要说明

线程池配置建议

推荐方案(虚拟线程):

# 启用虚拟线程,无需担心线程数限制
turbo.plugin.parallelGateway.threadPool.useVirtualThreads=true

传统方案(平台线程):

如果使用平台线程,核心线程数应该 >= 最大并行分支总数。例如上述场景:

turbo.plugin.parallelGateway.threadPool.corePoolSize=5
turbo.plugin.parallelGateway.threadPool.maxPoolSize=10

💡 性能提示:JDK 21+ 环境下,虚拟线程在嵌套并行网关场景性能显著优于平台线程。

更多线程池配置详情,请参考:线程池配置指南

5.3 分支汇聚策略

并行网关与包容网关都支持指定分支汇聚策略,目前支持的策略有:

5.4 数据汇聚策略

并行网关与包容网关都支持指定分支数据合并策略,目前支持的策略有:

5.5 并行网关节点示例

{
    ParallelGateway parallelGateway = new ParallelGateway();
    // 设置节点key, 节点唯一标识
    parallelGateway.setKey("ParallelGateway_38ad233");
    // 设置节点类型, 默认为9
    parallelGateway.setType(ExtendFlowElementType.PARALLEL_GATEWAY);

    List<String> egIncomings = new ArrayList<>();
    egIncomings.add("SequenceFlow_2gugjee");
    parallelGateway.setIncoming(egIncomings);

    // 设置多个分支出口
    List<String> egOutgoings = new ArrayList<>();
    egOutgoings.add("SequenceFlow_12rbl6u");
    egOutgoings.add("SequenceFlow_3ih7eta");
    parallelGateway.setOutgoing(egOutgoings);

    Map<String, Object> properties = new HashMap<>();
    Map<String, String> forkJoinMatch = new HashMap<>();
    // 记录分支Fork节点
    forkJoinMatch.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.FORK, "ParallelGateway_38ad233");
    // 记录分支Join节点
    forkJoinMatch.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.JOIN, "ParallelGateway_10lo44j");
    properties.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.FORK_JOIN_MATCH, JSONArray.toJSON(forkJoinMatch));
    parallelGateway.setProperties(properties);
}

5.6 包容网关节点示例

{
    InclusiveGateway inclusiveGateway = new InclusiveGateway();
    // 设置节点key, 节点唯一标识
    inclusiveGateway.setKey("InclusiveGateway_3a1nn9f");
    // 设置节点类型, 默认为10
    inclusiveGateway.setType(ExtendFlowElementType.INCLUSIVE_GATEWAY);

    // 多个分支入口
    List<String> egIncomings = new ArrayList<>();
    egIncomings.add("SequenceFlow_1h65e8t");
    egIncomings.add("SequenceFlow_25kdv36");
    inclusiveGateway.setIncoming(egIncomings);

    List<String> egOutgoings = new ArrayList<>();
    egOutgoings.add("SequenceFlow_3jkd63g");
    inclusiveGateway.setOutgoing(egOutgoings);

    Map<String, Object> properties = new HashMap<>();
    Map<String, String> forkJoinMatch = new HashMap<>();
    // 记录分支Fork节点
    forkJoinMatch.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.FORK, "InclusiveGateway_1djgrgp");
    // 记录分支Join节点
    forkJoinMatch.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.JOIN, "InclusiveGateway_3a1nn9f");
    properties.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.FORK_JOIN_MATCH, JSONArray.toJSON(forkJoinMatch));
    // 设置分支汇聚策略(在汇聚节点设置)
    properties.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.BRANCH_MERGE, MergeStrategy.BRANCH_MERGE.ANY_ONE);
    // 设置分支数据合并策略(在汇聚节点设置)
    properties.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.DATA_MERGE, MergeStrategy.DATA_MERGE.NONE);
    inclusiveGateway.setProperties(properties);
}