diff --git a/CHANGELOG.md b/CHANGELOG.md index fd067bb7..d859ca28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,17 @@ Starting from v2.2.5, all notable changes to this project will be documented in this file. +## v3.0.5 + +### New Features + +- Update network time at 00:00:00 every day to avoid time drift. + +### Bug Fixes + +- Alleviated the problem of frequent jitter in sampling rate. +- Fixed the time offset of up to several hours caused by external compensation of timestamp. + ## v3.0.4 ### Bug Fixes diff --git a/VERSION b/VERSION index 238df4b4..29b317f0 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v3.0.4 +v3.0.5 diff --git a/api/v1/history/sac.go b/api/v1/history/sac.go index 2c5645e2..bfde743c 100644 --- a/api/v1/history/sac.go +++ b/api/v1/history/sac.go @@ -20,17 +20,19 @@ func (h *History) getSACBytes(data []explorer.ExplorerData, legacyMode bool, sta var channelBuffer []int32 for index, record := range data { - // Make sure timestamp is continuous - if math.Abs(float64(record.Timestamp-startTimestamp-int64(index*1000))) != 0 { - return "", nil, fmt.Errorf("timestamp is not continuous") + if math.Abs(float64(record.Timestamp-startTimestamp-int64(index*1000))) >= explorer.EXPLORER_ALLOWED_JITTER_MS { + return "", nil, fmt.Errorf( + "timestamp is not within allowed jitter %d ms, expected %d, got %d", + explorer.EXPLORER_ALLOWED_JITTER_MS, + startTimestamp+int64(index*1000), + record.Timestamp, + ) } - if !legacyMode { - // Make sure sample rate is the same - if record.SampleRate != startSampleRate { - return "", nil, fmt.Errorf("sample rate is not the same") - } + // Make sure sample rate is the same + if record.SampleRate != startSampleRate { + return "", nil, fmt.Errorf("sample rate is not the same, expected %d, got %d", startSampleRate, record.SampleRate) } switch channelCode { diff --git a/api/v1/station/explorer.go b/api/v1/station/explorer.go index 0a997160..f8a10ae1 100644 --- a/api/v1/station/explorer.go +++ b/api/v1/station/explorer.go @@ -5,14 +5,14 @@ import ( "github.com/anyshake/observer/utils/timesource" ) -func (e *explorerInfo) get(timeSource timesource.Source, explorerDeps *explorer.ExplorerDependency) error { +func (e *explorerInfo) get(timeSource *timesource.Source, explorerDeps *explorer.ExplorerDependency) error { e.DeviceId = explorerDeps.Config.DeviceId e.Elevation = explorerDeps.Config.Elevation e.Errors = explorerDeps.Health.Errors e.Received = explorerDeps.Health.Received e.SampleRate = explorerDeps.Health.SampleRate - currentTime, err := timeSource.GetTime() + currentTime, err := timeSource.Get() if err != nil { return err } diff --git a/api/v1/station/os.go b/api/v1/station/os.go index 3f7a666a..9a97e786 100644 --- a/api/v1/station/os.go +++ b/api/v1/station/os.go @@ -9,7 +9,7 @@ import ( "github.com/wille/osutil" ) -func (o *osInfo) get(timeSource timesource.Source) error { +func (o *osInfo) get(timeSource *timesource.Source) error { hostname, err := os.Hostname() if err != nil { return err @@ -20,7 +20,7 @@ func (o *osInfo) get(timeSource timesource.Source) error { return err } - timestamp, err := timeSource.GetTime() + timestamp, err := timeSource.Get() if err != nil { return err } diff --git a/build/assets/config.json b/build/assets/config.json index 9fcf9a59..833cdb32 100644 --- a/build/assets/config.json +++ b/build/assets/config.json @@ -14,7 +14,7 @@ "explorer_settings": { "dsn": "transport:///dev/ttyUSB0?baudrate=115200", "engine": "serial", - "legacy": false + "legacy": true }, "sensor_settings": { "frequency": 4.5, diff --git a/cleaners/types.go b/cleaners/types.go index 2561bc73..02183bd6 100644 --- a/cleaners/types.go +++ b/cleaners/types.go @@ -11,7 +11,7 @@ type Options struct { Config *config.Config Database *gorm.DB Dependency *dig.Container - TimeSource timesource.Source + TimeSource *timesource.Source } type CleanerTask interface { diff --git a/cmd/main.go b/cmd/main.go index d56f29a9..6418d559 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -22,12 +22,12 @@ import ( service_archiver "github.com/anyshake/observer/services/archiver" service_miniseed "github.com/anyshake/observer/services/miniseed" + service_timesync "github.com/anyshake/observer/services/timesync" service_watchdog "github.com/anyshake/observer/services/watchdog" "github.com/anyshake/observer/startups" startup_explorer "github.com/anyshake/observer/startups/explorer" "github.com/anyshake/observer/utils/logger" "github.com/anyshake/observer/utils/timesource" - "github.com/beevik/ntp" "github.com/common-nighthawk/go-figure" ) @@ -95,13 +95,10 @@ func main() { // Create time source with NTP server logger.GetLogger(main).Infof("querying NTP server at %s:%d", conf.NtpClient.Host, conf.NtpClient.Port) - res, err := ntp.QueryWithOptions(conf.NtpClient.Host, ntp.QueryOptions{ - Port: conf.NtpClient.Port, Timeout: 10 * time.Second, - }) + timeSource, err := timesource.New(conf.NtpClient.Host, conf.NtpClient.Port, 10*time.Second) if err != nil { logger.GetLogger(main).Fatalln(err) } - timeSource := timesource.New(time.Now(), res.Time) logger.GetLogger(main).Info("time source has been created") // Connect to database @@ -138,7 +135,7 @@ func main() { Config: &conf, Database: databaseConn, Dependency: depsContainer, - TimeSource: timeSource, + TimeSource: &timeSource, } runCleanerTasks := func() { for _, t := range cleanerTasks { @@ -156,7 +153,7 @@ func main() { startupOptions := &startups.Options{ Config: &conf, Database: databaseConn, - TimeSource: timeSource, + TimeSource: &timeSource, } for _, t := range startupTasks { taskName := t.GetTaskName() @@ -179,12 +176,13 @@ func main() { &service_watchdog.WatchdogService{}, &service_archiver.ArchiverService{}, &service_miniseed.MiniSeedService{}, + &service_timesync.TimeSyncService{}, } serviceOptions := &services.Options{ Config: &conf, Database: databaseConn, Dependency: depsContainer, - TimeSource: timeSource, + TimeSource: &timeSource, CancelToken: cancelToken, } var waitGroup sync.WaitGroup diff --git a/drivers/explorer/impl.go b/drivers/explorer/impl.go index 5bedf56a..e6a891e2 100644 --- a/drivers/explorer/impl.go +++ b/drivers/explorer/impl.go @@ -218,27 +218,30 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) { } }() + // reference: https://stackoverflow.com/a/51424566 + // Calculate the duration to the next whole second to allivate the drift + calcDuration := func(currentTime time.Time, duration time.Duration) time.Duration { + return currentTime.Round(duration).Add(duration).Sub(currentTime) + } + // Read data from the FIFO buffer continuously var ( - dataBuffer = []legacyPacket{} - prevTime, _ = deps.FallbackTime.GetTime() - ticker = time.NewTicker(1 * time.Second) + dataBuffer = []legacyPacket{} + ticker = time.NewTimer(calcDuration(time.Now(), time.Second)) ) for { select { case <-deps.CancelToken.Done(): return - case <-ticker.C: + case currentTick := <-ticker.C: if len(dataBuffer) > 0 { - deps.Health.UpdatedAt = time.Now() - deps.Health.Received++ - - // Fix jitter in the timestamp - t, _ := deps.FallbackTime.GetTime() - if time.Duration(math.Abs(float64(t.Sub(prevTime).Milliseconds()))) <= EXPLORER_GENERAL_JITTER { - t = deps.FallbackTime.Fix(t, prevTime, time.Second) + currentTime, err := deps.FallbackTime.Get() + if err != nil { + continue } - prevTime = t + + deps.Health.UpdatedAt = currentTime + deps.Health.Received++ var ( z_axis_count []int32 @@ -258,13 +261,14 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) { Z_Axis: z_axis_count, E_Axis: e_axis_count, N_Axis: n_axis_count, - Timestamp: t.UTC().UnixMilli(), + Timestamp: currentTime.UTC().UnixMilli(), } deps.messageBus.Publish("explorer", &finalPacket) - dataBuffer = []legacyPacket{} + + ticker.Reset(calcDuration(currentTick, time.Second)) } - default: + case <-time.After(2 * time.Millisecond): for { dat, err := fifoBuffer.Read(legacy_packet_frame_header, len(legacy_packet_frame_header)+e.legacyPacket.length()) if err != nil { @@ -284,8 +288,6 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) { } func (e *ExplorerDriverImpl) handleReadMainlinePacket(deps *ExplorerDependency) { - prevTime, _ := deps.FallbackTime.GetTime() - for { select { case <-deps.CancelToken.Done(): @@ -344,27 +346,27 @@ func (e *ExplorerDriverImpl) handleReadMainlinePacket(deps *ExplorerDependency) continue } + // Get current timestamp + if e.mainlinePacketHeader.timestamp == 0 { + t, err := deps.FallbackTime.Get() + if err != nil { + continue + } + e.mainlinePacketHeader.timestamp = t.UnixMilli() + } + // Publish the data to the message bus deps.Health.SampleRate = sampleRate finalPacket := ExplorerData{ SampleRate: sampleRate, + Timestamp: e.mainlinePacketHeader.timestamp, Z_Axis: e.mainlinePacketChannel.z_axis, E_Axis: e.mainlinePacketChannel.e_axis, N_Axis: e.mainlinePacketChannel.n_axis, } - if e.mainlinePacketHeader.timestamp != 0 { - finalPacket.Timestamp = e.mainlinePacketHeader.timestamp - } else { - // Fix jitter in the timestamp - t, _ := deps.FallbackTime.GetTime() - if time.Duration(math.Abs(float64(t.Sub(prevTime).Milliseconds()))) <= EXPLORER_GENERAL_JITTER { - t = deps.FallbackTime.Fix(t, prevTime, time.Second) - } - finalPacket.Timestamp = t.UnixMilli() - prevTime = t - } deps.messageBus.Publish("explorer", &finalPacket) - deps.Health.UpdatedAt = time.Now() + + deps.Health.UpdatedAt = time.UnixMilli(e.mainlinePacketHeader.timestamp) deps.Health.Received++ } } @@ -385,8 +387,12 @@ func (e *ExplorerDriverImpl) IsAvailable(deps *ExplorerDependency) bool { } func (e *ExplorerDriverImpl) Init(deps *ExplorerDependency) error { - deps.Health.StartTime, _ = deps.FallbackTime.GetTime() + currentTime, err := deps.FallbackTime.Get() + if err != nil { + return err + } + deps.Health.StartTime = currentTime deps.subscribers = cmap.New[ExplorerEventHandler]() deps.messageBus = messagebus.New(1024) deps.Config.DeviceId = math.MaxUint32 diff --git a/drivers/explorer/types.go b/drivers/explorer/types.go index 71ac2f05..dde4cf10 100644 --- a/drivers/explorer/types.go +++ b/drivers/explorer/types.go @@ -10,7 +10,7 @@ import ( messagebus "github.com/vardius/message-bus" ) -const EXPLORER_GENERAL_JITTER = 10 * time.Millisecond +const EXPLORER_ALLOWED_JITTER_MS = 2 const ( EXPLORER_CHANNEL_CODE_Z = "Z" @@ -36,9 +36,9 @@ type ExplorerConfig struct { } type ExplorerDependency struct { + FallbackTime *timesource.Source Health ExplorerHealth Config ExplorerConfig - FallbackTime timesource.Source CancelToken context.Context Transport transport.TransportDriver messageBus messagebus.MessageBus diff --git a/frontend/src/.env b/frontend/src/.env index f3d85b9a..eaf4f8f1 100644 --- a/frontend/src/.env +++ b/frontend/src/.env @@ -1,2 +1,2 @@ -REACT_APP_VERSION=v3.0.4 -REACT_APP_RELEASE=a60171a8-20240808222943 +REACT_APP_VERSION=v3.0.5 +REACT_APP_RELEASE=0e50a2dc-20240822174555 diff --git a/frontend/src/package-lock.json b/frontend/src/package-lock.json index 6eacecc6..e1a42c2e 100644 --- a/frontend/src/package-lock.json +++ b/frontend/src/package-lock.json @@ -6601,9 +6601,9 @@ } }, "node_modules/caniuse-lite": { - "version": "1.0.30001589", - "resolved": "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001589.tgz", - "integrity": "sha512-vNQWS6kI+q6sBlHbh71IIeC+sRwK2N3EDySc/updIGhIee2x5z00J4c1242/5/d6EpEMdOnk/m+6tuk4/tcsqg==", + "version": "1.0.30001651", + "resolved": "https://registry.npmmirror.com/caniuse-lite/-/caniuse-lite-1.0.30001651.tgz", + "integrity": "sha512-9Cf+Xv1jJNe1xPZLGuUXLNkE1BoDkqRqYyFJ9TDYSqhduqA4hu4oR9HluGoWYQC/aj8WHjsGVV+bwkh0+tegRg==", "dev": true, "funding": [ { diff --git a/services/miniseed/start.go b/services/miniseed/start.go index 721a6674..b1f57c86 100644 --- a/services/miniseed/start.go +++ b/services/miniseed/start.go @@ -41,7 +41,7 @@ func (m *MiniSeedService) Start(options *services.Options, waitGroup *sync.WaitG m.writeBufferCountDown = MINISEED_WRITE_INTERVAL // Get sequence number if file exists - currentTime, _ := options.TimeSource.GetTime() + currentTime, _ := options.TimeSource.Get() for _, channelCode := range []string{ explorer.EXPLORER_CHANNEL_CODE_Z, explorer.EXPLORER_CHANNEL_CODE_E, diff --git a/services/miniseed/types.go b/services/miniseed/types.go index 8fef9ac1..0106fc2e 100644 --- a/services/miniseed/types.go +++ b/services/miniseed/types.go @@ -8,7 +8,7 @@ import ( const ( MINISEED_BIT_ORDER = mseedio.MSBFIRST MINISEED_ENCODE_TYPE = mseedio.STEIM2 - MINISEED_WRITE_INTERVAL = 5 + MINISEED_WRITE_INTERVAL = 3 MINISEED_CLEANUP_INTERVAL = 60 ) diff --git a/services/miniseed/write.go b/services/miniseed/write.go index 7a9f056a..88cafa95 100644 --- a/services/miniseed/write.go +++ b/services/miniseed/write.go @@ -17,15 +17,18 @@ func (m *MiniSeedService) handleWrite() error { for i := 1; i < len(m.miniseedBuffer); i++ { // Make sure timestamp is continuous - if math.Abs(float64(m.miniseedBuffer[i].Timestamp-startTimestamp-int64(i*1000))) != 0 { - return fmt.Errorf("timestamp is not continuous, expected %d, got %d", startTimestamp+int64(i*1000), m.miniseedBuffer[i].Timestamp) + if math.Abs(float64(m.miniseedBuffer[i].Timestamp-startTimestamp-int64(i*1000))) >= explorer.EXPLORER_ALLOWED_JITTER_MS { + return fmt.Errorf( + "timestamp is not within allowed jitter %d ms, expected %d, got %d", + explorer.EXPLORER_ALLOWED_JITTER_MS, + startTimestamp+int64(i*1000), + m.miniseedBuffer[i].Timestamp, + ) } - if !m.legacyMode { - // Make sure sample rate is the same - if m.miniseedBuffer[i].SampleRate != startSampleRate { - return fmt.Errorf("sample rate is not the same, expected %d, got %d", startSampleRate, m.miniseedBuffer[i].SampleRate) - } + // Make sure sample rate is the same + if m.miniseedBuffer[i].SampleRate != startSampleRate { + return fmt.Errorf("sample rate is not the same, expected %d, got %d", startSampleRate, m.miniseedBuffer[i].SampleRate) } } diff --git a/services/timesync/name.go b/services/timesync/name.go new file mode 100644 index 00000000..f539c3c5 --- /dev/null +++ b/services/timesync/name.go @@ -0,0 +1,5 @@ +package timesync + +func (w *TimeSyncService) GetServiceName() string { + return "timesync" +} diff --git a/services/timesync/start.go b/services/timesync/start.go new file mode 100644 index 00000000..70ed0d45 --- /dev/null +++ b/services/timesync/start.go @@ -0,0 +1,39 @@ +package timesync + +import ( + "sync" + "time" + + "github.com/anyshake/observer/services" + "github.com/anyshake/observer/utils/logger" +) + +func (s *TimeSyncService) Start(options *services.Options, waitGroup *sync.WaitGroup) { + defer waitGroup.Done() + logger.GetLogger(s.GetServiceName()).Infoln("service has been started") + + ticker := time.NewTicker(time.Second) + + for { + select { + case <-options.CancelToken.Done(): + logger.GetLogger(s.GetServiceName()).Infoln("service has been stopped") + return + case <-ticker.C: + currentTime, err := options.TimeSource.Get() + if err != nil { + logger.GetLogger(s.GetServiceName()).Errorln(err) + continue + } + + // Update time source at 00:00:00 UTC every day + if currentTime.Unix()%86400 == 0 { + if err = options.TimeSource.Update(); err != nil { + logger.GetLogger(s.GetServiceName()).Errorln(err) + } else { + logger.GetLogger(s.GetServiceName()).Info("time source has been updated") + } + } + } + } +} diff --git a/services/timesync/types.go b/services/timesync/types.go new file mode 100644 index 00000000..d79104ec --- /dev/null +++ b/services/timesync/types.go @@ -0,0 +1,3 @@ +package timesync + +type TimeSyncService struct{} diff --git a/services/types.go b/services/types.go index 7e98122c..8e76b097 100644 --- a/services/types.go +++ b/services/types.go @@ -14,7 +14,7 @@ type Options struct { Config *config.Config Dependency *dig.Container Database *gorm.DB - TimeSource timesource.Source + TimeSource *timesource.Source CancelToken context.Context } diff --git a/startups/types.go b/startups/types.go index 4013d222..f444366a 100644 --- a/startups/types.go +++ b/startups/types.go @@ -10,7 +10,7 @@ import ( type Options struct { Config *config.Config Database *gorm.DB - TimeSource timesource.Source + TimeSource *timesource.Source } type StartupTask interface { diff --git a/utils/timesource/time.go b/utils/timesource/get.go similarity index 76% rename from utils/timesource/time.go rename to utils/timesource/get.go index de84c7e1..ade31ccd 100644 --- a/utils/timesource/time.go +++ b/utils/timesource/get.go @@ -5,7 +5,10 @@ import ( "time" ) -func (g *Source) GetTime() (time.Time, error) { +func (g *Source) Get() (time.Time, error) { + g.rwMutex.RLock() + defer g.rwMutex.RUnlock() + if g.LocalBaseTime.IsZero() || g.ReferenceTime.IsZero() { return time.Now().UTC(), errors.New("empty BaseTime or RefTime is not allowed") } diff --git a/utils/timesource/jitter.go b/utils/timesource/jitter.go deleted file mode 100644 index 4e060666..00000000 --- a/utils/timesource/jitter.go +++ /dev/null @@ -1,12 +0,0 @@ -package timesource - -import ( - "time" -) - -func (s *Source) Fix(currentTime, prevTime time.Time, span time.Duration) time.Time { - expectedTime := prevTime.Add(span) - discrepancy := expectedTime.Sub(currentTime) - - return currentTime.Add(discrepancy) -} diff --git a/utils/timesource/new.go b/utils/timesource/new.go index 148bb59b..561c69fb 100644 --- a/utils/timesource/new.go +++ b/utils/timesource/new.go @@ -1,10 +1,24 @@ package timesource -import "time" +import ( + "time" -func New(baseTime, refTime time.Time) Source { - return Source{ - LocalBaseTime: baseTime, - ReferenceTime: refTime, + "github.com/beevik/ntp" +) + +func New(ntpHost string, ntpPort int, queryTimeout time.Duration) (Source, error) { + res, err := ntp.QueryWithOptions(ntpHost, ntp.QueryOptions{ + Port: ntpPort, Timeout: queryTimeout, + }) + if err != nil { + return Source{}, err } + + return Source{ + ntpHost: ntpHost, + ntpPort: ntpPort, + queryTimeout: queryTimeout, + LocalBaseTime: time.Now().UTC(), + ReferenceTime: res.Time, + }, nil } diff --git a/utils/timesource/types.go b/utils/timesource/types.go index 9c2ef221..846d5be0 100644 --- a/utils/timesource/types.go +++ b/utils/timesource/types.go @@ -1,8 +1,15 @@ package timesource -import "time" +import ( + "sync" + "time" +) type Source struct { + rwMutex sync.RWMutex + ntpHost string + ntpPort int + queryTimeout time.Duration LocalBaseTime time.Time ReferenceTime time.Time } diff --git a/utils/timesource/update.go b/utils/timesource/update.go new file mode 100644 index 00000000..9c04d110 --- /dev/null +++ b/utils/timesource/update.go @@ -0,0 +1,23 @@ +package timesource + +import ( + "time" + + "github.com/beevik/ntp" +) + +func (s *Source) Update() error { + s.rwMutex.Lock() + defer s.rwMutex.Unlock() + + res, err := ntp.QueryWithOptions(s.ntpHost, ntp.QueryOptions{ + Port: s.ntpPort, Timeout: s.queryTimeout, + }) + if err != nil { + return err + } + + s.LocalBaseTime = time.Now().UTC() + s.ReferenceTime = res.Time + return nil +}