Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize execution of workflow consisting of bucket-level followed by doc-level monitors #1729

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import org.opensearch.search.aggregations.AggregatorFactories
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder
import org.opensearch.transport.TransportService
import java.time.Instant
import java.util.UUID
Expand Down Expand Up @@ -479,7 +480,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
val queryBuilder = if (input.query.query() == null) BoolQueryBuilder()
else QueryBuilders.boolQuery().must(source.query())
queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues))
sr.source().query(queryBuilder)
sr.source().query(queryBuilder).sort("_seq_no", SortOrder.DESC)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we sorting based on _seq_no?

there is already a range query based on period_end variable

this seems incorrect

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to sort this because without sort, we get random 10 docs in period of last 15 minutes by default for a workflow running every 1 min. Now, the aggregation may have grouped 1000 docs.
So, 10 out of 1000 docs generated may not be the latest ones. We pass these 10 docs to the delegated doc-level monitor which has already moved its seq_no past these 10 random docs and hence do not generate an alert.

sort by seq_no ensures we always get the latest 10 docs out of the 1000 docs considered for aggregation. Thus, when the doc-level monitor runs next time, it gets latest 10 docs and it goes on to geenrate an alert.

}
sr.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
getCancelAfterTimeInterval()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,19 +297,33 @@ class TransportDocLevelMonitorFanOutAction
createFindings(monitor, docsToQueries, idQueryMap, true)
}
} else {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun,
executionId = executionId,
findingIdToDocSource,
workflowRunContext = workflowRunContext
)
if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun,
executionId = executionId,
findingIdToDocSource,
workflowRunContext = workflowRunContext
)
}
} else if (monitor.ignoreFindingsAndAlerts == true) {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTriggerIgnoringFindingsAndAlerts(
monitorResult,
it as DocumentLevelTrigger,
monitor,
queryToDocIds,
dryrun,
executionId,
workflowRunContext
)
}
}
}

Expand Down Expand Up @@ -349,6 +363,50 @@ class TransportDocLevelMonitorFanOutAction
}
}

private suspend fun runForEachDocTriggerIgnoringFindingsAndAlerts(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz add code comments

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we seem to be creating one alert

this method name is misleading

monitorResult: MonitorRunResult<DocumentLevelTriggerRunResult>,
trigger: DocumentLevelTrigger,
monitor: Monitor,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean,
executionId: String,
workflowRunContext: WorkflowRunContext?
): DocumentLevelTriggerRunResult {
val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, queryToDocIds)
if (triggerResult.triggeredDocs.isNotEmpty()) {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val alert = alertService.composeDocLevelAlert(
listOf(),
triggerResult.triggeredDocs,
triggerCtx,
monitorResult.alertError() ?: triggerResult.alertError(),
executionId = executionId,
workflorwRunContext = workflowRunContext
)
for (action in trigger.actions) {
this.runAction(action, triggerCtx.copy(alerts = listOf(AlertContext(alert))), monitor, dryrun)
}

if (!dryrun && monitor.id != Monitor.NO_ID) {
val actionResults = triggerResult.actionResultsMap.getOrDefault(alert.id, emptyMap())
val actionExecutionResults = actionResults.values.map { actionRunResult ->
ActionExecutionResult(actionRunResult.actionId, actionRunResult.executionTime, if (actionRunResult.throttled) 1 else 0)
}
val updatedAlert = alert.copy(actionExecutionResults = actionExecutionResults)

retryPolicy.let {
alertService.saveAlerts(
monitor.dataSources,
listOf(updatedAlert),
it,
routingId = monitor.id
)
}
}
}
return DocumentLevelTriggerRunResult(trigger.name, listOf(), monitorResult.error)
}

private suspend fun runForEachDocTrigger(
monitorResult: MonitorRunResult<DocumentLevelTriggerRunResult>,
trigger: DocumentLevelTrigger,
Expand Down Expand Up @@ -512,7 +570,7 @@ class TransportDocLevelMonitorFanOutAction
.string()
log.debug("Findings: $findingStr")

if (shouldCreateFinding) {
if (shouldCreateFinding and (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false)) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
Expand All @@ -524,13 +582,15 @@ class TransportDocLevelMonitorFanOutAction
bulkIndexFindings(monitor, indexRequests)
}

try {
findings.forEach { finding ->
publishFinding(monitor, finding)
if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) {
try {
findings.forEach { finding ->
publishFinding(monitor, finding)
}
} catch (e: Exception) {
// suppress exception
log.error("Optional finding callback failed", e)
}
} catch (e: Exception) {
// suppress exception
log.error("Optional finding callback failed", e)
}
this.findingsToTriggeredQueries += findingsToTriggeredQueries

Expand Down Expand Up @@ -688,6 +748,7 @@ class TransportDocLevelMonitorFanOutAction
var to: Long = Long.MAX_VALUE
while (to >= from) {
val hits: SearchHits = searchShard(
monitor,
indexExecutionCtx.concreteIndexName,
shard,
from,
Expand Down Expand Up @@ -870,6 +931,7 @@ class TransportDocLevelMonitorFanOutAction
* This method hence fetches only docs from shard which haven't been queried before
*/
private suspend fun searchShard(
monitor: Monitor,
index: String,
shard: String,
prevSeqNo: Long?,
Expand All @@ -883,8 +945,16 @@ class TransportDocLevelMonitorFanOutAction
val boolQueryBuilder = BoolQueryBuilder()
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))

if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) {
if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
}
} else if (monitor.ignoreFindingsAndAlerts == true) {
val docIdsParam = mutableListOf<String>()
if (docIds != null) {
docIdsParam.addAll(docIds)
}
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIdsParam))
}

