From 7cd66eaad41292be8fb0306dc8285105649d49e6 Mon Sep 17 00:00:00 2001 From: Markus Ast Date: Fri, 30 Jun 2023 14:58:29 +0200 Subject: [PATCH] Add events and methods to track SRS clients --- CHANGELOG.md | 7 +- Cargo.lock | 68 +++ Cargo.toml | 3 +- Makefile | 2 +- lua/DCS-gRPC/methods/unit.lua | 9 + lua/lua_files.rs | 0 protos/dcs/dcs.proto | 4 +- protos/dcs/mission/v0/mission.proto | 29 +- .../{tts/v0/tts.proto => srs/v0/srs.proto} | 26 +- src/lib.rs | 1 + src/rpc.rs | 9 +- src/rpc/{tts.rs => srs.rs} | 130 +++-- src/server.rs | 113 ++-- src/srs.rs | 313 +++++++++++ srs/Cargo.toml | 3 + srs/src/client.rs | 8 +- srs/src/lib.rs | 7 +- srs/src/message.rs | 506 +++++++++++------- srs/src/messages_codec.rs | 6 +- srs/src/stream.rs | 272 ++++++++++ srs/src/voice_codec.rs | 20 +- srs/src/voice_stream.rs | 357 ------------ stubs/src/lib.rs | 2 +- stubs/src/srs.rs | 3 + stubs/src/tts.rs | 3 - 25 files changed, 1231 insertions(+), 670 deletions(-) delete mode 100644 lua/lua_files.rs rename protos/dcs/{tts/v0/tts.proto => srs/v0/srs.proto} (84%) rename src/rpc/{tts.rs => srs.rs} (75%) create mode 100644 src/srs.rs create mode 100644 srs/src/stream.rs delete mode 100644 srs/src/voice_stream.rs create mode 100644 stubs/src/srs.rs delete mode 100644 stubs/src/tts.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 9140120f..8419bddd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,10 +6,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Breaking Changes +- Renamed `TtsService` to `SrsService` + ### Added - Added `ActivateGroup` API which allows to activate groups with late activation. - Added `DestroyGroup` API which removes the entire group from the game world. -- `DestroyUnit` API +- Added `DestroyUnit` API +- Added `GetClients` to `SrsService`, which retrieves a list of units that are connected to SRS and the frequencies they are connected to. +- Added `SrsConnectEvent` and `SrsDisconnectEvent` events ### Fixed - Fixed `MarkAddEvent`, `MarkChangeEvent` and `MarkRemoveEvent` position diff --git a/Cargo.lock b/Cargo.lock index 1efbf4f7..6b82918a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -167,6 +167,20 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom", + "instant", + "pin-project-lite", + "rand", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -356,10 +370,21 @@ dependencies = [ "subtle", ] +[[package]] +name = "ctor" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" +dependencies = [ + "quote", + "syn 1.0.109", +] + [[package]] name = "dcs-grpc" version = "0.7.1" dependencies = [ + "backoff", "dcs-grpc-srs", "dcs-grpc-stubs", "dcs-grpc-tts", @@ -403,6 +428,7 @@ dependencies = [ "bytes", "futures-util", "log", + "pretty_assertions", "serde", "serde_json", "serde_repr", @@ -486,6 +512,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c877555693c14d2f84191cfd3ad8582790fc52b5e2274b40b59cf5f5cea25c7" +[[package]] +name = "diff" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" + [[package]] name = "digest" version = "0.9.0" @@ -944,6 +976,15 @@ dependencies = [ "hashbrown 0.14.1", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + [[package]] name = "ipnet" version = "2.8.0" @@ -1223,6 +1264,15 @@ version = "6.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" +[[package]] +name = "output_vt100" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "628223faebab4e3e40667ee0b2336d34a5b960ff60ea743ddfdbcf7770bcfb66" +dependencies = [ + "winapi", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -1312,6 +1362,18 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "pretty_assertions" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a25e9bcb20aa780fd0bb16b72403a9064d6b3f22f026946029acb941a50af755" +dependencies = [ + "ctor", + "diff", + "output_vt100", + "yansi", +] + [[package]] name = "prettyplease" version = "0.1.25" @@ -2762,6 +2824,12 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "yansi" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" + [[package]] name = "zeroize" version = "1.6.0" diff --git a/Cargo.toml b/Cargo.toml index 6ee1fd9b..f3420097 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ log = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" -tokio = { version = "1.24", features = ["rt-multi-thread", "io-util", "net", "sync", "time", "parking_lot"] } +tokio = { version = "1.24", features = ["rt-multi-thread", "io-util", "net", "sync", "time", "parking_lot", "macros"] } tokio-stream = { version = "0.1", features = ["sync"] } tonic = "0.8" @@ -37,6 +37,7 @@ edition.workspace = true crate-type = ["cdylib"] [dependencies] +backoff = { version = "0.4", features = ["tokio"] } dcs-module-ipc = "0.8" futures-util.workspace = true igrf = "0.2" diff --git a/Makefile b/Makefile index 5501c86f..9dc2d99b 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ build: powershell copy target/debug/dcs_grpc.dll target/debug/dcs_grpc_hot_reload.dll watch: - cargo watch -x "check --features hot-reload" + cargo watch --ignore version.lua -x "check --features hot-reload" test: cargo test diff --git a/lua/DCS-gRPC/methods/unit.lua b/lua/DCS-gRPC/methods/unit.lua index 06b3d3b1..c2ad7d5c 100644 --- a/lua/DCS-gRPC/methods/unit.lua +++ b/lua/DCS-gRPC/methods/unit.lua @@ -119,6 +119,15 @@ GRPC.methods.getUnit = function(params) return GRPC.success({unit = GRPC.exporters.unit(unit)}) end +GRPC.methods.getUnitById = function(params) + local unit = Unit.getByName(Unit.getName({ id_ = params.id })) + if unit == nil then + return GRPC.errorNotFound("unit with id `" .. tostring(params.id) .. "` does not exist") + end + + return GRPC.success({unit = GRPC.exporters.unit(unit)}) +end + GRPC.methods.unitDestroy = function(params) local unit = Unit.getByName(params.name) if unit == nil then diff --git a/lua/lua_files.rs b/lua/lua_files.rs deleted file mode 100644 index e69de29b..00000000 diff --git a/protos/dcs/dcs.proto b/protos/dcs/dcs.proto index a05f4e5a..87d45cb0 100644 --- a/protos/dcs/dcs.proto +++ b/protos/dcs/dcs.proto @@ -11,8 +11,8 @@ import "dcs/group/v0/group.proto"; import "dcs/hook/v0/hook.proto"; import "dcs/mission/v0/mission.proto"; import "dcs/net/v0/net.proto"; +import "dcs/srs/v0/srs.proto"; import "dcs/timer/v0/timer.proto"; import "dcs/trigger/v0/trigger.proto"; -import "dcs/tts/v0/tts.proto"; import "dcs/unit/v0/unit.proto"; -import "dcs/world/v0/world.proto"; \ No newline at end of file +import "dcs/world/v0/world.proto"; diff --git a/protos/dcs/mission/v0/mission.proto b/protos/dcs/mission/v0/mission.proto index 1e58b7ae..d4385417 100644 --- a/protos/dcs/mission/v0/mission.proto +++ b/protos/dcs/mission/v0/mission.proto @@ -458,8 +458,8 @@ message StreamEventsResponse { } /** - * Fired for every TTS request that contains the `text_plain` field, for other clients to use e.g. - * for accessibility use-cases. + * Fired for every TTS request that contains the `text_plain` field, for other + * clients to use e.g. for accessibility use-cases. */ message TtsEvent { // The plain text that got transmitted. @@ -475,6 +475,29 @@ message StreamEventsResponse { optional string srs_client_name = 4; } + /** + * Fired every time a player occuping a unit connects to a frequency on SRS. + */ + message SrsConnectEvent { + // The unit that connected to a frequency in SRS. + dcs.common.v0.Unit unit = 1; + + // The radio frequency in Hz the unit connected to. + uint64 frequency = 2; + } + + /** + * Fired every time a player occuping a unit disconnects from a frequency on + * SRS. It is not fired when the player leaves the unit or the unit dies. + */ + message SrsDisconnectEvent { + // The unit that disconnected from a frequency in SRS. + dcs.common.v0.Unit unit = 1; + + // The radio frequency in Hz the unit disconnected from. + uint64 frequency = 2; + } + // The event's mission time. double time = 1; oneof event { @@ -528,6 +551,8 @@ message StreamEventsResponse { GroupCommandEvent group_command = 8198; SimulationFpsEvent simulation_fps = 8199; TtsEvent tts = 8200; + SrsConnectEvent srs_connect = 8201; + SrsDisconnectEvent srs_disconnect = 8202; } } diff --git a/protos/dcs/tts/v0/tts.proto b/protos/dcs/srs/v0/srs.proto similarity index 84% rename from protos/dcs/tts/v0/tts.proto rename to protos/dcs/srs/v0/srs.proto index 16be94ad..c813e7b0 100644 --- a/protos/dcs/tts/v0/tts.proto +++ b/protos/dcs/srs/v0/srs.proto @@ -1,10 +1,10 @@ syntax = "proto3"; -package dcs.tts.v0; +package dcs.srs.v0; import "dcs/common/v0/common.proto"; -option csharp_namespace = "RurouniJones.Dcs.Grpc.V0.Tts"; -option go_package = "github.com/DCS-gRPC/go-bindings/dcs/v0/tts"; +option csharp_namespace = "RurouniJones.Dcs.Grpc.V0.Srs"; +option go_package = "github.com/DCS-gRPC/go-bindings/dcs/v0/srs"; -service TtsService { +service SrsService { // Synthesize text to speech and transmit it over SRS. By default, this blocks until a // transmission completed (unless `async` is set to `true`). This can be used to prevent // transmission to overlap each other, by not sending another transmission on the same frequency @@ -12,6 +12,9 @@ service TtsService { // it does not block or prevent any other client from transmitting over the same frequency at the // same time. rpc Transmit(TransmitRequest) returns (TransmitResponse) {} + + // Retrieve a list of units (players) and their active frequencies that are connected to SRS. + rpc GetClients(GetClientsRequest) returns (GetClientsResponse) {} } message TransmitRequest { @@ -86,3 +89,18 @@ message TransmitResponse { // The duration in milliseconds it roughly takes to speak the transmission. uint32 duration_ms = 1; } + +message GetClientsRequest { +} + +message GetClientsResponse { + message Client { + // The unit that is connected to SRS. + dcs.common.v0.Unit unit = 1; + + // The radio frequencies in Hz the unit is connected to. + repeated uint64 frequencies = 2; + } + + repeated Client clients = 1; +} diff --git a/src/lib.rs b/src/lib.rs index d5b85b98..fa518a1b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ mod integrity; pub mod rpc; mod server; mod shutdown; +mod srs; mod stats; mod stream; diff --git a/src/rpc.rs b/src/rpc.rs index 2fbc0ae9..4572d2e1 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -6,7 +6,7 @@ use stubs::mission::v0::StreamEventsResponse; use tokio::sync::RwLock; use tonic::{Request, Status}; -pub use self::tts::Tts; +pub use self::srs::Srs; use crate::shutdown::ShutdownHandle; use crate::stats::Stats; @@ -18,9 +18,9 @@ mod group; mod hook; mod mission; mod net; +mod srs; mod timer; mod trigger; -mod tts; mod unit; mod world; @@ -80,6 +80,11 @@ impl MissionRpc { pub async fn events(&self) -> impl Stream { self.ipc.events().await } + + pub async fn event(&self, event: StreamEventsResponse) { + log::debug!("Received event: {:#?}", event); + self.ipc.event(event).await + } } impl HookRpc { diff --git a/src/rpc/tts.rs b/src/rpc/srs.rs similarity index 75% rename from src/rpc/tts.rs rename to src/rpc/srs.rs index 67917d67..a6177cf5 100644 --- a/src/rpc/tts.rs +++ b/src/rpc/srs.rs @@ -4,68 +4,75 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::str::FromStr; use std::time::{Duration, Instant}; +use ::srs::Sender; #[cfg(target_os = "windows")] use ::tts::WinConfig; use ::tts::{AwsConfig, AwsRegion, AzureConfig, GCloudConfig, TtsConfig}; -use dcs_module_ipc::IPC; -use futures_util::stream::{SplitSink, StreamExt}; -use futures_util::SinkExt; -use srs::VoiceStream; -use stubs::common::v0::Coalition; +use futures_util::FutureExt; +use stubs::common::v0::{Coalition, Unit}; use stubs::mission::v0::stream_events_response::{Event, TtsEvent}; use stubs::mission::v0::StreamEventsResponse; -use stubs::tts; -use stubs::tts::v0::transmit_request; -use stubs::tts::v0::tts_service_server::TtsService; +use stubs::srs; +use stubs::srs::v0::srs_service_server::SrsService; +use stubs::srs::v0::transmit_request; use tokio::time::sleep; use tonic::{Request, Response, Status}; +use super::MissionRpc; use crate::config::TtsProvider; use crate::fps::event_time; use crate::shutdown::ShutdownHandle; +use crate::srs::SrsClients; -pub struct Tts { +pub struct Srs { tts_config: crate::config::TtsConfig, srs_config: crate::config::SrsConfig, - ipc: IPC, + rpc: MissionRpc, + srs_clients: SrsClients, shutdown_signal: ShutdownHandle, } -impl Tts { +impl Srs { pub fn new( tts_config: crate::config::TtsConfig, srs_config: crate::config::SrsConfig, - ipc: IPC, + rpc: MissionRpc, + srs_clients: SrsClients, shutdown_signal: ShutdownHandle, ) -> Self { Self { tts_config, srs_config, - ipc, + rpc, + srs_clients, shutdown_signal, } } + + pub fn clients(&self) -> SrsClients { + self.srs_clients.clone() + } } #[tonic::async_trait] -impl TtsService for Tts { +impl SrsService for Srs { async fn transmit( &self, - request: Request, - ) -> Result, Status> { + request: Request, + ) -> Result, Status> { let request = request.into_inner(); let name = request.srs_client_name.as_deref().unwrap_or("DCS-gRPC"); - let mut client = srs::Client::new( + let mut client = ::srs::Client::new( name, request.frequency, match Coalition::from_i32(request.coalition) { - Some(Coalition::Red) => srs::Coalition::Red, - _ => srs::Coalition::Blue, + Some(Coalition::Red) => ::srs::Coalition::Red, + _ => ::srs::Coalition::Blue, }, ); let position = request.position.unwrap_or_default(); client - .set_position(srs::Position { + .set_position(::srs::Position { lat: position.lat, lon: position.lon, alt: position.alt, @@ -76,7 +83,7 @@ impl TtsService for Tts { .srs_config .addr .unwrap_or_else(|| SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5002)); - let stream = client + let (tx, _) = client .start(addr, self.shutdown_signal.signal()) .await .map_err(|err| { @@ -216,55 +223,84 @@ impl TtsService for Tts { let duration_ms = Duration::from_millis(frames.len() as u64 * 20); // ~20m per frame count if let Some(text) = request.plaintext { - self.ipc - .event(StreamEventsResponse { - time: event_time(), - event: Some(Event::Tts(TtsEvent { - text, - frequency: request.frequency, - coalition: request.coalition, - srs_client_name: request.srs_client_name, - })), - }) - .await; + let event = StreamEventsResponse { + time: event_time(), + event: Some(Event::Tts(TtsEvent { + text, + frequency: request.frequency, + coalition: request.coalition, + srs_client_name: request.srs_client_name, + })), + }; + self.rpc.event(event).await; } if request.r#async { let signal = self.shutdown_signal.signal(); tokio::task::spawn(async move { - if let Err(err) = transmit(frames, stream, signal).await { + if let Err(err) = transmit(frames, tx, signal).await { log::error!("TTS transmission failed: {}", err); } }); } else { - transmit(frames, stream, self.shutdown_signal.signal()) + transmit(frames, tx, self.shutdown_signal.signal()) .await .map_err(|err| Status::internal(err.to_string()))?; } - Ok(Response::new(tts::v0::TransmitResponse { + Ok(Response::new(srs::v0::TransmitResponse { duration_ms: duration_ms.as_millis() as u32, })) } + + async fn get_clients( + &self, + _request: Request, + ) -> Result, Status> { + #[derive(serde::Serialize)] + struct GetUnitByIdRequest { + id: u32, + } + #[derive(Debug, serde::Deserialize)] + struct GetUnitByIdResponse { + unit: Unit, + } + + let clients = + futures_util::future::join_all(self.srs_clients.clients.read().await.iter().map( + |(id, frequencies)| { + let frequencies = Vec::from_iter(frequencies.iter().copied()); + self.rpc + .request::<_, GetUnitByIdResponse>( + "getUnitById", + tonic::Request::new(GetUnitByIdRequest { id: *id }), + ) + .map(|unit| { + unit.ok().map(|unit| srs::v0::get_clients_response::Client { + unit: Some(unit.unit), + frequencies, + }) + }) + }, + )) + .await + .into_iter() + .flatten() + .collect(); + + Ok(Response::new(srs::v0::GetClientsResponse { clients })) + } } async fn transmit( frames: Vec>, - stream: VoiceStream, + tx: Sender, mut shutdown_signal: impl Future + Unpin, ) -> Result<(), Box> { - let (sink, mut stream) = stream.split::>(); - let mut transmission = Box::pin(transmit_frames(frames, sink)); + let mut transmission = Box::pin(transmit_frames(frames, tx)); loop { tokio::select! { - packet = stream.next() => { - if let Some(packet) = packet { - packet?; - // Not interested in the received voice packets, so simply discard them - } - } - result = &mut transmission => { return result; } @@ -280,11 +316,11 @@ async fn transmit( async fn transmit_frames( frames: Vec>, - mut sink: SplitSink>, + tx: Sender, ) -> Result<(), Box> { let start = Instant::now(); for (i, frame) in frames.into_iter().enumerate() { - sink.send(frame).await?; + tx.send(frame).await?; // wait for the current ~playtime before sending the next package let playtime = Duration::from_millis((i as u64 + 1) * 20); // 20m per frame count diff --git a/src/server.rs b/src/server.rs index 3bcf0df7..c70b8acf 100644 --- a/src/server.rs +++ b/src/server.rs @@ -14,19 +14,22 @@ use stubs::hook::v0::hook_service_server::HookServiceServer; use stubs::mission::v0::mission_service_server::MissionServiceServer; use stubs::mission::v0::StreamEventsResponse; use stubs::net::v0::net_service_server::NetServiceServer; +use stubs::srs::v0::srs_service_server::{SrsService, SrsServiceServer}; +pub use stubs::srs::v0::TransmitRequest; use stubs::timer::v0::timer_service_server::TimerServiceServer; use stubs::trigger::v0::trigger_service_server::TriggerServiceServer; -use stubs::tts::v0::tts_service_server::{TtsService, TtsServiceServer}; use stubs::unit::v0::unit_service_server::UnitServiceServer; use stubs::world::v0::world_service_server::WorldServiceServer; -use tokio::runtime::Runtime; +use tokio::runtime::{Handle, Runtime}; use tokio::sync::oneshot::{self, Receiver}; +use tokio::sync::{mpsc, Mutex}; use tokio::time::sleep; use tonic::transport; use crate::config::{Config, SrsConfig, TtsConfig}; -use crate::rpc::{HookRpc, MissionRpc, Tts}; +use crate::rpc::{HookRpc, MissionRpc, Srs}; use crate::shutdown::{Shutdown, ShutdownHandle}; +use crate::srs::SrsClients; use crate::stats::Stats; pub struct Server { @@ -34,7 +37,7 @@ pub struct Server { shutdown: Shutdown, after_shutdown: Option>, state: ServerState, - tts: Arc, + srs_transmit: mpsc::Sender, } #[derive(Clone)] @@ -46,6 +49,7 @@ struct ServerState { stats: Stats, tts_config: TtsConfig, srs_config: SrsConfig, + srs_transmit: Arc>>, } impl Server { @@ -54,24 +58,21 @@ impl Server { let ipc_hook = IPC::default(); let runtime = Runtime::new()?; let shutdown = Shutdown::new(); + let (tx, rx) = mpsc::channel(128); Ok(Self { runtime, after_shutdown: None, state: ServerState { addr: format!("{}:{}", config.host, config.port).parse()?, eval_enabled: config.eval_enabled, - ipc_mission: ipc_mission.clone(), + ipc_mission, ipc_hook, stats: Stats::new(shutdown.handle()), tts_config: config.tts.clone().unwrap_or_default(), srs_config: config.srs.clone().unwrap_or_default(), + srs_transmit: Arc::new(Mutex::new(rx)), }, - tts: Arc::new(Tts::new( - config.tts.clone().unwrap_or_default(), - config.srs.clone().unwrap_or_default(), - ipc_mission, - shutdown.handle(), - )), + srs_transmit: tx, shutdown, }) } @@ -86,6 +87,7 @@ impl Server { self.after_shutdown = Some(tx); self.runtime.spawn(crate::server::run( + self.runtime.handle().clone(), self.state.clone(), self.shutdown.handle(), rx, @@ -129,33 +131,23 @@ impl Server { } pub fn tts(&self, ssml: String, frequency: u64, opts: Option) { - let tts = self.tts.clone(); let opts = opts.unwrap_or_default(); log::debug!("TTS from Lua: `{}` @ {} ({:?})", ssml, frequency, opts); - - self.runtime.spawn(async move { - let result = tts - .transmit(tonic::Request::new(stubs::tts::v0::TransmitRequest { - ssml, - plaintext: opts.plaintext, - frequency, - srs_client_name: opts.srs_client_name, - position: opts.position, - coalition: opts - .coalition - .unwrap_or(stubs::common::v0::Coalition::Neutral) - .into(), - r#async: false, - provider: opts.provider, - })) - .await; - match result { - Ok(_) => {} - Err(err) => { - log::error!("Error in TTS transmission from Lua: {}", err); - } - } - }); + self.srs_transmit + .try_send(TransmitRequest { + ssml, + plaintext: opts.plaintext, + frequency, + srs_client_name: opts.srs_client_name, + position: opts.position, + coalition: opts + .coalition + .unwrap_or(stubs::common::v0::Coalition::Neutral) + .into(), + r#async: false, + provider: opts.provider, + }) + .ok(); } } @@ -166,20 +158,28 @@ pub struct TtsOptions { srs_client_name: Option, position: Option, coalition: Option, - provider: Option, + provider: Option, } async fn run( + runtime: Handle, state: ServerState, shutdown_signal: ShutdownHandle, mut after_shutdown: Receiver<()>, ) { loop { - match try_run(state.clone(), shutdown_signal.clone(), &mut after_shutdown).await { + match try_run( + runtime.clone(), + state.clone(), + shutdown_signal.clone(), + &mut after_shutdown, + ) + .await + { Ok(_) => break, Err(err) => { log::error!("{}", err); - log::info!("Restarting gIPC Server in 10 seconds ..."); + log::info!("Restarting gRPC Server in 10 seconds ..."); sleep(Duration::from_secs(10)).await; } } @@ -187,6 +187,7 @@ async fn run( } async fn try_run( + runtime: Handle, state: ServerState, shutdown_signal: ShutdownHandle, after_shutdown: &mut Receiver<()>, @@ -201,6 +202,7 @@ async fn try_run( stats, tts_config, srs_config, + srs_transmit, } = state; let mut mission_rpc = @@ -212,6 +214,34 @@ async fn try_run( hook_rpc.enable_eval(); } + let srs_clients = SrsClients::default(); + runtime.spawn(crate::srs::run_in_background( + mission_rpc.clone(), + srs_clients.clone(), + srs_config.clone(), + shutdown_signal.clone(), + )); + + let srs = Srs::new( + tts_config.clone(), + srs_config.clone(), + mission_rpc.clone(), + srs_clients.clone(), + shutdown_signal.clone(), + ); + runtime.spawn(async move { + let mut srs_transmit = srs_transmit.lock().await; + while let Some(req) = srs_transmit.recv().await { + let result = srs.transmit(tonic::Request::new(req)).await; + match result { + Ok(_) => {} + Err(err) => { + log::error!("Error in TTS transmission from Lua: {}", err); + } + } + } + }); + transport::Server::builder() .add_service(AtmosphereServiceServer::new(mission_rpc.clone())) .add_service(CoalitionServiceServer::new(mission_rpc.clone())) @@ -223,10 +253,11 @@ async fn try_run( .add_service(NetServiceServer::new(mission_rpc.clone())) .add_service(TimerServiceServer::new(mission_rpc.clone())) .add_service(TriggerServiceServer::new(mission_rpc.clone())) - .add_service(TtsServiceServer::new(Tts::new( + .add_service(SrsServiceServer::new(Srs::new( tts_config, srs_config, - ipc_mission, + mission_rpc.clone(), + srs_clients, shutdown_signal.clone(), ))) .add_service(UnitServiceServer::new(mission_rpc.clone())) diff --git a/src/srs.rs b/src/srs.rs new file mode 100644 index 00000000..a1941ebd --- /dev/null +++ b/src/srs.rs @@ -0,0 +1,313 @@ +use std::collections::{HashMap, HashSet}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::Arc; +use std::time::Duration; + +use backoff::ExponentialBackoff; +use futures_util::future::select; +use srs::{ + ClientDisconnectMessage, Message, Modulation, Packet, RadioUpdateMessage, + ServerSettingsMessage, StreamError, SyncMessage, UpdateMessage, +}; +use stubs::common::v0::Unit; +use stubs::mission::v0::stream_events_response::{Event, SrsConnectEvent, SrsDisconnectEvent}; +use stubs::mission::v0::StreamEventsResponse; +use tokio::sync::RwLock; +use tonic::{Code, Status}; + +use crate::fps::event_time; +use crate::rpc::MissionRpc; +use crate::shutdown::ShutdownHandle; + +#[derive(Clone, Default)] +pub struct SrsClients { + pub clients: Arc>>, +} + +pub type Frequencies = HashSet; + +pub async fn run_in_background( + rpc: MissionRpc, + clients: SrsClients, + config: crate::config::SrsConfig, + shutdown_handle: ShutdownHandle, +) { + let client = srs::Client::new( + "DCS-gRPC", + 256_000_000, // freq doesn't really matter here + srs::Coalition::Spectator, + ); + + let addr = config + .addr + .unwrap_or_else(|| SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5002)); + + let backoff = ExponentialBackoff { + initial_interval: Duration::from_secs(30), + max_interval: Duration::from_secs(5 * 60), + max_elapsed_time: None, // never stop trying + ..Default::default() + }; + + select( + Box::pin(backoff::future::retry_notify( + backoff, + // on each try, run the program and consider every error as transient (ie. worth + // retrying) + || async { + run( + rpc.clone(), + clients.clone(), + addr, + client.clone(), + shutdown_handle.clone(), + ) + .await + .map_err(backoff::Error::transient) + }, + // error hook: + |err, backoff: Duration| { + log::debug!( + "retrying with backoff {:.2}s after error: {err}", + backoff.as_secs_f64(), + ); + }, + )), + shutdown_handle.signal(), + ) + .await; +} + +async fn run( + rpc: MissionRpc, + clients: SrsClients, + addr: SocketAddr, + client: srs::Client, + shutdown_handle: ShutdownHandle, +) -> Result<(), StreamError> { + let (_, mut rx) = client.start(addr, shutdown_handle.signal()).await?; + while let Some(p) = rx.recv().await { + let Packet::Control(msg) = p? + else { + continue; + }; + log::info!("srs msg: {msg:#?}"); + + match msg { + Message::Sync(SyncMessage { + clients: sync_clients, + .. + }) => { + let mut clients = clients.clients.write().await; + let mut before = + std::mem::replace(&mut *clients, HashMap::with_capacity(sync_clients.len())); + + for c in sync_clients { + let Some(radio) = c.radio_info + else { + continue; + }; + + if radio.unit_id == 0 || radio.unit == "CA" { + continue; + } + + let before = before.remove(&radio.unit_id); + let after = radio + .radios + .into_iter() + .filter_map(|r| { + matches!(r.modulation, Modulation::Am | Modulation::Fm) + .then_some(r.freq as u64) + }) + .collect::>(); + + let mut unit = None; + if let Some(mut before) = before { + for freq in &after { + if !before.remove(freq) { + unit = connected(&rpc, unit.take(), radio.unit_id, *freq).await; + } + } + for freq in before { + unit = disconnected(&rpc, unit.take(), radio.unit_id, freq).await; + } + } else { + for freq in &after { + unit = connected(&rpc, unit.take(), radio.unit_id, *freq).await; + } + } + + if unit.is_some() { + clients.insert(radio.unit_id, after); + } + } + } + Message::Update(UpdateMessage { client, .. }) + | Message::RadioUpdate(RadioUpdateMessage { client, .. }) => { + let Some(radio) = client.radio_info + else { + continue; + }; + + if radio.unit_id == 0 || radio.unit == "CA" { + continue; + } + + let mut clients = clients.clients.write().await; + let before = clients.remove(&radio.unit_id); + let after = radio + .radios + .into_iter() + .filter_map(|r| { + matches!(r.modulation, Modulation::Am | Modulation::Fm) + .then_some(r.freq as u64) + }) + .collect::>(); + + let mut unit = None; + if let Some(mut before) = before { + for freq in &after { + if !before.remove(freq) { + unit = connected(&rpc, unit.take(), radio.unit_id, *freq).await; + } + } + for freq in before { + unit = disconnected(&rpc, unit.take(), radio.unit_id, freq).await; + } + } else { + for freq in &after { + unit = connected(&rpc, unit.take(), radio.unit_id, *freq).await; + } + } + + if unit.is_some() { + clients.insert(radio.unit_id, after); + } + } + Message::ClientDisconnect(ClientDisconnectMessage { client, .. }) => { + let Some(radio) = client.radio_info + else { + continue; + }; + + if radio.unit_id == 0 || radio.unit == "CA" { + continue; + } + + let mut clients = clients.clients.write().await; + if let Some(freqs) = clients.remove(&radio.unit_id) { + let mut unit = None; + for freq in freqs { + unit = disconnected(&rpc, unit.take(), radio.unit_id, freq).await; + } + } + } + Message::ServerSettings(ServerSettingsMessage { + server_settings, .. + }) => { + if !server_settings + .get("SHOW_TUNED_COUNT") + .is_some_and(|s| s == "True") + { + log::warn!( + "`Show Tuned/Client Count` is disabled on your SRS server. \ + Enable it if you want to receive the frequencies your SRS clients are on." + ) + } + } + Message::Ping(_) | Message::VersionMismatch(_) => {} + } + } + + Ok(()) +} + +async fn connected( + rpc: &MissionRpc, + unit: Option, + unit_id: u32, + frequency: u64, +) -> Option { + let unit = if let Some(unit) = unit { + unit + } else { + match get_unit_by_id(rpc, unit_id).await { + Ok(unit) => unit, + Err(err) => { + if err.code() != Code::NotFound { + log::error!( + "failed to get unit by id for srs connect event: {}", + err + ); + } + return None; + } + } + }; + + rpc.event(StreamEventsResponse { + time: event_time(), + event: Some(Event::SrsConnect(SrsConnectEvent { + unit: Some(unit.clone()), + frequency, + })), + }) + .await; + + Some(unit) +} + +async fn disconnected( + rpc: &MissionRpc, + unit: Option, + unit_id: u32, + frequency: u64, +) -> Option { + let unit = if let Some(unit) = unit { + unit + } else { + match get_unit_by_id(rpc, unit_id).await { + Ok(unit) => unit, + Err(err) => { + if err.code() != Code::NotFound { + log::error!( + "failed to get unit by id for srs disconnect event: {}", + err + ); + } + return None; + } + } + }; + + rpc.event(StreamEventsResponse { + time: event_time(), + event: Some(Event::SrsDisconnect(SrsDisconnectEvent { + unit: Some(unit.clone()), + frequency, + })), + }) + .await; + + Some(unit) +} + +async fn get_unit_by_id(rpc: &MissionRpc, id: u32) -> Result { + #[derive(serde::Serialize)] + struct GetUnitByIdRequest { + id: u32, + } + #[derive(Debug, serde::Deserialize)] + struct GetUnitByIdResponse { + unit: Unit, + } + + let res: GetUnitByIdResponse = rpc + .request( + "getUnitById", + tonic::Request::new(GetUnitByIdRequest { id }), + ) + .await?; + Ok(res.unit) +} diff --git a/srs/Cargo.toml b/srs/Cargo.toml index 0c9a93c8..5e3c72af 100644 --- a/srs/Cargo.toml +++ b/srs/Cargo.toml @@ -20,3 +20,6 @@ tokio-stream.workspace = true tokio-util = { version = "0.7", features = ["codec", "net"] } tokio.workspace = true uuid = { version = "1.1", features = ["v4"] } + +[dev-dependencies] +pretty_assertions = "1.3" diff --git a/srs/src/client.rs b/srs/src/client.rs index 60e68d94..099eb6b5 100644 --- a/srs/src/client.rs +++ b/srs/src/client.rs @@ -5,7 +5,8 @@ use std::sync::Arc; use tokio::sync::RwLock; use crate::message::{create_sguid, Coalition, Position}; -use crate::voice_stream::{VoiceStream, VoiceStreamError}; +use crate::stream::{Receiver, Sender}; +use crate::StreamError; #[derive(Debug, Clone)] pub struct UnitInfo { @@ -76,8 +77,7 @@ impl Client { self, addr: SocketAddr, shutdown_signal: impl Future + Unpin + Send + 'static, - ) -> Result { - let stream = VoiceStream::new(self, addr, shutdown_signal).await?; - Ok(stream) + ) -> Result<(Sender, Receiver), StreamError> { + crate::stream::stream(self, addr, shutdown_signal).await } } diff --git a/srs/src/lib.rs b/srs/src/lib.rs index f303d232..d6db183a 100644 --- a/srs/src/lib.rs +++ b/srs/src/lib.rs @@ -1,10 +1,9 @@ mod client; mod message; mod messages_codec; +mod stream; mod voice_codec; -mod voice_stream; pub use client::Client; -pub use message::{Coalition, Position}; -pub use voice_codec::{Encryption, Frequency, Modulation, VoicePacket}; -pub use voice_stream::VoiceStream; +pub use message::*; +pub use stream::{Packet, Receiver, Sender, StreamError}; diff --git a/srs/src/message.rs b/srs/src/message.rs index eb35513e..cd440cb9 100644 --- a/srs/src/message.rs +++ b/srs/src/message.rs @@ -5,15 +5,87 @@ use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use uuid::Uuid; -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum MsgType { - Update, - Ping, - Sync, - RadioUpdate, - ServerSettings, - ClientDisconnect, - VersionMismatch, +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase", untagged)] +pub enum Message { + Update(UpdateMessage), + Ping(PingMessage), + Sync(SyncMessage), + RadioUpdate(RadioUpdateMessage), + ServerSettings(ServerSettingsMessage), + ClientDisconnect(ClientDisconnectMessage), + VersionMismatch(VersionMismatchMessage), +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase", untagged)] +pub enum MessageRequest { + Update(UpdateMessage), + Sync(SyncMessageRequest), + RadioUpdate(RadioUpdateMessage), +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct UpdateMessage { + pub msg_type: MsgType<0>, + pub client: Client, + pub version: String, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct PingMessage { + pub msg_type: MsgType<1>, + pub version: String, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct SyncMessage { + pub msg_type: MsgType<2>, + pub clients: Vec, + pub server_settings: HashMap, + pub version: String, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct SyncMessageRequest { + pub msg_type: MsgType<2>, + pub client: Client, + pub version: String, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct RadioUpdateMessage { + pub msg_type: MsgType<3>, + pub client: Client, + pub version: String, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ServerSettingsMessage { + pub msg_type: MsgType<4>, + pub server_settings: HashMap, + pub version: String, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ClientDisconnectMessage { + pub msg_type: MsgType<5>, + pub client: Client, + pub version: String, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct VersionMismatchMessage { + pub msg_type: MsgType<6>, + pub version: String, } #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -23,53 +95,15 @@ pub enum Coalition { Red, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Radio { - #[serde(default)] pub enc: bool, - #[serde(default)] pub enc_key: u8, - #[serde(default)] - pub enc_mode: EncryptionMode, - #[serde(default = "default_freq")] - pub freq_max: f64, - #[serde(default = "default_freq")] - pub freq_min: f64, - #[serde(default = "default_freq")] pub freq: f64, - #[serde(default)] pub modulation: Modulation, - #[serde(default)] - pub name: String, - #[serde(default = "default_freq")] pub sec_freq: f64, - #[serde(default = "default_volume")] - pub volume: f32, - #[serde(default)] - pub freq_mode: FreqMode, - #[serde(default)] - pub guard_freq_mode: FreqMode, - #[serde(default)] - pub vol_mode: VolumeMode, - #[serde(default)] - pub expansion: bool, - #[serde(default = "default_channel")] - pub channel: i32, - #[serde(default)] - pub simul: bool, -} - -fn default_freq() -> f64 { - 1.0 -} - -fn default_volume() -> f32 { - 1.0 -} - -fn default_channel() -> i32 { - -1 + pub retransmit: bool, } impl Default for Radio { @@ -77,58 +111,15 @@ impl Default for Radio { Radio { enc: false, enc_key: 0, - enc_mode: EncryptionMode::NoEncryption, - freq_max: 1.0, - freq_min: 1.0, freq: 1.0, modulation: Modulation::Disabled, - name: "".to_string(), sec_freq: 1.0, - volume: 1.0, - freq_mode: FreqMode::Cockpit, - guard_freq_mode: FreqMode::Cockpit, - vol_mode: VolumeMode::Cockpit, - expansion: false, - channel: -1, - simul: false, + retransmit: false, } } } -#[derive(Serialize_repr, Deserialize_repr, Debug, Clone, Copy)] -#[repr(u8)] -#[derive(Default)] -pub enum EncryptionMode { - /// no control - #[default] - NoEncryption = 0, - /// FC3 Gui Toggle + Gui Enc key setting - EncryptionJustOverlay = 1, - /// InCockpit toggle + Incockpit Enc setting - EncryptionFull = 2, - /// Incockpit toggle + Gui Enc Key setting - EncryptionCockpitToggleOverlayCode = 3, -} - -#[derive(Serialize_repr, Deserialize_repr, Debug, Clone, Copy)] -#[repr(u8)] -#[derive(Default)] -pub enum VolumeMode { - #[default] - Cockpit = 0, - Overlay = 1, -} - -#[derive(Serialize_repr, Deserialize_repr, Debug, Clone, Copy)] -#[repr(u8)] -#[derive(Default)] -pub enum FreqMode { - #[default] - Cockpit = 0, - Overlay = 1, -} - -#[derive(Serialize_repr, Deserialize_repr, Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize_repr, Deserialize_repr)] #[repr(u8)] #[derive(Default)] pub enum Modulation { @@ -142,45 +133,37 @@ pub enum Modulation { Mids = 6, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct RadioInfo { - #[serde(default)] - pub name: String, - #[serde(default)] - pub ptt: bool, pub radios: Vec, - #[serde(default)] - pub control: RadioSwitchControls, - #[serde(default)] - pub selected: i16, - #[serde(default)] pub unit: String, pub unit_id: u32, - #[serde(default)] - pub simultaneous_transmission: bool, + pub iff: Transponder, } -#[derive(Serialize_repr, Deserialize_repr, Debug, Clone, Copy)] +#[derive(Debug, PartialEq, Default, Clone, Copy, Serialize_repr, Deserialize_repr)] #[repr(u8)] -#[derive(Default)] pub enum RadioSwitchControls { #[default] Hotas = 0, InCockpit = 1, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "PascalCase")] pub struct Client { pub client_guid: String, - pub name: Option, - pub radio_info: Option, + pub name: String, + pub seat: u32, pub coalition: Coalition, - pub lat_lng_position: Option, + pub allow_record: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub radio_info: Option, + pub lat_lng_position: Position, } -#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)] +#[derive(Debug, PartialEq, Default, Clone, Serialize, Deserialize)] pub struct Position { pub lat: f64, #[serde(rename = "lng")] @@ -188,19 +171,7 @@ pub struct Position { pub alt: f64, } -#[derive(Serialize, Deserialize, Debug)] -#[serde(rename_all = "PascalCase")] -pub struct Message { - pub client: Option, - pub msg_type: MsgType, - pub server_settings: Option>, - // Clients - // ServerSettings - // ExternalAWACSModePassword - pub version: String, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Transponder { control: IffControlMode, @@ -211,7 +182,7 @@ pub struct Transponder { status: IffStatus, } -#[derive(Serialize_repr, Deserialize_repr, Debug, Clone, Copy)] +#[derive(Debug, Serialize_repr, Deserialize_repr, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum IffControlMode { Cockpit = 0, @@ -219,7 +190,7 @@ pub enum IffControlMode { Disabled = 2, } -#[derive(Serialize_repr, Deserialize_repr, Debug, Clone, Copy)] +#[derive(Debug, Serialize_repr, Deserialize_repr, Clone, Copy, PartialEq, Eq)] #[repr(u8)] pub enum IffStatus { Off = 0, @@ -227,76 +198,46 @@ pub enum IffStatus { Ident = 2, } -impl Default for Transponder { - fn default() -> Self { - Transponder { - control: IffControlMode::Disabled, - mode1: -1, - mode3: -1, - mode4: false, - mic: -1, - status: IffStatus::Off, - } - } -} +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub struct MsgType; -impl ::serde::Serialize for MsgType { +#[derive(Debug, thiserror::Error)] +#[error("Invalid message type")] +struct MsgTypeError; + +impl Serialize for MsgType { fn serialize(&self, serializer: S) -> Result where - S: ::serde::Serializer, + S: serde::Serializer, { - // Serialize the enum as a u64. - serializer.serialize_u64(match *self { - MsgType::Update => 0, - MsgType::Ping => 1, - MsgType::Sync => 2, - MsgType::RadioUpdate => 3, - MsgType::ServerSettings => 4, - MsgType::ClientDisconnect => 5, - MsgType::VersionMismatch => 6, - }) + serializer.serialize_u8(V) } } -impl<'de> ::serde::Deserialize<'de> for MsgType { +impl<'de, const V: u8> Deserialize<'de> for MsgType { fn deserialize(deserializer: D) -> Result where - D: ::serde::Deserializer<'de>, + D: serde::Deserializer<'de>, { - struct Visitor; - - impl<'de> ::serde::de::Visitor<'de> for Visitor { - type Value = MsgType; - - fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { - formatter.write_str("positive integer") - } - - fn visit_u64(self, value: u64) -> Result - where - E: ::serde::de::Error, - { - // Rust does not come with a simple way of converting a - // number to an enum, so use a big `match`. - match value { - 0 => Ok(MsgType::Update), - 1 => Ok(MsgType::Ping), - 2 => Ok(MsgType::Sync), - 3 => Ok(MsgType::RadioUpdate), - 4 => Ok(MsgType::ServerSettings), - 5 => Ok(MsgType::ClientDisconnect), - 6 => Ok(MsgType::VersionMismatch), - _ => Err(E::custom(format!( - "unknown {} value: {}", - stringify!(MsgType), - value - ))), - } - } + let value = u8::deserialize(deserializer)?; + if value == V { + Ok(MsgType::) + } else { + Err(serde::de::Error::custom(MsgTypeError)) } + } +} - // Deserialize the enum from a u64. - deserializer.deserialize_u64(Visitor) +impl Default for Transponder { + fn default() -> Self { + Transponder { + control: IffControlMode::Disabled, + mode1: -1, + mode3: -1, + mode4: false, + mic: -1, + status: IffStatus::Off, + } } } @@ -358,3 +299,196 @@ pub fn create_sguid() -> String { assert_eq!(sguid.len(), 22); sguid } + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + + use super::*; + + #[test] + fn test_serde_message_update() { + let expected = r#"{"MsgType":0,"Client":{"ClientGuid":"BCZSXySXT4WL9zlxNkJwkQ","Name":"DCS-gRPC","Seat":0,"Coalition":2,"AllowRecord":false,"LatLngPosition":{"lat":0.0,"lng":0.0,"alt":0.0}},"Version":"2.0.8.6"}"#; + let msg: Message = serde_json::from_str(expected).unwrap(); + assert_eq!( + msg, + Message::Update(UpdateMessage { + msg_type: MsgType, + client: Client { + client_guid: "BCZSXySXT4WL9zlxNkJwkQ".to_string(), + name: "DCS-gRPC".to_string(), + seat: 0, + coalition: Coalition::Blue, + allow_record: false, + radio_info: None, + lat_lng_position: Position::default() + }, + version: "2.0.8.6".to_string() + }) + ); + assert_eq!(serde_json::to_string(&msg).unwrap(), expected); + } + + #[test] + fn test_serde_message_sync() { + let expected = r#"{"MsgType":2,"Clients":[{"ClientGuid":"BCZSXySXT4WL9zlxNkJwkQ","Name":"DCS-gRPC","Seat":0,"Coalition":2,"AllowRecord":false,"LatLngPosition":{"lat":0.0,"lng":0.0,"alt":0.0}},{"ClientGuid":"7WKyf-Wc5E2xofl7IOc0wg","Name":"PILOT_220624","Seat":0,"Coalition":0,"AllowRecord":false,"RadioInfo":{"radios":[{"enc":false,"encKey":0,"freq":305000000.0,"modulation":0,"secFreq":0.0,"retransmit":false}],"unit":"FA-18C_hornet","unitId":16777472,"iff":{"control":0,"mode1":-1,"mode3":-1,"mode4":true,"mic":-1,"status":1}},"LatLngPosition":{"lat":0.0,"lng":0.0,"alt":0.0}}],"ServerSettings":{"COALITION_AUDIO_SECURITY":"False"},"Version":"2.0.8.6"}"#; + let msg: Message = serde_json::from_str(expected).unwrap(); + assert_eq!( + msg, + Message::Sync(SyncMessage { + msg_type: MsgType, + clients: vec![ + Client { + client_guid: "BCZSXySXT4WL9zlxNkJwkQ".to_string(), + name: "DCS-gRPC".to_string(), + seat: 0, + coalition: Coalition::Blue, + allow_record: false, + radio_info: None, + lat_lng_position: Position { + lat: 0.0, + lon: 0.0, + alt: 0.0 + } + }, + Client { + client_guid: "7WKyf-Wc5E2xofl7IOc0wg".to_string(), + name: "PILOT_220624".to_string(), + seat: 0, + coalition: Coalition::Spectator, + allow_record: false, + radio_info: Some(RadioInfo { + radios: vec![Radio { + enc: false, + enc_key: 0, + freq: 305000000.0, + modulation: Modulation::Am, + sec_freq: 0.0, + retransmit: false + }], + unit: "FA-18C_hornet".to_string(), + unit_id: 16777472, + iff: Transponder { + control: IffControlMode::Cockpit, + mode1: -1, + mode3: -1, + mode4: true, + mic: -1, + status: IffStatus::Normal + }, + }), + lat_lng_position: Position::default(), + } + ], + server_settings: HashMap::from([( + "COALITION_AUDIO_SECURITY".to_string(), + "False".to_string() + )]), + version: "2.0.8.6".to_string() + }) + ); + assert_eq!(serde_json::to_string(&msg).unwrap(), expected); + } + + #[test] + fn test_serde_message_radio_update() { + let expected = r#"{"MsgType":3,"Client":{"ClientGuid":"BCZSXySXT4WL9zlxNkJwkQ","Name":"DCS-gRPC","Seat":0,"Coalition":2,"AllowRecord":false,"RadioInfo":{"radios":[{"enc":false,"encKey":0,"freq":1.0,"modulation":3,"secFreq":1.0,"retransmit":false}],"unit":"DCS-gRPC","unitId":0,"iff":{"control":2,"mode1":-1,"mode3":-1,"mode4":false,"mic":-1,"status":0}},"LatLngPosition":{"lat":0.0,"lng":0.0,"alt":0.0}},"Version":"2.0.8.6"}"#; + let msg: Message = serde_json::from_str(expected).unwrap(); + assert_eq!( + msg, + Message::RadioUpdate(RadioUpdateMessage { + msg_type: MsgType, + client: Client { + client_guid: "BCZSXySXT4WL9zlxNkJwkQ".to_string(), + name: "DCS-gRPC".to_string(), + seat: 0, + coalition: Coalition::Blue, + allow_record: false, + radio_info: Some(RadioInfo { + radios: vec![Radio { + enc: false, + enc_key: 0, + freq: 1.0, + modulation: Modulation::Disabled, + sec_freq: 1.0, + retransmit: false + }], + unit: "DCS-gRPC".to_string(), + unit_id: 0, + iff: Transponder { + control: IffControlMode::Disabled, + mode1: -1, + mode3: -1, + mode4: false, + mic: -1, + status: IffStatus::Off + }, + }), + lat_lng_position: Position::default(), + }, + version: "2.0.8.6".to_string() + }) + ); + assert_eq!(serde_json::to_string(&msg).unwrap(), expected); + } + + #[test] + fn test_serde_message_server_settings() { + let expected = r#"{"MsgType":4,"ServerSettings":{"COALITION_AUDIO_SECURITY":"False"},"Version":"2.0.8.6"}"#; + let msg: Message = serde_json::from_str(expected).unwrap(); + assert_eq!( + msg, + Message::ServerSettings(ServerSettingsMessage { + msg_type: MsgType, + server_settings: HashMap::from([( + "COALITION_AUDIO_SECURITY".to_string(), + "False".to_string() + )]), + version: "2.0.8.6".to_string() + }) + ); + assert_eq!(serde_json::to_string(&msg).unwrap(), expected); + } + + #[test] + fn test_serde_message_client_disconnect() { + let expected = r#"{"MsgType":5,"Client":{"ClientGuid":"OYOrf4yJdUex5tNuBYnaMQ","Name":"PILOT_220624","Seat":0,"Coalition":0,"AllowRecord":false,"RadioInfo":{"radios":[{"enc":false,"encKey":0,"freq":100.0,"modulation":2,"secFreq":0.0,"retransmit":false}],"unit":"CA","unitId":100000001,"iff":{"control":0,"mode1":0,"mode3":0,"mode4":false,"mic":-1,"status":0}},"LatLngPosition":{"lat":0.0,"lng":0.0,"alt":0.0}},"Version":"2.0.8.6"}"#; + let msg: Message = serde_json::from_str(expected).unwrap(); + assert_eq!( + msg, + Message::ClientDisconnect(ClientDisconnectMessage { + msg_type: MsgType, + client: Client { + client_guid: "OYOrf4yJdUex5tNuBYnaMQ".to_string(), + name: "PILOT_220624".to_string(), + seat: 0, + coalition: Coalition::Spectator, + allow_record: false, + radio_info: Some(RadioInfo { + radios: vec![Radio { + enc: false, + enc_key: 0, + freq: 100.0, + modulation: Modulation::Intercom, + sec_freq: 0.0, + retransmit: false + }], + unit: "CA".to_string(), + unit_id: 100000001, + iff: Transponder { + control: IffControlMode::Cockpit, + mode1: 0, + mode3: 0, + mode4: false, + mic: -1, + status: IffStatus::Off + }, + }), + lat_lng_position: Position::default(), + }, + version: "2.0.8.6".to_string() + }) + ); + assert_eq!(serde_json::to_string(&msg).unwrap(), expected); + } +} diff --git a/srs/src/messages_codec.rs b/srs/src/messages_codec.rs index 30c848b8..61d40f5c 100644 --- a/srs/src/messages_codec.rs +++ b/srs/src/messages_codec.rs @@ -3,7 +3,7 @@ use std::{error, fmt, io}; use bytes::BytesMut; use tokio_util::codec::{Decoder, Encoder, LinesCodec, LinesCodecError}; -use crate::message::Message; +use crate::message::{Message, MessageRequest}; pub struct MessagesCodec { lines_codec: LinesCodec, @@ -44,10 +44,10 @@ impl Decoder for MessagesCodec { } } -impl Encoder for MessagesCodec { +impl Encoder for MessagesCodec { type Error = MessagesCodecError; - fn encode(&mut self, msg: Message, buf: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, msg: MessageRequest, buf: &mut BytesMut) -> Result<(), Self::Error> { let json = serde_json::to_string(&msg).map_err(MessagesCodecError::JsonEncode)?; self.lines_codec.encode(json, buf)?; Ok(()) diff --git a/srs/src/stream.rs b/srs/src/stream.rs new file mode 100644 index 00000000..e62533d7 --- /dev/null +++ b/srs/src/stream.rs @@ -0,0 +1,272 @@ +use std::future::Future; +use std::io; +use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; + +use futures_util::{FutureExt, SinkExt, StreamExt, TryFutureExt}; +use tokio::io::AsyncWriteExt; +use tokio::net::{TcpStream, UdpSocket}; +use tokio::sync::mpsc; +use tokio::time; +use tokio_stream::pending; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::codec::{FramedRead, FramedWrite}; +use tokio_util::udp::UdpFramed; + +use crate::message::{ + Client, Message, MessageRequest, MsgType, RadioInfo, RadioUpdateMessage, ServerSettingsMessage, + SyncMessageRequest, UpdateMessage, VersionMismatchMessage, +}; +use crate::messages_codec::{self, MessagesCodec, MessagesCodecError}; +use crate::voice_codec::{self, Encryption, Frequency, Modulation, VoiceCodec, VoicePacket}; + +const SRS_VERSION: &str = "1.9.0.0"; + +pub type Sender = mpsc::Sender>; +pub type Receiver = mpsc::Receiver>; + +#[allow(clippy::large_enum_variant)] +pub enum Packet { + Control(Message), + Voice(Vec), +} + +pub async fn stream( + client: crate::Client, + addr: SocketAddr, + shutdown_signal: impl Future + Unpin + Send + 'static, +) -> Result<(Sender, Receiver), StreamError> { + let tcp = TcpStream::connect(addr).await?; + let (tcp_stream, tcp_sink) = tcp.into_split(); + let mut messages_sink = FramedWrite::new(tcp_sink, MessagesCodec::new()); + let mut messages_stream = FramedRead::new(tcp_stream, MessagesCodec::new()); + + let udp = UdpSocket::bind(SocketAddr::from(([0, 0, 0, 0], 0))).await?; + udp.connect(addr).await?; + let (mut voice_sink, mut voice_stream) = UdpFramed::new(udp, VoiceCodec::new()).split(); + + let (tx_inner, rx) = mpsc::channel::>(128); + let (tx, rx_inner) = mpsc::channel::>(128); + let tx_err = tx_inner.clone(); + + tokio::task::spawn( + async move { + // send sync message to receive server settings + messages_sink + .send(create_sync_message(&client).await) + .await?; + + // send initial Update message + messages_sink + .send(create_radio_update_message(&client).await) + .await?; + + let los_enabled = AtomicBool::new(false); + let distance_enabled = AtomicBool::new(false); + let mut old_pos = client.position().await; + let mut position_update_interval = time::interval(Duration::from_secs(60)); + let mut voice_ping_interval = time::interval(Duration::from_secs(5)); + let mut shutdown_signal = shutdown_signal.fuse(); + let mut packet_id = 1; + + let mut sguid = [0; 22]; + sguid.clone_from_slice(client.sguid().as_bytes()); + + enum Select { + Control(Option>), + Voice(Option>), + Send(Option>), + PositionUpdate, + VoicePing, + Shutdown, + } + + // Never resolve once sender gets dropped. + let mut rx = ReceiverStream::new(rx_inner).chain(pending()); + + loop { + let select = tokio::select! { + msg = messages_stream.next() => Select::Control(msg), + packet = voice_stream.next() => Select::Voice(packet), + data = rx.next() => Select::Send(data), + _ = position_update_interval.tick() => Select::PositionUpdate, + _ = voice_ping_interval.tick() => Select::VoicePing, + _ = &mut shutdown_signal => Select::Shutdown, + }; + + match select { + Select::Control(Some(msg)) => { + let msg = msg?; + + // handle message + if let Message::VersionMismatch(VersionMismatchMessage { + version, .. + }) = &msg + { + return Err(StreamError::VersionMismatch { + expected: SRS_VERSION.to_string(), + encountered: version.to_string(), + }); + } + + // update server settings + if let Message::ServerSettings(ServerSettingsMessage { + ref server_settings, + .. + }) = &msg + { + los_enabled.store( + server_settings.get("LOS_ENABLED").map(|s| s.as_str()) + == Some("True"), + Ordering::Relaxed, + ); + distance_enabled.store( + server_settings.get("DISTANCE_ENABLED").map(|s| s.as_str()) + == Some("true"), + Ordering::Relaxed, + ); + } + + tx_inner.try_send(Ok(Packet::Control(msg))).ok(); + } + Select::Voice(Some(packet)) => { + // Not completely implemented, so might never be called for now + let (packet, _) = packet?; + tx_inner.try_send(Ok(Packet::Voice(packet.audio_part))).ok(); + } + Select::Send(Some(data)) => { + let packet = VoicePacket { + audio_part: data, + frequencies: vec![Frequency { + freq: client.freq() as f64, + modulation: if client.freq() <= 87_995_000 { + Modulation::Fm + } else { + Modulation::Am + }, + encryption: Encryption::None, + }], + unit_id: client.unit().map(|u| u.id).unwrap_or(0), + packet_id, + hop_count: 0, + transmission_sguid: sguid, + client_sguid: sguid, + }; + packet_id = packet_id.wrapping_add(1); + voice_sink.send((packet.into(), addr)).await?; + } + Select::Control(None) | Select::Voice(None) | Select::Send(None) => { + return Err(StreamError::Closed); + } + Select::PositionUpdate => { + // keep the position of the station updated + let new_pos = client.position().await; + let los_enabled = los_enabled.load(Ordering::Relaxed); + let distance_enabled = distance_enabled.load(Ordering::Relaxed); + if (los_enabled || distance_enabled) && new_pos != old_pos { + log::debug!( + "Position of {} changed, sending a new update message", + client.name() + ); + messages_sink + .send(create_update_message(&client).await) + .await?; + old_pos = new_pos; + } + } + Select::VoicePing => { + voice_sink + .send((voice_codec::Packet::Ping(sguid), addr)) + .await?; + } + Select::Shutdown => { + messages_sink.into_inner().shutdown().await?; + break; + } + } + } + + Ok(()) + } + .map_err(move |err| tx_err.try_send(Err(err)).ok()), + ); + + Ok((tx, rx)) +} + +#[derive(Debug, thiserror::Error)] +pub enum StreamError { + #[error(transparent)] + Net(#[from] io::Error), + #[error(transparent)] + MessagesCodec(#[from] messages_codec::MessagesCodecError), + #[error("Unsupported SRS server version {encountered} (expected {expected})")] + VersionMismatch { + expected: String, + encountered: String, + }, + #[error("Stream was closed unexpectedly")] + Closed, +} + +async fn create_radio_update_message(client: &crate::Client) -> MessageRequest { + let pos = client.position().await; + MessageRequest::RadioUpdate(RadioUpdateMessage { + msg_type: MsgType, + client: Client { + client_guid: client.sguid().to_string(), + name: client.name().to_string(), + seat: 0, + coalition: client.coalition, + allow_record: true, + radio_info: Some(RadioInfo { + // add a radio to receive voice + radios: Vec::new(), + unit: client + .unit() + .map(|u| u.name.clone()) + .unwrap_or_else(|| client.name().to_string()), + unit_id: client.unit().as_ref().map(|u| u.id).unwrap_or(0), + iff: Default::default(), + }), + lat_lng_position: pos, + }, + version: SRS_VERSION.to_string(), + }) +} + +async fn create_update_message(client: &crate::Client) -> MessageRequest { + let pos = client.position().await; + MessageRequest::Update(UpdateMessage { + msg_type: MsgType, + client: Client { + client_guid: client.sguid().to_string(), + name: client.name().to_string(), + seat: 0, + coalition: client.coalition, + allow_record: true, + radio_info: None, + lat_lng_position: pos, + }, + version: SRS_VERSION.to_string(), + }) +} + +async fn create_sync_message(client: &crate::Client) -> MessageRequest { + let pos = client.position().await; + + MessageRequest::Sync(SyncMessageRequest { + msg_type: MsgType, + client: Client { + client_guid: client.sguid().to_string(), + name: client.name().to_string(), + seat: 0, + coalition: client.coalition, + allow_record: true, + radio_info: None, + lat_lng_position: pos, + }, + version: SRS_VERSION.to_string(), + }) +} diff --git a/srs/src/voice_codec.rs b/srs/src/voice_codec.rs index 1e26209e..2872952d 100644 --- a/srs/src/voice_codec.rs +++ b/srs/src/voice_codec.rs @@ -65,19 +65,13 @@ pub struct VoicePacket { } impl Decoder for VoiceCodec { - // UdpFramed, what VoiceCodec is used with, has a strange behavior in Tokio currently. If the - // codec would return `None`, which is actually an indication for that the voice codec needs - // more data to produce a valid item, the UdpFramed would yield the `None` as well. Though, - // a `None` from a stream means the stream is closed. This is planned to be fixed in tokio - // 0.2.0. Until then, we are using an option item here instead, so the stream would return - // `Some(None)` instead. - type Item = Option; + type Item = VoicePacket; type Error = io::Error; fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { // discard ping messages if self.is_head && buf.len() == 22 { - return Ok(Some(None)); + return Ok(None); } if let Some(bytes) = self.inner.decode(buf)? { @@ -134,7 +128,7 @@ impl Decoder for VoiceCodec { assert_eq!(rd.position(), len); - Ok(Some(Some(VoicePacket { + Ok(Some(VoicePacket { audio_part, frequencies, unit_id, @@ -142,12 +136,16 @@ impl Decoder for VoiceCodec { hop_count, transmission_sguid, client_sguid, - }))) + })) } else { self.is_head = false; - Ok(Some(None)) + Ok(None) } } + + fn decode_eof(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { + self.decode(buf) + } } impl Encoder for VoiceCodec { diff --git a/srs/src/voice_stream.rs b/srs/src/voice_stream.rs deleted file mode 100644 index c773a27e..00000000 --- a/srs/src/voice_stream.rs +++ /dev/null @@ -1,357 +0,0 @@ -use std::future::Future; -use std::io; -use std::net::SocketAddr; -use std::pin::Pin; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::Duration; - -use futures_util::future::FutureExt; -use futures_util::sink::{Sink, SinkExt}; -use futures_util::stream::{SplitStream, Stream, StreamExt}; -use tokio::io::AsyncWriteExt; -use tokio::net::{TcpStream, UdpSocket}; -use tokio::sync::mpsc; -use tokio::time; -use tokio_stream::wrappers::ReceiverStream; -use tokio_util::codec::{FramedRead, FramedWrite}; -use tokio_util::sync::{PollSendError, PollSender}; -use tokio_util::udp::UdpFramed; - -use crate::client::Client; -use crate::message::{Client as MsgClient, Message, MsgType, Radio, RadioInfo}; -use crate::messages_codec::{self, MessagesCodec}; -use crate::voice_codec::*; - -const SRS_VERSION: &str = "1.9.0.0"; - -pub struct VoiceStream { - voice_sink: Pin>>, - voice_stream: SplitStream>, - heartbeat: Pin>>>, - client: Client, - packet_id: u64, -} - -#[derive(Clone)] -struct ServerSettings(Arc); - -struct ServerSettingsInner { - los_enabled: AtomicBool, - distance_enabled: AtomicBool, -} - -#[derive(Debug, thiserror::Error)] -pub enum VoiceStreamError { - #[error(transparent)] - Io(#[from] io::Error), - #[error(transparent)] - MessagesCodec(#[from] messages_codec::MessagesCodecError), - #[error(transparent)] - ChannelSend(#[from] tokio::sync::mpsc::error::SendError), - #[error("Version mismatch between DATIS ({expected}) and the SRS server ({encountered})")] - VersionMismatch { - expected: String, - encountered: String, - }, - #[error("Voice stream was closed unexpectedly")] - Closed, - #[error("TCP connection was closed unexpectedly")] - ConnectionClosed, -} - -impl VoiceStream { - pub async fn new( - client: Client, - addr: SocketAddr, - shutdown_signal: impl Future + Unpin + Send + 'static, - ) -> Result { - let tcp = TcpStream::connect(addr).await?; - let (stream, sink) = tcp.into_split(); - let mut messages_sink = FramedWrite::new(sink, MessagesCodec::new()); - let messages_stream = FramedRead::new(stream, MessagesCodec::new()); - - let server_settings = ServerSettings(Arc::new(ServerSettingsInner { - los_enabled: AtomicBool::new(false), - distance_enabled: AtomicBool::new(false), - })); - - let local_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); - let udp = UdpSocket::bind(local_addr).await?; - udp.connect(addr).await?; - let (mut voice_sink, voice_stream) = UdpFramed::new(udp, VoiceCodec::new()).split(); - let (tx, rx) = mpsc::channel(32); - let mut rx = ReceiverStream::new(rx); - let tx2 = tx.clone(); - - let client2 = client.clone(); - let heartbeat = async move { - let recv_voice = false; // future option to enable listening to SRS - let mut messages_stream = messages_stream.fuse(); - - // send sync message to receive server settings - messages_sink - .send(create_sync_message(&client).await) - .await?; - - // send initial Update message - messages_sink - .send(create_radio_update_message(&client).await) - .await?; - - let mut old_pos = client.position().await; - let mut position_update_interval = time::interval(Duration::from_secs(60)); - let mut voice_ping_interval = time::interval(Duration::from_secs(5)); - let mut shutdown_signal = shutdown_signal.fuse(); - - let mut sguid = [0; 22]; - sguid.clone_from_slice(client.sguid().as_bytes()); - - loop { - tokio::select! { - // receive control messages - msg = messages_stream.next() => { - if let Some(msg) = msg { - let msg = msg?; - - // update server settings - if let Some(settings) = msg.server_settings { - server_settings.0.los_enabled.store( - settings.get("LOS_ENABLED").map(|s| s.as_str()) == Some("True"), - Ordering::Relaxed, - ); - server_settings.0.distance_enabled.store( - settings.get("DISTANCE_ENABLED").map(|s| s.as_str()) == Some("true"), - Ordering::Relaxed, - ); - } - - // handle message - if msg.msg_type == MsgType::VersionMismatch { - return Err(VoiceStreamError::VersionMismatch { - expected: SRS_VERSION.to_string(), - encountered: msg.version, - }) - } - } else { - log::debug!("Messages stream was closed, closing voice stream"); - break; - } - } - - // Sends updates about the client to the server. - _ = position_update_interval.tick().fuse() => { - // keep the position of the station updated - let new_pos = client.position().await; - let los_enabled = server_settings.0.los_enabled.load(Ordering::Relaxed); - let distance_enabled = server_settings.0.distance_enabled.load(Ordering::Relaxed); - if (los_enabled || distance_enabled) && new_pos != old_pos { - log::debug!( - "Position of {} changed, sending a new update message", - client.name() - ); - messages_sink.send(create_update_message(&client).await).await?; - old_pos = new_pos; - } - } - - _ = voice_ping_interval.tick().fuse() => { - if recv_voice { - tx.send(Packet::Ping(sguid)).await?; - } - } - - packet = rx.next() => { - if let Some(p) = packet { - voice_sink.send((p, addr)).await?; - } - } - - _ = &mut shutdown_signal => { - messages_sink.into_inner().shutdown().await?; - break; - } - } - } - - Ok(()) - }; - - Ok(VoiceStream { - voice_stream, - voice_sink: Box::pin(PollSender::new(tx2)), - heartbeat: Box::pin(heartbeat), - client: client2, - packet_id: 1, - }) - } -} - -impl Stream for VoiceStream { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let s = self.get_mut(); - - match s.voice_stream.poll_next_unpin(cx) { - Poll::Pending => {} - Poll::Ready(None) => return Poll::Ready(Some(Err(VoiceStreamError::Closed))), - Poll::Ready(Some(Ok((None, _)))) => { - // not enough data for the codec to create a new item - } - Poll::Ready(Some(Ok((Some(p), _)))) => { - return Poll::Ready(Some(Ok(p))); - } - Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))), - } - - match s.heartbeat.poll_unpin(cx) { - Poll::Pending => {} - Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))), - Poll::Ready(Ok(_)) => { - return Poll::Ready(Some(Err(VoiceStreamError::ConnectionClosed))); - } - } - - Poll::Pending - } -} - -impl Sink> for VoiceStream { - type Error = PollSendError; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let s = self.get_mut(); - s.voice_sink.as_mut().poll_ready(cx) - } - - fn start_send(self: Pin<&mut Self>, item: Vec) -> Result<(), Self::Error> { - let mut sguid = [0; 22]; - sguid.clone_from_slice(self.client.sguid().as_bytes()); - - let packet = VoicePacket { - audio_part: item, - frequencies: vec![Frequency { - freq: self.client.freq() as f64, - modulation: if self.client.freq() <= 87_995_000 { - Modulation::Fm - } else { - Modulation::Am - }, - encryption: Encryption::None, - }], - unit_id: self.client.unit().map(|u| u.id).unwrap_or(0), - packet_id: self.packet_id, - hop_count: 0, - transmission_sguid: sguid, - client_sguid: sguid, - }; - - let s = self.get_mut(); - s.packet_id = s.packet_id.wrapping_add(1); - - s.voice_sink.as_mut().start_send(packet.into()) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let s = self.get_mut(); - s.voice_sink.as_mut().poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let s = self.get_mut(); - s.voice_sink.as_mut().poll_close(cx) - } -} - -impl Sink for VoiceStream { - type Error = PollSendError; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let s = self.get_mut(); - s.voice_sink.as_mut().poll_ready(cx) - } - - fn start_send(self: Pin<&mut Self>, mut packet: VoicePacket) -> Result<(), Self::Error> { - let mut sguid = [0; 22]; - sguid.clone_from_slice(self.client.sguid().as_bytes()); - packet.client_sguid = sguid; - - let s = self.get_mut(); - s.packet_id = s.packet_id.wrapping_add(1); - - s.voice_sink.as_mut().start_send(packet.into()) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let s = self.get_mut(); - s.voice_sink.as_mut().poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let s = self.get_mut(); - s.voice_sink.as_mut().poll_close(cx) - } -} - -async fn create_radio_update_message(client: &Client) -> Message { - let pos = client.position().await; - Message { - client: Some(MsgClient { - client_guid: client.sguid().to_string(), - name: Some(client.name().to_string()), - coalition: client.coalition, - radio_info: Some(RadioInfo { - name: "DATIS Radios".to_string(), - ptt: false, - // TODO: enable one of the radios to receive voice - radios: std::iter::repeat_with(Radio::default).take(10).collect(), - control: crate::message::RadioSwitchControls::Hotas, - selected: 0, - unit: client - .unit() - .map(|u| u.name.clone()) - .unwrap_or_else(|| client.name().to_string()), - unit_id: client.unit().as_ref().map(|u| u.id).unwrap_or(0), - simultaneous_transmission: true, - }), - lat_lng_position: Some(pos), - }), - msg_type: MsgType::RadioUpdate, - server_settings: None, - version: SRS_VERSION.to_string(), - } -} - -async fn create_update_message(client: &Client) -> Message { - let pos = client.position().await; - Message { - client: Some(MsgClient { - client_guid: client.sguid().to_string(), - name: Some(client.name().to_string()), - coalition: client.coalition, - radio_info: None, - lat_lng_position: Some(pos), - }), - msg_type: MsgType::Update, - server_settings: None, - version: SRS_VERSION.to_string(), - } -} - -async fn create_sync_message(client: &Client) -> Message { - let pos = client.position().await; - Message { - client: Some(MsgClient { - client_guid: client.sguid().to_string(), - name: Some(client.name().to_string()), - coalition: client.coalition, - radio_info: None, - lat_lng_position: Some(pos), - }), - msg_type: MsgType::Sync, - server_settings: None, - version: SRS_VERSION.to_string(), - } -} diff --git a/stubs/src/lib.rs b/stubs/src/lib.rs index 3e9fa71e..bd60b350 100644 --- a/stubs/src/lib.rs +++ b/stubs/src/lib.rs @@ -12,9 +12,9 @@ pub mod group; pub mod hook; pub mod mission; pub mod net; +pub mod srs; pub mod timer; pub mod trigger; -pub mod tts; pub mod unit; mod utils; pub mod world; diff --git a/stubs/src/srs.rs b/stubs/src/srs.rs new file mode 100644 index 00000000..0c0e0b57 --- /dev/null +++ b/stubs/src/srs.rs @@ -0,0 +1,3 @@ +pub mod v0 { + tonic::include_proto!("dcs.srs.v0"); +} diff --git a/stubs/src/tts.rs b/stubs/src/tts.rs deleted file mode 100644 index 69464eaf..00000000 --- a/stubs/src/tts.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod v0 { - tonic::include_proto!("dcs.tts.v0"); -}