Skip to content

Commit

Permalink
Merge pull request #78 from th2-net/release-3.8.0
Browse files Browse the repository at this point in the history
Release 3.8.0
  • Loading branch information
OptimumCode authored Oct 13, 2021
2 parents ace0c00 + 10c0765 commit e9aa09b
Show file tree
Hide file tree
Showing 17 changed files with 900 additions and 128 deletions.
33 changes: 32 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2 check1 (3.7.2)
# th2 check1 (3.8.0)

## Overview

Expand Down Expand Up @@ -108,8 +108,39 @@ spec:
- "parsed"
```
## Prometheus metrics
The Check1 component publishes Prometheus metrics to observe the actual state of it
* `th2_check1_actual_cache_number` - actual number of messages in caches
* `th2_check1_active_tasks_number` - actual number of currently working rules

The `th2_check1_actual_cache_number` metric separate messages with two labels:
* `session_alias` - session alias of received message
* `direction` - direction of received message

The `th2_check1_active_tasks_number` metric separate rules with label `rule_type`

## Release Notes

### 3.8.0

#### Added:
+ Added check for positive timeout
+ Added mechanism for handling exceptions when creating and executing rules which publishes events about an error that has occurred
+ Added metric for monitoring active rules and messages count
+ Added check for required message type in the message filter
+ Provided more detailed logging in comparable messages
+ Provided the ability to attach verification description to event
+ Provided the ability to verify repeating groups according to defined filters via `check_repeating_group_order` parameter in the `RootComparisonSettings` message

#### Changed:
+ Migrated `common` version from `3.25.0` to `3.26.4`
+ Added support for converting SimpleList to readable payload body
+ Added the new `description` parameter to `RootMessageFilter` message
+ Migrated `grpc-check1` version from `3.2.0` to `3.4.2`
+ Migrated sailfish-utils from `3.7.0` to `3.8.1`
+ Now Check1 keep the order of repeating result groups by default
+ Fix IN, NOT_IN FilterOperation interaction

### 3.7.2

#### Changed:
Expand Down
12 changes: 8 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ signing {

dependencies {
api platform('com.exactpro.th2:bom:3.0.0')
implementation 'com.exactpro.th2:grpc-check1:3.2.0'
implementation 'com.exactpro.th2:common:3.25.0'
implementation 'com.exactpro.th2:sailfish-utils:3.7.0'
implementation 'com.exactpro.th2:grpc-check1:3.4.2'
implementation 'com.exactpro.th2:common:3.26.4'
implementation 'com.exactpro.th2:sailfish-utils:3.9.1'
implementation "org.slf4j:slf4j-log4j12"
implementation "org.slf4j:slf4j-api"

Expand All @@ -177,6 +177,10 @@ dependencies {

implementation "io.reactivex.rxjava2:rxjava:2.2.19" // https://github.com/salesforce/reactive-grpc/issues/202

implementation('io.prometheus:simpleclient') {
because('metrics from messages and rules')
}

testImplementation 'org.junit.jupiter:junit-jupiter:5.6.2'
testImplementation 'org.jetbrains.kotlin:kotlin-test-junit'
testImplementation "com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0"
Expand All @@ -202,4 +206,4 @@ dockerPrepare {

docker {
copySpec.from(tarTree("$buildDir/distributions/${applicationName}.tar"))
}
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
release_version = 3.7.2
release_version = 3.8.0

description = 'th2 check1 box'

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2020 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -16,6 +16,7 @@
import java.util.Map.Entry;

import com.exactpro.th2.check1.event.bean.CheckSequenceRow;
import com.exactpro.th2.common.grpc.FilterOperation;
import com.exactpro.th2.common.grpc.MessageFilter;
import com.exactpro.th2.common.grpc.MessageMetadata;
import com.exactpro.th2.common.grpc.MetadataFilter;
Expand Down Expand Up @@ -134,6 +135,8 @@ private static String getKeyFields(String name, ValueFilter valueFilter) {
messageFilter.getFieldsMap().forEach((childName, filter) -> result.append(getKeyFields(childName, filter)));
} else if (valueFilter.hasListFilter()) {
valueFilter.getListFilter().getValuesList().forEach(filter -> result.append(getKeyFields(name, filter)));
} else if (valueFilter.hasSimpleList() && (valueFilter.getOperation() == FilterOperation.IN || valueFilter.getOperation() == FilterOperation.NOT_IN)) {
result.append(", ").append(name).append(' ').append(valueFilter.getOperation()).append(' ').append(valueFilter.getSimpleList().getSimpleValuesList());
} else if (valueFilter.getKey()) {
result.append(", ").append(name).append('=').append(valueFilter.getSimpleFilter());
}
Expand All @@ -142,6 +145,9 @@ private static String getKeyFields(String name, ValueFilter valueFilter) {

private static String getKeyFields(String name, SimpleFilter valueFilter) {
if (valueFilter.getKey()) {
if (valueFilter.hasSimpleList() && (valueFilter.getOperation() == FilterOperation.IN || valueFilter.getOperation() == FilterOperation.NOT_IN)) {
return ", " + name + ' ' + valueFilter.getOperation() + ' ' + valueFilter.getSimpleList().getSimpleValuesList();
}
return ", " + name + '=' + valueFilter.getValue();
}
return "";
Expand Down
78 changes: 15 additions & 63 deletions src/main/kotlin/com/exactpro/th2/check1/CollectorService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,16 @@ import com.exactpro.th2.check1.grpc.ChainID
import com.exactpro.th2.check1.grpc.CheckRuleRequest
import com.exactpro.th2.check1.grpc.CheckSequenceRuleRequest
import com.exactpro.th2.check1.grpc.CheckpointRequestOrBuilder
import com.exactpro.th2.check1.metrics.BufferMetric
import com.exactpro.th2.check1.rule.AbstractCheckTask
import com.exactpro.th2.check1.rule.check.CheckRuleTask
import com.exactpro.th2.check1.rule.sequence.SequenceCheckRuleTask
import com.exactpro.th2.check1.rule.RuleFactory
import com.exactpro.th2.common.event.Event
import com.exactpro.th2.common.event.EventUtils
import com.exactpro.th2.common.grpc.ComparisonSettings
import com.exactpro.th2.common.grpc.ConnectionID
import com.exactpro.th2.common.grpc.Direction
import com.exactpro.th2.common.grpc.EventBatch
import com.exactpro.th2.common.grpc.EventID
import com.exactpro.th2.common.grpc.MessageBatch
import com.exactpro.th2.common.grpc.MessageFilter
import com.exactpro.th2.common.grpc.MessageID
import com.exactpro.th2.common.grpc.RootComparisonSettings
import com.exactpro.th2.common.grpc.RootMessageFilter
import com.exactpro.th2.common.schema.message.MessageListener
import com.exactpro.th2.common.schema.message.MessageRouter
import com.exactpro.th2.common.schema.message.SubscriberMonitor
Expand Down Expand Up @@ -64,41 +59,34 @@ class CollectorService(
private val olderThanDelta = configuration.cleanupOlderThan
private val olderThanTimeUnit = configuration.cleanupTimeUnit
private val maxEventBatchContentSize = configuration.maxEventBatchContentSize

private var ruleFactory: RuleFactory

init {
BufferMetric.configure(configuration)

val limitSize = configuration.messageCacheSize
mqSubject = PublishSubject.create()

subscriberMonitor = subscribe(MessageListener { _: String, batch: MessageBatch -> mqSubject.onNext(batch) })
streamObservable = mqSubject.flatMapIterable(MessageBatch::getMessagesList)
.groupBy { message -> message.metadata.id.run { SessionKey(connectionId.sessionAlias, direction) } }
.groupBy { message ->
message.metadata.id.run {
SessionKey(connectionId.sessionAlias, direction)
}.also(BufferMetric::processMessage)
}
.map { group -> StreamContainer(group.key!!, limitSize, group) }
.replay().apply { connect() }

checkpointSubscriber = streamObservable.subscribeWith(CheckpointSubscriber())

ruleFactory = RuleFactory(maxEventBatchContentSize, streamObservable, eventBatchRouter)
}

@Throws(InterruptedException::class)
fun verifyCheckRule(request: CheckRuleRequest): ChainID {
check(request.hasParentEventId()) { "Parent event id can't be null" }
val parentEventID: EventID = request.parentEventId
check(request.connectivityId.sessionAlias.isNotEmpty()) { "Session alias cannot be empty" }
val sessionAlias: String = request.connectivityId.sessionAlias

check(request.kindCase != CheckRuleRequest.KindCase.KIND_NOT_SET) {
"Either old filter or root filter must be set"
}
val filter: RootMessageFilter = if (request.hasRootFilter()) {
request.rootFilter
} else {
request.filter.toRootMessageFilter()
}
val direction = directionOrDefault(request.direction)

val chainID = request.getChainIdOrGenerate()

val task = CheckRuleTask(request.description, Instant.now(), SessionKey(sessionAlias, direction), request.timeout, maxEventBatchContentSize,
filter, parentEventID, streamObservable, eventBatchRouter)
val task = ruleFactory.createCheckRule(request)

cleanupTasksOlderThan(olderThanDelta, olderThanTimeUnit)

Expand All @@ -110,26 +98,8 @@ class CollectorService(

@Throws(InterruptedException::class)
fun verifyCheckSequenceRule(request: CheckSequenceRuleRequest): ChainID {
check(request.hasParentEventId()) { "Parent event id can't be null" }
val parentEventID: EventID = request.parentEventId
check(request.connectivityId.sessionAlias.isNotEmpty()) { "Session alias cannot be empty" }
val sessionAlias: String = request.connectivityId.sessionAlias
val direction = directionOrDefault(request.direction)

check((request.messageFiltersList.isEmpty() && request.rootMessageFiltersList.isNotEmpty())
|| (request.messageFiltersList.isNotEmpty() && request.rootMessageFiltersList.isEmpty())) {
"Either messageFilters or rootMessageFilters must be set but not both"
}

val chainID = request.getChainIdOrGenerate()

val protoMessageFilters: List<RootMessageFilter> = if (request.rootMessageFiltersList.isNotEmpty()) {
request.rootMessageFiltersList
} else {
request.messageFiltersList.map { it.toRootMessageFilter() }
}
val task = SequenceCheckRuleTask(request.description, Instant.now(), SessionKey(sessionAlias, direction), request.timeout, maxEventBatchContentSize,
request.preFilter, protoMessageFilters, request.checkOrder, parentEventID, streamObservable, eventBatchRouter)
val task = ruleFactory.createSequenceCheckRule(request)

cleanupTasksOlderThan(olderThanDelta, olderThanTimeUnit)

Expand All @@ -139,24 +109,6 @@ class CollectorService(
return chainID
}

private fun MessageFilter.toRootMessageFilter(): RootMessageFilter {
return RootMessageFilter.newBuilder()
.setMessageType(this.messageType)
.setComparisonSettings(this.comparisonSettings.toRootComparisonSettings())
.setMessageFilter(this)
.build()
}

private fun ComparisonSettings.toRootComparisonSettings(): RootComparisonSettings {
return RootComparisonSettings.newBuilder()
.addAllIgnoreFields(this.ignoreFieldsList)
.build()
}


private fun directionOrDefault(direction: Direction) =
if (direction == Direction.UNRECOGNIZED) Direction.FIRST else direction

private fun AbstractCheckTask.addToChainOrBegin(
value: AbstractCheckTask?,
checkpoint: com.exactpro.th2.common.grpc.Checkpoint
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2021-2021 Exactpro (Exactpro Systems Limited)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.check1.exception

open class RuleException @JvmOverloads constructor(message: String? = null, cause: Throwable? = null) : Exception(message, cause)

class RuleCreationException @JvmOverloads constructor(message: String? = null, cause: Throwable? = null) : RuleException(message, cause)

class RuleInternalException @JvmOverloads constructor(message: String? = null, cause: Throwable? = null) : RuleException(message, cause)
49 changes: 49 additions & 0 deletions src/main/kotlin/com/exactpro/th2/check1/metrics/BufferMetric.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2021-2021 Exactpro (Exactpro Systems Limited)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.check1.metrics

import com.exactpro.th2.check1.SessionKey
import com.exactpro.th2.check1.configuration.Check1Configuration
import com.exactpro.th2.common.metrics.DEFAULT_DIRECTION_LABEL_NAME
import com.exactpro.th2.common.metrics.DEFAULT_SESSION_ALIAS_LABEL_NAME
import io.prometheus.client.Counter
import java.util.concurrent.ConcurrentHashMap
import kotlin.math.min

object BufferMetric {

private val actualBufferCountMetric: Counter = Counter
.build("th2_check1_actual_cache_number", "The actual number of messages in caches")
.labelNames(DEFAULT_SESSION_ALIAS_LABEL_NAME, DEFAULT_DIRECTION_LABEL_NAME)
.register()

private val bufferMessagesSizeBySessionKey: MutableMap<SessionKey, Int> = ConcurrentHashMap()
private var maxBufferSize: Int = -1

fun configure(configuration: Check1Configuration) {
this.maxBufferSize = configuration.messageCacheSize
}

fun processMessage(sessionKey: SessionKey) {
val labels = arrayOf(sessionKey.sessionAlias, sessionKey.direction.name)

bufferMessagesSizeBySessionKey.compute(sessionKey) { _, old ->
min(maxBufferSize, (old ?: 0) + 1).also {
if (it != old) {
actualBufferCountMetric.labels(*labels).inc()
}
}
}
}
}
31 changes: 31 additions & 0 deletions src/main/kotlin/com/exactpro/th2/check1/metrics/RuleMetric.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2021-2021 Exactpro (Exactpro Systems Limited)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.exactpro.th2.check1.metrics

import io.prometheus.client.Gauge

object RuleMetric {
private val ACTIVE_TASK_COUNTER = Gauge
.build("th2_check1_active_tasks_number", "The number of currently working rules")
.labelNames("rule_type")
.register()

fun incrementActiveRule(ruleType: String) {
ACTIVE_TASK_COUNTER.labels(ruleType).inc()
}

fun decrementActiveRule(ruleType: String) {
ACTIVE_TASK_COUNTER.labels(ruleType).dec()
}
}
Loading

0 comments on commit e9aa09b

Please sign in to comment.