Skip to content

Commit

Permalink
feat(规则引擎): 增加场景分支executeAnyway配置.优化场景条件分支逻辑. (#511)
Browse files Browse the repository at this point in the history
  • Loading branch information
tancongsir authored May 23, 2024
1 parent 86b073b commit 2b8f571
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private Flux<AlarmInfo> parseAlarmInfo(ExecutionContext context, RuleData data)
//节点所在的条件分支索引
int branchIndex = context
.getJob()
.getConfiguration(SceneRule.ACTION_KEY_BRANCH_INDEX)
.getConfiguration(SceneRule.ACTION_KEY_BRANCH_ID)
.map(idx -> CastUtils.castNumber(idx).intValue())
.orElse(AlarmRuleBindEntity.ANY_BRANCH_INDEX);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ public class SceneConditionAction implements Serializable {
@Schema(description = "满足条件时执行的动作")
private List<SceneActions> then;

@Schema(description = "无论如何都尝试执行此分支")
private boolean executeAnyway = false;

@Schema(description = "分支ID")
private Integer branchId;


//仅用于设置到reactQl sql的column中
public List<Term> createContextTerm() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
@Setter
public class SceneRule implements Serializable {

public static final String ACTION_KEY_BRANCH_ID = "_branchId";
public static final String ACTION_KEY_BRANCH_INDEX = "_branchIndex";
public static final String ACTION_KEY_GROUP_INDEX = "_groupIndex";
public static final String ACTION_KEY_ACTION_INDEX = "_actionIndex";
Expand Down Expand Up @@ -331,11 +332,16 @@ public Disposable createBranchHandler(Flux<Map<String, Object>> sourceData,
if (last == null) {
last = handler;
} else {
boolean executeAnyway = branch.isExecuteAnyway();
Function<Map<String, Object>, Mono<Boolean>> _last = last;

last = data -> _last
.apply(data)
.flatMap(match -> {
//无论如何都尝试执行当前分支
if(executeAnyway){
return handler.apply(data);
}
//上一个分支满足了则返回,不执行此分支逻辑
if (match) {
return Reactors.ALWAYS_FALSE;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,45 @@
package org.jetlinks.community.rule.engine.service.terms;

import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.PrepareSqlFragments;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.term.AbstractTermFragmentBuilder;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.utils.ConverterUtils;
import org.jetlinks.community.utils.ObjectMappers;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* 根据规则查询告警配置.
*
* <p>
* 例如:查询场景联动ID为rule-id绑定的告警
* <pre>
* <pre>{@code
* {
* "column":"id",
* "termType":"rule-bind-alarm",
* "value":"rule-id"
* }
* </pre>
*
* {
* "column":"id",
* "termType":"rule-bind-alarm",
* "value": {
* "ruleId":["rule-id"],
* "branchId":[-1,2]
* }
* }
* }</pre>
*
* @author zhangji 2022/11/23
*/
Expand All @@ -36,23 +55,80 @@ public SqlFragments createFragments(String columnFullName,
RDBColumnMetadata column,
Term term) {

AlarmRuleBindTerm bindTerm = AlarmRuleBindTerm.of(term.getValue());
if (CollectionUtils.isEmpty(bindTerm.ruleId)) {
throw new IllegalArgumentException("illegal term [rule-bind-alarm] value :" + term);
}
PrepareSqlFragments sqlFragments = PrepareSqlFragments.of();
if (term.getOptions().contains("not")) {
sqlFragments.addSql("not");
}
sqlFragments
.addSql("exists(select 1 from ", getTableName("s_alarm_rule_bind", column), " _bind where _bind.alarm_id =", columnFullName);

List<Object> ruleId = convertList(column, term);
sqlFragments
.addSql(
"and _bind.rule_id in (",
ruleId.stream().map(r -> "?").collect(Collectors.joining(",")),
String.join("?", bindTerm.ruleId),
")")
.addParameter(ruleId);
.addParameter(bindTerm.ruleId);

if (CollectionUtils.isNotEmpty(bindTerm.branchId)) {
sqlFragments
.addSql(
"and _bind.branch_index in (",
bindTerm.branchId.stream().map(r -> "?").collect(Collectors.joining(",")),
")")
.addParameter(bindTerm.branchId);
}

sqlFragments.addSql(")");

return sqlFragments;
}

@Getter
@Setter
public static class AlarmRuleBindTerm {
private List<String> ruleId;

private List<Integer> branchId;

public static AlarmRuleBindTerm of(Object term) {
if(term instanceof AlarmRuleBindTerm){
return ((AlarmRuleBindTerm) term);
}
AlarmRuleBindTerm bindTerm = new AlarmRuleBindTerm();

if (term instanceof String) {
String str = ((String) term);
if (str.startsWith("{")) {
term = ObjectMappers.parseJson(str, Map.class);
} else if (str.startsWith("[")) {
term = ObjectMappers.parseJsonArray(str, Object.class);
} else if (str.contains(":")) {
// ruleId:-1,1,2,3
String[] split = str.split(":");
bindTerm.setRuleId(Collections.singletonList(split[0]));
bindTerm.setBranchId(ConverterUtils.convertToList(split[1], val -> CastUtils
.castNumber(val)
.intValue()));
} else {
bindTerm.setRuleId(Collections.singletonList(str));
}
}

if (term instanceof Collection) {
bindTerm.setRuleId(ConverterUtils.convertToList(term, String::valueOf));
}

if (term instanceof Map) {
FastBeanCopier.copy(term, bindTerm);
}


return bindTerm;
}
}

}

0 comments on commit 2b8f571

Please sign in to comment.