diff --git a/src/CoreIpc.sln b/src/CoreIpc.sln index 12d4a07d..74779ed7 100644 --- a/src/CoreIpc.sln +++ b/src/CoreIpc.sln @@ -1,6 +1,6 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# 17 +# Visual Studio Version 17 VisualStudioVersion = 17.0.31919.166 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "UiPath.CoreIpc", "UiPath.CoreIpc\UiPath.CoreIpc.csproj", "{58200319-1F71-4E22-894D-7E69E0CD0B57}" @@ -17,6 +17,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "UiPath.CoreIpc.Http", "UiPa EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "UiPath.Ipc.Tests", "UiPath.Ipc.Tests\UiPath.Ipc.Tests.csproj", "{E238E183-92CF-48A6-890F-C422853D6656}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "UiPath.CoreIpc.Extensions.Abstractions", "UiPath.CoreIpc.Extensions.Abstractions\UiPath.CoreIpc.Extensions.Abstractions.csproj", "{F519AE2B-88A6-482E-A6E2-B525F71F566D}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -39,6 +41,10 @@ Global {E238E183-92CF-48A6-890F-C422853D6656}.Debug|Any CPU.Build.0 = Debug|Any CPU {E238E183-92CF-48A6-890F-C422853D6656}.Release|Any CPU.ActiveCfg = Release|Any CPU {E238E183-92CF-48A6-890F-C422853D6656}.Release|Any CPU.Build.0 = Release|Any CPU + {F519AE2B-88A6-482E-A6E2-B525F71F566D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F519AE2B-88A6-482E-A6E2-B525F71F566D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F519AE2B-88A6-482E-A6E2-B525F71F566D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F519AE2B-88A6-482E-A6E2-B525F71F566D}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/Playground/Program.cs b/src/Playground/Program.cs index 613e8d5a..e373bd18 100644 --- a/src/Playground/Program.cs +++ b/src/Playground/Program.cs @@ -44,7 +44,7 @@ private static async Task Main(string[] args) typeof(Contracts.IServerOperations), // DEVINE new EndpointSettings(typeof(Contracts.IServerOperations)) // ASTALALT { - BeforeCall = async (callInfo, _) => + BeforeIncommingCall = async (callInfo, _) => { Console.WriteLine($"Server: {callInfo.Method.Name}"); } @@ -76,36 +76,30 @@ private static async Task Main(string[] args) var c1 = new IpcClient() { - Config = new() + Callbacks = new() { - Callbacks = new() - { - typeof(Contracts.IClientOperations), - { typeof(Contracts.IClientOperations2), new Impl.Client2() }, - }, - ServiceProvider = clientSP, - Scheduler = clientScheduler, + typeof(Contracts.IClientOperations), + { typeof(Contracts.IClientOperations2), new Impl.Client2() }, }, + ServiceProvider = clientSP, + Scheduler = clientScheduler, Transport = new NamedPipeClientTransport() { PipeName = Contracts.PipeName, ServerName = ".", AllowImpersonation = false, - }, + } }; var c2 = new IpcClient() { - Config = new() + ServiceProvider = clientSP, + Callbacks = new() { - ServiceProvider = clientSP, - Callbacks = new() - { - typeof(Contracts.IClientOperations), - { typeof(Contracts.IClientOperations2), new Impl.Client2() }, - }, - Scheduler = clientScheduler, + typeof(Contracts.IClientOperations), + { typeof(Contracts.IClientOperations2), new Impl.Client2() }, }, + Scheduler = clientScheduler, Transport = new NamedPipeClientTransport() { PipeName = Contracts.PipeName, @@ -116,16 +110,13 @@ private static async Task Main(string[] args) var proxy1 = new IpcClient() { - Config = new() + ServiceProvider = clientSP, + Callbacks = new() { - ServiceProvider = clientSP, - Callbacks = new() - { - typeof(Contracts.IClientOperations), - { typeof(Contracts.IClientOperations2), new Impl.Client2() }, - }, - Scheduler = clientScheduler, + typeof(Contracts.IClientOperations), + { typeof(Contracts.IClientOperations2), new Impl.Client2() }, }, + Scheduler = clientScheduler, Transport = new NamedPipeClientTransport() { PipeName = Contracts.PipeName, diff --git a/src/UiPath.CoreIpc.Extensions.Abstractions/UiPath.CoreIpc.Extensions.Abstractions.csproj b/src/UiPath.CoreIpc.Extensions.Abstractions/UiPath.CoreIpc.Extensions.Abstractions.csproj new file mode 100644 index 00000000..2186c679 --- /dev/null +++ b/src/UiPath.CoreIpc.Extensions.Abstractions/UiPath.CoreIpc.Extensions.Abstractions.csproj @@ -0,0 +1,17 @@ + + + + net6.0;net461;net6.0-windows + enable + enable + preview + true + enable + true + + + + + + + diff --git a/src/UiPath.CoreIpc.Http/BidiHttpListener.cs b/src/UiPath.CoreIpc.Http/BidiHttpListener.cs deleted file mode 100644 index 06076a21..00000000 --- a/src/UiPath.CoreIpc.Http/BidiHttpListener.cs +++ /dev/null @@ -1,285 +0,0 @@ -using Nito.AsyncEx; -using System.Buffers; -using System.Collections.Concurrent; -using System.Diagnostics.CodeAnalysis; -using System.IO.Pipelines; -using System.Net; -using System.Net.Http; -using System.Threading.Channels; - -namespace UiPath.Ipc.Http; - -using static Constants; -using IBidiHttpListenerConfig = IListenerConfig; - -public sealed partial record BidiHttpListener : ServerTransport, IBidiHttpListenerConfig -{ - public required Uri Uri { get; init; } - - BidiHttpListenerState IBidiHttpListenerConfig.CreateListenerState(IpcServer server) - => new(server, this); - - BidiHttpServerConnectionState IBidiHttpListenerConfig.CreateConnectionState(IpcServer server, BidiHttpListenerState listenerState) - => new(server, listenerState); - - async ValueTask IBidiHttpListenerConfig.AwaitConnection(BidiHttpListenerState listenerState, BidiHttpServerConnectionState connectionState, CancellationToken ct) - { - await connectionState.WaitForConnection(ct); - return connectionState; - } - - public IEnumerable Validate() - { - throw new NotImplementedException(); - } -} - -internal sealed class BidiHttpListenerState : IAsyncDisposable -{ - private readonly IpcServer _ipcServer; - private readonly CancellationTokenSource _cts = new(); - private readonly HttpListener _httpListener; - private readonly Task _processing; - private readonly Lazy _disposing; - - private readonly ConcurrentDictionary> _connections = new(); - private readonly Channel<(Guid connectionId, Uri reverseUri)> _newConnections = Channel.CreateUnbounded<(Guid connectionId, Uri reverseUri)>(); - - public ChannelReader<(Guid connectionId, Uri reverseUri)> NewConnections => _newConnections.Reader; - public ChannelReader GetConnectionChannel(Guid connectionId) => _connections[connectionId]; - - public BidiHttpListenerState(IpcServer ipcServer, BidiHttpListener listener) - { - _ipcServer = ipcServer; - _httpListener = new HttpListener() - { - Prefixes = - { - listener.Uri.ToString() - } - }; - _processing = ProcessContexts(); - _disposing = new(DisposeCore); - } - - public ValueTask DisposeAsync() => new(_disposing.Value); - - private async Task DisposeCore() - { - _cts.Cancel(); - try - { - await _processing; - } - catch (OperationCanceledException ex) when (ex.CancellationToken == _cts.Token) - { - } - - foreach (var pair in _connections) - { - pair.Value.Writer.Complete(); - } - _cts.Dispose(); - } - - private async Task ProcessContexts() - { - await foreach (var (context, connectionId, reverseUri) in AwaitContexts()) - { - var connectionChannel = _connections.GetOrAdd(connectionId, _ => - { - _newConnections.Writer.TryWrite((connectionId, reverseUri)); - return Channel.CreateUnbounded(); - }); - - await connectionChannel.Writer.WriteAsync(context, _cts.Token); - } - - async IAsyncEnumerable<(HttpListenerContext context, Guid connectionId, Uri reverseUri)> AwaitContexts() - { - while (!_cts.Token.IsCancellationRequested) - { - var context = await _httpListener.GetContextAsync(); - - if (!TryAcceptContext(context, out var connectionId, out var reverseUri)) - { - context.Response.StatusCode = 400; - context.Response.Close(); - continue; - } - - yield return (context, connectionId, reverseUri); - } - } - - bool TryAcceptContext(HttpListenerContext context, out Guid connectionId, [NotNullWhen(returnValue: true)] out Uri? reverseUri) - { - if (!Guid.TryParse(context.Request.Headers[ConnectionIdHeader], out connectionId) || - !Uri.TryCreate(context.Request.Headers[ReverseUriHeader], UriKind.Absolute, out reverseUri)) - { - connectionId = Guid.Empty; - reverseUri = null; - return false; - } - - return true; - } - } -} - -internal sealed class BidiHttpServerConnectionState : Stream, IAsyncDisposable -{ - private readonly Pipe _pipe = new(); - - private readonly IpcServer _server; - private readonly BidiHttpListenerState _listenerState; - - private readonly CancellationTokenSource _cts = new(); - private readonly AsyncLock _lock = new(); - private (Guid connectionId, Uri reverseUri)? _connection = null; - private HttpClient? _client; - private Task? _processing = null; - private readonly Lazy _disposing; - - public BidiHttpServerConnectionState(IpcServer server, BidiHttpListenerState listenerState) - { - _server = server; - _listenerState = listenerState; - _disposing = new(DisposeCore); - } - - public -#if !NET461 - override -#endif - ValueTask DisposeAsync() => new(_disposing.Value); - - private async Task DisposeCore() - { - _cts.Cancel(); - - _client?.Dispose(); - - try - { - await (_processing ?? Task.CompletedTask); - } - catch (OperationCanceledException ex) when (ex.CancellationToken == _cts.Token) - { - // ignored - } - - _cts.Dispose(); - } - - public async Task WaitForConnection(CancellationToken ct) - { - using (await _lock.LockAsync(ct)) - { - if (_connection is not null) - { - throw new InvalidOperationException(); - } - - _connection = await _listenerState.NewConnections.ReadAsync(ct); - - _client = new() - { - BaseAddress = _connection.Value.reverseUri, - DefaultRequestHeaders = - { - { ConnectionIdHeader, _connection.Value.connectionId.ToString() } - } - }; - - _processing = ProcessContexts(_cts.Token); - } - } - - private async Task ProcessContexts(CancellationToken ct) - { - var reader = _listenerState.GetConnectionChannel(_connection!.Value.connectionId); - - while (await reader.WaitToReadAsync(ct)) - { - if (!reader.TryRead(out var context)) - { - continue; - } - await ProcessContext(context); - } - - async Task ProcessContext(HttpListenerContext context) - { - try - { - while (true) - { - var memory = _pipe.Writer.GetMemory(); - var cbRead = await context.Request.InputStream.ReadAsync(memory, ct); - if (cbRead is 0) - { - break; - } - _pipe.Writer.Advance(cbRead); - var flushResult = await _pipe.Writer.FlushAsync(ct); - if (flushResult.IsCompleted) - { - break; - } - } - } - finally - { - context.Response.StatusCode = 200; - context.Response.Close(); - } - } - } - - public override bool CanRead => true; - public override bool CanSeek => false; - public override bool CanWrite => true; - - public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken ct) - { - var memory = new Memory(buffer, offset, count); - var readResult = await _pipe.Reader.ReadAsync(ct); - - var take = (int)Math.Min(readResult.Buffer.Length, memory.Length); - - readResult.Buffer.Slice(start: 0, length: take).CopyTo(memory.Span); - _pipe.Reader.AdvanceTo(readResult.Buffer.GetPosition(take)); - - return take; - } - - public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken ct) - { - var memory = new ReadOnlyMemory(buffer, offset, count); - if (_client is null) - { - throw new InvalidOperationException(); - } - - HttpContent content = -#if NET461 - new ByteArrayContent(memory.ToArray()); -#else - new ReadOnlyMemoryContent(memory); -#endif - - await _client.PostAsync(requestUri: "", content, ct); - } - - public override Task FlushAsync(CancellationToken cancellationToken) - => Task.CompletedTask; - - public override void Flush() => throw new NotImplementedException(); - public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException(); - public override void SetLength(long value) => throw new NotImplementedException(); - public override int Read(byte[] buffer, int offset, int count) => throw new NotImplementedException(); - public override void Write(byte[] buffer, int offset, int count) => throw new NotImplementedException(); - public override long Length => throw new NotImplementedException(); - public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } -} diff --git a/src/UiPath.CoreIpc.Http/BidiHttpServerTransport.cs b/src/UiPath.CoreIpc.Http/BidiHttpServerTransport.cs new file mode 100644 index 00000000..4e43ce58 --- /dev/null +++ b/src/UiPath.CoreIpc.Http/BidiHttpServerTransport.cs @@ -0,0 +1,278 @@ +using Nito.AsyncEx; +using System.Buffers; +using System.Collections.Concurrent; +using System.Diagnostics.CodeAnalysis; +using System.IO.Pipelines; +using System.Net; +using System.Net.Http; +using System.Threading.Channels; + +namespace UiPath.Ipc.Http; + +using static Constants; + +public sealed partial class BidiHttpServerTransport : ServerTransport +{ + public required Uri Uri { get; init; } + + protected override IServerState CreateServerState() + => new BidiHttpServerState(this); + + protected override IEnumerable ValidateCore() => []; + + private sealed class BidiHttpServerState : IServerState + { + private readonly CancellationTokenSource _cts = new(); + private readonly HttpListener _httpListener; + private readonly Task _processing; + private readonly Lazy _disposing; + + private readonly ConcurrentDictionary> _connections = new(); + private readonly Channel<(Guid connectionId, Uri reverseUri)> _newConnections = Channel.CreateUnbounded<(Guid connectionId, Uri reverseUri)>(); + + public ChannelReader<(Guid connectionId, Uri reverseUri)> NewConnections => _newConnections.Reader; + public ChannelReader GetConnectionChannel(Guid connectionId) => _connections[connectionId]; + + public BidiHttpServerState(BidiHttpServerTransport transport) + { + _httpListener = new HttpListener() + { + Prefixes = + { + transport.Uri.ToString() + } + }; + _processing = ProcessContexts(); + _disposing = new(DisposeCore); + } + + public ValueTask DisposeAsync() => new(_disposing.Value); + + private async Task DisposeCore() + { + _cts.Cancel(); + try + { + await _processing; + } + catch (OperationCanceledException ex) when (ex.CancellationToken == _cts.Token) + { + } + + foreach (var pair in _connections) + { + pair.Value.Writer.Complete(); + } + _cts.Dispose(); + } + + private async Task ProcessContexts() + { + await foreach (var (context, connectionId, reverseUri) in AwaitContexts()) + { + var connectionChannel = _connections.GetOrAdd(connectionId, _ => + { + _newConnections.Writer.TryWrite((connectionId, reverseUri)); + return Channel.CreateUnbounded(); + }); + + await connectionChannel.Writer.WriteAsync(context, _cts.Token); + } + + async IAsyncEnumerable<(HttpListenerContext context, Guid connectionId, Uri reverseUri)> AwaitContexts() + { + while (!_cts.Token.IsCancellationRequested) + { + var context = await _httpListener.GetContextAsync(); + + if (!TryAcceptContext(context, out var connectionId, out var reverseUri)) + { + context.Response.StatusCode = 400; + context.Response.Close(); + continue; + } + + yield return (context, connectionId, reverseUri); + } + } + + bool TryAcceptContext(HttpListenerContext context, out Guid connectionId, [NotNullWhen(returnValue: true)] out Uri? reverseUri) + { + if (!Guid.TryParse(context.Request.Headers[ConnectionIdHeader], out connectionId) || + !Uri.TryCreate(context.Request.Headers[ReverseUriHeader], UriKind.Absolute, out reverseUri)) + { + connectionId = Guid.Empty; + reverseUri = null; + return false; + } + + return true; + } + } + + IServerConnectionSlot IServerState.CreateConnectionSlot() => new BidiHttpServerConnectionSlot(this); + } + + private sealed class BidiHttpServerConnectionSlot : Stream, IServerConnectionSlot, IAsyncDisposable + { + private readonly Pipe _pipe = new(); + + private readonly BidiHttpServerState _listenerState; + + private readonly CancellationTokenSource _cts = new(); + private readonly AsyncLock _lock = new(); + private (Guid connectionId, Uri reverseUri)? _connection = null; + private HttpClient? _client; + private Task? _processing = null; + private readonly Lazy _disposing; + + public BidiHttpServerConnectionSlot(BidiHttpServerState serverState) + { + _listenerState = serverState; + _disposing = new(DisposeCore); + } + + public +#if !NET461 + override +#endif + ValueTask DisposeAsync() => new(_disposing.Value); + + private async Task DisposeCore() + { + _cts.Cancel(); + + _client?.Dispose(); + + try + { + await (_processing ?? Task.CompletedTask); + } + catch (OperationCanceledException ex) when (ex.CancellationToken == _cts.Token) + { + // ignored + } + + _cts.Dispose(); + } + + public async Task WaitForConnection(CancellationToken ct) + { + using (await _lock.LockAsync(ct)) + { + if (_connection is not null) + { + throw new InvalidOperationException(); + } + + _connection = await _listenerState.NewConnections.ReadAsync(ct); + + _client = new() + { + BaseAddress = _connection.Value.reverseUri, + DefaultRequestHeaders = + { + { ConnectionIdHeader, _connection.Value.connectionId.ToString() } + } + }; + + _processing = ProcessContexts(_cts.Token); + } + } + + private async Task ProcessContexts(CancellationToken ct) + { + var reader = _listenerState.GetConnectionChannel(_connection!.Value.connectionId); + + while (await reader.WaitToReadAsync(ct)) + { + if (!reader.TryRead(out var context)) + { + continue; + } + await ProcessContext(context); + } + + async Task ProcessContext(HttpListenerContext context) + { + try + { + while (true) + { + var memory = _pipe.Writer.GetMemory(); + var cbRead = await context.Request.InputStream.ReadAsync(memory, ct); + if (cbRead is 0) + { + break; + } + _pipe.Writer.Advance(cbRead); + var flushResult = await _pipe.Writer.FlushAsync(ct); + if (flushResult.IsCompleted) + { + break; + } + } + } + finally + { + context.Response.StatusCode = 200; + context.Response.Close(); + } + } + } + + ValueTask IServerConnectionSlot.AwaitConnection(CancellationToken ct) + { + throw new NotImplementedException(); + } + + + + public override bool CanRead => true; + public override bool CanSeek => false; + public override bool CanWrite => true; + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken ct) + { + var memory = new Memory(buffer, offset, count); + var readResult = await _pipe.Reader.ReadAsync(ct); + + var take = (int)Math.Min(readResult.Buffer.Length, memory.Length); + + readResult.Buffer.Slice(start: 0, length: take).CopyTo(memory.Span); + _pipe.Reader.AdvanceTo(readResult.Buffer.GetPosition(take)); + + return take; + } + + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken ct) + { + var memory = new ReadOnlyMemory(buffer, offset, count); + if (_client is null) + { + throw new InvalidOperationException(); + } + + HttpContent content = +#if NET461 + new ByteArrayContent(memory.ToArray()); +#else + new ReadOnlyMemoryContent(memory); +#endif + + await _client.PostAsync(requestUri: "", content, ct); + } + + public override Task FlushAsync(CancellationToken cancellationToken) + => Task.CompletedTask; + + public override void Flush() => throw new NotImplementedException(); + public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException(); + public override void SetLength(long value) => throw new NotImplementedException(); + public override int Read(byte[] buffer, int offset, int count) => throw new NotImplementedException(); + public override void Write(byte[] buffer, int offset, int count) => throw new NotImplementedException(); + + public override long Length => throw new NotImplementedException(); + public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } + } +} diff --git a/src/UiPath.CoreIpc.Http/UiPath.CoreIpc.Http.csproj b/src/UiPath.CoreIpc.Http/UiPath.CoreIpc.Http.csproj index 524ffbc1..83f0f21e 100644 --- a/src/UiPath.CoreIpc.Http/UiPath.CoreIpc.Http.csproj +++ b/src/UiPath.CoreIpc.Http/UiPath.CoreIpc.Http.csproj @@ -21,7 +21,7 @@ - + diff --git a/src/UiPath.CoreIpc/Client/ServiceClient.cs b/src/UiPath.CoreIpc/Client/ServiceClient.cs index 4c0edd2f..95ef34c3 100644 --- a/src/UiPath.CoreIpc/Client/ServiceClient.cs +++ b/src/UiPath.CoreIpc/Client/ServiceClient.cs @@ -11,7 +11,7 @@ private static IpcProxy CreateProxy(ServiceClient serviceClient) where T : cl return proxy; } - protected abstract IServiceClientConfig Config { get; } + protected abstract IClientConfig Config { get; } public abstract Stream? Network { get; } public event EventHandler? ConnectionClosed; @@ -66,11 +66,7 @@ async Task Invoke() var (connection, newConnection) = await EnsureConnection(ct); - if (Config.BeforeCall is not null) - { - var callInfo = new CallInfo(newConnection, method, args); - await Config.BeforeCall(callInfo, ct); - } + await (Config.BeforeOutgoingCall?.Invoke(new CallInfo(newConnection, method, args), ct) ?? Task.CompletedTask); var requestId = connection.NewRequestId(); var request = new Request(_interfaceType.Name, requestId, methodName, serializedArguments, messageTimeout.TotalSeconds) @@ -78,18 +74,18 @@ async Task Invoke() UploadStream = uploadStream }; - Config.Logger?.ServiceClient_Calling(methodName, requestId, Config.DebugName); + Config.Logger?.ServiceClient_Calling(methodName, requestId, Config.GetComputedDebugName()); Response response; try { response = await connection.RemoteCall(request, ct); // returns user errors instead of throwing them (could throw for system bugs) - Config.Logger?.ServiceClient_CalledSuccessfully(request.MethodName, requestId, Config.DebugName); + Config.Logger?.ServiceClient_CalledSuccessfully(request.MethodName, requestId, Config.GetComputedDebugName()); } catch (Exception ex) { - Config.Logger?.ServiceClient_FailedToCall(request.MethodName, requestId, Config.DebugName, ex); + Config.Logger?.ServiceClient_FailedToCall(request.MethodName, requestId, Config.GetComputedDebugName(), ex); throw; } @@ -133,7 +129,7 @@ string[] SerializeArguments() public abstract void Dispose(); - public override string ToString() => Config.DebugName; + public override string ToString() => Config.GetComputedDebugName(); #region Generic adapter cache private static readonly MethodInfo GenericDefOf_Invoke = ((Func>)Invoke).Method.GetGenericMethodDefinition(); @@ -227,9 +223,9 @@ public override async ValueTask CloseConnection() var network = await Connect(ct); - LatestConnection = new Connection(network, Config.DebugName, Config.Logger); - var router = new Router(_client.Config.CreateCallbackRouterConfig(), _client.Config.ServiceProvider); - _latestServer = new Server(router, _client.Config.RequestTimeout, LatestConnection); + LatestConnection = new Connection(network, Config.GetComputedDebugName(), Config.Logger); + var router = new Router(_client.CreateCallbackRouterConfig(), _client.ServiceProvider); + _latestServer = new Server(router, _client.RequestTimeout, LatestConnection); _ = Pal(); return (LatestConnection, newlyConnected: true); @@ -242,7 +238,7 @@ async Task Pal() } catch (Exception ex) { - Config.Logger.LogException(ex, Config.DebugName); + Config.Logger.LogException(ex, Config.GetComputedDebugName()); } } } @@ -260,17 +256,17 @@ private async Task Connect(CancellationToken ct) return network; } - protected override IServiceClientConfig Config => _client.Config; + protected override IClientConfig Config => _client; } internal sealed class ServiceClientForCallback : ServiceClient where TInterface : class { private readonly Connection _connection; - private readonly IServiceClientConfig _config; + private readonly IClientConfig _config; public override Stream? Network => _connection.Network; - public ServiceClientForCallback(Connection connection, IServiceClientConfig config) : base(typeof(TInterface)) + public ServiceClientForCallback(Connection connection, IClientConfig config) : base(typeof(TInterface)) { _connection = connection; _config = config; @@ -286,5 +282,5 @@ public override void Dispose() protected override Task<(Connection connection, bool newlyConnected)> EnsureConnection(CancellationToken ct) => Task.FromResult((_connection, newlyConnected: false)); - protected override IServiceClientConfig Config => _config; + protected override IClientConfig Config => _config; } diff --git a/src/UiPath.CoreIpc/Config/ClientConfig.cs b/src/UiPath.CoreIpc/Config/ClientConfig.cs index c6ac4bdb..6c05c192 100644 --- a/src/UiPath.CoreIpc/Config/ClientConfig.cs +++ b/src/UiPath.CoreIpc/Config/ClientConfig.cs @@ -1,18 +1,9 @@ -using System.ComponentModel; - -namespace UiPath.Ipc; +namespace UiPath.Ipc; public sealed class ClientConfig : Peer, IServiceClientConfig { - public EndpointCollection? Callbacks { get; init; } - - public ILogger? Logger { get; init; } - public BeforeConnectHandler? BeforeConnect { get; init; } public BeforeCallHandler? BeforeCall { get; init; } - [EditorBrowsable(EditorBrowsableState.Never)] - public string DebugName { get; set; } = null!; - internal void Validate() { var haveDeferredInjectedCallbacks = Callbacks?.Any(x => x.Service.MaybeGetServiceProvider() is null && x.Service.MaybeGetInstance() is null) ?? false; @@ -22,36 +13,4 @@ internal void Validate() throw new InvalidOperationException("ServiceProvider is required when you register injectable callbacks. Consider registering a callback instance."); } } - - internal ILogger? GetLogger(string name) - { - if (Logger is not null) - { - return Logger; - } - - if (ServiceProvider?.GetService() is not { } loggerFactory) - { - return null; - } - - return loggerFactory.CreateLogger(name); - } - - internal override RouterConfig CreateCallbackRouterConfig() - => RouterConfig.From( - Callbacks.OrDefault(), - endpoint => endpoint with - { - BeforeCall = null, // callbacks don't support BeforeCall - Scheduler = endpoint.Scheduler ?? Scheduler - }); } - -public interface IClientState : IDisposable -{ - Stream? Network { get; } - - bool IsConnected(); - ValueTask Connect(IpcClient client, CancellationToken ct); -} \ No newline at end of file diff --git a/src/UiPath.CoreIpc/Config/IServiceClientConfig.cs b/src/UiPath.CoreIpc/Config/IClientConfig.cs similarity index 58% rename from src/UiPath.CoreIpc/Config/IServiceClientConfig.cs rename to src/UiPath.CoreIpc/Config/IClientConfig.cs index 19ed49fd..9e4741ab 100644 --- a/src/UiPath.CoreIpc/Config/IServiceClientConfig.cs +++ b/src/UiPath.CoreIpc/Config/IClientConfig.cs @@ -1,11 +1,11 @@ namespace UiPath.Ipc; // Maybe decommission -internal interface IServiceClientConfig +internal interface IClientConfig { TimeSpan RequestTimeout { get; } BeforeConnectHandler? BeforeConnect { get; } - BeforeCallHandler? BeforeCall { get; } + BeforeCallHandler? BeforeOutgoingCall { get; } ILogger? Logger { get; } - string DebugName { get; } + string GetComputedDebugName(); } diff --git a/src/UiPath.CoreIpc/Config/IClientState.cs b/src/UiPath.CoreIpc/Config/IClientState.cs new file mode 100644 index 00000000..74ff640c --- /dev/null +++ b/src/UiPath.CoreIpc/Config/IClientState.cs @@ -0,0 +1,9 @@ +namespace UiPath.Ipc; + +public interface IClientState : IDisposable +{ + Stream? Network { get; } + + bool IsConnected(); + ValueTask Connect(IpcClient client, CancellationToken ct); +} diff --git a/src/UiPath.CoreIpc/Config/IpcClient.cs b/src/UiPath.CoreIpc/Config/IpcClient.cs index 24104b16..7bed1c05 100644 --- a/src/UiPath.CoreIpc/Config/IpcClient.cs +++ b/src/UiPath.CoreIpc/Config/IpcClient.cs @@ -1,10 +1,22 @@ -namespace UiPath.Ipc; +using System.ComponentModel; -public sealed class IpcClient : Peer +namespace UiPath.Ipc; + +public sealed class IpcClient : Peer, IClientConfig { - public required ClientConfig Config { get; init; } + public EndpointCollection? Callbacks { get; set; } + + public ILogger? Logger { get; init; } + public BeforeConnectHandler? BeforeConnect { get; set; } + public BeforeCallHandler? BeforeOutgoingCall { get; set; } + + [EditorBrowsable(EditorBrowsableState.Never)] + public string DebugName { get; set; } = null!; + public required ClientTransport Transport { get; init; } + string IClientConfig.GetComputedDebugName() => DebugName ?? Transport.ToString(); + private readonly ConcurrentDictionary _clients = new(); private ServiceClient GetServiceClient(Type proxyType) { @@ -16,18 +28,42 @@ private ServiceClient GetServiceClient(Type proxyType) internal void Validate() { - if (Config is null) + var haveDeferredInjectedCallbacks = Callbacks?.Any(x => x.Service.MaybeGetServiceProvider() is null && x.Service.MaybeGetInstance() is null) ?? false; + + if (haveDeferredInjectedCallbacks && ServiceProvider is null) { - throw new InvalidOperationException($"{Config} is required."); + throw new InvalidOperationException("ServiceProvider is required when you register injectable callbacks. Consider registering a callback instance."); } + if (Transport is null) { throw new InvalidOperationException($"{Transport} is required."); } - Config.Validate(); Transport.Validate(); + } - Config.DebugName ??= Transport.ToString(); + internal ILogger? GetLogger(string name) + { + if (Logger is not null) + { + return Logger; + } + + if (ServiceProvider?.GetService() is not { } loggerFactory) + { + return null; + } + + return loggerFactory.CreateLogger(name); } + + internal RouterConfig CreateCallbackRouterConfig() + => RouterConfig.From( + Callbacks.OrDefault(), + endpoint => endpoint with + { + BeforeIncommingCall = null, // callbacks don't support BeforeCall + Scheduler = endpoint.Scheduler ?? Scheduler + }); } diff --git a/src/UiPath.CoreIpc/Config/IpcServer.cs b/src/UiPath.CoreIpc/Config/IpcServer.cs index d0f58020..e9c3d14d 100644 --- a/src/UiPath.CoreIpc/Config/IpcServer.cs +++ b/src/UiPath.CoreIpc/Config/IpcServer.cs @@ -14,8 +14,16 @@ public sealed class IpcServer : Peer, IAsyncDisposable private bool _disposeStarted; private Accepter? _accepter; + private Lazy _dispose; - public async ValueTask DisposeAsync() + public IpcServer() + { + _dispose = new(DisposeCore); + } + + public ValueTask DisposeAsync() => new(_dispose.Value); + + private async Task DisposeCore() { Accepter? accepter = null; lock (_lock) @@ -77,6 +85,13 @@ private void OnNewConnectionError(Exception ex) _stopped.TrySetException(ex); } + internal RouterConfig CreateRouterConfig(IpcServer server) => RouterConfig.From( + server.Endpoints, + endpoint => endpoint with + { + Scheduler = endpoint.Scheduler ?? server.Scheduler + }); + private sealed class ObserverAdapter : IObserver { public required Action OnNext { get; init; } @@ -95,6 +110,7 @@ private sealed class Accepter : IAsyncDisposable private readonly Task _running; private readonly IObserver _newConnection; private readonly TaskCompletionSource _tcsStartedAccepting = new(); + private readonly Lazy _dispose; public Task StartedAccepting => _tcsStartedAccepting.Task; @@ -103,15 +119,19 @@ public Accepter(ServerTransport transport, IObserver connected) _serverState = transport.CreateServerState(); _newConnection = connected; _running = RunOnThreadPool(LoopAccept, parallelCount: transport.ConcurrentAccepts, _cts.Token); + _dispose = new(DisposeCore); } - public async ValueTask DisposeAsync() - { + public ValueTask DisposeAsync() => new(_dispose.Value); + + private async Task DisposeCore() + { _cts.Cancel(); await _running; _cts.Dispose(); } + private async Task LoopAccept(CancellationToken ct) { try diff --git a/src/UiPath.CoreIpc/Config/Peer.cs b/src/UiPath.CoreIpc/Config/Peer.cs index c7e9ba8a..4d4f7121 100644 --- a/src/UiPath.CoreIpc/Config/Peer.cs +++ b/src/UiPath.CoreIpc/Config/Peer.cs @@ -2,11 +2,7 @@ public abstract class Peer { - public TimeSpan RequestTimeout { get; init; } = Timeout.InfiniteTimeSpan; - public IServiceProvider? ServiceProvider { get; init; } - public TaskScheduler? Scheduler { get; init; } - - internal virtual RouterConfig CreateRouterConfig(IpcServer server) => throw new NotSupportedException(); - - internal virtual RouterConfig CreateCallbackRouterConfig() => throw new NotSupportedException(); + public TimeSpan RequestTimeout { get; set; } = Timeout.InfiniteTimeSpan; + public IServiceProvider? ServiceProvider { get; set; } + public TaskScheduler? Scheduler { get; set; } } diff --git a/src/UiPath.CoreIpc/Config/ServerTransport.cs b/src/UiPath.CoreIpc/Config/ServerTransport.cs index ed49685e..c63bc155 100644 --- a/src/UiPath.CoreIpc/Config/ServerTransport.cs +++ b/src/UiPath.CoreIpc/Config/ServerTransport.cs @@ -5,8 +5,9 @@ namespace UiPath.Ipc; public abstract class ServerTransport { - public int ConcurrentAccepts { get; init; } = 5; - public byte MaxReceivedMessageSizeInMegabytes { get; init; } = 2; + public int ConcurrentAccepts { get; set; } = 5; + public byte MaxReceivedMessageSizeInMegabytes { get; set; } = 2; + public X509Certificate? Certificate { get; init; } internal int MaxMessageSize => MaxReceivedMessageSizeInMegabytes * 1024 * 1024; diff --git a/src/UiPath.CoreIpc/Helpers/Router.cs b/src/UiPath.CoreIpc/Helpers/Router.cs index 45252243..26684819 100644 --- a/src/UiPath.CoreIpc/Helpers/Router.cs +++ b/src/UiPath.CoreIpc/Helpers/Router.cs @@ -131,7 +131,7 @@ public static Route From(IServiceProvider? serviceProvider, EndpointSettings end => new Route() { Service = endpointSettings.Service.WithProvider(serviceProvider), - BeforeCall = endpointSettings.BeforeCall, + BeforeCall = endpointSettings.BeforeIncommingCall, Scheduler = endpointSettings.Scheduler.OrDefault(), LoggerFactory = serviceProvider.MaybeCreateServiceFactory(), }; diff --git a/src/UiPath.CoreIpc/Server/EndpointSettings.cs b/src/UiPath.CoreIpc/Server/EndpointSettings.cs index 26a39d6b..3b00df73 100644 --- a/src/UiPath.CoreIpc/Server/EndpointSettings.cs +++ b/src/UiPath.CoreIpc/Server/EndpointSettings.cs @@ -5,7 +5,7 @@ public record EndpointSettings { public TaskScheduler? Scheduler { get; set; } - public BeforeCallHandler? BeforeCall { get; set; } + public BeforeCallHandler? BeforeIncommingCall { get; set; } public Type ContractType => Service.Type; public object? ServiceInstance => Service.MaybeGetInstance(); public IServiceProvider? ServiceProvider => Service.MaybeGetServiceProvider(); diff --git a/src/UiPath.CoreIpc/Server/ServerConnection.cs b/src/UiPath.CoreIpc/Server/ServerConnection.cs index cde20546..276985b1 100644 --- a/src/UiPath.CoreIpc/Server/ServerConnection.cs +++ b/src/UiPath.CoreIpc/Server/ServerConnection.cs @@ -2,7 +2,7 @@ namespace UiPath.Ipc; -internal sealed class ServerConnection : IClient, IDisposable, IServiceClientConfig +internal sealed class ServerConnection : IClient, IDisposable, IClientConfig { public static void CreateAndListen(IpcServer server, Stream network, CancellationToken ct) { @@ -71,10 +71,10 @@ void IClient.Impersonate(Action action) } #region IServiceClientConfig - TimeSpan IServiceClientConfig.RequestTimeout => _ipcServer.RequestTimeout; - BeforeConnectHandler? IServiceClientConfig.BeforeConnect => null; - BeforeCallHandler? IServiceClientConfig.BeforeCall => null; - ILogger? IServiceClientConfig.Logger => _logger; - string IServiceClientConfig.DebugName => _debugName; + TimeSpan IClientConfig.RequestTimeout => _ipcServer.RequestTimeout; + BeforeConnectHandler? IClientConfig.BeforeConnect => null; + BeforeCallHandler? IClientConfig.BeforeOutgoingCall => null; + ILogger? IClientConfig.Logger => _logger; + string IClientConfig.GetComputedDebugName() => _debugName; #endregion } diff --git a/src/UiPath.CoreIpc/Transport/Tcp/TcpTransport.cs b/src/UiPath.CoreIpc/Transport/Tcp/TcpClientTransport.cs similarity index 89% rename from src/UiPath.CoreIpc/Transport/Tcp/TcpTransport.cs rename to src/UiPath.CoreIpc/Transport/Tcp/TcpClientTransport.cs index c114f4f6..dd7cf5a3 100644 --- a/src/UiPath.CoreIpc/Transport/Tcp/TcpTransport.cs +++ b/src/UiPath.CoreIpc/Transport/Tcp/TcpClientTransport.cs @@ -2,7 +2,7 @@ namespace UiPath.Ipc.Transport.Tcp; -public sealed record TcpTransport : ClientTransport +public sealed record TcpClientTransport : ClientTransport { public required IPEndPoint EndPoint { get; init; } @@ -32,7 +32,7 @@ public bool IsConnected() public async ValueTask Connect(IpcClient client, CancellationToken ct) { - var transport = client.Transport as TcpTransport ?? throw new InvalidOperationException(); + var transport = client.Transport as TcpClientTransport ?? throw new InvalidOperationException(); _tcpClient = new System.Net.Sockets.TcpClient(); #if NET461 diff --git a/src/UiPath.CoreIpc/Transport/WebSocket/WebSocketTransport.cs b/src/UiPath.CoreIpc/Transport/WebSocket/WebSocketClientTransport.cs similarity index 84% rename from src/UiPath.CoreIpc/Transport/WebSocket/WebSocketTransport.cs rename to src/UiPath.CoreIpc/Transport/WebSocket/WebSocketClientTransport.cs index 6558b2c7..e5b36472 100644 --- a/src/UiPath.CoreIpc/Transport/WebSocket/WebSocketTransport.cs +++ b/src/UiPath.CoreIpc/Transport/WebSocket/WebSocketClientTransport.cs @@ -2,7 +2,7 @@ namespace UiPath.Ipc.Transport.WebSocket; -public sealed record WebSocketTransport : ClientTransport +public sealed record WebSocketClientTransport : ClientTransport { public required Uri Uri { get; init; } public override string ToString() => $"WebSocketClient={Uri}"; @@ -28,7 +28,7 @@ internal sealed class WebSocketClientState : IClientState public async ValueTask Connect(IpcClient client, CancellationToken ct) { - var transport = client.Transport as WebSocketTransport ?? throw new InvalidOperationException(); + var transport = client.Transport as WebSocketClientTransport ?? throw new InvalidOperationException(); _clientWebSocket = new(); await _clientWebSocket.ConnectAsync(transport.Uri, ct); diff --git a/src/UiPath.CoreIpc/UiPath.CoreIpc.csproj b/src/UiPath.CoreIpc/UiPath.CoreIpc.csproj index af37d5f6..d2bb74a6 100644 --- a/src/UiPath.CoreIpc/UiPath.CoreIpc.csproj +++ b/src/UiPath.CoreIpc/UiPath.CoreIpc.csproj @@ -27,6 +27,13 @@ + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + @@ -35,6 +42,7 @@ + diff --git a/src/UiPath.Ipc.Tests/ComputingTests.cs b/src/UiPath.Ipc.Tests/ComputingTests.cs index e6858e6b..aeecd313 100644 --- a/src/UiPath.Ipc.Tests/ComputingTests.cs +++ b/src/UiPath.Ipc.Tests/ComputingTests.cs @@ -2,7 +2,6 @@ using Nito.AsyncEx; using Nito.Disposables; using NSubstitute; -using System.Collections.Concurrent; using System.Runtime.InteropServices; using System.Text; using UiPath.Ipc.Transport.NamedPipe; @@ -12,7 +11,7 @@ namespace UiPath.Ipc.Tests; -public abstract class ComputingTests : TestBase +public abstract class ComputingTests : SpyTestBase { #region " Setup " protected readonly ComputingCallback _computingCallback = new(); @@ -25,8 +24,10 @@ public abstract class ComputingTests : TestBase protected sealed override IpcProxy? IpcProxy => Proxy as IpcProxy; protected sealed override Type ContractType => typeof(IComputingService); - - protected readonly ConcurrentBag _clientBeforeCalls = new(); + protected override EndpointCollection? Callbacks => new() + { + { typeof(IComputingCallback), _computingCallback } + }; protected ComputingTests(ITestOutputHelper outputHelper) : base(outputHelper) { @@ -42,24 +43,7 @@ protected override void ConfigureSpecificServices(IServiceCollection services) .AddSingletonAlias() ; - protected override ServerTransport ConfigTransportAgnostic(ServerTransport listener) - => listener with - { - ConcurrentAccepts = 10, - RequestTimeout = Timeouts.DefaultRequest, - MaxReceivedMessageSizeInMegabytes = 1, - }; - protected override ClientConfig CreateClientConfig(EndpointCollection? callbacks = null) - => new() - { - RequestTimeout = Timeouts.DefaultRequest, - Scheduler = GuiScheduler, - Callbacks = callbacks ?? new() - { - { typeof(IComputingCallback), _computingCallback } - }, - BeforeCall = async (callInfo, _) => _clientBeforeCalls.Add(callInfo), - }; + protected override TimeSpan ServerRequestTimeout => Timeouts.DefaultRequest; #endregion [Theory, IpcAutoData] @@ -231,16 +215,13 @@ public async Task BeforeConnect_ShouldStartExternalServerJIT() }); var proxy = new IpcClient { - Config = new() + Scheduler = GuiScheduler, + BeforeConnect = async (_) => { - Scheduler = GuiScheduler, - BeforeConnect = async (_) => - { - serverProcess.Start(); - var time = TimeSpan.FromSeconds(1); - _outputHelper.WriteLine($"Server started. Waiting {time}. PID={serverProcess.Id}"); - await Task.Delay(time); - }, + serverProcess.Start(); + var time = TimeSpan.FromSeconds(1); + _outputHelper.WriteLine($"Server started. Waiting {time}. PID={serverProcess.Id}"); + await Task.Delay(time); }, Transport = externalServerParams.CreateClientTransport() }.GetProxy(); @@ -260,7 +241,7 @@ await Enumerable.Range(1, CParallelism) var mockCallback = Substitute.For(); mockCallback.AddInts(0, 1).Returns(1); - var proxy = CreateClient(callbacks: new() + var proxy = CreateIpcClient(callbacks: new() { { typeof(IComputingCallback), mockCallback } })!.GetProxy(); @@ -310,8 +291,8 @@ public readonly record struct ExternalServerParams(ServerKind Kind, string? Pipe public ClientTransport CreateClientTransport() => Kind switch { ServerKind.NamedPipes => new NamedPipeClientTransport() { PipeName = PipeName! }, - ServerKind.Tcp => new TcpTransport() { EndPoint = new(System.Net.IPAddress.Loopback, Port) }, - ServerKind.WebSockets => new WebSocketTransport() { Uri = new($"ws://localhost:{Port}") }, + ServerKind.Tcp => new TcpClientTransport() { EndPoint = new(System.Net.IPAddress.Loopback, Port) }, + ServerKind.WebSockets => new WebSocketClientTransport() { Uri = new($"ws://localhost:{Port}") }, _ => throw new NotSupportedException($"Kind not supported. Kind was {Kind}") }; } @@ -319,7 +300,7 @@ public enum ServerKind { NamedPipes, Tcp, WebSockets } private sealed class DisableInProcClientServer : OverrideConfig { - public override async Task Override(Func> listener) => null; + public override async Task Override(Func> ipcServerFactory) => null; public override IpcClient? Override(Func client) => null; } } diff --git a/src/UiPath.Ipc.Tests/ComputingTestsOverNamedPipes.cs b/src/UiPath.Ipc.Tests/ComputingTestsOverNamedPipes.cs index 067b5ba7..88ef854f 100644 --- a/src/UiPath.Ipc.Tests/ComputingTestsOverNamedPipes.cs +++ b/src/UiPath.Ipc.Tests/ComputingTestsOverNamedPipes.cs @@ -9,7 +9,7 @@ public sealed class ComputingTestsOverNamedPipes : ComputingTests public ComputingTestsOverNamedPipes(ITestOutputHelper outputHelper) : base(outputHelper) { } - protected override async Task CreateListener() => new NamedPipeServerTransport + protected override async Task CreateServerTransport() => new NamedPipeServerTransport { PipeName = PipeName }; diff --git a/src/UiPath.Ipc.Tests/Config/OverrideConfigAttribute.cs b/src/UiPath.Ipc.Tests/Config/OverrideConfigAttribute.cs index 237c1e40..cecdedde 100644 --- a/src/UiPath.Ipc.Tests/Config/OverrideConfigAttribute.cs +++ b/src/UiPath.Ipc.Tests/Config/OverrideConfigAttribute.cs @@ -34,6 +34,6 @@ public OverrideConfigAttribute(Type overrideConfigType) public abstract class OverrideConfig { - public virtual Task Override(Func> listener) => listener()!; - public virtual IpcClient? Override(Func client) => client(); + public virtual Task Override(Func> ipcServer) => ipcServer()!; + public virtual IpcClient? Override(Func ipcClientFactory) => ipcClientFactory(); } \ No newline at end of file diff --git a/src/UiPath.Ipc.Tests/Helpers/IpcHelpers.cs b/src/UiPath.Ipc.Tests/Helpers/IpcHelpers.cs index 0296354b..3ccc33c9 100644 --- a/src/UiPath.Ipc.Tests/Helpers/IpcHelpers.cs +++ b/src/UiPath.Ipc.Tests/Helpers/IpcHelpers.cs @@ -35,23 +35,27 @@ public static IServiceProvider GetRequired(this IServiceProvider serviceProvi internal static class IpcClientExtensions { public static IpcClient WithRequestTimeout(this IpcClient ipcClient, TimeSpan requestTimeout) - => new() { - Config = ipcClient.Config with { RequestTimeout = requestTimeout }, - Transport = ipcClient.Transport, - }; + ipcClient.RequestTimeout = requestTimeout; + return ipcClient; + } + public static IpcServer WithRequestTimeout(this IpcServer ipcServer, TimeSpan requestTimeout) + { + ipcServer.RequestTimeout = requestTimeout; + return ipcServer; + } + public static async Task WithRequestTimeout(this Task ipcServerTask, TimeSpan requestTimeout) + => (await ipcServerTask).WithRequestTimeout(requestTimeout); public static IpcClient WithCallbacks(this IpcClient ipcClient, EndpointCollection callbacks) - => new() { - Config = ipcClient.Config with { Callbacks = callbacks }, - Transport = ipcClient.Transport, - }; + ipcClient.Callbacks = callbacks; + return ipcClient; + } public static IpcClient WithBeforeConnect(this IpcClient ipcClient, BeforeConnectHandler beforeConnect) - => new() { - Config = ipcClient.Config with { BeforeConnect = beforeConnect }, - Transport = ipcClient.Transport, - }; + ipcClient.BeforeConnect = beforeConnect; + return ipcClient; + } } \ No newline at end of file diff --git a/src/UiPath.Ipc.Tests/NamedPipeSmokeTests.cs b/src/UiPath.Ipc.Tests/NamedPipeSmokeTests.cs index 71fa5d83..e43eee21 100644 --- a/src/UiPath.Ipc.Tests/NamedPipeSmokeTests.cs +++ b/src/UiPath.Ipc.Tests/NamedPipeSmokeTests.cs @@ -26,12 +26,10 @@ public async Task NamedPipesShoulNotLeak() private static IpcServer CreateServer(string pipeName) => new IpcServer { - Transport = [ - new NamedPipeServerTransport - { - PipeName = pipeName, - } - ], + Transport = new NamedPipeServerTransport + { + PipeName = pipeName, + }, Endpoints = new() { typeof(IComputingService) @@ -45,8 +43,7 @@ private static IpcServer CreateServer(string pipeName) private static IpcClient CreateClient(string pipeName) => new() { - Transport = new NamedPipeClientTransport { PipeName = pipeName }, - Config = new() + Transport = new NamedPipeClientTransport { PipeName = pipeName } }; private static Task ListPipes(string pattern) diff --git a/src/UiPath.Ipc.Tests/Program.cs b/src/UiPath.Ipc.Tests/Program.cs index 084096ce..feb7a67e 100644 --- a/src/UiPath.Ipc.Tests/Program.cs +++ b/src/UiPath.Ipc.Tests/Program.cs @@ -10,7 +10,7 @@ return 1; } var externalServerParams = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(Convert.FromBase64String(base64))); -await using var asyncDisposable = externalServerParams.CreateListenerConfig(out var listener); +await using var asyncDisposable = externalServerParams.CreateListenerConfig(out var serverTransport); await using var serviceProvider = new ServiceCollection() .AddLogging(builder => builder.AddConsole()) @@ -25,7 +25,7 @@ { { typeof(IComputingService) }, }, - Transport = [listener], + Transport = serverTransport, }; ipcServer.Start(); await ipcServer.WaitForStop(); diff --git a/src/UiPath.Ipc.Tests/RobotTests.cs b/src/UiPath.Ipc.Tests/RobotTests.cs index c07902d8..627bb566 100644 --- a/src/UiPath.Ipc.Tests/RobotTests.cs +++ b/src/UiPath.Ipc.Tests/RobotTests.cs @@ -17,6 +17,10 @@ public abstract class RobotTests : TestBase protected sealed override IpcProxy? IpcProxy => Proxy as IpcProxy; protected sealed override Type ContractType => typeof(IStudioOperations); + protected override EndpointCollection? Callbacks => new() + { + { typeof(IStudioEvents), _studioEvents } + }; protected readonly ConcurrentBag _clientBeforeCalls = new(); @@ -31,24 +35,7 @@ protected override void ConfigureSpecificServices(IServiceCollection services) .AddSingleton() .AddSingletonAlias(); - protected override ServerTransport ConfigTransportAgnostic(ServerTransport listener) - => listener with - { - ConcurrentAccepts = 10, - RequestTimeout = Timeouts.DefaultRequest, - MaxReceivedMessageSizeInMegabytes = 1, - }; - protected override ClientConfig CreateClientConfig(EndpointCollection? callbacks = null) - => new() - { - RequestTimeout = Timeouts.DefaultRequest, - Scheduler = GuiScheduler, - Callbacks = callbacks ?? new() - { - { typeof(IStudioEvents), _studioEvents } - }, - BeforeCall = async (callInfo, _) => _clientBeforeCalls.Add(callInfo), - }; + protected override TimeSpan ServerRequestTimeout => Timeouts.DefaultRequest; #endregion [Fact] diff --git a/src/UiPath.Ipc.Tests/RobotTestsOverNamedPipes.cs b/src/UiPath.Ipc.Tests/RobotTestsOverNamedPipes.cs index fed8d637..5e9b11eb 100644 --- a/src/UiPath.Ipc.Tests/RobotTestsOverNamedPipes.cs +++ b/src/UiPath.Ipc.Tests/RobotTestsOverNamedPipes.cs @@ -12,7 +12,7 @@ public sealed class RobotTestsOverNamedPipes : RobotTests public RobotTestsOverNamedPipes(ITestOutputHelper outputHelper) : base(outputHelper) { } - protected override async Task CreateListener() => new NamedPipeServerTransport + protected override async Task CreateServerTransport() => new NamedPipeServerTransport { PipeName = PipeName }; @@ -62,13 +62,8 @@ public static void OnConnectingToUserService() public TContract CreateUserServiceProxy(string pipeName) where TContract : class => RobotIpcHelpers.CreateProxy( - listener: new NamedPipeServerTransport() - { - PipeName = pipeName, - AccessControl = security => security.AllowCurrentUser(), - MaxReceivedMessageSizeInMegabytes = 10, - RequestTimeout = TimeSpan.FromSeconds(40), - }, + pipeName, + requestTimeout: TimeSpan.FromSeconds(40), callbacks: new EndpointCollection() { { typeof(TCallback), Instance } @@ -81,16 +76,6 @@ internal static partial class RobotIpcHelpers { private static readonly ConcurrentDictionary PipeClients = new(); - public static TContract CreateProxy( - NamedPipeServerTransport listener, - EndpointCollection? callbacks = null, - IServiceProvider? provider = null, - Action? beforeConnect = null, - BeforeCallHandler? beforeCall = null, - bool allowImpersonation = false, - TaskScheduler? scheduler = null) where TContract : class - => CreateProxy(listener.PipeName, listener.RequestTimeout, callbacks, provider, beforeConnect, beforeCall, allowImpersonation, scheduler); - public static TContract CreateProxy( string pipeName, TimeSpan? requestTimeout = null, @@ -101,6 +86,7 @@ public static TContract CreateProxy( bool allowImpersonation = false, TaskScheduler? scheduler = null) where TContract : class { + // TODO: Fix this // Dirty hack (temporary): different callback sets will result in different connections // Hopefully, the different sets are also disjunctive. @@ -139,20 +125,17 @@ private static ClientAndParams CreateClient(CreateProxyRequest request) => new( Client: new() { - Config = new() + RequestTimeout = request.Params.RequestTimeout ?? Timeout.InfiniteTimeSpan, + ServiceProvider = request.Params.Provider, + Logger = request.Params.Provider?.GetService>(), + Callbacks = request.Callbacks, + BeforeConnect = request.Params.BeforeConnect is null ? null : _ => { - RequestTimeout = request.Params.RequestTimeout ?? Timeout.InfiniteTimeSpan, - ServiceProvider = request.Params.Provider, - Logger = request.Params.Provider?.GetService>(), - Callbacks = request.Callbacks, - BeforeConnect = request.Params.BeforeConnect is null ? null : _ => - { - request.Params.BeforeConnect(); - return Task.CompletedTask; - }, - BeforeCall = request.Params.BeforeCall, - Scheduler = request.Params.Scheduler, + request.Params.BeforeConnect(); + return Task.CompletedTask; }, + BeforeOutgoingCall = request.Params.BeforeCall, + Scheduler = request.Params.Scheduler, Transport = new NamedPipeClientTransport { PipeName = request.ActualKey.Name, diff --git a/src/UiPath.Ipc.Tests/SpyTestBase.cs b/src/UiPath.Ipc.Tests/SpyTestBase.cs new file mode 100644 index 00000000..2fc866f3 --- /dev/null +++ b/src/UiPath.Ipc.Tests/SpyTestBase.cs @@ -0,0 +1,20 @@ +using System.Collections.Concurrent; +using Xunit.Abstractions; + +namespace UiPath.Ipc.Tests; + +public abstract class SpyTestBase : TestBase +{ + protected readonly ConcurrentBag _clientBeforeCalls = new(); + + protected SpyTestBase(ITestOutputHelper outputHelper) : base(outputHelper) + { + } + + protected override void ConfigureClient(IpcClient ipcClient) + { + base.ConfigureClient(ipcClient); + + ipcClient.BeforeOutgoingCall = async (callInfo, _) => _clientBeforeCalls.Add(callInfo); + } +} diff --git a/src/UiPath.Ipc.Tests/SyncOverAsyncTests.cs b/src/UiPath.Ipc.Tests/SyncOverAsyncTests.cs index 01d4ef79..6992eeeb 100644 --- a/src/UiPath.Ipc.Tests/SyncOverAsyncTests.cs +++ b/src/UiPath.Ipc.Tests/SyncOverAsyncTests.cs @@ -47,12 +47,7 @@ public async Task RemoteCallingSyncOverAsync_IpcShouldBeResilient(ScenarioId sce private static IpcServer CreateServer(string pipeName) => new IpcServer { - Transport = [ - new NamedPipeServerTransport - { - PipeName = pipeName, - } - ], + Transport = new NamedPipeServerTransport { PipeName = pipeName }, Endpoints = new() { typeof(IComputingService) @@ -66,8 +61,7 @@ private static IpcServer CreateServer(string pipeName) private static IpcClient CreateClient(string pipeName) => new() { - Transport = new NamedPipeClientTransport { PipeName = pipeName }, - Config = new() + Transport = new NamedPipeClientTransport { PipeName = pipeName } }; diff --git a/src/UiPath.Ipc.Tests/SystemTests.cs b/src/UiPath.Ipc.Tests/SystemTests.cs index 73dc59a3..70db74df 100644 --- a/src/UiPath.Ipc.Tests/SystemTests.cs +++ b/src/UiPath.Ipc.Tests/SystemTests.cs @@ -30,20 +30,7 @@ protected override void ConfigureSpecificServices(IServiceCollection services) .AddSingleton() .AddSingletonAlias(); - protected override ServerTransport ConfigTransportAgnostic(ServerTransport listener) - => listener with - { - ConcurrentAccepts = 10, - RequestTimeout = Timeouts.DefaultRequest, - MaxReceivedMessageSizeInMegabytes = 1, - }; - protected override ClientConfig CreateClientConfig(EndpointCollection? callbacks = null) - => new() - { - RequestTimeout = Timeouts.DefaultRequest, - ServiceProvider = ServiceProvider, - Callbacks = callbacks - }; + protected override TimeSpan ServerRequestTimeout => Timeouts.DefaultRequest; #endregion [Theory, IpcAutoData] @@ -76,7 +63,7 @@ public async Task NotPassingAnOptionalMessage_ShouldWork() .ShouldBeAsync(true); [Fact] - [OverrideConfig(typeof(ServerExecutingTooLongACall_ShouldThrowTimeout_Config))] + [OverrideConfig(typeof(ShortServerLongClientTimeout))] public async Task ServerExecutingTooLongACall_ShouldThrowTimeout() => await Proxy.EchoGuidAfter(Guid.Empty, Timeout.InfiniteTimeSpan) // method takes forever but we have a server side RequestTimeout configured .ShouldThrowAsync() @@ -92,23 +79,24 @@ public async Task ClientWaitingForTooLongACall_ShouldThrowTimeout() => await Proxy.EchoGuidAfter(Guid.Empty, Timeout.InfiniteTimeSpan) // method takes forever but we have a server side RequestTimeout configured .ShouldThrowAsync(); - private sealed class ServerExecutingTooLongACall_ShouldThrowTimeout_Config : OverrideConfig + private sealed class ShortServerLongClientTimeout : OverrideConfig { - public override async Task Override(Func> listener) => await listener() with { RequestTimeout = Timeouts.Short }; - public override IpcClient? Override(Func client) - => client().WithRequestTimeout(Timeout.InfiniteTimeSpan); + public override async Task Override(Func> ipcServerFactory) + { + var ipcServer = await ipcServerFactory(); + ipcServer.RequestTimeout = Timeouts.Short; + return ipcServer; + } + + public override IpcClient? Override(Func client) => client().WithRequestTimeout(Timeout.InfiniteTimeSpan); } private sealed class ClientWaitingForTooLongACall_ShouldThrowTimeout_Config : OverrideConfig { - public override async Task Override(Func> listener) => await listener() with { RequestTimeout = Timeout.InfiniteTimeSpan }; - public override IpcClient? Override(Func client) - => client().WithRequestTimeout(Timeouts.IpcRoundtrip); + public override Task Override(Func> ipcServerFactory) => ipcServerFactory().WithRequestTimeout(Timeout.InfiniteTimeSpan)!; + public override IpcClient? Override(Func client) => client().WithRequestTimeout(Timeouts.IpcRoundtrip); } - private ServerTransport ShortClientTimeout(ServerTransport listener) => listener with { RequestTimeout = TimeSpan.FromMilliseconds(100) }; - private ServerTransport InfiniteServerTimeout(ServerTransport listener) => listener with { RequestTimeout = Timeout.InfiniteTimeSpan }; - [Fact] public async Task FireAndForget_ShouldWork() { diff --git a/src/UiPath.Ipc.Tests/SystemTestsOverNamedPipes.cs b/src/UiPath.Ipc.Tests/SystemTestsOverNamedPipes.cs index d0430161..27f83f70 100644 --- a/src/UiPath.Ipc.Tests/SystemTestsOverNamedPipes.cs +++ b/src/UiPath.Ipc.Tests/SystemTestsOverNamedPipes.cs @@ -9,7 +9,7 @@ public sealed class SystemTestsOverNamedPipes : SystemTests public SystemTestsOverNamedPipes(ITestOutputHelper outputHelper) : base(outputHelper) { } - protected sealed override async Task CreateListener() => new NamedPipeServerTransport + protected sealed override async Task CreateServerTransport() => new NamedPipeServerTransport { PipeName = PipeName }; diff --git a/src/UiPath.Ipc.Tests/SystemTestsOverTcp.cs b/src/UiPath.Ipc.Tests/SystemTestsOverTcp.cs index 265a9134..d9878b51 100644 --- a/src/UiPath.Ipc.Tests/SystemTestsOverTcp.cs +++ b/src/UiPath.Ipc.Tests/SystemTestsOverTcp.cs @@ -10,12 +10,12 @@ public sealed class SystemTestsOverTcp : SystemTests public SystemTestsOverTcp(ITestOutputHelper outputHelper) : base(outputHelper) { } - protected sealed override async Task CreateListener() + protected sealed override async Task CreateServerTransport() => new TcpServerTransport { EndPoint = _endPoint, }; protected override ClientTransport CreateClientTransport() - => new TcpTransport() { EndPoint = _endPoint }; + => new TcpClientTransport() { EndPoint = _endPoint }; } diff --git a/src/UiPath.Ipc.Tests/SystemTestsOverWebSockets.cs b/src/UiPath.Ipc.Tests/SystemTestsOverWebSockets.cs index b8795dd2..bc17f6f6 100644 --- a/src/UiPath.Ipc.Tests/SystemTestsOverWebSockets.cs +++ b/src/UiPath.Ipc.Tests/SystemTestsOverWebSockets.cs @@ -15,7 +15,7 @@ protected override async Task DisposeAsync() await base.DisposeAsync(); } - protected override async Task CreateListener() + protected override async Task CreateServerTransport() { var listener = new WebSocketServerTransport { @@ -27,5 +27,5 @@ protected override async Task CreateListener() } protected override ClientTransport CreateClientTransport() - => new WebSocketTransport() { Uri = _webSocketContext.ClientUri }; + => new WebSocketClientTransport() { Uri = _webSocketContext.ClientUri }; } diff --git a/src/UiPath.Ipc.Tests/TestBase.cs b/src/UiPath.Ipc.Tests/TestBase.cs index 3086c9b3..5652726f 100644 --- a/src/UiPath.Ipc.Tests/TestBase.cs +++ b/src/UiPath.Ipc.Tests/TestBase.cs @@ -45,8 +45,8 @@ public TestBase(ITestOutputHelper outputHelper) _guiThread.SynchronizationContext.Send(() => Thread.CurrentThread.Name = Names.GuiThreadName); _serviceProvider = IpcHelpers.ConfigureServices(_outputHelper, ConfigureSpecificServices); - _ipcServer = new(CreateServer); - _ipcClient = new(() => CreateClient()); + _ipcServer = new(CreateIpcServer); + _ipcClient = new(() => CreateIpcClient()); OverrideConfig? GetOverrideConfig() { @@ -67,78 +67,84 @@ public TestBase(ITestOutputHelper outputHelper) protected abstract void ConfigureSpecificServices(IServiceCollection services); - private Task CreateListenerAndConfigure() - { - var factory = async () => - { - _outputHelper.WriteLine("Creating listener..."); - var listener = await CreateListener(); - listener = ConfigTransportAgnostic(listener); - return listener; - }; + protected virtual EndpointCollection? Callbacks => []; - if (_overrideConfig is null) + private Task CreateIpcServer() + { + if (_overrideConfig is not null) { - return factory(); + return _overrideConfig.Override(Core); } - return _overrideConfig.Override(factory); - } - - protected async Task CreateServer() - { - if (await CreateListenerAndConfigure() is not { } listener) return null; + return Core()!; - return new() + async Task Core() { - Endpoints = new() { - new EndpointSettings(ContractType) + _outputHelper.WriteLine($"Creating {nameof(ServerTransport)}..."); + + var serverTransport = await CreateServerTransport(); + ConfigTransportBase(serverTransport); + + var endpointSettings = new EndpointSettings(ContractType) + { + BeforeIncommingCall = (callInfo, ct) => { - BeforeCall = (callInfo, ct) => - { - _serverBeforeCalls.Add(callInfo); - return _tailBeforeCall?.Invoke(callInfo, ct) ?? Task.CompletedTask; - } + _serverBeforeCalls.Add(callInfo); + return _tailBeforeCall?.Invoke(callInfo, ct) ?? Task.CompletedTask; } - }, - Transport = [listener], - ServiceProvider = _serviceProvider, - Scheduler = GuiScheduler - }; - } + }; - protected IpcClient? CreateClient(EndpointCollection? callbacks = null) + return new() + { + Endpoints = new() { endpointSettings }, + Transport = serverTransport, + ServiceProvider = _serviceProvider, + Scheduler = GuiScheduler, + RequestTimeout = ServerRequestTimeout + }; + } + } + protected IpcClient? CreateIpcClient(EndpointCollection? callbacks = null) { - var factory = () => + if (_overrideConfig is null) + { + return CreateDefaultClient(); + } + + return _overrideConfig.Override(CreateDefaultClient); + + IpcClient CreateDefaultClient() { - var config = CreateClientConfig(callbacks); - var transport = CreateClientTransport(); var client = new IpcClient { - Config = config, - Transport = transport + Callbacks = callbacks ?? Callbacks, + Transport = CreateClientTransport() }; + ConfigureClient(client); return client; - }; - - if (_overrideConfig is null) - { - return factory(); } - - return _overrideConfig.Override(factory); } + protected TContract? GetProxy() where TContract : class => _ipcClient.Value?.GetProxy(); protected void CreateLazyProxy(out Lazy lazy) where TContract : class => lazy = new(GetProxy); - protected abstract Task CreateListener(); + protected abstract Task CreateServerTransport(); + protected abstract TimeSpan ServerRequestTimeout { get; } - protected abstract ClientConfig CreateClientConfig(EndpointCollection? callbacks = null); + protected virtual void ConfigureClient(IpcClient ipcClient) + { + ipcClient.RequestTimeout = Timeouts.DefaultRequest; + ipcClient.Scheduler = GuiScheduler; + } protected abstract ClientTransport CreateClientTransport(); - protected abstract ServerTransport ConfigTransportAgnostic(ServerTransport listener); + protected virtual void ConfigTransportBase(ServerTransport serverTransport) + { + serverTransport.ConcurrentAccepts = 10; + serverTransport.MaxReceivedMessageSizeInMegabytes = 1; + } protected virtual async Task DisposeAsync() {