Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(规则引擎): 增加场景分支executeAnyway配置.优化场景条件分支逻辑. #511

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private Flux<AlarmInfo> parseAlarmInfo(ExecutionContext context, RuleData data)
//节点所在的条件分支索引
int branchIndex = context
.getJob()
.getConfiguration(SceneRule.ACTION_KEY_BRANCH_INDEX)
.getConfiguration(SceneRule.ACTION_KEY_BRANCH_ID)
.map(idx -> CastUtils.castNumber(idx).intValue())
.orElse(AlarmRuleBindEntity.ANY_BRANCH_INDEX);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ public class SceneConditionAction implements Serializable {
@Schema(description = "满足条件时执行的动作")
private List<SceneActions> then;

@Schema(description = "无论如何都尝试执行此分支")
private boolean executeAnyway = false;

@Schema(description = "分支ID")
private Integer branchId;


//仅用于设置到reactQl sql的column中
public List<Term> createContextTerm() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
@Setter
public class SceneRule implements Serializable {

public static final String ACTION_KEY_BRANCH_ID = "_branchId";
public static final String ACTION_KEY_BRANCH_INDEX = "_branchIndex";
public static final String ACTION_KEY_GROUP_INDEX = "_groupIndex";
public static final String ACTION_KEY_ACTION_INDEX = "_actionIndex";
Expand Down Expand Up @@ -331,11 +332,16 @@ public Disposable createBranchHandler(Flux<Map<String, Object>> sourceData,
if (last == null) {
last = handler;
} else {
boolean executeAnyway = branch.isExecuteAnyway();
Function<Map<String, Object>, Mono<Boolean>> _last = last;

last = data -> _last
.apply(data)
.flatMap(match -> {
//无论如何都尝试执行当前分支
if(executeAnyway){
return handler.apply(data);
}
//上一个分支满足了则返回,不执行此分支逻辑
if (match) {
return Reactors.ALWAYS_FALSE;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,45 @@
package org.jetlinks.community.rule.engine.service.terms;

import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.PrepareSqlFragments;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.term.AbstractTermFragmentBuilder;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.utils.ConverterUtils;
import org.jetlinks.community.utils.ObjectMappers;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* 根据规则查询告警配置.
*
* <p>
* 例如:查询场景联动ID为rule-id绑定的告警
* <pre>
* <pre>{@code
* {
* "column":"id",
* "termType":"rule-bind-alarm",
* "value":"rule-id"
* }
* </pre>
*
* {
* "column":"id",
* "termType":"rule-bind-alarm",
* "value": {
* "ruleId":["rule-id"],
* "branchId":[-1,2]
* }
* }
* }</pre>
*
* @author zhangji 2022/11/23
*/
Expand All @@ -36,23 +55,80 @@ public SqlFragments createFragments(String columnFullName,
RDBColumnMetadata column,
Term term) {

AlarmRuleBindTerm bindTerm = AlarmRuleBindTerm.of(term.getValue());
if (CollectionUtils.isEmpty(bindTerm.ruleId)) {
throw new IllegalArgumentException("illegal term [rule-bind-alarm] value :" + term);
}
PrepareSqlFragments sqlFragments = PrepareSqlFragments.of();
if (term.getOptions().contains("not")) {
sqlFragments.addSql("not");
}
sqlFragments
.addSql("exists(select 1 from ", getTableName("s_alarm_rule_bind", column), " _bind where _bind.alarm_id =", columnFullName);

List<Object> ruleId = convertList(column, term);
sqlFragments
.addSql(
"and _bind.rule_id in (",
ruleId.stream().map(r -> "?").collect(Collectors.joining(",")),
String.join("?", bindTerm.ruleId),
")")
.addParameter(ruleId);
.addParameter(bindTerm.ruleId);

if (CollectionUtils.isNotEmpty(bindTerm.branchId)) {
sqlFragments
.addSql(
"and _bind.branch_index in (",
bindTerm.branchId.stream().map(r -> "?").collect(Collectors.joining(",")),
")")
.addParameter(bindTerm.branchId);
}

sqlFragments.addSql(")");

return sqlFragments;
}

@Getter
@Setter
public static class AlarmRuleBindTerm {
private List<String> ruleId;

private List<Integer> branchId;

public static AlarmRuleBindTerm of(Object term) {
if(term instanceof AlarmRuleBindTerm){
return ((AlarmRuleBindTerm) term);
}
AlarmRuleBindTerm bindTerm = new AlarmRuleBindTerm();

if (term instanceof String) {
String str = ((String) term);
if (str.startsWith("{")) {
term = ObjectMappers.parseJson(str, Map.class);
} else if (str.startsWith("[")) {
term = ObjectMappers.parseJsonArray(str, Object.class);
} else if (str.contains(":")) {
// ruleId:-1,1,2,3
String[] split = str.split(":");
bindTerm.setRuleId(Collections.singletonList(split[0]));
bindTerm.setBranchId(ConverterUtils.convertToList(split[1], val -> CastUtils
.castNumber(val)
.intValue()));
} else {
bindTerm.setRuleId(Collections.singletonList(str));
}
}

if (term instanceof Collection) {
bindTerm.setRuleId(ConverterUtils.convertToList(term, String::valueOf));
}

if (term instanceof Map) {
FastBeanCopier.copy(term, bindTerm);
}


return bindTerm;
}
}

}
Loading