diff --git a/src/NetCoreStack.WebSockets/Internal/ConnectionManager.cs b/src/NetCoreStack.WebSockets/Internal/ConnectionManager.cs index 94f75cd..06ce845 100644 --- a/src/NetCoreStack.WebSockets/Internal/ConnectionManager.cs +++ b/src/NetCoreStack.WebSockets/Internal/ConnectionManager.cs @@ -22,6 +22,20 @@ public ConnectionManager(ILoggerFactory loggerFactory) Connections = new ConcurrentDictionary(); } + private void PrepareBytes(ref byte[] bytes, JsonObject properties) + { + if (bytes == null) + { + throw new ArgumentNullException(nameof(bytes)); + } + + var props = JsonConvert.SerializeObject(properties); + var propsBytes = Encoding.UTF8.GetBytes($"{SocketsConstants.Splitter}{props}"); + + var bytesCount = bytes.Length; + bytes = bytes.Concat(propsBytes).ToArray(); + } + private async Task SendAsync(WebSocketTransport transport, WebSocketMessageDescriptor descriptor) { if (descriptor == null) @@ -40,6 +54,21 @@ await transport.WebSocket.SendAsync(descriptor.Segments, CancellationToken.None); } + private async Task SendBinaryAsync(WebSocketTransport transport, byte[] chunkedBytes, bool endOfMessage) + { + if (transport == null) + { + throw new ArgumentNullException(nameof(transport)); + } + + var segments = new ArraySegment(chunkedBytes); + + await transport.WebSocket.SendAsync(segments, + WebSocketMessageType.Binary, + endOfMessage, + CancellationToken.None); + } + public async Task BroadcastAsync(WebSocketMessageContext context) { if (context == null) @@ -66,6 +95,37 @@ public async Task BroadcastAsync(WebSocketMessageContext context) } } + public async Task BroadcastBinaryAsync(byte[] bytes, JsonObject properties) + { + PrepareBytes(ref bytes, properties); + + var buffer = new byte[SocketsConstants.ChunkSize]; + using (var ms = new MemoryStream(bytes)) + { + using (BinaryReader br = new BinaryReader(ms)) + { + byte[] chunkedBytes = null; + do + { + chunkedBytes = br.ReadBytes(SocketsConstants.ChunkSize); + var endOfMessage = false; + + if (chunkedBytes.Length < SocketsConstants.ChunkSize) + endOfMessage = true; + + foreach (var connection in Connections) + { + await SendBinaryAsync(transport: connection.Value, chunkedBytes: chunkedBytes, endOfMessage: endOfMessage); + } + + if (endOfMessage) + break; + + } while (chunkedBytes.Length <= SocketsConstants.ChunkSize); + } + } + } + public async Task SendAsync(string connectionId, WebSocketMessageContext context) { WebSocketTransport transport = null; @@ -93,11 +153,7 @@ public async Task SendBinaryAsync(string connectionId, byte[] bytes, JsonObject throw new ArgumentOutOfRangeException(nameof(transport)); } - var props = JsonConvert.SerializeObject(properties); - var propsBytes = Encoding.UTF8.GetBytes($"{SocketsConstants.Splitter}{props}"); - - var bytesCount = bytes.Length; - bytes = bytes.Concat(propsBytes).ToArray(); + PrepareBytes(ref bytes, properties); var buffer = new byte[SocketsConstants.ChunkSize]; using (var ms = new MemoryStream(bytes)) diff --git a/src/NetCoreStack.WebSockets/Internal/IConnectionManager.cs b/src/NetCoreStack.WebSockets/Internal/IConnectionManager.cs index b8639f2..e79eecd 100644 --- a/src/NetCoreStack.WebSockets/Internal/IConnectionManager.cs +++ b/src/NetCoreStack.WebSockets/Internal/IConnectionManager.cs @@ -7,11 +7,14 @@ public interface IConnectionManager { Task BroadcastAsync(WebSocketMessageContext context); + Task BroadcastBinaryAsync(byte[] bytes, JsonObject properties); + Task SendAsync(string connectionId, WebSocketMessageContext context); Task SendBinaryAsync(string connectionId, byte[] bytes, JsonObject properties); Task SendAsync(string connectionId, WebSocketMessageContext context, WebSocket webSocket); + void CloseConnection(string connectionId); } } diff --git a/test/ServerTestApp/Controllers/DiscoveryController.cs b/test/ServerTestApp/Controllers/DiscoveryController.cs index d5b434a..2349d8e 100644 --- a/test/ServerTestApp/Controllers/DiscoveryController.cs +++ b/test/ServerTestApp/Controllers/DiscoveryController.cs @@ -38,26 +38,34 @@ public IActionResult Get() [HttpPost(nameof(SendAsync))] public async Task SendAsync([FromBody]SimpleModel model) { - var echo = $"Echo from server '{model.Message}' - {DateTime.Now}"; + var echo = $"Echo from server '{model.Key}' - {DateTime.Now}"; var obj = new { message = echo }; var webSocketContext = new WebSocketMessageContext { Command = WebSocketCommands.DataSend, Value = obj }; await _connectionManager.BroadcastAsync(webSocketContext); return Ok(); } + [HttpPost(nameof(BrodcastBinaryAsync))] + public async Task BrodcastBinaryAsync([FromBody]SimpleModel model) + { + var bytes = _distrubutedCache.Get(model.Key); + await _connectionManager.BroadcastBinaryAsync(bytes, new SocketObject { Key = model.Key }); + return Ok(); + } + [HttpPost(nameof(SendBinaryAsync))] public async Task SendBinaryAsync([FromBody]SimpleModel model) { - var bytes = _distrubutedCache.Get(model.Message); - await _connectionManager.SendBinaryAsync(model.ConnectionId, bytes, new SocketObject { Key = model.Message }); + var bytes = _distrubutedCache.Get(model.Key); + await _connectionManager.SendBinaryAsync(model.ConnectionId, bytes, new SocketObject { Key = model.Key }); return Ok(); } [HttpPost(nameof(SendBinaryFromMemoryAsync))] public async Task SendBinaryFromMemoryAsync([FromBody]SimpleModel model) { - var bytes = (byte[])_memoryCache.Get(model.Message); - await _connectionManager.SendBinaryAsync(model.ConnectionId, bytes, new SocketObject { Key = model.Message }); + var bytes = (byte[])_memoryCache.Get(model.Key); + await _connectionManager.SendBinaryAsync(model.ConnectionId, bytes, new SocketObject { Key = model.Key }); return Ok(); } } diff --git a/test/ServerTestApp/Models/SimpleModel.cs b/test/ServerTestApp/Models/SimpleModel.cs index 2b77494..adc1588 100644 --- a/test/ServerTestApp/Models/SimpleModel.cs +++ b/test/ServerTestApp/Models/SimpleModel.cs @@ -3,6 +3,6 @@ public class SimpleModel { public string ConnectionId { get; set; } - public string Message { get; set; } + public string Key { get; set; } } } diff --git a/test/ServerTestApp/Startup.cs b/test/ServerTestApp/Startup.cs index e3308b9..0077636 100644 --- a/test/ServerTestApp/Startup.cs +++ b/test/ServerTestApp/Startup.cs @@ -44,7 +44,7 @@ public void ConfigureServices(IServiceCollection services) services.AddDistributedRedisCache(options => { options.Configuration = "localhost"; - options.InstanceName = "SocketsInstance"; + options.InstanceName = "RedisInstance"; }); services.AddTransient();