Skip to content

Commit

Permalink
Update network time at 00:00:00 every day to avoid time drift
Browse files Browse the repository at this point in the history
  • Loading branch information
bclswl0827 committed Aug 22, 2024
1 parent 0e50a2d commit 01429d8
Show file tree
Hide file tree
Showing 25 changed files with 192 additions and 90 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

Starting from v2.2.5, all notable changes to this project will be documented in this file.

## v3.0.5

### New Features

- Update network time at 00:00:00 every day to avoid time drift.

### Bug Fixes

- Alleviated the problem of frequent jitter in sampling rate.
- Fixed the time offset of up to several hours caused by external compensation of timestamp.

## v3.0.4

### Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v3.0.4
v3.0.5
18 changes: 10 additions & 8 deletions api/v1/history/sac.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@ func (h *History) getSACBytes(data []explorer.ExplorerData, legacyMode bool, sta

var channelBuffer []int32
for index, record := range data {

// Make sure timestamp is continuous
if math.Abs(float64(record.Timestamp-startTimestamp-int64(index*1000))) != 0 {
return "", nil, fmt.Errorf("timestamp is not continuous")
if math.Abs(float64(record.Timestamp-startTimestamp-int64(index*1000))) >= explorer.EXPLORER_ALLOWED_JITTER_MS {
return "", nil, fmt.Errorf(
"timestamp is not within allowed jitter %d ms, expected %d, got %d",
explorer.EXPLORER_ALLOWED_JITTER_MS,
startTimestamp+int64(index*1000),
record.Timestamp,
)
}

if !legacyMode {
// Make sure sample rate is the same
if record.SampleRate != startSampleRate {
return "", nil, fmt.Errorf("sample rate is not the same")
}
// Make sure sample rate is the same
if record.SampleRate != startSampleRate {
return "", nil, fmt.Errorf("sample rate is not the same, expected %d, got %d", startSampleRate, record.SampleRate)
}

switch channelCode {
Expand Down
4 changes: 2 additions & 2 deletions api/v1/station/explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (
"github.com/anyshake/observer/utils/timesource"
)

func (e *explorerInfo) get(timeSource timesource.Source, explorerDeps *explorer.ExplorerDependency) error {
func (e *explorerInfo) get(timeSource *timesource.Source, explorerDeps *explorer.ExplorerDependency) error {
e.DeviceId = explorerDeps.Config.DeviceId
e.Elevation = explorerDeps.Config.Elevation
e.Errors = explorerDeps.Health.Errors
e.Received = explorerDeps.Health.Received
e.SampleRate = explorerDeps.Health.SampleRate

currentTime, err := timeSource.GetTime()
currentTime, err := timeSource.Get()
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions api/v1/station/os.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/wille/osutil"
)

func (o *osInfo) get(timeSource timesource.Source) error {
func (o *osInfo) get(timeSource *timesource.Source) error {
hostname, err := os.Hostname()
if err != nil {
return err
Expand All @@ -20,7 +20,7 @@ func (o *osInfo) get(timeSource timesource.Source) error {
return err
}

timestamp, err := timeSource.GetTime()
timestamp, err := timeSource.Get()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion build/assets/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"explorer_settings": {
"dsn": "transport:///dev/ttyUSB0?baudrate=115200",
"engine": "serial",
"legacy": false
"legacy": true
},
"sensor_settings": {
"frequency": 4.5,
Expand Down
2 changes: 1 addition & 1 deletion cleaners/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Options struct {
Config *config.Config
Database *gorm.DB
Dependency *dig.Container
TimeSource timesource.Source
TimeSource *timesource.Source
}

type CleanerTask interface {
Expand Down
14 changes: 6 additions & 8 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import (
service_archiver "github.com/anyshake/observer/services/archiver"

service_miniseed "github.com/anyshake/observer/services/miniseed"
service_timesync "github.com/anyshake/observer/services/timesync"
service_watchdog "github.com/anyshake/observer/services/watchdog"
"github.com/anyshake/observer/startups"
startup_explorer "github.com/anyshake/observer/startups/explorer"
"github.com/anyshake/observer/utils/logger"
"github.com/anyshake/observer/utils/timesource"
"github.com/beevik/ntp"
"github.com/common-nighthawk/go-figure"
)

Expand Down Expand Up @@ -95,13 +95,10 @@ func main() {

// Create time source with NTP server
logger.GetLogger(main).Infof("querying NTP server at %s:%d", conf.NtpClient.Host, conf.NtpClient.Port)
res, err := ntp.QueryWithOptions(conf.NtpClient.Host, ntp.QueryOptions{
Port: conf.NtpClient.Port, Timeout: 10 * time.Second,
})
timeSource, err := timesource.New(conf.NtpClient.Host, conf.NtpClient.Port, 10*time.Second)
if err != nil {
logger.GetLogger(main).Fatalln(err)
}
timeSource := timesource.New(time.Now(), res.Time)
logger.GetLogger(main).Info("time source has been created")

// Connect to database
Expand Down Expand Up @@ -138,7 +135,7 @@ func main() {
Config: &conf,
Database: databaseConn,
Dependency: depsContainer,
TimeSource: timeSource,
TimeSource: &timeSource,
}
runCleanerTasks := func() {
for _, t := range cleanerTasks {
Expand All @@ -156,7 +153,7 @@ func main() {
startupOptions := &startups.Options{
Config: &conf,
Database: databaseConn,
TimeSource: timeSource,
TimeSource: &timeSource,
}
for _, t := range startupTasks {
taskName := t.GetTaskName()
Expand All @@ -179,12 +176,13 @@ func main() {
&service_watchdog.WatchdogService{},
&service_archiver.ArchiverService{},
&service_miniseed.MiniSeedService{},
&service_timesync.TimeSyncService{},
}
serviceOptions := &services.Options{
Config: &conf,
Database: databaseConn,
Dependency: depsContainer,
TimeSource: timeSource,
TimeSource: &timeSource,
CancelToken: cancelToken,
}
var waitGroup sync.WaitGroup
Expand Down
66 changes: 36 additions & 30 deletions drivers/explorer/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,27 +218,30 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) {
}
}()

// reference: https://stackoverflow.com/a/51424566
// Calculate the duration to the next whole second to allivate the drift
calcDuration := func(currentTime time.Time, duration time.Duration) time.Duration {
return currentTime.Round(duration).Add(duration).Sub(currentTime)
}

// Read data from the FIFO buffer continuously
var (
dataBuffer = []legacyPacket{}
prevTime, _ = deps.FallbackTime.GetTime()
ticker = time.NewTicker(1 * time.Second)
dataBuffer = []legacyPacket{}
ticker = time.NewTimer(calcDuration(time.Now(), time.Second))
)
for {
select {
case <-deps.CancelToken.Done():
return
case <-ticker.C:
case currentTick := <-ticker.C:
if len(dataBuffer) > 0 {
deps.Health.UpdatedAt = time.Now()
deps.Health.Received++

// Fix jitter in the timestamp
t, _ := deps.FallbackTime.GetTime()
if time.Duration(math.Abs(float64(t.Sub(prevTime).Milliseconds()))) <= EXPLORER_GENERAL_JITTER {
t = deps.FallbackTime.Fix(t, prevTime, time.Second)
currentTime, err := deps.FallbackTime.Get()
if err != nil {
continue
}
prevTime = t

deps.Health.UpdatedAt = currentTime
deps.Health.Received++

var (
z_axis_count []int32
Expand All @@ -258,13 +261,14 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) {
Z_Axis: z_axis_count,
E_Axis: e_axis_count,
N_Axis: n_axis_count,
Timestamp: t.UTC().UnixMilli(),
Timestamp: currentTime.UTC().UnixMilli(),
}
deps.messageBus.Publish("explorer", &finalPacket)

dataBuffer = []legacyPacket{}

ticker.Reset(calcDuration(currentTick, time.Second))
}
default:
case <-time.After(2 * time.Millisecond):
for {
dat, err := fifoBuffer.Read(legacy_packet_frame_header, len(legacy_packet_frame_header)+e.legacyPacket.length())
if err != nil {
Expand All @@ -284,8 +288,6 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) {
}

func (e *ExplorerDriverImpl) handleReadMainlinePacket(deps *ExplorerDependency) {
prevTime, _ := deps.FallbackTime.GetTime()

for {
select {
case <-deps.CancelToken.Done():
Expand Down Expand Up @@ -344,27 +346,27 @@ func (e *ExplorerDriverImpl) handleReadMainlinePacket(deps *ExplorerDependency)
continue
}

// Get current timestamp
if e.mainlinePacketHeader.timestamp == 0 {
t, err := deps.FallbackTime.Get()
if err != nil {
continue
}
e.mainlinePacketHeader.timestamp = t.UnixMilli()
}

// Publish the data to the message bus
deps.Health.SampleRate = sampleRate
finalPacket := ExplorerData{
SampleRate: sampleRate,
Timestamp: e.mainlinePacketHeader.timestamp,
Z_Axis: e.mainlinePacketChannel.z_axis,
E_Axis: e.mainlinePacketChannel.e_axis,
N_Axis: e.mainlinePacketChannel.n_axis,
}
if e.mainlinePacketHeader.timestamp != 0 {
finalPacket.Timestamp = e.mainlinePacketHeader.timestamp
} else {
// Fix jitter in the timestamp
t, _ := deps.FallbackTime.GetTime()
if time.Duration(math.Abs(float64(t.Sub(prevTime).Milliseconds()))) <= EXPLORER_GENERAL_JITTER {
t = deps.FallbackTime.Fix(t, prevTime, time.Second)
}
finalPacket.Timestamp = t.UnixMilli()
prevTime = t
}
deps.messageBus.Publish("explorer", &finalPacket)
deps.Health.UpdatedAt = time.Now()

deps.Health.UpdatedAt = time.UnixMilli(e.mainlinePacketHeader.timestamp)
deps.Health.Received++
}
}
Expand All @@ -385,8 +387,12 @@ func (e *ExplorerDriverImpl) IsAvailable(deps *ExplorerDependency) bool {
}

func (e *ExplorerDriverImpl) Init(deps *ExplorerDependency) error {
deps.Health.StartTime, _ = deps.FallbackTime.GetTime()
currentTime, err := deps.FallbackTime.Get()
if err != nil {
return err
}

deps.Health.StartTime = currentTime
deps.subscribers = cmap.New[ExplorerEventHandler]()
deps.messageBus = messagebus.New(1024)
deps.Config.DeviceId = math.MaxUint32
Expand Down
4 changes: 2 additions & 2 deletions drivers/explorer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
messagebus "github.com/vardius/message-bus"
)

const EXPLORER_GENERAL_JITTER = 10 * time.Millisecond
const EXPLORER_ALLOWED_JITTER_MS = 2

const (
EXPLORER_CHANNEL_CODE_Z = "Z"
Expand All @@ -36,9 +36,9 @@ type ExplorerConfig struct {
}

type ExplorerDependency struct {
FallbackTime *timesource.Source
Health ExplorerHealth
Config ExplorerConfig
FallbackTime timesource.Source
CancelToken context.Context
Transport transport.TransportDriver
messageBus messagebus.MessageBus
Expand Down
4 changes: 2 additions & 2 deletions frontend/src/.env
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
REACT_APP_VERSION=v3.0.4
REACT_APP_RELEASE=a60171a8-20240808222943
REACT_APP_VERSION=v3.0.5
REACT_APP_RELEASE=0e50a2dc-20240822174555
6 changes: 3 additions & 3 deletions frontend/src/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion services/miniseed/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (m *MiniSeedService) Start(options *services.Options, waitGroup *sync.WaitG
m.writeBufferCountDown = MINISEED_WRITE_INTERVAL

// Get sequence number if file exists
currentTime, _ := options.TimeSource.GetTime()
currentTime, _ := options.TimeSource.Get()
for _, channelCode := range []string{
explorer.EXPLORER_CHANNEL_CODE_Z,
explorer.EXPLORER_CHANNEL_CODE_E,
Expand Down
2 changes: 1 addition & 1 deletion services/miniseed/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
const (
MINISEED_BIT_ORDER = mseedio.MSBFIRST
MINISEED_ENCODE_TYPE = mseedio.STEIM2
MINISEED_WRITE_INTERVAL = 5
MINISEED_WRITE_INTERVAL = 3
MINISEED_CLEANUP_INTERVAL = 60
)

Expand Down
17 changes: 10 additions & 7 deletions services/miniseed/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ func (m *MiniSeedService) handleWrite() error {

for i := 1; i < len(m.miniseedBuffer); i++ {
// Make sure timestamp is continuous
if math.Abs(float64(m.miniseedBuffer[i].Timestamp-startTimestamp-int64(i*1000))) != 0 {
return fmt.Errorf("timestamp is not continuous, expected %d, got %d", startTimestamp+int64(i*1000), m.miniseedBuffer[i].Timestamp)
if math.Abs(float64(m.miniseedBuffer[i].Timestamp-startTimestamp-int64(i*1000))) >= explorer.EXPLORER_ALLOWED_JITTER_MS {
return fmt.Errorf(
"timestamp is not within allowed jitter %d ms, expected %d, got %d",
explorer.EXPLORER_ALLOWED_JITTER_MS,
startTimestamp+int64(i*1000),
m.miniseedBuffer[i].Timestamp,
)
}

if !m.legacyMode {
// Make sure sample rate is the same
if m.miniseedBuffer[i].SampleRate != startSampleRate {
return fmt.Errorf("sample rate is not the same, expected %d, got %d", startSampleRate, m.miniseedBuffer[i].SampleRate)
}
// Make sure sample rate is the same
if m.miniseedBuffer[i].SampleRate != startSampleRate {
return fmt.Errorf("sample rate is not the same, expected %d, got %d", startSampleRate, m.miniseedBuffer[i].SampleRate)
}
}

Expand Down
5 changes: 5 additions & 0 deletions services/timesync/name.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package timesync

func (w *TimeSyncService) GetServiceName() string {
return "timesync"
}
Loading

0 comments on commit 01429d8

Please sign in to comment.