val request: SearchRequest = SearchRequest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6233,4 +6233,120 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
)
)
}

fun `test execute workflow with custom alerts and finding index when bucket monitor is used in chained finding of ignored doc monitor`() {
val query = QueryBuilders.rangeQuery("test_strict_date_time")
.gt("{{period_end}}||-10d")
.lte("{{period_end}}")
.format("epoch_millis")
val compositeSources = listOf(
TermsValuesSourceBuilder("test_field_1").field("test_field_1")
)
val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources)
val input = SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg))
// Bucket level monitor will reduce the size of matched doc ids on those that belong
// to a bucket that contains more than 1 document after term grouping
val triggerScript = """
params.docCount > 1
""".trimIndent()

var trigger = randomBucketLevelTrigger()
trigger = trigger.copy(
bucketSelector = BucketSelectorExtAggregationBuilder(
name = trigger.id,
bucketsPathsMap = mapOf("docCount" to "_count"),
script = Script(triggerScript),
parentBucketPath = "composite_agg",
filter = null,
)
)
val bucketCustomAlertsIndex = "custom_alerts_index"
val bucketCustomFindingsIndex = "custom_findings_index"
val bucketCustomFindingsIndexPattern = "custom_findings_index-1"

val bucketLevelMonitorResponse = createMonitor(
randomBucketLevelMonitor(
inputs = listOf(input),
enabled = false,
triggers = listOf(trigger),
dataSources = DataSources(
findingsEnabled = true,
alertsIndex = bucketCustomAlertsIndex,
findingsIndex = bucketCustomFindingsIndex,
findingsIndexPattern = bucketCustomFindingsIndexPattern
)
)
)!!

val docQuery1 = DocLevelQuery(query = "test_field_1:\"test_value_2\"", name = "1", fields = listOf())
val docQuery2 = DocLevelQuery(query = "test_field_1:\"test_value_1\"", name = "2", fields = listOf())
val docQuery3 = DocLevelQuery(query = "test_field_1:\"test_value_3\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1, docQuery2, docQuery3))
val docTrigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val docCustomAlertsIndex = "custom_alerts_index"
val docCustomFindingsIndex = "custom_findings_index"
val docCustomFindingsIndexPattern = "custom_findings_index-1"
var docLevelMonitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(docTrigger),
dataSources = DataSources(
alertsIndex = docCustomAlertsIndex,
findingsIndex = docCustomFindingsIndex,
findingsIndexPattern = docCustomFindingsIndexPattern
),
ignoreFindingsAndAlerts = true
)

val docLevelMonitorResponse = createMonitor(docLevelMonitor)!!
// 1. bucketMonitor (chainedFinding = null) 2. docMonitor (chainedFinding = bucketMonitor)
var workflow = randomWorkflow(
monitorIds = listOf(bucketLevelMonitorResponse.id, docLevelMonitorResponse.id),
enabled = false,
auditDelegateMonitorAlerts = false
)
val workflowResponse = upsertWorkflow(workflow)!!
val workflowById = searchWorkflow(workflowResponse.id)
assertNotNull(workflowById)

// Creates 5 documents
insertSampleTimeSerializedData(
index,
listOf(
"test_value_1",
"test_value_1", // adding duplicate to verify aggregation
"test_value_2",
"test_value_2",
"test_value_3"
)
)

val workflowId = workflowResponse.id
// 1. bucket level monitor should reduce the doc findings to 4 (1, 2, 3, 4)
// 2. Doc level monitor will match those 4 documents although it contains rules for matching all 5 documents (docQuery3 matches the fifth)
val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!!
assertNotNull(executeWorkflowResponse)

for (monitorRunResults in executeWorkflowResponse.workflowRunResult.monitorRunResults) {
if (bucketLevelMonitorResponse.monitor.name == monitorRunResults.monitorName) {
val searchResult = monitorRunResults.inputResults.results.first()

@Suppress("UNCHECKED_CAST")
val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")
?.get("buckets") as List<kotlin.collections.Map<String, Any>>
assertEquals("Incorrect search result", 3, buckets.size)

val getAlertsResponse = assertAlerts(bucketLevelMonitorResponse.id, bucketCustomAlertsIndex, 2, workflowId)
assertAcknowledges(getAlertsResponse.alerts, bucketLevelMonitorResponse.id, 2)
assertFindings(bucketLevelMonitorResponse.id, bucketCustomFindingsIndex, 1, 4, listOf("1", "2", "3", "4"))
} else {
assertEquals(1, monitorRunResults.inputResults.results.size)
val values = monitorRunResults.triggerResults.values
assertEquals(1, values.size)

val getAlertsResponse = assertAlerts(docLevelMonitorResponse.id, docCustomAlertsIndex, 1, workflowId)
assertAcknowledges(getAlertsResponse.alerts, docLevelMonitorResponse.id, 1)
assertFindings(docLevelMonitorResponse.id, docCustomFindingsIndex, 0, 0, listOf("1", "2", "3", "4"))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,14 @@ fun randomDocumentLevelMonitor(
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
withMetadata: Boolean = false,
dataSources: DataSources,
ignoreFindingsAndAlerts: Boolean? = false,
owner: String? = null
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, owner = owner
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources,
ignoreFindingsAndAlerts = ignoreFindingsAndAlerts, owner = owner
)
}

Expand Down
Loading