本插件为Turbo提供“并行网关”和“包容网关”的多分支并行处理能力,使开发者可以在工作流中灵活处理分支流程。
🌟🌟🌟注意:并行网关与包容网关均不支持跨网关的节点回滚操作
# JDBC config
turbo.plugin.jdbc.url=jdbc:mysql://127.0.0.1:3306/t_engine?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&autoReconnect=true
turbo.plugin.jdbc.username=username
turbo.plugin.jdbc.password=password
turbo.plugin.jdbc.driver=com.mysql.jdbc.Driver
turbo.plugin.jdbc.maximumPoolSize=10
# 自定义设置并行网关与包容网关NodeType。并行网关默认为9,包容网关默认为10。如非覆盖Turbo原有执行器插件,请不要设置为1-8
turbo.plugin.element_type.ParallelGatewayElementPlugin=9
turbo.plugin.element_type.InclusiveGatewayElementPlugin=10
# 并行网关与包容网关的开关配置。默认为true开启
turbo.plugin.support.ParallelGatewayElementPlugin=true
turbo.plugin.support.InclusiveGatewayElementPlugin=true
# 并行分支执行超时时间,单位:毫秒
turbo.plugin.parallelGateway.threadPool.timeout=3000
并行网关与包容网关都支持指定分支汇聚策略,目前支持的策略有:
com.didiglobal.turbo.plugin.executor.BranchMergeCustom
类,重写joinFirst
、joinMerge
方法,并在该类上添加@Primary
注解。
并行网关与包容网关都支持指定分支数据合并策略,目前支持的策略有:
com.didiglobal.turbo.plugin.executor.DataMergeCustom
类,重写merge
方法,并在该类上添加@Primary
注解。
{
ParallelGateway parallelGateway = new ParallelGateway();
// 设置节点key, 节点唯一标识
parallelGateway.setKey("ParallelGateway_38ad233");
// 设置节点类型, 默认为9
parallelGateway.setType(ExtendFlowElementType.PARALLEL_GATEWAY);
List<String> egIncomings = new ArrayList<>();
egIncomings.add("SequenceFlow_2gugjee");
parallelGateway.setIncoming(egIncomings);
// 设置多个分支出口
List<String> egOutgoings = new ArrayList<>();
egOutgoings.add("SequenceFlow_12rbl6u");
egOutgoings.add("SequenceFlow_3ih7eta");
parallelGateway.setOutgoing(egOutgoings);
Map<String, Object> properties = new HashMap<>();
Map<String, String> forkJoinMatch = new HashMap<>();
// 记录分支Fork节点
forkJoinMatch.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.FORK, "ParallelGateway_38ad233");
// 记录分支Join节点
forkJoinMatch.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.JOIN, "ParallelGateway_10lo44j");
properties.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.FORK_JOIN_MATCH, JSONArray.toJSON(forkJoinMatch));
parallelGateway.setProperties(properties);
}
{
InclusiveGateway inclusiveGateway = new InclusiveGateway();
// 设置节点key, 节点唯一标识
inclusiveGateway.setKey("InclusiveGateway_3a1nn9f");
// 设置节点类型, 默认为10
inclusiveGateway.setType(ExtendFlowElementType.INCLUSIVE_GATEWAY);
// 多个分支入口
List<String> egIncomings = new ArrayList<>();
egIncomings.add("SequenceFlow_1h65e8t");
egIncomings.add("SequenceFlow_25kdv36");
inclusiveGateway.setIncoming(egIncomings);
List<String> egOutgoings = new ArrayList<>();
egOutgoings.add("SequenceFlow_3jkd63g");
inclusiveGateway.setOutgoing(egOutgoings);
Map<String, Object> properties = new HashMap<>();
Map<String, String> forkJoinMatch = new HashMap<>();
// 记录分支Fork节点
forkJoinMatch.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.FORK, "InclusiveGateway_1djgrgp");
// 记录分支Join节点
forkJoinMatch.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.JOIN, "InclusiveGateway_3a1nn9f");
properties.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.FORK_JOIN_MATCH, JSONArray.toJSON(forkJoinMatch));
// 设置分支汇聚策略(在汇聚节点设置)
properties.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.BRANCH_MERGE, MergeStrategy.BRANCH_MERGE.ANY_ONE);
// 设置分支数据合并策略(在汇聚节点设置)
properties.put(com.didiglobal.turbo.plugin.common.Constants.ELEMENT_PROPERTIES.DATA_MERGE, MergeStrategy.DATA_MERGE.NONE);
inclusiveGateway.setProperties(properties);
}