Skip to content

Commit

Permalink
stream, performance improvements, tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
gencebay committed Feb 25, 2018
1 parent 33c4f70 commit afecdbb
Show file tree
Hide file tree
Showing 12 changed files with 194 additions and 253 deletions.
125 changes: 53 additions & 72 deletions src/NetCoreStack.WebSockets.ProxyClient/ClientWebSocketReceiver.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using Microsoft.Extensions.Logging;
using NetCoreStack.WebSockets.Internal;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.WebSockets;
using System.Threading;
Expand Down Expand Up @@ -32,95 +30,78 @@ public ClientWebSocketReceiver(IServiceProvider serviceProvider,

public async Task ReceiveAsync()
{
try
var buffer = new byte[NCSConstants.ChunkSize];
var result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
while (!result.CloseStatus.HasValue)
{
var buffer = new byte[NCSConstants.ChunkSize];
var result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
while (!result.CloseStatus.HasValue)
if (result.MessageType == WebSocketMessageType.Text)
{
if (result.MessageType == WebSocketMessageType.Text)
byte[] inputs = null;
using (var ms = new MemoryStream())
{
try
while (!result.EndOfMessage)
{
var context = result.ToContext(buffer);
if (context.Command == WebSocketCommands.Handshake)
{
_context.ConnectionId = context.Value?.ToString();
_handshakeCallback?.Invoke(_context.ConnectionId);
}
await ms.WriteAsync(buffer, 0, result.Count);
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
}

var invocator = _context.GetInvocator(_serviceProvider);
if (invocator != null)
{
await invocator.InvokeAsync(context);
}
await ms.WriteAsync(buffer, 0, result.Count);
inputs = ms.ToArray();
}
try
{
var context = result.ToContext(inputs);
if (context.Command == WebSocketCommands.Handshake)
{
_context.ConnectionId = context.Value?.ToString();
_handshakeCallback?.Invoke(_context.ConnectionId);
}
catch (Exception ex)
var invocator = _context.GetInvocator(_serviceProvider);
if (invocator != null)
{
_logger.LogWarning(ex, "{0} Invocator error occurred for message type: {1}", NCSConstants.WarningSymbol, WebSocketMessageType.Text);
await invocator.InvokeAsync(context);
}
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "{0} An error occurred for message type: {1}", NCSConstants.WarningSymbol, WebSocketMessageType.Text);
}
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
}

if (result.MessageType == WebSocketMessageType.Binary)
if (result.MessageType == WebSocketMessageType.Binary)
{
byte[] binaryResult = null;
using (var ms = new MemoryStream())
{
byte[] binaryResult = null;
using (var ms = new MemoryStream())
{
while (!result.EndOfMessage)
{
if (!result.CloseStatus.HasValue)
{
await ms.WriteAsync(buffer, 0, result.Count);
}
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
}
if (result.EndOfMessage)
{
if (!result.CloseStatus.HasValue)
{
await ms.WriteAsync(buffer, 0, result.Count);
}
}
binaryResult = ms.ToArray();
}
try
while (!result.EndOfMessage)
{
var context = await result.ToBinaryContextAsync(_context.Compressor, binaryResult);
var invocator = _context.GetInvocator(_serviceProvider);
if (invocator != null)
{
await invocator.InvokeAsync(context);
}
await ms.WriteAsync(buffer, 0, result.Count);
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
}
catch (Exception ex)

await ms.WriteAsync(buffer, 0, result.Count);
binaryResult = ms.ToArray();
}
try
{
var context = await result.ToBinaryContextAsync(_context.Compressor, binaryResult);
var invocator = _context.GetInvocator(_serviceProvider);
if (invocator != null)
{
_logger.LogWarning(ex, "{0} Invocator error occurred for message type: {1}", NCSConstants.WarningSymbol, WebSocketMessageType.Binary);
await invocator.InvokeAsync(context);
}
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "{0} Invocator error occurred for message type: {1}", NCSConstants.WarningSymbol, WebSocketMessageType.Binary);
}
result = await _context.WebSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
}

await _context.WebSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);
_closeCallback?.Invoke(_context);
}
catch (Exception ex)
{
var dictionary = new Dictionary<string, string>();
dictionary.Add(nameof(_context.ConnectionId), _context.ConnectionId);

if (_context.InvocatorContext != null)
{
dictionary.Add(nameof(_context.InvocatorContext.ConnectorName), _context.InvocatorContext.ConnectorName);
dictionary.Add(nameof(_context.InvocatorContext.Uri), Convert.ToString(_context.InvocatorContext.Uri));
}

_logger.LogWarning(ex, "{0} receive exception: {1}", NCSConstants.WarningSymbol, JsonConvert.SerializeObject(dictionary));
}
finally
{
_closeCallback?.Invoke(_context);
}
await _context.WebSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);
_closeCallback?.Invoke(_context);
}
}
}
Loading

0 comments on commit afecdbb

Please sign in to comment.