Skip to content

Commit

Permalink
BroadCast Binary Message to All Peers
Browse files Browse the repository at this point in the history
  • Loading branch information
gedem authored and gedem committed Dec 8, 2016
1 parent 4923761 commit 663b3bb
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 12 deletions.
66 changes: 61 additions & 5 deletions src/NetCoreStack.WebSockets/Internal/ConnectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ public ConnectionManager(ILoggerFactory loggerFactory)
Connections = new ConcurrentDictionary<string, WebSocketTransport>();
}

private void PrepareBytes(ref byte[] bytes, JsonObject properties)
{
if (bytes == null)
{
throw new ArgumentNullException(nameof(bytes));
}

var props = JsonConvert.SerializeObject(properties);
var propsBytes = Encoding.UTF8.GetBytes($"{SocketsConstants.Splitter}{props}");

var bytesCount = bytes.Length;
bytes = bytes.Concat(propsBytes).ToArray();
}

private async Task SendAsync(WebSocketTransport transport, WebSocketMessageDescriptor descriptor)
{
if (descriptor == null)
Expand All @@ -40,6 +54,21 @@ await transport.WebSocket.SendAsync(descriptor.Segments,
CancellationToken.None);
}

private async Task SendBinaryAsync(WebSocketTransport transport, byte[] chunkedBytes, bool endOfMessage)
{
if (transport == null)
{
throw new ArgumentNullException(nameof(transport));
}

var segments = new ArraySegment<byte>(chunkedBytes);

await transport.WebSocket.SendAsync(segments,
WebSocketMessageType.Binary,
endOfMessage,
CancellationToken.None);
}

public async Task BroadcastAsync(WebSocketMessageContext context)
{
if (context == null)
Expand All @@ -66,6 +95,37 @@ public async Task BroadcastAsync(WebSocketMessageContext context)
}
}

public async Task BroadcastBinaryAsync(byte[] bytes, JsonObject properties)
{
PrepareBytes(ref bytes, properties);

var buffer = new byte[SocketsConstants.ChunkSize];
using (var ms = new MemoryStream(bytes))
{
using (BinaryReader br = new BinaryReader(ms))
{
byte[] chunkedBytes = null;
do
{
chunkedBytes = br.ReadBytes(SocketsConstants.ChunkSize);
var endOfMessage = false;

if (chunkedBytes.Length < SocketsConstants.ChunkSize)
endOfMessage = true;

foreach (var connection in Connections)
{
await SendBinaryAsync(transport: connection.Value, chunkedBytes: chunkedBytes, endOfMessage: endOfMessage);
}

if (endOfMessage)
break;

} while (chunkedBytes.Length <= SocketsConstants.ChunkSize);
}
}
}

public async Task SendAsync(string connectionId, WebSocketMessageContext context)
{
WebSocketTransport transport = null;
Expand Down Expand Up @@ -93,11 +153,7 @@ public async Task SendBinaryAsync(string connectionId, byte[] bytes, JsonObject
throw new ArgumentOutOfRangeException(nameof(transport));
}

var props = JsonConvert.SerializeObject(properties);
var propsBytes = Encoding.UTF8.GetBytes($"{SocketsConstants.Splitter}{props}");

var bytesCount = bytes.Length;
bytes = bytes.Concat(propsBytes).ToArray();
PrepareBytes(ref bytes, properties);

var buffer = new byte[SocketsConstants.ChunkSize];
using (var ms = new MemoryStream(bytes))
Expand Down
3 changes: 3 additions & 0 deletions src/NetCoreStack.WebSockets/Internal/IConnectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ public interface IConnectionManager
{
Task BroadcastAsync(WebSocketMessageContext context);

Task BroadcastBinaryAsync(byte[] bytes, JsonObject properties);

Task SendAsync(string connectionId, WebSocketMessageContext context);

Task SendBinaryAsync(string connectionId, byte[] bytes, JsonObject properties);

Task SendAsync(string connectionId, WebSocketMessageContext context, WebSocket webSocket);

void CloseConnection(string connectionId);
}
}
18 changes: 13 additions & 5 deletions test/ServerTestApp/Controllers/DiscoveryController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,34 @@ public IActionResult Get()
[HttpPost(nameof(SendAsync))]
public async Task<IActionResult> SendAsync([FromBody]SimpleModel model)
{
var echo = $"Echo from server '{model.Message}' - {DateTime.Now}";
var echo = $"Echo from server '{model.Key}' - {DateTime.Now}";
var obj = new { message = echo };
var webSocketContext = new WebSocketMessageContext { Command = WebSocketCommands.DataSend, Value = obj };
await _connectionManager.BroadcastAsync(webSocketContext);
return Ok();
}

[HttpPost(nameof(BrodcastBinaryAsync))]
public async Task<IActionResult> BrodcastBinaryAsync([FromBody]SimpleModel model)
{
var bytes = _distrubutedCache.Get(model.Key);
await _connectionManager.BroadcastBinaryAsync(bytes, new SocketObject { Key = model.Key });
return Ok();
}

[HttpPost(nameof(SendBinaryAsync))]
public async Task<IActionResult> SendBinaryAsync([FromBody]SimpleModel model)
{
var bytes = _distrubutedCache.Get(model.Message);
await _connectionManager.SendBinaryAsync(model.ConnectionId, bytes, new SocketObject { Key = model.Message });
var bytes = _distrubutedCache.Get(model.Key);
await _connectionManager.SendBinaryAsync(model.ConnectionId, bytes, new SocketObject { Key = model.Key });
return Ok();
}

[HttpPost(nameof(SendBinaryFromMemoryAsync))]
public async Task<IActionResult> SendBinaryFromMemoryAsync([FromBody]SimpleModel model)
{
var bytes = (byte[])_memoryCache.Get(model.Message);
await _connectionManager.SendBinaryAsync(model.ConnectionId, bytes, new SocketObject { Key = model.Message });
var bytes = (byte[])_memoryCache.Get(model.Key);
await _connectionManager.SendBinaryAsync(model.ConnectionId, bytes, new SocketObject { Key = model.Key });
return Ok();
}
}
Expand Down
2 changes: 1 addition & 1 deletion test/ServerTestApp/Models/SimpleModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
public class SimpleModel
{
public string ConnectionId { get; set; }
public string Message { get; set; }
public string Key { get; set; }
}
}
2 changes: 1 addition & 1 deletion test/ServerTestApp/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void ConfigureServices(IServiceCollection services)
services.AddDistributedRedisCache(options =>
{
options.Configuration = "localhost";
options.InstanceName = "SocketsInstance";
options.InstanceName = "RedisInstance";
});

services.AddTransient<IHandshakeStateTransport, MyHandshakeStateTransport>();
Expand Down

0 comments on commit 663b3bb

Please sign in to comment.