diff --git a/ntcore/doc/time-sync-protocol.adoc b/ntcore/doc/time-sync-protocol.adoc new file mode 100644 index 00000000000..46407ee3b2f --- /dev/null +++ b/ntcore/doc/time-sync-protocol.adoc @@ -0,0 +1,51 @@ += Time Syncronization Protocol Specification, Version 1.0 +WPILib Developers +Protocol Revision 1.0, 08/25/2024 +:toc: +:toc-placement: preamble +:sectanchors: + +[[roles]] +=== Roles + +Time Syncronization Protocol (TSP) participants can assume either a server role or a client role. The server role is responsible for listening for incoming time synchronization requests from clients and replying approriately. The client role is responsible for sending "Ping" messages to the server and listening for "Pong" replies to estimate the offset between the server and client time bases. + +All time values shall use units of microseconds. The epoch of the time base this is measured against is unspecified. + +Clients shall periodically (e.g. every few seconds) send, in a manner that minimizes transmission delays, a **TSP Ping Message** that contains the client's current local time. + +When the server recieves a **TSP Ping Message** from any client, it shall respond to the client, in a manner that minimizes transmission delays, with a **TSP Pong message** encoding a timestamp of its (the server's) current local time (in microseconds), and the client-provided data value. + +When the client receives a **TSP Pong Message** from the server, it shall compute the round trip time (RTT) from the delta between the message's data value and the current local time. If the RTT is less than that from previous measurements, the client shall use the timestamp in the message plus ½ the RTT as the server time equivalent to the current local time, and use this equivalence to compute server time base timestamps from local time for future messages. + +[[transport]] +=== Transport + +Communication between server and clients shall occur over the User Datagram Protocol (UDP) Port 5810. + +[[format]] +=== Message Format + +TODO: Should I have an arbitrary magic value? do i need checksums? should we include a sequence number or other way to check if a message was replied to besides checking if we got the timestamp we expected back? +also need to spefify endienness + +**TSP Ping** and **TSP Pong** messages shall be endoded in a manor compatible with a WPILib packed struct (todo: just use struct syntax instead of a table.) with respect to byte alignment and endienness. + +TSP Ping + +|==== +| Offset | Format | Data | Notes +| 0 | uint8 | Protocol version | This field shall always set to 1 (0b1) for TSP Version 1. +| 1 | uint8 | Message ID | This field shall always be set to 1 (0b1). +| 2 | uint64 | Client Network Time | The client's local time value, at the time this Ping message was sent. +|==== + +TSP Pong + +|==== +| Offset | Format | Data | Notes +| 0 | uint8 | Protocol version | This field shall always set to 1 (0b1) for TSP Version 1. +| 1 | uint8 | Message ID | This field shall always be set to 2 (0b2). +| 2 | uint64 | Client Local Time | The client's local time value from the Ping message that this Pong is generated in response to. +| 10 | uint64 | Server Local Time | The current time at the server, at the time this Pong message was sent. +|==== diff --git a/ntcore/src/main/native/cpp/net/TimeSyncClientServer.cpp b/ntcore/src/main/native/cpp/net/TimeSyncClientServer.cpp new file mode 100644 index 00000000000..f60b45043ce --- /dev/null +++ b/ntcore/src/main/native/cpp/net/TimeSyncClientServer.cpp @@ -0,0 +1,308 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include "net/TimeSyncClientServer.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "ntcore_cpp.h" + +template <> +struct wpi::Struct { + static constexpr std::string_view GetTypeName() { return "TspPing"; } + static constexpr size_t GetSize() { return 10; } + static constexpr std::string_view GetSchema() { + return "uint8 version;uint8 message_id;uint64 client_time"; + } + + static TspPing Unpack(std::span data) { + return TspPing{ + wpi::UnpackStruct(data), + wpi::UnpackStruct(data), + wpi::UnpackStruct(data), + }; + } + static void Pack(std::span data, const TspPing& value) { + wpi::PackStruct<0>(data, value.version); + wpi::PackStruct<1>(data, value.message_id); + wpi::PackStruct<2>(data, value.client_time); + } +}; + +template <> +struct wpi::Struct { + static constexpr std::string_view GetTypeName() { return "TspPong"; } + static constexpr size_t GetSize() { return 18; } + static constexpr std::string_view GetSchema() { + return "uint8 version;uint8 message_id;uint64 client_time;uint64_t " + "server_time"; + } + + static TspPong Unpack(std::span data) { + return TspPong{ + TspPing{ + wpi::UnpackStruct(data), + wpi::UnpackStruct(data), + wpi::UnpackStruct(data), + }, + wpi::UnpackStruct(data), + }; + } + static void Pack(std::span data, const TspPong& value) { + wpi::PackStruct<0>(data, value.version); + wpi::PackStruct<1>(data, value.message_id); + wpi::PackStruct<2>(data, value.client_time); + wpi::PackStruct<10>(data, value.server_time); + } +}; + +static_assert(wpi::StructSerializable); +static_assert(wpi::StructSerializable); + +static void ClientLoggerFunc(unsigned int level, const char* file, + unsigned int line, const char* msg) { + if (level == 20) { + wpi::print(stderr, "TimeSyncClient: {}\n", msg); + return; + } + + std::string_view levelmsg; + if (level >= 50) { + levelmsg = "CRITICAL"; + } else if (level >= 40) { + levelmsg = "ERROR"; + } else if (level >= 30) { + levelmsg = "WARNING"; + } else { + return; + } + wpi::print(stderr, "TimeSyncClient: {}: {} ({}:{})\n", levelmsg, msg, file, + line); +} + +static void ServerLoggerFunc(unsigned int level, const char* file, + unsigned int line, const char* msg) { + if (level == 20) { + wpi::print(stderr, "TimeSyncServer: {}\n", msg); + return; + } + + std::string_view levelmsg; + if (level >= 50) { + levelmsg = "CRITICAL"; + } else if (level >= 40) { + levelmsg = "ERROR"; + } else if (level >= 30) { + levelmsg = "WARNING"; + } else { + return; + } + wpi::print(stderr, "TimeSyncServer: {}: {} ({}:{})\n", levelmsg, msg, file, + line); +} + +void wpi::TimeSyncServer::UdpCallback(uv::Buffer& data, size_t n, + const sockaddr& sender, unsigned flags) { + wpi::println("TimeSyncServer got ping!"); + + TspPing ping{wpi::UnpackStruct(data.bytes())}; + + if (ping.version != 1) { + WPI_ERROR(m_logger, "Bad version from client?"); + return; + } + if (ping.message_id != 1) { + WPI_ERROR(m_logger, "Bad message id from client?"); + return; + } + + uint64_t current_time = m_timeProvider(); + + TspPong pong{ping, current_time}; + pong.message_id = 2; + + wpi::SmallVector::GetSize()> pongData( + wpi::Struct::GetSize()); + wpi::PackStruct(pongData, pong); + + // Wrap our buffer - pongData should free itself for free + wpi::uv::Buffer pongBuf{pongData}; + int sent = + m_udp->TrySend(sender, wpi::SmallVector{pongBuf}); + wpi::println("Pong ret: {}", sent); + if (static_cast(sent) != wpi::Struct::GetSize()) { + WPI_ERROR(m_logger, "Didn't send the whole pong back?"); + return; + } + + WPI_INFO(m_logger, "Got ping: {} {} {}", ping.version, ping.message_id, + ping.client_time); + WPI_INFO(m_logger, "Sent pong: {} {} {} {}", pong.version, pong.message_id, + pong.client_time, pong.server_time); +} + +wpi::TimeSyncServer::TimeSyncServer(int port, + std::function timeProvider) + : m_logger{::ServerLoggerFunc}, + m_timeProvider{timeProvider}, + m_udp{wpi::uv::Udp::Create(m_loopRunner.GetLoop(), AF_INET)} { + m_loopRunner.ExecSync( + [this, port](uv::Loop&) { m_udp->Bind("0.0.0.0", port); }); +} + +void wpi::TimeSyncServer::Start() { + m_loopRunner.ExecSync([this](uv::Loop&) { + m_udp->received.connect(&wpi::TimeSyncServer::UdpCallback, this); + m_udp->StartRecv(); + }); +} + +void wpi::TimeSyncServer::Stop() { + m_loopRunner.Stop(); +} + +void wpi::TimeSyncClient::Tick() { + wpi::println("wpi::TimeSyncClient::Tick"); + // Regardless of if we've gotten a pong back yet, we'll ping again. this is + // pretty naive but should be "fine" for now? + + uint64_t ping_local_time{m_timeProvider()}; + + TspPing ping{.version = 1, .message_id = 1, .client_time = ping_local_time}; + + wpi::SmallVector::GetSize()> pingData( + wpi::Struct::GetSize()); + wpi::PackStruct(pingData, ping); + + // Wrap our buffer - pingData should free itself + wpi::uv::Buffer pingBuf{pingData}; + int sent = m_udp->TrySend(wpi::SmallVector{pingBuf}); + + if (static_cast(sent) != wpi::Struct::GetSize()) { + WPI_ERROR(m_logger, "Didn't send the whole ping out?"); + return; + } + + { + std::lock_guard lock{m_offsetMutex}; + m_metadata.pingsSent++; + } + + m_lastPing = ping; +} + +void wpi::TimeSyncClient::UdpCallback(uv::Buffer& buf, size_t nbytes, + const sockaddr& sender, unsigned flags) { + uint64_t pong_local_time = m_timeProvider(); + + if (static_cast(nbytes) != wpi::Struct::GetSize()) { + WPI_ERROR(m_logger, "Got {} bytes for pong?", nbytes); + return; + } + + TspPong pong{ + wpi::UnpackStruct(buf.bytes()), + }; + + fmt::println("->[client] Got pong: {} {} {} {}", pong.version, + pong.message_id, pong.client_time, pong.server_time); + + if (pong.version != 1) { + fmt::println("Bad version from server?"); + return; + } + if (pong.message_id != 2) { + fmt::println("Bad message id from server?"); + return; + } + + TspPing ping = m_lastPing; + + if (pong.client_time != ping.client_time) { + WPI_WARNING(m_logger, + "Pong was not a reply to our ping? Got ping {} vs pong {}", + ping.client_time, pong.client_time); + return; + } + + // when time = send_time+rtt2/2, server time = server time + // server time = local time + offset + // offset = (server time - local time) = (server time) - (send_time + + // rtt2/2) + auto rtt2 = pong_local_time - ping.client_time; + int64_t serverTimeOffsetUs = pong.server_time - rtt2 / 2 - ping.client_time; + + { + std::lock_guard lock{m_offsetMutex}; + m_metadata.offset = serverTimeOffsetUs; + m_metadata.pongsRecieved++; + m_metadata.lastPongTime = pong_local_time; + } + + using std::cout; + fmt::println("Ping-ponged! RTT2 {} uS, offset {} uS", rtt2, + serverTimeOffsetUs); + fmt::println("Estimated server time {} s", + (m_timeProvider() + serverTimeOffsetUs) / 1000000.0); +} + +wpi::TimeSyncClient::TimeSyncClient(std::string_view server, int remote_port, + std::chrono::milliseconds ping_delay, + std::function timeProvider) + : m_logger(::ClientLoggerFunc), + m_timeProvider(timeProvider), + m_udp{wpi::uv::Udp::Create(m_loopRunner.GetLoop(), AF_INET)}, + m_pingTimer{wpi::uv::Timer::Create(m_loopRunner.GetLoop())}, + m_serverIP{server}, + m_serverPort{remote_port}, + m_loopDelay(ping_delay) { + struct sockaddr_in serverAddr; + uv::NameToAddr(m_serverIP, m_serverPort, &serverAddr); + + m_loopRunner.ExecSync( + [this, serverAddr](uv::Loop&) { m_udp->Connect(serverAddr); }); +} + +void wpi::TimeSyncClient::Start() { + wpi::println("Connecting recieved"); + + m_loopRunner.ExecSync([this](uv::Loop&) { + m_udp->received.connect(&wpi::TimeSyncClient::UdpCallback, this); + m_udp->StartRecv(); + }); + + wpi::println("Starting pinger"); + using namespace std::chrono_literals; + m_pingTimer->timeout.connect(&wpi::TimeSyncClient::Tick, this); + + m_loopRunner.ExecSync( + [this](uv::Loop&) { m_pingTimer->Start(m_loopDelay, m_loopDelay); }); +} + +void wpi::TimeSyncClient::Stop() { + m_loopRunner.Stop(); +} + +int64_t wpi::TimeSyncClient::GetOffset() { + std::lock_guard lock{m_offsetMutex}; + return m_metadata.offset; +} + +wpi::TimeSyncClient::Metadata wpi::TimeSyncClient::GetMetadata() { + std::lock_guard lock{m_offsetMutex}; + return m_metadata; +} diff --git a/ntcore/src/main/native/include/net/TimeSyncClientServer.h b/ntcore/src/main/native/include/net/TimeSyncClientServer.h new file mode 100644 index 00000000000..70aba802251 --- /dev/null +++ b/ntcore/src/main/native/include/net/TimeSyncClientServer.h @@ -0,0 +1,121 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "ntcore_cpp.h" + +struct TspPing { + uint8_t version; + uint8_t message_id; + uint64_t client_time; +}; + +struct TspPong : public TspPing { + TspPong(TspPing ping, uint64_t servertime) + : TspPing{ping}, server_time{servertime} {} + uint64_t server_time; +}; + +namespace wpi { + +class TimeSyncServer { + using SharedUdpPtr = std::shared_ptr; + + EventLoopRunner m_loopRunner{}; + + wpi::Logger m_logger; + std::function m_timeProvider; + SharedUdpPtr m_udp; + + std::thread m_listener; + + private: + void UdpCallback(uv::Buffer& buf, size_t nbytes, const sockaddr& sender, + unsigned flags); + + public: + explicit TimeSyncServer(int port = 5810, + std::function timeProvider = nt::Now); + + /** + * Start listening for pings + */ + void Start(); + /** + * Stop our loop runner. After stopping, we cannot restart. + */ + void Stop(); +}; + +class TimeSyncClient { + public: + struct Metadata { + int64_t offset{0}; + size_t pingsSent; + size_t pongsRecieved; + uint64_t lastPongTime; + }; + + private: + using SharedUdpPtr = std::shared_ptr; + using SharedTimerPtr = std::shared_ptr; + + EventLoopRunner m_loopRunner{}; + + wpi::Logger m_logger; + std::function m_timeProvider; + + SharedUdpPtr m_udp; + SharedTimerPtr m_pingTimer; + + std::string m_serverIP; + int m_serverPort; + + std::chrono::milliseconds m_loopDelay; + + std::mutex m_offsetMutex; + Metadata m_metadata; + + // We only allow the most recent ping to stay alive, so only keep track of it + TspPing m_lastPing; + + void Tick(); + + void UdpCallback(uv::Buffer& buf, size_t nbytes, const sockaddr& sender, + unsigned flags); + + public: + TimeSyncClient(std::string_view server, int remote_port, + std::chrono::milliseconds ping_delay, + std::function timeProvider = nt::Now); + + void Start(); + void Stop(); + int64_t GetOffset(); + Metadata GetMetadata(); +}; + +} // namespace wpi diff --git a/ntcore/src/test/native/cpp/TimeSyncProtocolTest.cpp b/ntcore/src/test/native/cpp/TimeSyncProtocolTest.cpp new file mode 100644 index 00000000000..23329dce9d3 --- /dev/null +++ b/ntcore/src/test/native/cpp/TimeSyncProtocolTest.cpp @@ -0,0 +1,30 @@ +// Copyright (c) FIRST and other WPILib contributors. +// Open Source Software; you can modify and/or share it under the terms of +// the WPILib BSD license file in the root directory of this project. + +#include + +#include + +TEST(TimeSyncProtocolTest, TestClient) { + using namespace wpi; + using namespace std::chrono_literals; + + static auto server_bogus_offset = -nt::Now(); + TimeSyncServer server{5810, []() { return nt::Now() + server_bogus_offset; }}; + TimeSyncClient client{"127.0.0.1", 5810, 1s}; + + server.Start(); + std::this_thread::sleep_for(0.5s); + client.Start(); + + for (int i = 0; i < 10; i++) { + auto off = client.GetOffset(); + wpi::println("Unit Test: current offset = {} uS, error = {} uS", off, + off - static_cast(server_bogus_offset)); + std::this_thread::sleep_for(1s); + } + + server.Stop(); + client.Stop(); +}