From 540f1487d9992b96bf443b245aecc8e458dc7d92 Mon Sep 17 00:00:00 2001 From: Chengxun Lee <24319042+bclswl0827@users.noreply.github.com> Date: Sun, 1 Sep 2024 03:46:42 +0800 Subject: [PATCH] Update AnyShake Explorer mainline data protocol --- .github/workflows/release.yml | 18 +- CHANGELOG.md | 18 ++ Makefile | 11 +- VERSION | 2 +- api/v1/history/sac.go | 4 +- cleaners/database/name.go | 2 +- cleaners/explorer/name.go | 2 +- drivers/dao/sqlite_windows_amd64.go | 12 +- drivers/dao/sqlite_windows_arm.go | 19 ++ drivers/dao/sqlite_windows_arm64.go | 4 +- drivers/explorer/impl.go | 456 ++++++++++++++-------------- drivers/explorer/types.go | 9 +- frontend/src/.env | 4 +- services/archiver/event.go | 4 +- services/miniseed/write.go | 6 +- services/seedlink/start.go | 2 +- startups/explorer/execute.go | 9 +- startups/explorer/logger.go | 19 ++ startups/explorer/name.go | 2 +- utils/fifo/peek.go | 38 +++ utils/fifo/read.go | 29 +- utils/fifo/types.go | 2 +- 22 files changed, 380 insertions(+), 292 deletions(-) create mode 100644 drivers/dao/sqlite_windows_arm.go create mode 100644 startups/explorer/logger.go create mode 100644 utils/fifo/peek.go diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 47f7781b..ad1f992b 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -58,11 +58,7 @@ jobs: mkdir -p ./build/release for i in ./build/dist/*; do arch=$(basename $i) - if [ "x$arch" != "xwin32" ] && [ "x$arch" != "xwin64" ]; then - echo "Packaging Linux $arch ..." - filename=./build/release/linux_$arch.zip - zip -rj $filename $i - elif [ "x$arch" == "xwin32" ]; then + if [ "x$arch" == "xwin32" ]; then echo "Packaging Windows 32-bit ..." filename=./build/release/windows_386.zip zip -rj $filename $i @@ -70,6 +66,18 @@ jobs: echo "Packaging Windows 64-bit ..." filename=./build/release/windows_amd64.zip zip -rj $filename $i + elif [ "x$arch" == "xwinarm" ]; then + echo "Packaging Windows arm ..." + filename=./build/release/windows_arm.zip + zip -rj $filename $i + elif [ "x$arch" == "xwinarm64" ]; then + echo "Packaging Windows arm64 ..." + filename=./build/release/windows_arm64.zip + zip -rj $filename $i + else + echo "Packaging Linux $arch ..." + filename=./build/release/linux_$arch.zip + zip -rj $filename $i fi done echo "::set-output name=status::success" diff --git a/CHANGELOG.md b/CHANGELOG.md index a47c7e2c..df857598 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,24 @@ Starting from v2.2.5, all notable changes to this project will be documented in this file. +## v3.2.1 + +### Breaking Changes + +- **Data Protocol**: The AnyShake Explorer mainline data protocol has been updated again. **If you are using AnyShake Explorer implemented by STM32, please rebuild and burn the firmware to the latest version, if you are using AnyShake Explorer implemented by ESP8266, just ignore this message.** + +### New Features + +- Print log messages from AnyShake Explorer driver. +- Improved the stability of the legacy data protocol. +- Support Windows ARM32 architecture. + +### Bug Fixes + +- Fix SQLite "out of memory" issue on Windows release build. +- Fix release names in the GitHub Actions workflow. +- Use `time.NewTimer` instead of `time.After` to avoid memory leak. + ## v3.2.0 ### New Features diff --git a/Makefile b/Makefile index 1da0fae6..511c3c38 100644 --- a/Makefile +++ b/Makefile @@ -25,18 +25,21 @@ $(BUILD_ARCH): @cp -r $(ASSETS_DIR) $(DIST_DIR)/$@ windows: - @echo "Building Windows 32-bit, 64-bit, ARM64 ..." - @mkdir -p $(DIST_DIR)/win32 $(DIST_DIR)/win64 $(DIST_DIR)/winarm - @rm -rf $(DIST_DIR)/win32/* $(DIST_DIR)/win64/* $(DIST_DIR)/winarm/* + @echo "Building Windows 32-bit, 64-bit, arm, arm64 ..." + @mkdir -p $(DIST_DIR)/win32 $(DIST_DIR)/win64 $(DIST_DIR)/winarm $(DIST_DIR)/winarm64 + @rm -rf $(DIST_DIR)/win32/* $(DIST_DIR)/win64/* $(DIST_DIR)/winarm/* $(DIST_DIR)/winarm64/* @CGO_ENABLED=0 GOOS=windows GOARCH=386 go build -ldflags="$(BUILD_FLAGS)" \ $(BUILD_ARGS) -o $(DIST_DIR)/win32/$(BINARY).exe $(SRC_DIR)/*.go @CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -ldflags="$(BUILD_FLAGS)" \ $(BUILD_ARGS) -o $(DIST_DIR)/win64/$(BINARY).exe $(SRC_DIR)/*.go - @CGO_ENABLED=0 GOOS=windows GOARCH=arm64 go build -ldflags="$(BUILD_FLAGS)" \ + @CGO_ENABLED=0 GOOS=windows GOARCH=arm go build -ldflags="$(BUILD_FLAGS)" \ $(BUILD_ARGS) -o $(DIST_DIR)/winarm/$(BINARY).exe $(SRC_DIR)/*.go + @CGO_ENABLED=0 GOOS=windows GOARCH=arm64 go build -ldflags="$(BUILD_FLAGS)" \ + $(BUILD_ARGS) -o $(DIST_DIR)/winarm64/$(BINARY).exe $(SRC_DIR)/*.go @cp -r $(ASSETS_DIR) $(DIST_DIR)/win32 @cp -r $(ASSETS_DIR) $(DIST_DIR)/win64 @cp -r $(ASSETS_DIR) $(DIST_DIR)/winarm + @cp -r $(ASSETS_DIR) $(DIST_DIR)/winarm64 gen: ifeq ($(shell command -v gqlgen 2> /dev/null),) diff --git a/VERSION b/VERSION index 6d260c3a..040943e5 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v3.2.0 +v3.2.1 diff --git a/api/v1/history/sac.go b/api/v1/history/sac.go index dd20319b..ef246140 100644 --- a/api/v1/history/sac.go +++ b/api/v1/history/sac.go @@ -21,11 +21,11 @@ func (h *History) getSACBytes(data []explorer.ExplorerData, stationCode, network var channelBuffer []int32 for index, record := range data { // Make sure timestamp is continuous - if math.Abs(float64(record.Timestamp-startTimestamp-int64(index*1000))) >= explorer.EXPLORER_ALLOWED_JITTER_MS { + if math.Abs(float64(record.Timestamp-startTimestamp-int64(index*int(time.Second.Milliseconds())))) >= explorer.EXPLORER_ALLOWED_JITTER_MS { return "", nil, fmt.Errorf( "timestamp is not within allowed jitter %d ms, expected %d, got %d", explorer.EXPLORER_ALLOWED_JITTER_MS, - startTimestamp+int64(index*1000), + startTimestamp+int64(index*int(time.Second.Milliseconds())), record.Timestamp, ) } diff --git a/cleaners/database/name.go b/cleaners/database/name.go index 48adfc1b..5d762408 100644 --- a/cleaners/database/name.go +++ b/cleaners/database/name.go @@ -1,5 +1,5 @@ package database func (t *DatabaseCleanerTask) GetTaskName() string { - return "database" + return "cleaner_database" } diff --git a/cleaners/explorer/name.go b/cleaners/explorer/name.go index 06bca43a..e8d1297f 100644 --- a/cleaners/explorer/name.go +++ b/cleaners/explorer/name.go @@ -1,5 +1,5 @@ package explorer func (t *ExplorerCleanerTask) GetTaskName() string { - return "explorer" + return "cleaner_explorer" } diff --git a/drivers/dao/sqlite_windows_amd64.go b/drivers/dao/sqlite_windows_amd64.go index 2aa89403..bc64c2e0 100644 --- a/drivers/dao/sqlite_windows_amd64.go +++ b/drivers/dao/sqlite_windows_amd64.go @@ -1,11 +1,11 @@ package dao import ( - "fmt" - "runtime" "time" + "github.com/bclswl0827/sqlite" "gorm.io/gorm" + "gorm.io/gorm/logger" ) type _SQLite struct{} @@ -15,5 +15,11 @@ func (s *_SQLite) match(engine string) bool { } func (s *_SQLite) open(host string, port int, username, password, database string, timeout time.Duration) (*gorm.DB, error) { - return nil, fmt.Errorf("current platform %s/%s does not support SQLite", runtime.GOOS, runtime.GOARCH) + db, err := gorm.Open(sqlite.Open(database), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Silent), + }) + sqlDB, _ := db.DB() + sqlDB.SetMaxOpenConns(1) + + return db, err } diff --git a/drivers/dao/sqlite_windows_arm.go b/drivers/dao/sqlite_windows_arm.go new file mode 100644 index 00000000..2aa89403 --- /dev/null +++ b/drivers/dao/sqlite_windows_arm.go @@ -0,0 +1,19 @@ +package dao + +import ( + "fmt" + "runtime" + "time" + + "gorm.io/gorm" +) + +type _SQLite struct{} + +func (s *_SQLite) match(engine string) bool { + return engine == "sqlite3" || engine == "sqlite" +} + +func (s *_SQLite) open(host string, port int, username, password, database string, timeout time.Duration) (*gorm.DB, error) { + return nil, fmt.Errorf("current platform %s/%s does not support SQLite", runtime.GOOS, runtime.GOARCH) +} diff --git a/drivers/dao/sqlite_windows_arm64.go b/drivers/dao/sqlite_windows_arm64.go index ef4291ad..bc64c2e0 100644 --- a/drivers/dao/sqlite_windows_arm64.go +++ b/drivers/dao/sqlite_windows_arm64.go @@ -1,7 +1,6 @@ package dao import ( - "fmt" "time" "github.com/bclswl0827/sqlite" @@ -16,8 +15,7 @@ func (s *_SQLite) match(engine string) bool { } func (s *_SQLite) open(host string, port int, username, password, database string, timeout time.Duration) (*gorm.DB, error) { - dsn := fmt.Sprintf("file://%s?cache=shared&mode=rwc&_pragma=busy_timeout(%d)", database, int(timeout.Seconds())) - db, err := gorm.Open(sqlite.Open(dsn), &gorm.Config{ + db, err := gorm.Open(sqlite.Open(database), &gorm.Config{ Logger: logger.Default.LogMode(logger.Silent), }) sqlDB, _ := db.DB() diff --git a/drivers/explorer/impl.go b/drivers/explorer/impl.go index 0381b4ca..14b08d3f 100644 --- a/drivers/explorer/impl.go +++ b/drivers/explorer/impl.go @@ -15,22 +15,21 @@ import ( ) var ( - legacy_packet_frame_header = []byte{0xFC, 0x1B} - mainline_packet_frame_header = []byte{0xF1, 0xD9} - mainline_packet_frame_tail = []byte{0xD9, 0xF1} + LEGACY_PACKET_FRAME_HEADER = []byte{0xFC, 0x1B} + MAINLINE_PACKET_FRAME_HEADER = []byte{0xFA, 0xDE} ) // In legacy mode, each packet contains 3 channels, n samples per channel. // The packet is sent at an interval of (1000 / sample rate) milliseconds. // Set n = 5 (also in Explorer) fits the common sample rates (25, 50, 100, 125 Hz). -const legacy_packet_channel_size = 5 +const LEGACY_PACKET_CHANNEL_SIZE = 5 // Legacy packet structure, fixed size. // Each channel has a checksum, which is the XOR of all bytes in the channel. type legacyPacket struct { - Z_Axis [legacy_packet_channel_size]int32 - E_Axis [legacy_packet_channel_size]int32 - N_Axis [legacy_packet_channel_size]int32 + Z_Axis [LEGACY_PACKET_CHANNEL_SIZE]int32 + E_Axis [LEGACY_PACKET_CHANNEL_SIZE]int32 + N_Axis [LEGACY_PACKET_CHANNEL_SIZE]int32 Checksum [3]uint8 } @@ -45,174 +44,120 @@ func (g *legacyPacket) decode(data []byte) error { } // Using XOR algorithm - calc_checksum := [3]uint8{0, 0, 0} - z_axis_offset := int(unsafe.Sizeof(g.Z_Axis)) - for i := 0; i < z_axis_offset; i++ { - calc_checksum[0] ^= data[i] + calcChecksum := [3]uint8{0, 0, 0} + zAxisOffset := int(unsafe.Sizeof(g.Z_Axis)) + for i := 0; i < zAxisOffset; i++ { + calcChecksum[0] ^= data[i] } - e_axis_offset := z_axis_offset + int(unsafe.Sizeof(g.E_Axis)) - for i := z_axis_offset; i < e_axis_offset; i++ { - calc_checksum[1] ^= data[i] + eAxisOffset := zAxisOffset + int(unsafe.Sizeof(g.E_Axis)) + for i := zAxisOffset; i < eAxisOffset; i++ { + calcChecksum[1] ^= data[i] } - n_axis_offset := e_axis_offset + int(unsafe.Sizeof(g.N_Axis)) - for i := e_axis_offset; i < n_axis_offset; i++ { - calc_checksum[2] ^= data[i] + nAxisOffset := eAxisOffset + int(unsafe.Sizeof(g.N_Axis)) + for i := eAxisOffset; i < nAxisOffset; i++ { + calcChecksum[2] ^= data[i] } - for i := 0; i < len(calc_checksum); i++ { - if calc_checksum[i] != g.Checksum[i] { - return fmt.Errorf("checksum mismatch, expected %v, got %v", g.Checksum, calc_checksum) + for i := 0; i < len(calcChecksum); i++ { + if calcChecksum[i] != g.Checksum[i] { + return fmt.Errorf("checksum mismatch, expected %v, got %v", g.Checksum, calcChecksum) } } return nil } +// In mainline mode, each packet contains 3 channels, n samples per channel. +// The packet is sent at an interval of (1000 / sample rate) milliseconds. +// Set n = 5 (also in Explorer) fits the common sample rates (25, 50, 100, 125 Hz). +const MAINLINE_PACKET_CHANNEL_SIZE = 5 + // Mainline packet header structure, fixed size. -// 34 bytes of header data without the frame header bytes. -type mainlinePacketHeader struct { - sampleRate uint16 - timestamp int64 - deviceId uint32 - latitude float32 - longitude float32 - elevation float32 - reserved uint64 - checksum uint8 +// The VariableData be Device ID, Latitude, Longitude, Elevation in int32 / float32 format. +type mainlinePacket struct { + Timestamp int64 + VariableData [4]byte // Can be int32 or float32 + VariableName string // Exclude from length calculation + Z_axis [MAINLINE_PACKET_CHANNEL_SIZE]int32 + E_axis [MAINLINE_PACKET_CHANNEL_SIZE]int32 + N_axis [MAINLINE_PACKET_CHANNEL_SIZE]int32 + Checksum uint8 } -func (g *mainlinePacketHeader) length() int { - return int(unsafe.Sizeof(g.sampleRate) + - unsafe.Sizeof(g.timestamp) + - unsafe.Sizeof(g.deviceId) + - unsafe.Sizeof(g.latitude) + - unsafe.Sizeof(g.longitude) + - unsafe.Sizeof(g.elevation) + - unsafe.Sizeof(g.reserved) + - unsafe.Sizeof(g.checksum)) +func (g *mainlinePacket) length() int { + return int(unsafe.Sizeof(g.Timestamp) + + unsafe.Sizeof(g.VariableData) + + unsafe.Sizeof(g.Z_axis) + + unsafe.Sizeof(g.E_axis) + + unsafe.Sizeof(g.N_axis) + + unsafe.Sizeof(g.Checksum)) } -func (g *mainlinePacketHeader) decode(data []byte) error { - g.checksum = data[len(data)-1] +func (g *mainlinePacket) decode(data []byte) error { + // Restore header checksum, note that the byte order is little-endian + checksumIndex := len(data) - int(unsafe.Sizeof(g.Checksum)) + g.Checksum = data[checksumIndex] - // Using XOR algorithm - calc_checksum := uint8(0) - for i := 0; i < len(data[:34]); i++ { - calc_checksum ^= data[i] + // Using XOR algorithm to calculate the header checksum + calcHeaderChecksum := uint8(0) + for i := 0; i < checksumIndex; i++ { + calcHeaderChecksum ^= data[i] } - if calc_checksum != g.checksum { - return fmt.Errorf("checksum mismatch, expected %d, got %d", g.checksum, calc_checksum) + if calcHeaderChecksum != g.Checksum { + return fmt.Errorf("header checksum mismatch, expected %d, got %d", g.Checksum, calcHeaderChecksum) } - g.sampleRate = binary.LittleEndian.Uint16(data[:2]) - g.timestamp = int64(binary.LittleEndian.Uint64(data[2:10])) - g.deviceId = binary.LittleEndian.Uint32(data[10:14]) - g.latitude = math.Float32frombits(binary.LittleEndian.Uint32(data[14:18])) - g.longitude = math.Float32frombits(binary.LittleEndian.Uint32(data[18:22])) - g.elevation = math.Float32frombits(binary.LittleEndian.Uint32(data[22:26])) - g.reserved = binary.LittleEndian.Uint64(data[26:34]) - - return nil -} - -// Mainline packet channel structure, variable number of samples. -// Flexibly sized packet channel depending on the sample rate. -type mainlinePacketChannel struct { - z_axis []int32 - e_axis []int32 - n_axis []int32 - checksum uint32 -} - -func (g *mainlinePacketChannel) length(sampleRate int) int { - return 3*sampleRate*int(unsafe.Sizeof(int32(0))) + // Z, E, N axis data - int(unsafe.Sizeof(g.checksum)) // Checksum of Z, E, N axis -} - -func (g *mainlinePacketChannel) decode(data []byte, sampleRate int) error { - g.checksum = binary.LittleEndian.Uint32(data[len(data)-4:]) - - // Convert little-endian to big-endian for checksum calculation - for i := 0; i < len(data)-4; i += 4 { - data[i], data[i+1], data[i+2], data[i+3] = data[i+3], data[i+2], data[i+1], data[i] + // Restore the header data, note that the byte order is little-endian + switch (g.Timestamp / time.Second.Milliseconds()) % 4 { + case 0: + g.VariableName = "device_id" + case 1: + g.VariableName = "latitude" + case 2: + g.VariableName = "longitude" + case 3: + g.VariableName = "elevation" } + variableDataIndex := int(unsafe.Sizeof(g.Timestamp) + unsafe.Sizeof(g.VariableData)) + copy(g.VariableData[:], data[unsafe.Sizeof(g.Timestamp):variableDataIndex]) + g.Timestamp = int64(binary.LittleEndian.Uint64(data[:unsafe.Sizeof(g.Timestamp)])) - // Using CRC-32/MPEG-2 algorithm - calc_checksum := uint32(0xFFFFFFFF) - for _, v := range data[:len(data)-4] { - calc_checksum ^= uint32(v) << 24 - for i := 0; i < 8; i++ { - if (calc_checksum & 0x80000000) != 0 { - calc_checksum = (calc_checksum << 1) ^ 0x04C11DB7 - } else { - calc_checksum <<= 1 - } - } - } - if calc_checksum != g.checksum { - return fmt.Errorf("checksum mismatch, expected %d, got %d", g.checksum, calc_checksum) - } - - // Restore the original data, note that the byte order is big-endian - g.z_axis = make([]int32, sampleRate) - binary.Read(bytes.NewReader(data[:sampleRate*int(unsafe.Sizeof(int32(0)))]), binary.BigEndian, g.z_axis) - g.e_axis = make([]int32, sampleRate) - binary.Read(bytes.NewReader(data[sampleRate*int(unsafe.Sizeof(int32(0))):2*sampleRate*int(unsafe.Sizeof(int32(0)))]), binary.BigEndian, g.e_axis) - g.n_axis = make([]int32, sampleRate) - binary.Read(bytes.NewReader(data[2*sampleRate*int(unsafe.Sizeof(int32(0))):3*sampleRate*int(unsafe.Sizeof(int32(0)))]), binary.BigEndian, g.n_axis) - - return nil -} - -// Mainline packet tail structure, fixed size -// 9 bytes of tail data without the frame tail bytes -type mainlinePacketTail struct { - reserved uint64 - checksum uint8 -} - -func (g *mainlinePacketTail) length() int { - return int(unsafe.Sizeof(g.reserved) + unsafe.Sizeof(g.checksum)) -} - -func (g *mainlinePacketTail) decode(data []byte) error { - g.checksum = data[8] - - // Using XOR algorithm - calc_checksum := uint8(0) - for i := 0; i < len(data); i++ { - calc_checksum ^= data[i] + // Restore the channel data, note that the byte order is little-endian + zAxisOffset := variableDataIndex + int(unsafe.Sizeof(g.Z_axis)) + err := binary.Read(bytes.NewReader(data[variableDataIndex:zAxisOffset]), binary.LittleEndian, g.Z_axis[:]) + if err != nil { + return err } - if calc_checksum != g.checksum { - return fmt.Errorf("checksum mismatch, expected %d, got %d", g.checksum, calc_checksum) + eAxisOffset := zAxisOffset + int(unsafe.Sizeof(g.E_axis)) + err = binary.Read(bytes.NewReader(data[zAxisOffset:eAxisOffset]), binary.LittleEndian, g.E_axis[:]) + if err != nil { + return err } - - g.reserved = binary.LittleEndian.Uint64(data[:8]) - return nil + nAxisOffset := eAxisOffset + int(unsafe.Sizeof(g.N_axis)) + return binary.Read(bytes.NewReader(data[eAxisOffset:nAxisOffset]), binary.LittleEndian, g.N_axis[:]) } type ExplorerDriverImpl struct { - // Dependencies for legacy mode - legacyPacket legacyPacket - // Dependencies for mainline mode - mainlinePacketHeader mainlinePacketHeader - mainlinePacketChannel mainlinePacketChannel - mainlinePacketTail mainlinePacketTail + logger ExplorerLogger + legacyPacket legacyPacket + mainlinePacket mainlinePacket } -func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) { - fifoBuffer := fifo.New((len(legacy_packet_frame_header) + e.legacyPacket.length()) * 4096) +func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency, fifoBuffer *fifo.Buffer) { + recvSize := len(LEGACY_PACKET_FRAME_HEADER) + e.legacyPacket.length() // Read data from the transport continuously go func() { - buf := make([]byte, e.legacyPacket.length()) + buf := make([]byte, recvSize/2) for { select { case <-deps.CancelToken.Done(): + e.logger.Infof("cancelling read data from transport") return default: - n, err := deps.Transport.Read(buf, 10*time.Millisecond, false) + n, err := deps.Transport.Read(buf, 100*time.Millisecond, false) if err != nil { - return + e.logger.Errorf("failed to read data from transport: %v", err) + continue } fifoBuffer.Write(buf[:n]) @@ -228,16 +173,19 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) { // Read data from the FIFO buffer continuously var ( - dataBuffer = []legacyPacket{} - ticker = time.NewTimer(calcDuration(time.Now(), time.Second)) + packetBuffer = []legacyPacket{} + ticker = time.NewTimer(calcDuration(time.Now(), time.Second)) + timer = time.NewTimer(time.Millisecond) ) for { + timer.Reset(time.Millisecond) + select { case <-deps.CancelToken.Done(): ticker.Stop() return case currentTick := <-ticker.C: - if len(dataBuffer) > 0 { + if len(packetBuffer) > 0 { currentTime, err := deps.FallbackTime.Get() if err != nil { continue @@ -251,141 +199,177 @@ func (e *ExplorerDriverImpl) handleReadLegacyPacket(deps *ExplorerDependency) { e_axis_count []int32 n_axis_count []int32 ) - for _, packet := range dataBuffer { + for _, packet := range packetBuffer { z_axis_count = append(z_axis_count, packet.Z_Axis[:]...) e_axis_count = append(e_axis_count, packet.E_Axis[:]...) n_axis_count = append(n_axis_count, packet.N_Axis[:]...) } - sampleRate := len(dataBuffer) * legacy_packet_channel_size + sampleRate := len(packetBuffer) * LEGACY_PACKET_CHANNEL_SIZE deps.Health.SetSampleRate(sampleRate) finalPacket := ExplorerData{ SampleRate: sampleRate, Z_Axis: z_axis_count, E_Axis: e_axis_count, N_Axis: n_axis_count, - Timestamp: currentTime.UTC().UnixMilli(), + Timestamp: currentTime.UTC().Add(-time.Second).UnixMilli(), } deps.messageBus.Publish("explorer", &finalPacket) - dataBuffer = []legacyPacket{} + packetBuffer = []legacyPacket{} ticker.Reset(calcDuration(currentTick, time.Second)) } - case <-time.After(500 * time.Microsecond): - dat, err := fifoBuffer.Read(legacy_packet_frame_header, len(legacy_packet_frame_header)+e.legacyPacket.length()) - if err == nil { - // Read the packet data - err = e.legacyPacket.decode(dat[len(legacy_packet_frame_header):]) - if err != nil { - deps.Health.SetErrors(deps.Health.GetErrors() + 1) - } else { - dataBuffer = append(dataBuffer, e.legacyPacket) - } + case <-timer.C: + dat, err := fifoBuffer.Peek(LEGACY_PACKET_FRAME_HEADER, recvSize) + if err != nil { + continue + } + + // Read the packet data + err = e.legacyPacket.decode(dat[len(LEGACY_PACKET_FRAME_HEADER):]) + if err != nil { + e.logger.Warnf("failed to decode legacy packet: %v", err) + deps.Health.SetErrors(deps.Health.GetErrors() + 1) + } else { + packetBuffer = append(packetBuffer, e.legacyPacket) } } } } -func (e *ExplorerDriverImpl) handleReadMainlinePacket(deps *ExplorerDependency) { +func (e *ExplorerDriverImpl) handleReadMainlinePacket(deps *ExplorerDependency, fifoBuffer *fifo.Buffer) { + recvSize := len(MAINLINE_PACKET_FRAME_HEADER) + e.mainlinePacket.length() + + // Read data from the transport continuously + go func() { + buf := make([]byte, recvSize/2) + for { + select { + case <-deps.CancelToken.Done(): + e.logger.Infof("cancelling read data from transport") + return + default: + n, err := deps.Transport.Read(buf, 100*time.Millisecond, false) + if err != nil { + e.logger.Errorf("failed to read data from transport: %v", err) + continue + } + + fifoBuffer.Write(buf[:n]) + } + } + }() + + // Read data from the FIFO buffer continuously + var ( + packetBuffer = []mainlinePacket{} + nextTick = int64(0) + timer = time.NewTimer(time.Millisecond) + ) for { + timer.Reset(time.Millisecond) + select { case <-deps.CancelToken.Done(): return - default: - // Find the header sync bytes - ok, _ := deps.Transport.Filter(mainline_packet_frame_header, 2*time.Second) - if !ok { - continue - } - - // Read header section and update dependency data - headerBuf := make([]byte, e.mainlinePacketHeader.length()) - _, err := deps.Transport.Read(headerBuf, time.Second, false) + case <-timer.C: + dat, err := fifoBuffer.Peek(MAINLINE_PACKET_FRAME_HEADER, recvSize) if err != nil { continue } - err = e.mainlinePacketHeader.decode(headerBuf) + err = e.mainlinePacket.decode(dat[len(MAINLINE_PACKET_FRAME_HEADER):]) if err != nil { + e.logger.Warnf("failed to decode mainline packet: %v", err) deps.Health.SetErrors(deps.Health.GetErrors() + 1) continue } - if e.mainlinePacketHeader.latitude != 0 && e.mainlinePacketHeader.longitude != 0 && e.mainlinePacketHeader.elevation != 0 { - deps.Config.SetLatitude(float64(e.mainlinePacketHeader.latitude)) - deps.Config.SetLongitude(float64(e.mainlinePacketHeader.longitude)) - deps.Config.SetElevation(float64(e.mainlinePacketHeader.elevation)) - } - // Get data section packet size and read the channel data - sampleRate := int(e.mainlinePacketHeader.sampleRate) - dataBuf := make([]byte, e.mainlinePacketChannel.length(sampleRate)) - _, err = deps.Transport.Read(dataBuf, time.Second, false) - if err != nil { - continue - } - err = e.mainlinePacketChannel.decode(dataBuf, sampleRate) - if err != nil { - deps.Health.SetErrors(deps.Health.GetErrors() + 1) - continue + // Update the device ID, latitude, longitude, elevation + switch e.mainlinePacket.VariableName { + case "device_id": + deps.Config.SetDeviceId(binary.LittleEndian.Uint32(e.mainlinePacket.VariableData[:])) + case "latitude": + latitude := math.Float32frombits(binary.LittleEndian.Uint32(e.mainlinePacket.VariableData[:])) + if latitude >= -90 && latitude <= 90 { + deps.Config.SetLatitude(float64(latitude)) + } + case "longitude": + longitude := math.Float32frombits(binary.LittleEndian.Uint32(e.mainlinePacket.VariableData[:])) + if longitude >= -180 && longitude <= 180 { + deps.Config.SetLongitude(float64(longitude)) + } + case "elevation": + elevation := math.Float32frombits(binary.LittleEndian.Uint32(e.mainlinePacket.VariableData[:])) + if elevation >= 0 { + deps.Config.SetElevation(float64(elevation)) + } } - // Get tail section data, check tail bytes of the packet - tailBuf := make([]byte, e.mainlinePacketTail.length()+len(mainline_packet_frame_tail)) - _, err = deps.Transport.Read(tailBuf, time.Second, false) - if err != nil { - continue - } - frameTailSliceIndex := len(tailBuf) - len(mainline_packet_frame_tail) - if !bytes.Equal(tailBuf[frameTailSliceIndex:], mainline_packet_frame_tail) { - deps.Health.SetErrors(deps.Health.GetErrors() + 1) - continue - } - err = e.mainlinePacketTail.decode(tailBuf[:frameTailSliceIndex]) - if err != nil { - deps.Health.SetErrors(deps.Health.GetErrors() + 1) - continue + // Append the packet to the buffer + if nextTick == 0 { + nextTick = e.mainlinePacket.Timestamp + } else { + packetBuffer = append(packetBuffer, e.mainlinePacket) } - // Get current timestamp - if e.mainlinePacketHeader.timestamp == 0 { - t, err := deps.FallbackTime.Get() - if err != nil { + if math.Abs(float64(e.mainlinePacket.Timestamp-nextTick)) <= EXPLORER_ALLOWED_JITTER_MS { + // Update the next tick even if the buffer is empty + nextTick = e.mainlinePacket.Timestamp + time.Second.Milliseconds() + if len(packetBuffer) == 0 { continue } - e.mainlinePacketHeader.timestamp = t.UnixMilli() - } - // Publish the data to the message bus - deps.Health.SetSampleRate(sampleRate) - finalPacket := ExplorerData{ - SampleRate: sampleRate, - Timestamp: e.mainlinePacketHeader.timestamp, - Z_Axis: e.mainlinePacketChannel.z_axis, - E_Axis: e.mainlinePacketChannel.e_axis, - N_Axis: e.mainlinePacketChannel.n_axis, - } - deps.messageBus.Publish("explorer", &finalPacket) + // Merge the packet buffer into a single packet + var ( + z_axis_count []int32 + e_axis_count []int32 + n_axis_count []int32 + ) + for _, packet := range packetBuffer { + z_axis_count = append(z_axis_count, packet.Z_axis[:]...) + e_axis_count = append(e_axis_count, packet.E_axis[:]...) + n_axis_count = append(n_axis_count, packet.N_axis[:]...) + } - deps.Health.SetUpdatedAt(time.UnixMilli(e.mainlinePacketHeader.timestamp)) - deps.Health.SetReceived(deps.Health.GetReceived() + 1) + // Publish the final packet + sampleRate := len(packetBuffer) * MAINLINE_PACKET_CHANNEL_SIZE + finalPacket := ExplorerData{ + SampleRate: sampleRate, + Z_Axis: z_axis_count, + E_Axis: e_axis_count, + N_Axis: n_axis_count, + Timestamp: e.mainlinePacket.Timestamp - time.Second.Milliseconds(), + } + deps.messageBus.Publish("explorer", &finalPacket) + + // Update the health status + deps.Health.SetSampleRate(sampleRate) + deps.Health.SetReceived(deps.Health.GetReceived() + 1) + deps.Health.SetUpdatedAt(time.UnixMilli(e.mainlinePacket.Timestamp).UTC()) + + packetBuffer = []mainlinePacket{} + } else if e.mainlinePacket.Timestamp-nextTick > EXPLORER_ALLOWED_JITTER_MS { + // Update the next tick, clear the buffer if the jitter exceeds the threshold + nextTick = e.mainlinePacket.Timestamp + time.Second.Milliseconds() + packetBuffer = []mainlinePacket{} + } } } } func (e *ExplorerDriverImpl) readerDaemon(deps *ExplorerDependency) { + fifoBuffer := fifo.New(65536) + if deps.Config.GetLegacyMode() { - e.handleReadLegacyPacket(deps) + e.handleReadLegacyPacket(deps, &fifoBuffer) } else { - e.handleReadMainlinePacket(deps) + e.handleReadMainlinePacket(deps, &fifoBuffer) } } -func (e *ExplorerDriverImpl) IsAvailable(deps *ExplorerDependency) bool { - buf := make([]byte, 128) - _, err := deps.Transport.Read(buf, 2*time.Second, true) - return err == nil -} +func (e *ExplorerDriverImpl) Init(deps *ExplorerDependency, logger ExplorerLogger) error { + e.logger = logger -func (e *ExplorerDriverImpl) Init(deps *ExplorerDependency) error { currentTime, err := deps.FallbackTime.Get() if err != nil { return err @@ -398,26 +382,30 @@ func (e *ExplorerDriverImpl) Init(deps *ExplorerDependency) error { // Get device ID in EEPROM if !deps.Config.GetLegacyMode() { - readTimeout := 5 * time.Second - startTime := time.Now() - for time.Since(startTime) < readTimeout { - ok, _ := deps.Transport.Filter(mainline_packet_frame_header, 2*time.Second) + var ( + startTime = time.Now() + readTimeout = 5 * time.Second + ) + for time.Since(startTime) <= readTimeout { + ok, _ := deps.Transport.Filter(MAINLINE_PACKET_FRAME_HEADER, time.Second) if !ok { continue } - headerBuf := make([]byte, e.mainlinePacketHeader.length()) + headerBuf := make([]byte, e.mainlinePacket.length()) _, err := deps.Transport.Read(headerBuf, time.Second, false) if err != nil { continue } - err = e.mainlinePacketHeader.decode(headerBuf) + err = e.mainlinePacket.decode(headerBuf) if err != nil { continue } - deps.Config.SetDeviceId(e.mainlinePacketHeader.deviceId) - break + if e.mainlinePacket.VariableName == "device_id" { + deps.Config.SetDeviceId(binary.LittleEndian.Uint32(e.mainlinePacket.VariableData[:])) + break + } } - if time.Since(startTime) >= readTimeout { + if time.Since(startTime) > readTimeout { return errors.New("failed to get device ID, please check the device") } } diff --git a/drivers/explorer/types.go b/drivers/explorer/types.go index 00855689..bc307541 100644 --- a/drivers/explorer/types.go +++ b/drivers/explorer/types.go @@ -57,10 +57,15 @@ type ExplorerData struct { type ExplorerEventHandler = func(data *ExplorerData) +type ExplorerLogger interface { + Infof(format string, args ...any) + Warnf(format string, args ...any) + Errorf(format string, args ...any) +} + type ExplorerDriver interface { readerDaemon(deps *ExplorerDependency) - IsAvailable(deps *ExplorerDependency) bool - Init(deps *ExplorerDependency) error + Init(deps *ExplorerDependency, logger ExplorerLogger) error Subscribe(deps *ExplorerDependency, clientId string, handler ExplorerEventHandler) error Unsubscribe(deps *ExplorerDependency, clientId string) error } diff --git a/frontend/src/.env b/frontend/src/.env index 33794e13..064879d3 100644 --- a/frontend/src/.env +++ b/frontend/src/.env @@ -1,2 +1,2 @@ -REACT_APP_VERSION=v3.2.0 -REACT_APP_RELEASE=659cecb8-20240830121249 +REACT_APP_VERSION=v3.2.1 +REACT_APP_RELEASE=59b9250b-20240901015842 diff --git a/services/archiver/event.go b/services/archiver/event.go index 4e927441..2d0d7b26 100644 --- a/services/archiver/event.go +++ b/services/archiver/event.go @@ -1,6 +1,8 @@ package archiver import ( + "time" + "github.com/anyshake/observer/drivers/dao/tables" "github.com/anyshake/observer/drivers/explorer" "github.com/anyshake/observer/utils/logger" @@ -39,7 +41,7 @@ func (a *ArchiverService) handleExplorerEvent(data *explorer.ExplorerData) { if a.cleanupCountDown == 0 { err := a.databaseConn. Table(adcCountModel.GetName()). - Where("timestamp < ?", data.Timestamp-int64(a.lifeCycle*86400*1000)). + Where("timestamp < ?", data.Timestamp-int64(a.lifeCycle*int(time.Hour.Milliseconds())*24)). Delete(&tables.AdcCount{}). Error if err != nil { diff --git a/services/miniseed/write.go b/services/miniseed/write.go index cda289df..022246b8 100644 --- a/services/miniseed/write.go +++ b/services/miniseed/write.go @@ -15,13 +15,13 @@ func (m *MiniSeedService) handleWrite() error { startSampleRate = m.miniseedBuffer[0].SampleRate ) - for i := 1; i < len(m.miniseedBuffer); i++ { + for i := int64(1); i < int64(len(m.miniseedBuffer)); i++ { // Make sure timestamp is increasing by 1000 ms with allowed jitter - if math.Abs(float64(m.miniseedBuffer[i].Timestamp-startTimestamp-int64(i*1000))) >= explorer.EXPLORER_ALLOWED_JITTER_MS { + if math.Abs(float64(m.miniseedBuffer[i].Timestamp-startTimestamp-(i*time.Second.Milliseconds()))) >= explorer.EXPLORER_ALLOWED_JITTER_MS { return fmt.Errorf( "timestamp is not within allowed jitter %d ms, expected %d, got %d", explorer.EXPLORER_ALLOWED_JITTER_MS, - startTimestamp+int64(i*1000), + startTimestamp+(i*time.Second.Milliseconds()), m.miniseedBuffer[i].Timestamp, ) } diff --git a/services/seedlink/start.go b/services/seedlink/start.go index 817675e4..29083cd0 100644 --- a/services/seedlink/start.go +++ b/services/seedlink/start.go @@ -59,7 +59,7 @@ func (s *SeedLinkService) Start(options *services.Options, waitGroup *sync.WaitG if s.prevSampleRate == data.SampleRate { messageBus.Publish(s.GetServiceName(), data) } else { - logger.GetLogger(s.GetServiceName()).Warnf("sample rate is not the same, expected %d, got %d", s.prevSampleRate, data.SampleRate) + logger.GetLogger(s.GetServiceName()).Warnf("sample rate is not the same, previous %d, current %d", s.prevSampleRate, data.SampleRate) } s.prevSampleRate = data.SampleRate }, diff --git a/startups/explorer/execute.go b/startups/explorer/execute.go index a7fd6d1c..09e0d2b1 100644 --- a/startups/explorer/execute.go +++ b/startups/explorer/execute.go @@ -1,8 +1,6 @@ package explorer import ( - "errors" - "github.com/anyshake/observer/drivers/explorer" "github.com/anyshake/observer/startups" "github.com/anyshake/observer/utils/logger" @@ -20,13 +18,8 @@ func (t *ExplorerStartupTask) Execute(depsContainer *dig.Container, options *sta } explorerDriver := explorer.ExplorerDriver(&explorer.ExplorerDriverImpl{}) - logger.GetLogger(t.GetTaskName()).Infoln("checking availability of opened device") - if !explorerDriver.IsAvailable(explorerDeps) { - return errors.New("opened device is not working, check the connection or modes") - } - logger.GetLogger(t.GetTaskName()).Infoln("device is being initialized, please wait") - err = explorerDriver.Init(explorerDeps) + err = explorerDriver.Init(explorerDeps, &explorerLoggerImpl{moduleName: "explorer_driver"}) if err != nil { return err } diff --git a/startups/explorer/logger.go b/startups/explorer/logger.go new file mode 100644 index 00000000..b80b7ce4 --- /dev/null +++ b/startups/explorer/logger.go @@ -0,0 +1,19 @@ +package explorer + +import "github.com/anyshake/observer/utils/logger" + +type explorerLoggerImpl struct { + moduleName string +} + +func (e *explorerLoggerImpl) Infof(format string, args ...any) { + logger.GetLogger(e.moduleName).Infof(format, args...) +} + +func (e *explorerLoggerImpl) Warnf(format string, args ...any) { + logger.GetLogger(e.moduleName).Warnf(format, args...) +} + +func (e *explorerLoggerImpl) Errorf(format string, args ...any) { + logger.GetLogger(e.moduleName).Errorf(format, args...) +} diff --git a/startups/explorer/name.go b/startups/explorer/name.go index 06e611cc..28db1a06 100644 --- a/startups/explorer/name.go +++ b/startups/explorer/name.go @@ -1,5 +1,5 @@ package explorer func (t *ExplorerStartupTask) GetTaskName() string { - return "explorer" + return "startup_explorer" } diff --git a/utils/fifo/peek.go b/utils/fifo/peek.go new file mode 100644 index 00000000..463e8b86 --- /dev/null +++ b/utils/fifo/peek.go @@ -0,0 +1,38 @@ +package fifo + +import ( + "fmt" +) + +func (b *Buffer) Peek(header []byte, size int) ([]byte, error) { + b.mutex.RLock() + defer b.mutex.RUnlock() + + for { + if (b.writeIndex-b.readIndex+b.capacity)%b.capacity < size { + return nil, fmt.Errorf("not enough data") + } + + isHeaderFind := true + for i := 0; i < len(header); i++ { + if b.data[(b.readIndex+i)%b.capacity] != header[i] { + isHeaderFind = false + break + } + } + + if isHeaderFind { + break + } + + b.readIndex = (b.readIndex + 1) % b.capacity + } + + packet := make([]byte, size) + for i := 0; i < size; i++ { + packet[i] = b.data[(b.readIndex+i)%b.capacity] + } + + b.readIndex = (b.readIndex + size) % b.capacity + return packet, nil +} diff --git a/utils/fifo/read.go b/utils/fifo/read.go index 35e66fdf..e1a4628a 100644 --- a/utils/fifo/read.go +++ b/utils/fifo/read.go @@ -2,32 +2,23 @@ package fifo import ( "fmt" + "time" ) -func (b *Buffer) Read(header []byte, size int) ([]byte, error) { - b.mutex.Lock() - defer b.mutex.Unlock() - - for { +func (b *Buffer) Read(size int, wait bool) ([]byte, error) { + if wait { + for (b.writeIndex-b.readIndex+b.capacity)%b.capacity < size { + time.Sleep(time.Millisecond) + } + } else { if (b.writeIndex-b.readIndex+b.capacity)%b.capacity < size { return nil, fmt.Errorf("not enough data") } - - isHeaderFind := true - for i := 0; i < len(header); i++ { - if b.data[(b.readIndex+i)%b.capacity] != header[i] { - isHeaderFind = false - break - } - } - - if isHeaderFind { - break - } - - b.readIndex = (b.readIndex + 1) % b.capacity } + b.mutex.RLock() + defer b.mutex.RUnlock() + packet := make([]byte, size) for i := 0; i < size; i++ { packet[i] = b.data[(b.readIndex+i)%b.capacity] diff --git a/utils/fifo/types.go b/utils/fifo/types.go index 57ebe06d..195164b9 100644 --- a/utils/fifo/types.go +++ b/utils/fifo/types.go @@ -7,5 +7,5 @@ type Buffer struct { readIndex int writeIndex int capacity int - mutex sync.Mutex + mutex sync.RWMutex }