Skip to content

Commit

Permalink
Refactor parameter handling and interfaces, use sync map for observat…
Browse files Browse the repository at this point in the history
…ions by topic
  • Loading branch information
PhilippMatthes committed Apr 30, 2024
1 parent e39199e commit 6e9a00c
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 66 deletions.
2 changes: 1 addition & 1 deletion histories/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestLoadBestHistory(t *testing.T) {
}

// Override the interface to get the current program
getCurrentProgram = func(thingName string) (observations.Observation, bool) {
getCurrentProgram = func(_ string) (observations.Observation, bool) {
mockProgramObservation := observations.Observation{
Result: 123, // Program ID relevant for selection
}
Expand Down
4 changes: 2 additions & 2 deletions monitor/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
)

// Interfaces to other packages.
var getAllThingsForMap = things.Things.Range
var getCurrentPredictionForMap = predictions.GetCurrentPrediction
var getAllThingsForMap = things.Things.Range // pointer ref
var getCurrentPredictionForMap = predictions.GetCurrentPrediction // func ref

// Write geojson data that can be used to visualize the predictions.
// The geojson file is written to the static directory.
Expand Down
2 changes: 1 addition & 1 deletion monitor/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestWriteGeoJSONMap(t *testing.T) {
ThenQuality: []byte{100, 100, 100, 100, 100, 100, 100, 100, 100, 100},
ReferenceTime: time.Unix(0, 0),
}
getCurrentPredictionForMap = func(thingName string) (predictions.Prediction, bool) {
getCurrentPredictionForMap = func(_ string) (predictions.Prediction, bool) {
return mockPrediction, true
}

Expand Down
60 changes: 19 additions & 41 deletions monitor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,46 +41,21 @@ type MetricsEntry struct {
var metricsFileLock = &sync.Mutex{}

// Interfaces to other packages.
var getAllThingsForMetrics = things.Things.Range
var getCurrentPrimarySignalForMetrics = observations.GetCurrentPrimarySignal
var getCurrentProgramForMetrics = observations.GetCurrentProgram
var getCurrentPredictionForMetrics = predictions.GetCurrentPrediction
var getLastPredictionTimeForMetrics = predictions.GetLastPredictionTime
var getObservationsReceivedByTopic = func(callback func(dsType string, count uint64)) {
// Lock for async access.
observations.ObservationsReceivedByTopicLock.RLock()
defer observations.ObservationsReceivedByTopicLock.RUnlock()
for dsType, count := range observations.ObservationsReceivedByTopic {
callback(dsType, count)
}
}
var getObservationsReceived = func() uint64 {
return observations.ObservationsReceived
}
var getObservationsProcessed = func() uint64 {
return observations.ObservationsProcessed
}
var getObservationsDiscarded = func() uint64 {
return observations.ObservationsDiscarded
}
var getHistoryUpdatesRequested = func() uint64 {
return histories.HistoryUpdatesRequested
}
var getHistoryUpdatesProcessed = func() uint64 {
return histories.HistoryUpdatesProcessed
}
var getHistoryUpdatesDiscarded = func() uint64 {
return histories.HistoryUpdatesDiscarded
}
var getPredictionsChecked = func() uint64 {
return predictions.PredictionsChecked
}
var getPredictionsPublished = func() uint64 {
return predictions.PredictionsPublished
}
var getPredictionsDiscarded = func() uint64 {
return predictions.PredictionsDiscarded
}
var getAllThingsForMetrics = things.Things.Range // pointer ref
var getCurrentPrimarySignalForMetrics = observations.GetCurrentPrimarySignal // func ref
var getCurrentProgramForMetrics = observations.GetCurrentProgram // func ref
var getCurrentPredictionForMetrics = predictions.GetCurrentPrediction // func ref
var getLastPredictionTimeForMetrics = predictions.GetLastPredictionTime // func ref
var getObservationsReceivedByTopic = observations.ObservationsReceivedByTopic.Range // pointer ref
var getObservationsReceived = func() uint64 { return observations.ObservationsReceived }
var getObservationsProcessed = func() uint64 { return observations.ObservationsProcessed }
var getObservationsDiscarded = func() uint64 { return observations.ObservationsDiscarded }
var getHistoryUpdatesRequested = func() uint64 { return histories.HistoryUpdatesRequested }
var getHistoryUpdatesProcessed = func() uint64 { return histories.HistoryUpdatesProcessed }
var getHistoryUpdatesDiscarded = func() uint64 { return histories.HistoryUpdatesDiscarded }
var getPredictionsChecked = func() uint64 { return predictions.PredictionsChecked }
var getPredictionsPublished = func() uint64 { return predictions.PredictionsPublished }
var getPredictionsDiscarded = func() uint64 { return predictions.PredictionsDiscarded }

func generateMetrics() Metrics {
entries := []MetricsEntry{}
Expand Down Expand Up @@ -226,8 +201,11 @@ func generatePrometheusMetrics(m Metrics) []string {
lines = append(lines, fmt.Sprintf("predictor_observations{action=\"received\"} %d", getObservationsReceived()))
lines = append(lines, fmt.Sprintf("predictor_observations{action=\"processed\"} %d", getObservationsProcessed()))
lines = append(lines, fmt.Sprintf("predictor_observations{action=\"discarded\"} %d", getObservationsDiscarded()))
getObservationsReceivedByTopic(func(dsType string, count uint64) {
getObservationsReceivedByTopic(func(k, v interface{}) bool {
dsType := k.(string)
count := v.(uint64)
lines = append(lines, fmt.Sprintf("predictor_observations_by_topic{topic=\"%s\"} %d", dsType, count))
return true
})

// Add metrics for the histories.
Expand Down
20 changes: 10 additions & 10 deletions monitor/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@ func prepareMocks() {
getAllThingsForMetrics = func(f func(key, value interface{}) bool) {
f("1337_1", things.Thing{}) // Thing can be empty, only name is needed
}
getCurrentPrimarySignalForMetrics = func(thingName string) (observations.Observation, bool) {
getCurrentPrimarySignalForMetrics = func(_ string) (observations.Observation, bool) {
return observations.Observation{
Result: 4, // RedAmber
ReceivedTime: time.Unix(1, 0),
PhenomenonTime: time.Unix(0, 0), // 1 second delay
}, true
}
getCurrentProgramForMetrics = func(thingName string) (observations.Observation, bool) {
getCurrentProgramForMetrics = func(_ string) (observations.Observation, bool) {
return observations.Observation{
Result: 10, // Program 10
ReceivedTime: time.Unix(1, 0),
PhenomenonTime: time.Unix(0, 0), // 1 second delay
}, true
}
getCurrentPredictionForMetrics = func(thingName string) (predictions.Prediction, bool) {
getCurrentPredictionForMetrics = func(_ string) (predictions.Prediction, bool) {
return predictions.Prediction{
ThingName: "1337_1",
Now: []byte{1, 1, 1, 1, 1, 3, 3, 3, 3, 3},
Expand All @@ -43,15 +43,15 @@ func prepareMocks() {
ReferenceTime: time.Unix(0, 0),
}, true
}
getLastPredictionTimeForMetrics = func(thingName string) (time.Time, bool) {
getLastPredictionTimeForMetrics = func(_ string) (time.Time, bool) {
return time.Unix(0, 0), true
}
getObservationsReceivedByTopic = func(callback func(dsType string, count uint64)) {
callback("primary_signal", 1)
callback("signal_program", 1)
callback("cycle_second", 1)
callback("detector_bike", 0)
callback("detector_car", 0)
getObservationsReceivedByTopic = func(f func(k, v interface{}) bool) {
f("primary_signal", 1)
f("signal_program", 1)
f("cycle_second", 1)
f("detector_bike", 0)
f("detector_car", 0)
}
getObservationsReceived = func() uint64 {
return 1
Expand Down
20 changes: 9 additions & 11 deletions observations/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ import (
const observationQoS = 1

// Received messages by their topic.
var ObservationsReceivedByTopic = make(map[string]uint64)

// The lock for the map.
var ObservationsReceivedByTopicLock = &sync.RWMutex{}
var ObservationsReceivedByTopic = &sync.Map{}

// The number of processed messages, for logging purposes.
var ObservationsReceived uint64 = 0
Expand All @@ -49,11 +46,12 @@ func CheckReceivedMessagesPeriodically() {
panic("No messages received in the last 60 seconds")
}
log.Info.Printf("Received %d observations in the last 60 seconds. (%d processed, %d canceled)", dReceived, dProcessed, dCanceled)
ObservationsReceivedByTopicLock.RLock()
for dsType, count := range ObservationsReceivedByTopic {
ObservationsReceivedByTopic.Range(func(k, v interface{}) bool {
count := k.(uint64)
dsType := v.(string)
log.Info.Printf(" - Received %d observations for `%s`.", count, dsType)
}
ObservationsReceivedByTopicLock.RUnlock()
return true
})
}
}

Expand All @@ -72,9 +70,9 @@ func processMessage(msg mqtt.Message) {
}

// Increment the number of received messages.
ObservationsReceivedByTopicLock.Lock()
ObservationsReceivedByTopic[dsType.(string)]++
ObservationsReceivedByTopicLock.Unlock()
val, _ := ObservationsReceivedByTopic.LoadOrStore(dsType.(string), uint64(1))
ptr := val.(*int64)
atomic.AddInt64(ptr, 1)

var observation Observation
if err := json.Unmarshal(msg.Payload(), &observation); err != nil {
Expand Down

0 comments on commit 6e9a00c

Please sign in to comment.