From 6e9a00cbb4e4d26dee161f084aea682868d558d4 Mon Sep 17 00:00:00 2001 From: Philipp Matthes Date: Tue, 30 Apr 2024 10:05:30 +0200 Subject: [PATCH] Refactor parameter handling and interfaces, use sync map for observations by topic --- histories/cache_test.go | 2 +- monitor/map.go | 4 +-- monitor/map_test.go | 2 +- monitor/metrics.go | 60 +++++++++++++---------------------------- monitor/metrics_test.go | 20 +++++++------- observations/client.go | 20 +++++++------- 6 files changed, 42 insertions(+), 66 deletions(-) diff --git a/histories/cache_test.go b/histories/cache_test.go index 1d565c1..85b74a4 100644 --- a/histories/cache_test.go +++ b/histories/cache_test.go @@ -86,7 +86,7 @@ func TestLoadBestHistory(t *testing.T) { } // Override the interface to get the current program - getCurrentProgram = func(thingName string) (observations.Observation, bool) { + getCurrentProgram = func(_ string) (observations.Observation, bool) { mockProgramObservation := observations.Observation{ Result: 123, // Program ID relevant for selection } diff --git a/monitor/map.go b/monitor/map.go index 4c62ba1..804911e 100644 --- a/monitor/map.go +++ b/monitor/map.go @@ -15,8 +15,8 @@ import ( ) // Interfaces to other packages. -var getAllThingsForMap = things.Things.Range -var getCurrentPredictionForMap = predictions.GetCurrentPrediction +var getAllThingsForMap = things.Things.Range // pointer ref +var getCurrentPredictionForMap = predictions.GetCurrentPrediction // func ref // Write geojson data that can be used to visualize the predictions. // The geojson file is written to the static directory. diff --git a/monitor/map_test.go b/monitor/map_test.go index a75692f..00291c7 100644 --- a/monitor/map_test.go +++ b/monitor/map_test.go @@ -71,7 +71,7 @@ func TestWriteGeoJSONMap(t *testing.T) { ThenQuality: []byte{100, 100, 100, 100, 100, 100, 100, 100, 100, 100}, ReferenceTime: time.Unix(0, 0), } - getCurrentPredictionForMap = func(thingName string) (predictions.Prediction, bool) { + getCurrentPredictionForMap = func(_ string) (predictions.Prediction, bool) { return mockPrediction, true } diff --git a/monitor/metrics.go b/monitor/metrics.go index 40f1a1c..8f209d2 100644 --- a/monitor/metrics.go +++ b/monitor/metrics.go @@ -41,46 +41,21 @@ type MetricsEntry struct { var metricsFileLock = &sync.Mutex{} // Interfaces to other packages. -var getAllThingsForMetrics = things.Things.Range -var getCurrentPrimarySignalForMetrics = observations.GetCurrentPrimarySignal -var getCurrentProgramForMetrics = observations.GetCurrentProgram -var getCurrentPredictionForMetrics = predictions.GetCurrentPrediction -var getLastPredictionTimeForMetrics = predictions.GetLastPredictionTime -var getObservationsReceivedByTopic = func(callback func(dsType string, count uint64)) { - // Lock for async access. - observations.ObservationsReceivedByTopicLock.RLock() - defer observations.ObservationsReceivedByTopicLock.RUnlock() - for dsType, count := range observations.ObservationsReceivedByTopic { - callback(dsType, count) - } -} -var getObservationsReceived = func() uint64 { - return observations.ObservationsReceived -} -var getObservationsProcessed = func() uint64 { - return observations.ObservationsProcessed -} -var getObservationsDiscarded = func() uint64 { - return observations.ObservationsDiscarded -} -var getHistoryUpdatesRequested = func() uint64 { - return histories.HistoryUpdatesRequested -} -var getHistoryUpdatesProcessed = func() uint64 { - return histories.HistoryUpdatesProcessed -} -var getHistoryUpdatesDiscarded = func() uint64 { - return histories.HistoryUpdatesDiscarded -} -var getPredictionsChecked = func() uint64 { - return predictions.PredictionsChecked -} -var getPredictionsPublished = func() uint64 { - return predictions.PredictionsPublished -} -var getPredictionsDiscarded = func() uint64 { - return predictions.PredictionsDiscarded -} +var getAllThingsForMetrics = things.Things.Range // pointer ref +var getCurrentPrimarySignalForMetrics = observations.GetCurrentPrimarySignal // func ref +var getCurrentProgramForMetrics = observations.GetCurrentProgram // func ref +var getCurrentPredictionForMetrics = predictions.GetCurrentPrediction // func ref +var getLastPredictionTimeForMetrics = predictions.GetLastPredictionTime // func ref +var getObservationsReceivedByTopic = observations.ObservationsReceivedByTopic.Range // pointer ref +var getObservationsReceived = func() uint64 { return observations.ObservationsReceived } +var getObservationsProcessed = func() uint64 { return observations.ObservationsProcessed } +var getObservationsDiscarded = func() uint64 { return observations.ObservationsDiscarded } +var getHistoryUpdatesRequested = func() uint64 { return histories.HistoryUpdatesRequested } +var getHistoryUpdatesProcessed = func() uint64 { return histories.HistoryUpdatesProcessed } +var getHistoryUpdatesDiscarded = func() uint64 { return histories.HistoryUpdatesDiscarded } +var getPredictionsChecked = func() uint64 { return predictions.PredictionsChecked } +var getPredictionsPublished = func() uint64 { return predictions.PredictionsPublished } +var getPredictionsDiscarded = func() uint64 { return predictions.PredictionsDiscarded } func generateMetrics() Metrics { entries := []MetricsEntry{} @@ -226,8 +201,11 @@ func generatePrometheusMetrics(m Metrics) []string { lines = append(lines, fmt.Sprintf("predictor_observations{action=\"received\"} %d", getObservationsReceived())) lines = append(lines, fmt.Sprintf("predictor_observations{action=\"processed\"} %d", getObservationsProcessed())) lines = append(lines, fmt.Sprintf("predictor_observations{action=\"discarded\"} %d", getObservationsDiscarded())) - getObservationsReceivedByTopic(func(dsType string, count uint64) { + getObservationsReceivedByTopic(func(k, v interface{}) bool { + dsType := k.(string) + count := v.(uint64) lines = append(lines, fmt.Sprintf("predictor_observations_by_topic{topic=\"%s\"} %d", dsType, count)) + return true }) // Add metrics for the histories. diff --git a/monitor/metrics_test.go b/monitor/metrics_test.go index 7377ae8..f6b13ad 100644 --- a/monitor/metrics_test.go +++ b/monitor/metrics_test.go @@ -17,21 +17,21 @@ func prepareMocks() { getAllThingsForMetrics = func(f func(key, value interface{}) bool) { f("1337_1", things.Thing{}) // Thing can be empty, only name is needed } - getCurrentPrimarySignalForMetrics = func(thingName string) (observations.Observation, bool) { + getCurrentPrimarySignalForMetrics = func(_ string) (observations.Observation, bool) { return observations.Observation{ Result: 4, // RedAmber ReceivedTime: time.Unix(1, 0), PhenomenonTime: time.Unix(0, 0), // 1 second delay }, true } - getCurrentProgramForMetrics = func(thingName string) (observations.Observation, bool) { + getCurrentProgramForMetrics = func(_ string) (observations.Observation, bool) { return observations.Observation{ Result: 10, // Program 10 ReceivedTime: time.Unix(1, 0), PhenomenonTime: time.Unix(0, 0), // 1 second delay }, true } - getCurrentPredictionForMetrics = func(thingName string) (predictions.Prediction, bool) { + getCurrentPredictionForMetrics = func(_ string) (predictions.Prediction, bool) { return predictions.Prediction{ ThingName: "1337_1", Now: []byte{1, 1, 1, 1, 1, 3, 3, 3, 3, 3}, @@ -43,15 +43,15 @@ func prepareMocks() { ReferenceTime: time.Unix(0, 0), }, true } - getLastPredictionTimeForMetrics = func(thingName string) (time.Time, bool) { + getLastPredictionTimeForMetrics = func(_ string) (time.Time, bool) { return time.Unix(0, 0), true } - getObservationsReceivedByTopic = func(callback func(dsType string, count uint64)) { - callback("primary_signal", 1) - callback("signal_program", 1) - callback("cycle_second", 1) - callback("detector_bike", 0) - callback("detector_car", 0) + getObservationsReceivedByTopic = func(f func(k, v interface{}) bool) { + f("primary_signal", 1) + f("signal_program", 1) + f("cycle_second", 1) + f("detector_bike", 0) + f("detector_car", 0) } getObservationsReceived = func() uint64 { return 1 diff --git a/observations/client.go b/observations/client.go index cc663d1..5564bae 100644 --- a/observations/client.go +++ b/observations/client.go @@ -21,10 +21,7 @@ import ( const observationQoS = 1 // Received messages by their topic. -var ObservationsReceivedByTopic = make(map[string]uint64) - -// The lock for the map. -var ObservationsReceivedByTopicLock = &sync.RWMutex{} +var ObservationsReceivedByTopic = &sync.Map{} // The number of processed messages, for logging purposes. var ObservationsReceived uint64 = 0 @@ -49,11 +46,12 @@ func CheckReceivedMessagesPeriodically() { panic("No messages received in the last 60 seconds") } log.Info.Printf("Received %d observations in the last 60 seconds. (%d processed, %d canceled)", dReceived, dProcessed, dCanceled) - ObservationsReceivedByTopicLock.RLock() - for dsType, count := range ObservationsReceivedByTopic { + ObservationsReceivedByTopic.Range(func(k, v interface{}) bool { + count := k.(uint64) + dsType := v.(string) log.Info.Printf(" - Received %d observations for `%s`.", count, dsType) - } - ObservationsReceivedByTopicLock.RUnlock() + return true + }) } } @@ -72,9 +70,9 @@ func processMessage(msg mqtt.Message) { } // Increment the number of received messages. - ObservationsReceivedByTopicLock.Lock() - ObservationsReceivedByTopic[dsType.(string)]++ - ObservationsReceivedByTopicLock.Unlock() + val, _ := ObservationsReceivedByTopic.LoadOrStore(dsType.(string), uint64(1)) + ptr := val.(*int64) + atomic.AddInt64(ptr, 1) var observation Observation if err := json.Unmarshal(msg.Payload(), &observation); err != nil {