diff --git a/Directory.Build.props b/Directory.Build.props index f2fa751a..33b4fe2f 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -4,13 +4,13 @@ $(GitVersion_NuGetVersion) Totalsoft Totalsoft - totalsoft charisma + totalsoft nbb true true snupkg - + \ No newline at end of file diff --git a/NBB.sln b/NBB.sln index 0a935c57..9877fa38 100644 --- a/NBB.sln +++ b/NBB.sln @@ -292,7 +292,17 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".github", ".github", "{F577 .github\release-drafter.yml = .github\release-drafter.yml EndProjectSection EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NBB.Correlation.Serilog.SqlServer", "src\Correlation\NBB.Correlation.Serilog.SqlServer\NBB.Correlation.Serilog.SqlServer.csproj", "{F442DC3C-D2E9-468F-A6E3-D639E6F5D168}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NBB.Correlation.Serilog.SqlServer", "src\Correlation\NBB.Correlation.Serilog.SqlServer\NBB.Correlation.Serilog.SqlServer.csproj", "{F442DC3C-D2E9-468F-A6E3-D639E6F5D168}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Orchestration", "Orchestration", "{7D20C931-B59B-4227-A916-3DE892F7A78A}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NBB.ProcessManager.Definition", "src\Orchestration\NBB.ProcessManager.Definition\NBB.ProcessManager.Definition.csproj", "{501EF886-EF0C-4067-BBF8-1768F2747E67}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NBB.ProcessManager.Runtime", "src\Orchestration\NBB.ProcessManager.Runtime\NBB.ProcessManager.Runtime.csproj", "{D11FE59A-F3FE-4A7D-85B3-252294B6447C}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Orchestration", "Orchestration", "{FE39B11B-26E0-4EDA-AA33-18E1D5EB63D8}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "NBB.ProcessManager.Tests", "test\UnitTests\Orchestration\NBB.ProcessManager.Tests\NBB.ProcessManager.Tests.csproj", "{DB6CACB0-CDC1-47CD-B89E-BE24D9238472}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -670,6 +680,18 @@ Global {F442DC3C-D2E9-468F-A6E3-D639E6F5D168}.Debug|Any CPU.Build.0 = Debug|Any CPU {F442DC3C-D2E9-468F-A6E3-D639E6F5D168}.Release|Any CPU.ActiveCfg = Release|Any CPU {F442DC3C-D2E9-468F-A6E3-D639E6F5D168}.Release|Any CPU.Build.0 = Release|Any CPU + {501EF886-EF0C-4067-BBF8-1768F2747E67}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {501EF886-EF0C-4067-BBF8-1768F2747E67}.Debug|Any CPU.Build.0 = Debug|Any CPU + {501EF886-EF0C-4067-BBF8-1768F2747E67}.Release|Any CPU.ActiveCfg = Release|Any CPU + {501EF886-EF0C-4067-BBF8-1768F2747E67}.Release|Any CPU.Build.0 = Release|Any CPU + {D11FE59A-F3FE-4A7D-85B3-252294B6447C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D11FE59A-F3FE-4A7D-85B3-252294B6447C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D11FE59A-F3FE-4A7D-85B3-252294B6447C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D11FE59A-F3FE-4A7D-85B3-252294B6447C}.Release|Any CPU.Build.0 = Release|Any CPU + {DB6CACB0-CDC1-47CD-B89E-BE24D9238472}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DB6CACB0-CDC1-47CD-B89E-BE24D9238472}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DB6CACB0-CDC1-47CD-B89E-BE24D9238472}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DB6CACB0-CDC1-47CD-B89E-BE24D9238472}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -803,6 +825,11 @@ Global {9DDF53B1-7728-4CF3-BEBE-2057A6956F59} = {48E5BC6A-5846-4AD7-BFFB-C1D04C5C1BC2} {F57779EB-9107-427D-A65C-5AA262C348E1} = {EB7B0468-DE25-4350-80AD-50E7C4D09120} {F442DC3C-D2E9-468F-A6E3-D639E6F5D168} = {307E1650-6128-4A36-9C98-297F2BF71C83} + {7D20C931-B59B-4227-A916-3DE892F7A78A} = {7311E32F-C1B0-41C9-B5F1-DE9EBB6ABB55} + {501EF886-EF0C-4067-BBF8-1768F2747E67} = {7D20C931-B59B-4227-A916-3DE892F7A78A} + {D11FE59A-F3FE-4A7D-85B3-252294B6447C} = {7D20C931-B59B-4227-A916-3DE892F7A78A} + {FE39B11B-26E0-4EDA-AA33-18E1D5EB63D8} = {83E383BB-C9D9-4F5F-BAAF-FF1ADFF0B151} + {DB6CACB0-CDC1-47CD-B89E-BE24D9238472} = {FE39B11B-26E0-4EDA-AA33-18E1D5EB63D8} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {23A42379-616A-43EF-99BC-803DF151F54E} diff --git a/dependencies.props b/dependencies.props index c1e6b274..c625e534 100644 --- a/dependencies.props +++ b/dependencies.props @@ -2,16 +2,16 @@ 11.0.2 4.1.0 - 3.0.1 + 3.1.0 0.10.14 4.8.2 2.3.1 2.4.0-beta.1.build3958 5.3.2 - 2.1.1 + 2.2.0 2.1.4 2.1.5 - 4.5.0 + 4.6.0 2.7.1 2.0.2 5.1.2 diff --git a/src/Orchestration/NBB.ProcessManager.Definition/Builder/AbstractDefinition.cs b/src/Orchestration/NBB.ProcessManager.Definition/Builder/AbstractDefinition.cs new file mode 100644 index 00000000..56b68547 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Definition/Builder/AbstractDefinition.cs @@ -0,0 +1,99 @@ +using NBB.Core.Abstractions; +using System; +using System.Collections.Generic; +using System.Linq; + +namespace NBB.ProcessManager.Definition.Builder +{ + public abstract class AbstractDefinition : IDefinition + where TData : struct + { + private readonly List> _eventActivities = new List>(); + private readonly Dictionary> _eventCorrelations = new Dictionary>(); + + + public EventActivitySetBuilder StartWith(EventPredicate predicate = null) + where TEvent : IEvent + { + var ea = new EventActivitySet(true, predicate); + _eventActivities.Add(ea); + return new EventActivitySetBuilder(ea); + } + + public EventActivitySetBuilder When(EventPredicate predicate = null) + where TEvent : IEvent + { + var ea = new EventActivitySet(false, predicate); + _eventActivities.Add(ea); + return new EventActivitySetBuilder(ea); + } + + public void Event(Action> configureEventCorrelation) + { + Preconditions.NotNull(configureEventCorrelation, nameof(configureEventCorrelation)); + + var configurator = new EventCorrelationBuilder(); + configureEventCorrelation(configurator); + var correl = configurator.Build(); + _eventCorrelations.Add(typeof(TEvent), new EventCorrelation(@event => correl.CorrelationFilter((TEvent) @event))); + } + + public Func GetCorrelationFilter() where TEvent : IEvent + { + if (_eventCorrelations.ContainsKey(typeof(TEvent))) + { + var func = _eventCorrelations[typeof(TEvent)]; + return @event => func.CorrelationFilter(@event); + } + + return null; + } + + public IEnumerable GetEventTypes() => _eventActivities.Select(x => x.EventType).Distinct(); + + public IEnumerable, IEnumerable>>> GetEffectHandlers(Type eventType) + { + return _eventActivities + .Where(x => x.EventType == eventType) + .Select(x => (x.WhenPredicate, x.GetEffectHandlers())); + } + + public IEnumerable, IEnumerable>>> GetStateHandlers(Type eventType) + { + return _eventActivities + .Where(x => x.EventType == eventType) + .Select(x => (x.WhenPredicate, x.GetStateHandlers())); + } + + public EventPredicate GetStarterPredicate() where TEvent : IEvent + { + var act = _eventActivities + .SingleOrDefault(x => x.EventType == typeof(TEvent) && x.StartsProcess); + return (@event, data) => + { + if (act == null) + return false; + return act.WhenPredicate?.Invoke(@event, data) ?? true; + }; + } + + public IEnumerable> GetCompletionPredicates() where TEvent : IEvent + { + foreach (var x in _eventActivities) + { + if (x.EventType == typeof(TEvent) && x.CompletesProcess) + yield return (@event, data) => + { + if (x.CompletionPredicate != null && x.WhenPredicate != null) + return x.CompletionPredicate(@event, data) && x.WhenPredicate(@event, data); + if (x.CompletionPredicate == null && x.WhenPredicate != null) + return x.WhenPredicate(@event, data); + if (x.CompletionPredicate != null && x.WhenPredicate == null) + return x.CompletionPredicate(@event, data); + + return true; + }; + } + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Definition/Builder/EventActivitySet.cs b/src/Orchestration/NBB.ProcessManager.Definition/Builder/EventActivitySet.cs new file mode 100644 index 00000000..042f8918 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Definition/Builder/EventActivitySet.cs @@ -0,0 +1,85 @@ +using NBB.Core.Abstractions; +using System; +using System.Collections.Generic; + +namespace NBB.ProcessManager.Definition.Builder +{ + + public class EventActivitySet : IEventActivitySet + where TEvent : IEvent + where TData : struct + { + private readonly List> _handlers = new List>(); + private readonly List> _setStateHandlers = new List>(); + private readonly EventPredicate _whenPredicate; + private EventPredicate _completionPredicate; + public bool StartsProcess { get; set; } + public bool CompletesProcess { get; set; } + + public EventActivitySet(bool startsProcess, EventPredicate whenPredicate = null) + { + StartsProcess = startsProcess; + _whenPredicate = whenPredicate; + } + + public Type EventType => typeof(TEvent); + + public IEnumerable> GetEffectHandlers() + { + return _handlers; + } + + public IEnumerable> GetStateHandlers() + { + return _setStateHandlers; + } + + public void AddEffectHandler(EffectHandler handler) + { + _handlers.Add((@event, data) => handler((TEvent) @event, data)); + } + + public void AddSetStateHandler(StateHandler handler) + { + _setStateHandlers.Add((@event, data) => handler((TEvent) @event, data)); + } + + public EventPredicate WhenPredicate + { + get + { + if (_whenPredicate == null) + return null; + return (@event, data) => _whenPredicate((TEvent) @event, data); + } + } + + public EventPredicate CompletionPredicate + { + get + { + if (_completionPredicate == null) + return null; + return (@event, data) => _completionPredicate((TEvent) @event, data); + } + } + + public void UseForCompletion(EventPredicate predicate) + { + _completionPredicate = predicate; + CompletesProcess = true; + } + } + + public interface IEventActivitySet + where TData : struct + { + Type EventType { get; } + bool CompletesProcess { get; } + bool StartsProcess { get; } + EventPredicate WhenPredicate { get; } + EventPredicate CompletionPredicate { get; } + IEnumerable> GetEffectHandlers(); + IEnumerable> GetStateHandlers(); + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Definition/Builder/EventActivitySetBuilder.cs b/src/Orchestration/NBB.ProcessManager.Definition/Builder/EventActivitySetBuilder.cs new file mode 100644 index 00000000..5a4caaeb --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Definition/Builder/EventActivitySetBuilder.cs @@ -0,0 +1,81 @@ +using NBB.Core.Abstractions; +using NBB.ProcessManager.Definition.Effects; +using System; + +namespace NBB.ProcessManager.Definition.Builder +{ + public class EventActivitySetBuilder + where TEvent : IEvent + where TData : struct + { + private readonly EventActivitySet _eventActivitySet; + + public EventActivitySetBuilder(EventActivitySet eventActivitySet) + { + _eventActivitySet = eventActivitySet; + } + + public EventActivitySetBuilder Then(EffectHandler handler, EventPredicate predicate = null) + { + _eventActivitySet.AddEffectHandler((whenEvent, data) => + { + if (predicate != null && !predicate(whenEvent, data)) + return NoEffect.Instance; + return handler(whenEvent, data); + }); + return this; + } + + public EventActivitySetBuilder SetState(StateHandler handler, EventPredicate predicate = null) + { + _eventActivitySet.AddSetStateHandler((whenEvent, data) => + { + if (predicate != null && !predicate(whenEvent, data)) + return data.Data; + return handler(whenEvent, data); + }); + return this; + } + + public EventActivitySetBuilder SendCommand(Func, T> handler, EventPredicate predicate = null) + where T : ICommand + { + Then((whenEvent, state) => + { + var command = handler(whenEvent, state); + return new SendCommand(command); + }, predicate); + return this; + } + + public EventActivitySetBuilder RequestTimeout(TimeSpan timeSpan, EventPredicate predicate = null) + where T : IEvent, new() + { + RequestTimeout(timeSpan, new T(), predicate); + return this; + } + + public EventActivitySetBuilder RequestTimeout(TimeSpan timeSpan, T message, EventPredicate predicate = null) + where T : IEvent + { + Then((whenEvent, state) => new RequestTimeout(state.CorrelationId.ToString(), timeSpan, message, typeof(T)), predicate); + return this; + } + + public EventActivitySetBuilder PublishEvent(Func, T> handler, EventPredicate predicate = null) + where T : IEvent + { + Then((whenEvent, state) => + { + var @event = handler(whenEvent, state); + return new PublishEvent(@event); + }, predicate); + return this; + } + + public void Complete(EventPredicate predicate = null) + { + _eventActivitySet.UseForCompletion(predicate); + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Definition/Builder/EventCorrelation.cs b/src/Orchestration/NBB.ProcessManager.Definition/Builder/EventCorrelation.cs new file mode 100644 index 00000000..3cd956f1 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Definition/Builder/EventCorrelation.cs @@ -0,0 +1,16 @@ +using System; +using System.Linq.Expressions; +using NBB.Core.Abstractions; + +namespace NBB.ProcessManager.Definition.Builder +{ + public class EventCorrelation + { + public Func CorrelationFilter { get; set; } + + public EventCorrelation(Func correlationFilter) + { + CorrelationFilter = correlationFilter; + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Definition/Builder/EventCorrelationBuilder.cs b/src/Orchestration/NBB.ProcessManager.Definition/Builder/EventCorrelationBuilder.cs new file mode 100644 index 00000000..0de2758a --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Definition/Builder/EventCorrelationBuilder.cs @@ -0,0 +1,34 @@ +using System; +using System.Linq.Expressions; + +namespace NBB.ProcessManager.Definition.Builder +{ + public class EventCorrelationBuilder + where TData : struct + { + private Func _correlationFilter; + + public EventCorrelationBuilder CorrelateById(Func selector) + { + _correlationFilter = @event => selector(@event); + return this; + } + + public EventCorrelationBuilder CorrelateById(Func selector) where T : struct + { + _correlationFilter = @event => selector(@event); + return this; + } + + public EventCorrelationBuilder CorrelateBy(Func selector) where T : class + { + _correlationFilter = selector; + return this; + } + + public EventCorrelation Build() + { + return new EventCorrelation(_correlationFilter); + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Definition/DependencyInjectionExtensions.cs b/src/Orchestration/NBB.ProcessManager.Definition/DependencyInjectionExtensions.cs new file mode 100644 index 00000000..3cb0e66d --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Definition/DependencyInjectionExtensions.cs @@ -0,0 +1,50 @@ +using MediatR; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using System; +using System.Collections.Generic; +using System.Linq; + +namespace NBB.ProcessManager.Definition +{ + public static class DependencyInjectionExtensions + { + public static void AddProcessManagerDefinition(this IServiceCollection services) + { + //scan for pm definitions + services.Scan(scan => scan + .FromEntryAssembly() + .AddClasses(classes => classes.AssignableTo(typeof(IDefinition<>))) + .AsImplementedInterfaces() + .WithSingletonLifetime() + ); + } + + public static void AddNotificationHandlers(this IServiceCollection services, Type processManagerNotificationHandlerImplementationType) + { + if (!processManagerNotificationHandlerImplementationType.IsGenericType + || processManagerNotificationHandlerImplementationType.GetGenericTypeDefinition().GetGenericArguments().Length != 3) + throw new Exception("Invalid definition handler"); + + var tempServiceCollection = new ServiceCollection(); + foreach (var serviceDesc in services) + tempServiceCollection.Add(serviceDesc); + + var sp = tempServiceCollection.BuildServiceProvider(); + var defs = sp.GetRequiredService>(); + + foreach (var def in defs) + { + var dataType = def.GetType().BaseType?.GenericTypeArguments.FirstOrDefault(); + if (dataType == null) + throw new Exception("Cannot determine process manager definition data type"); + + foreach (var eventType in def.GetEventTypes()) + { + services.AddScoped(typeof(INotificationHandler<>).MakeGenericType(eventType), + processManagerNotificationHandlerImplementationType.MakeGenericType(def.GetType(), dataType, eventType)); + } + } + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Definition/Effects/NoEffect.cs b/src/Orchestration/NBB.ProcessManager.Definition/Effects/NoEffect.cs new file mode 100644 index 00000000..d4b77b60 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Definition/Effects/NoEffect.cs @@ -0,0 +1,9 @@ +namespace NBB.ProcessManager.Definition.Effects +{ + + public class NoEffect : IEffect + { + public static readonly IEffect Instance = new NoEffect(); + + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Definition/Effects/PublishEvent.cs b/src/Orchestration/NBB.ProcessManager.Definition/Effects/PublishEvent.cs new file mode 100644 index 00000000..625d60fd --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Definition/Effects/PublishEvent.cs @@ -0,0 +1,12 @@ +namespace NBB.ProcessManager.Definition.Effects +{ + public class PublishEvent : IEffect + { + public object Event { get; } + + public PublishEvent(object @event) + { + Event = @event; + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Definition/Effects/RequestTimeout.cs b/src/Orchestration/NBB.ProcessManager.Definition/Effects/RequestTimeout.cs new file mode 100644 index 00000000..715896d0 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Definition/Effects/RequestTimeout.cs @@ -0,0 +1,20 @@ +using System; + +namespace NBB.ProcessManager.Definition.Effects +{ + public class RequestTimeout : IEffect + { + public Type MessageType { get; } + public object Message { get; } + public string InstanceId { get; } + public TimeSpan TimeSpan { get; } + + public RequestTimeout(string instanceId, TimeSpan timeSpan, object message, Type messageType) + { + InstanceId = instanceId; + TimeSpan = timeSpan; + Message = message; + MessageType = messageType; + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Definition/Effects/SendCommand.cs b/src/Orchestration/NBB.ProcessManager.Definition/Effects/SendCommand.cs new file mode 100644 index 00000000..968779dc --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Definition/Effects/SendCommand.cs @@ -0,0 +1,12 @@ +namespace NBB.ProcessManager.Definition.Effects +{ + public class SendCommand : IEffect + { + public object Command { get; } + + public SendCommand(object command) + { + Command = command; + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Definition/IDefinition.cs b/src/Orchestration/NBB.ProcessManager.Definition/IDefinition.cs new file mode 100644 index 00000000..3e51e8cf --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Definition/IDefinition.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using NBB.Core.Abstractions; + +namespace NBB.ProcessManager.Definition +{ + public interface IDefinition : IDefinition + where TData : struct + { + IEnumerable, IEnumerable>>> GetEffectHandlers(Type eventType); + IEnumerable, IEnumerable>>> GetStateHandlers(Type eventType); + Func GetCorrelationFilter() where TEvent : IEvent; + EventPredicate GetStarterPredicate() where TEvent : IEvent; + IEnumerable> GetCompletionPredicates() where TEvent : IEvent; + } + + public interface IDefinition + { + IEnumerable GetEventTypes(); + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Definition/IEffect.cs b/src/Orchestration/NBB.ProcessManager.Definition/IEffect.cs new file mode 100644 index 00000000..7085f95f --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Definition/IEffect.cs @@ -0,0 +1,11 @@ +namespace NBB.ProcessManager.Definition +{ + public delegate IEffect EffectHandler(TEvent @event, InstanceData data) where TData : struct; + public delegate TData StateHandler(TEvent @event, InstanceData data) where TData : struct; + public delegate bool EventPredicate(TEvent @event, InstanceData data) where TData : struct; + public delegate T EventPredicate(TEvent @event, InstanceData data) where TData : struct; + + public interface IEffect + { + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Definition/InstanceData.cs b/src/Orchestration/NBB.ProcessManager.Definition/InstanceData.cs new file mode 100644 index 00000000..ddae7939 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Definition/InstanceData.cs @@ -0,0 +1,12 @@ +using System; + +namespace NBB.ProcessManager.Definition +{ + public class InstanceData + where TData : struct + { + //TO REMOVE CorrelationId setter + public object CorrelationId { get; set; } + public TData Data { get; set; } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Definition/NBB.ProcessManager.Definition.csproj b/src/Orchestration/NBB.ProcessManager.Definition/NBB.ProcessManager.Definition.csproj new file mode 100644 index 00000000..0948a638 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Definition/NBB.ProcessManager.Definition.csproj @@ -0,0 +1,23 @@ + + + + netstandard2.0 + Process manager definition builder. + latest + + + + + + + + + + + + + + + + + diff --git a/src/Orchestration/NBB.ProcessManager.Definition/Preconditions.cs b/src/Orchestration/NBB.ProcessManager.Definition/Preconditions.cs new file mode 100644 index 00000000..03be685b --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Definition/Preconditions.cs @@ -0,0 +1,56 @@ +using System; +using System.Diagnostics; +using JetBrains.Annotations; + +namespace NBB.ProcessManager.Definition +{ + [DebuggerStepThrough] + public static class Preconditions + { + [ContractAnnotation("value:null => halt")] + public static T NotNull([NoEnumeration] T value, [InvokerParameterName, NotNull] string parameterName) + where T : class + { + if (ReferenceEquals(value, null)) + { + NotEmpty(parameterName, nameof(parameterName)); + + throw new ArgumentNullException(parameterName); + } + + return value; + } + + [ContractAnnotation("value:null => halt")] + public static string NotEmpty(string value, [InvokerParameterName, NotNull] string parameterName) + { + if (ReferenceEquals(value, null)) + { + NotEmpty(parameterName, nameof(parameterName)); + + throw new ArgumentNullException(parameterName); + } + + if (value.Length == 0) + { + NotEmpty(parameterName, nameof(parameterName)); + + throw new ArgumentException("String value cannot be null.", parameterName); + } + + return value; + } + + public static TEnum IsDefined(TEnum value, [InvokerParameterName, NotNull] string parameterName) where TEnum : struct + { + if (!Enum.IsDefined(typeof(TEnum), value)) + { + NotEmpty(parameterName, nameof(parameterName)); + + throw new ArgumentOutOfRangeException(parameterName); + } + + return value; + } + } +} diff --git a/src/Orchestration/NBB.ProcessManager.Definition/ReflectionExtensions.cs b/src/Orchestration/NBB.ProcessManager.Definition/ReflectionExtensions.cs new file mode 100644 index 00000000..d09ce428 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Definition/ReflectionExtensions.cs @@ -0,0 +1,239 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Runtime.CompilerServices; + +namespace NBB.ProcessManager.Definition +{ + public static class ReflectionExtensions + { + public static bool IsNonAbstractClass(this Type type, bool publicOnly) + { + var typeInfo = type.GetTypeInfo(); + + if (typeInfo.IsClass && !typeInfo.IsAbstract) + { + if (typeInfo.IsDefined(typeof(CompilerGeneratedAttribute), inherit: true)) + { + return false; + } + + if (publicOnly) + { + return typeInfo.IsPublic || typeInfo.IsNestedPublic; + } + + return true; + } + + return false; + } + + public static IEnumerable GetBaseTypes(this Type type) + { + var typeInfo = type.GetTypeInfo(); + + foreach (var implementedInterface in typeInfo.ImplementedInterfaces) + { + yield return implementedInterface; + } + + var baseType = typeInfo.BaseType; + + while (baseType != null) + { + var baseTypeInfo = baseType.GetTypeInfo(); + + yield return baseType; + + baseType = baseTypeInfo.BaseType; + } + } + + public static bool IsInNamespace(this Type type, string @namespace) + { + var typeNamespace = type.Namespace ?? string.Empty; + + if (@namespace.Length > typeNamespace.Length) + { + return false; + } + + var typeSubNamespace = typeNamespace.Substring(0, @namespace.Length); + + if (typeSubNamespace.Equals(@namespace, StringComparison.Ordinal)) + { + if (typeNamespace.Length == @namespace.Length) + { + //exactly the same + return true; + } + + //is a subnamespace? + return typeNamespace[@namespace.Length] == '.'; + } + + return false; + } + + public static bool IsInExactNamespace(this Type type, string @namespace) + { + return string.Equals(type.Namespace, @namespace, StringComparison.Ordinal); + } + + public static bool HasAttribute(this Type type, Type attributeType) + { + return type.GetTypeInfo().IsDefined(attributeType, inherit: true); + } + + public static bool HasAttribute(this Type type, Func predicate) where T : Attribute + { + return type.GetTypeInfo().GetCustomAttributes(inherit: true).Any(predicate); + } + + public static bool IsAssignableTo(this Type type, Type otherType) + { + var typeInfo = type.GetTypeInfo(); + var otherTypeInfo = otherType.GetTypeInfo(); + + if (otherTypeInfo.IsGenericTypeDefinition) + { + return typeInfo.IsAssignableToGenericTypeDefinition(otherTypeInfo); + } + + return otherTypeInfo.IsAssignableFrom(typeInfo); + } + + private static bool IsAssignableToGenericTypeDefinition(this TypeInfo typeInfo, TypeInfo genericTypeInfo) + { + var interfaceTypes = typeInfo.ImplementedInterfaces.Select(t => t.GetTypeInfo()); + + foreach (var interfaceType in interfaceTypes) + { + if (interfaceType.IsGenericType) + { + var typeDefinitionTypeInfo = interfaceType + .GetGenericTypeDefinition() + .GetTypeInfo(); + + if (typeDefinitionTypeInfo.Equals(genericTypeInfo)) + { + return true; + } + } + } + + if (typeInfo.IsGenericType) + { + var typeDefinitionTypeInfo = typeInfo + .GetGenericTypeDefinition() + .GetTypeInfo(); + + if (typeDefinitionTypeInfo.Equals(genericTypeInfo)) + { + return true; + } + } + + var baseTypeInfo = typeInfo.BaseType?.GetTypeInfo(); + + if (baseTypeInfo is null) + { + return false; + } + + return baseTypeInfo.IsAssignableToGenericTypeDefinition(genericTypeInfo); + } + + private static IEnumerable GetImplementedInterfacesToMap(TypeInfo typeInfo) + { + if (!typeInfo.IsGenericType) + { + return typeInfo.ImplementedInterfaces; + } + + if (!typeInfo.IsGenericTypeDefinition) + { + return typeInfo.ImplementedInterfaces; + } + + return FilterMatchingGenericInterfaces(typeInfo); + } + + private static IEnumerable FilterMatchingGenericInterfaces(TypeInfo typeInfo) + { + var genericTypeParameters = typeInfo.GenericTypeParameters; + + foreach (var current in typeInfo.ImplementedInterfaces) + { + var currentTypeInfo = current.GetTypeInfo(); + + if (currentTypeInfo.IsGenericType && currentTypeInfo.ContainsGenericParameters + && GenericParametersMatch(genericTypeParameters, currentTypeInfo.GenericTypeArguments)) + { + yield return currentTypeInfo.GetGenericTypeDefinition(); + } + } + } + + private static bool GenericParametersMatch(IReadOnlyList parameters, IReadOnlyList interfaceArguments) + { + if (parameters.Count != interfaceArguments.Count) + { + return false; + } + + for (var i = 0; i < parameters.Count; i++) + { + if (parameters[i] != interfaceArguments[i]) + { + return false; + } + } + + return true; + } + + public static bool IsOpenGeneric(this Type type) + { + return type.GetTypeInfo().IsGenericTypeDefinition; + } + + public static bool HasMatchingGenericArity(this Type interfaceType, TypeInfo typeInfo) + { + if (typeInfo.IsGenericType) + { + var interfaceTypeInfo = interfaceType.GetTypeInfo(); + + if (interfaceTypeInfo.IsGenericType) + { + var argumentCount = interfaceType.GenericTypeArguments.Length; + var parameterCount = typeInfo.GenericTypeParameters.Length; + + return argumentCount == parameterCount; + } + + return false; + } + + return true; + } + + public static Type GetRegistrationType(this Type interfaceType, TypeInfo typeInfo) + { + if (typeInfo.IsGenericTypeDefinition) + { + var interfaceTypeInfo = interfaceType.GetTypeInfo(); + + if (interfaceTypeInfo.IsGenericType) + { + return interfaceType.GetGenericTypeDefinition(); + } + } + + return interfaceType; + } + + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/DependencyInjectionExtensions.cs b/src/Orchestration/NBB.ProcessManager.Runtime/DependencyInjectionExtensions.cs new file mode 100644 index 00000000..a6e508ec --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/DependencyInjectionExtensions.cs @@ -0,0 +1,33 @@ +using Microsoft.Extensions.DependencyInjection; +using NBB.ProcessManager.Definition.Effects; +using NBB.ProcessManager.Runtime.EffectRunners; +using NBB.ProcessManager.Runtime.Persistence; +using NBB.ProcessManager.Runtime.Timeouts; +using System; + +namespace NBB.ProcessManager.Runtime +{ + public static class DependencyInjectionExtensions + { + public static void AddProcessManagerRuntime(this IServiceCollection services) + { + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(Functions.EffectRunnerFactory()); + services.AddSingleton(typeof(IEffectRunnerMarker), EffectRunners.EffectRunners.PublishEventEffectRunner()); + services.AddSingleton(typeof(IEffectRunnerMarker), EffectRunners.EffectRunners.SendCommandEffectRunner()); + services.AddSingleton(typeof(IEffectRunnerMarker), EffectRunners.EffectRunners.NoOpEffect()); + services.AddSingleton(typeof(IEffectRunnerMarker), EffectRunners.EffectRunners.RequestTimeoutEffectRunner()); + + services.AddTimeoutManager(); + } + + public static void AddTimeoutManager(this IServiceCollection services) + { + services.AddHostedService(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton>(provider => () => DateTime.UtcNow); + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/EffectRunners/EffectRunners.cs b/src/Orchestration/NBB.ProcessManager.Runtime/EffectRunners/EffectRunners.cs new file mode 100644 index 00000000..2e63c8e0 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/EffectRunners/EffectRunners.cs @@ -0,0 +1,65 @@ +using Microsoft.Extensions.DependencyInjection; +using NBB.Messaging.Abstractions; +using NBB.ProcessManager.Definition.Effects; +using NBB.ProcessManager.Runtime.Timeouts; +using System; +using System.Threading.Tasks; + +namespace NBB.ProcessManager.Runtime.EffectRunners +{ + public class EffectRunners + { + public static Func PublishEventEffectRunner() + { + return serviceProvider => + { + var messageBusPublisher = serviceProvider.GetRequiredService(); + return effect => + { + if (effect is PublishEvent publishEvent) + return messageBusPublisher.PublishAsync(publishEvent.Event); + return Task.CompletedTask; + }; + }; + } + + public static Func SendCommandEffectRunner() + { + return serviceProvider => + { + var messageBusPublisher = serviceProvider.GetRequiredService(); + return effect => + { + if (effect is SendCommand sendCommand) + return messageBusPublisher.PublishAsync(sendCommand.Command); + return Task.CompletedTask; + }; + }; + } + + public static Func RequestTimeoutEffectRunner() + { + return serviceProvider => + { + var timeoutsManager = serviceProvider.GetRequiredService(); + var timeoutsRepository = serviceProvider.GetRequiredService(); + var currentTimeProvider = serviceProvider.GetRequiredService>(); + + return async effect => + { + if (effect is RequestTimeout requestTimeout) + { + var dueDate = currentTimeProvider().Add(requestTimeout.TimeSpan); + await timeoutsRepository.Add(new TimeoutRecord(requestTimeout.InstanceId, dueDate, requestTimeout.Message, requestTimeout.MessageType)); + timeoutsManager.NewTimeoutRegistered(dueDate); + } + }; + }; + } + + public static Func NoOpEffect() + { + return serviceProvider => { return effect => Task.CompletedTask; }; + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/EffectRunners/IEffectRunnerFactory.cs b/src/Orchestration/NBB.ProcessManager.Runtime/EffectRunners/IEffectRunnerFactory.cs new file mode 100644 index 00000000..abce1dca --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/EffectRunners/IEffectRunnerFactory.cs @@ -0,0 +1,31 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using NBB.ProcessManager.Definition; + +namespace NBB.ProcessManager.Runtime.EffectRunners +{ + + public interface IEffectRunnerMarker + { + } + + + + public static class Functions + { + public static Func EffectRunnerFactory() + { + return serviceProvider => + { + return effectType => (EffectRunner) serviceProvider.GetRequiredService(typeof(IEffectRunnerMarker<>).MakeGenericType(effectType)); + + }; + } + } + + public delegate Task EffectRunner(IEffect effect); + + public delegate EffectRunner EffectRunnerFactory(Type effectType); + +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/Events/EventReceived.cs b/src/Orchestration/NBB.ProcessManager.Runtime/Events/EventReceived.cs new file mode 100644 index 00000000..e3796d04 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/Events/EventReceived.cs @@ -0,0 +1,17 @@ +using NBB.Core.Abstractions; +using System; + +namespace NBB.ProcessManager.Runtime.Events +{ + public class EventReceived : IEvent + { + public TEvent ReceivedEvent { get; } + public Guid EventId { get; } + + public EventReceived(TEvent receivedEvent, Guid? eventId = null) + { + ReceivedEvent = receivedEvent; + EventId = eventId ?? Guid.NewGuid(); + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/Events/ProcessAborted.cs b/src/Orchestration/NBB.ProcessManager.Runtime/Events/ProcessAborted.cs new file mode 100644 index 00000000..dc265e8b --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/Events/ProcessAborted.cs @@ -0,0 +1,15 @@ +using NBB.Core.Abstractions; +using System; + +namespace NBB.ProcessManager.Runtime.Events +{ + public class ProcessAborted : IEvent + { + public Guid EventId { get; } + + public ProcessAborted(Guid? eventId = null) + { + EventId = eventId ?? Guid.NewGuid(); + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/Events/ProcessCompleted.cs b/src/Orchestration/NBB.ProcessManager.Runtime/Events/ProcessCompleted.cs new file mode 100644 index 00000000..bc79383d --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/Events/ProcessCompleted.cs @@ -0,0 +1,17 @@ +using NBB.Core.Abstractions; +using System; + +namespace NBB.ProcessManager.Runtime.Events +{ + public class ProcessCompleted : IEvent + { + public TEvent ReceivedEvent { get; } + public Guid EventId { get; } + + public ProcessCompleted(TEvent receivedEvent, Guid? eventId = null) + { + ReceivedEvent = receivedEvent; + EventId = eventId ?? Guid.NewGuid(); + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/Events/ProcessStarted.cs b/src/Orchestration/NBB.ProcessManager.Runtime/Events/ProcessStarted.cs new file mode 100644 index 00000000..452fde7e --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/Events/ProcessStarted.cs @@ -0,0 +1,17 @@ +using NBB.Core.Abstractions; +using System; + +namespace NBB.ProcessManager.Runtime.Events +{ + public class ProcessStarted: IEvent + { + public object CorrelationId { get; } + public Guid EventId { get; } + + public ProcessStarted(object correlationId, Guid? eventId = null) + { + CorrelationId = correlationId; + EventId = eventId ?? Guid.NewGuid(); + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/Events/ProcessTimeout.cs b/src/Orchestration/NBB.ProcessManager.Runtime/Events/ProcessTimeout.cs new file mode 100644 index 00000000..d7bd14e3 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/Events/ProcessTimeout.cs @@ -0,0 +1,15 @@ +using NBB.Core.Abstractions; +using System; + +namespace NBB.ProcessManager.Runtime.Events +{ + public class ProcessTimeout : IEvent + { + public Guid EventId { get; } + + public ProcessTimeout(Guid? eventId = null) + { + EventId = eventId ?? Guid.NewGuid(); + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/Instance.cs b/src/Orchestration/NBB.ProcessManager.Runtime/Instance.cs new file mode 100644 index 00000000..05e98146 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/Instance.cs @@ -0,0 +1,149 @@ +using NBB.Core.Abstractions; +using NBB.ProcessManager.Definition; +using NBB.ProcessManager.Runtime.Events; +using System; +using System.Collections.Generic; +using System.Linq; + +namespace NBB.ProcessManager.Runtime +{ + public class Instance + where TData : struct + { + private readonly IDefinition _definition; + public InstanceData InstanceData { get; } + public InstanceStates State { get; private set; } + + private readonly List _changes = new List(); + private readonly List _effects = new List(); + public int Version { get; internal set; } + + public Instance(IDefinition definition) + { + _definition = definition; + InstanceData = new InstanceData(); + } + + private void StartProcess(TEvent @event) + where TEvent : IEvent + { + var eventType = typeof(TEvent); + var starter = _definition.GetStarterPredicate()(@event, InstanceData); + if (!starter) + throw new Exception($"Definition {_definition.GetType()} does not accept eventType {eventType} as a starter event."); + + var idSelector = _definition.GetCorrelationFilter(); + if (idSelector == null) + throw new ArgumentNullException($"No correlation defined for eventType {eventType} in definition {_definition.GetType()}"); + + if (State != InstanceStates.NotStarted) + throw new Exception($"Cannot start an instance which is in state {State}"); + + var correlationId = idSelector(@event); + Emit(new ProcessStarted(correlationId)); + } + + public void ProcessEvent(TEvent @event) + where TEvent : IEvent + { + var eventType = typeof(TEvent); + var starter = _definition.GetStarterPredicate()(@event, InstanceData); + + if (State == InstanceStates.NotStarted && starter) + StartProcess(@event); + + if (State == InstanceStates.Completed || State == InstanceStates.Aborted) + throw new Exception($"Cannot accept a new event. Instance is {State}"); + + if (State != InstanceStates.Started) + return; + + var effectHandlers = _definition.GetEffectHandlers(eventType); + foreach (var (pred, handlers) in effectHandlers) + { + if (pred != null && !pred(@event, InstanceData)) + continue; + + foreach (var handler in handlers) + { + var effect = handler(@event, InstanceData); + _effects.Add(effect); + } + } + + Emit(new EventReceived(@event)); + + if (_definition.GetCompletionPredicates().Any(x => x(@event, InstanceData))) + Emit(new ProcessCompleted(@event)); + } + + private void Apply(EventReceived @event) + { + var stateHandlers = _definition.GetStateHandlers(@event.ReceivedEvent.GetType()); + foreach (var (pred, handlers) in stateHandlers) + { + if (pred != null && !pred((IEvent) @event.ReceivedEvent, InstanceData)) + continue; + + foreach (var handler in handlers) + InstanceData.Data = handler((IEvent) @event.ReceivedEvent, InstanceData); + } + } + + private void Apply(ProcessStarted @event) + { + State = InstanceStates.Started; + InstanceData.CorrelationId = @event.CorrelationId; + } + + private void Apply(ProcessCompleted @event) + { + State = InstanceStates.Completed; + } + + private void ApplyChanges(dynamic @event, bool isNew) + { + Apply(@event); + + if (isNew) + { + _changes.Add(@event); + } + else + Version++; + } + + private void Emit(IEvent @event) + { + ApplyChanges(@event, true); + } + + public string GetStreamFor(object identity) + { + return _definition.GetType().FullName + ":" + InstanceData.Data.GetType().FullName + ":" + identity; + } + + public string GetStream() + { + return GetStreamFor(InstanceData.CorrelationId); + } + + public void LoadFromHistory(IEnumerable events) + { + foreach (var @event in events) + { + ApplyChanges(@event, false); + } + } + + public IEnumerable GetUncommittedChanges() => _changes; + public IEnumerable GetUncommittedEffects() => _effects; + + public void MarkChangesAsCommitted() + { + Version += _changes.Count; + _changes.Clear(); + _effects.Clear(); + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/InstanceStates.cs b/src/Orchestration/NBB.ProcessManager.Runtime/InstanceStates.cs new file mode 100644 index 00000000..4b1a4e94 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/InstanceStates.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace NBB.ProcessManager.Runtime +{ + public enum InstanceStates + { + NotStarted = 0, + Started = 1, + Aborted = 2, + Completed = 3 + } +} diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/NBB.ProcessManager.Runtime.csproj b/src/Orchestration/NBB.ProcessManager.Runtime/NBB.ProcessManager.Runtime.csproj new file mode 100644 index 00000000..cd4d8ea5 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/NBB.ProcessManager.Runtime.csproj @@ -0,0 +1,16 @@ + + + + netstandard2.0 + + + + + + + + + + + + diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/Persistence/IInstanceDataRepository.cs b/src/Orchestration/NBB.ProcessManager.Runtime/Persistence/IInstanceDataRepository.cs new file mode 100644 index 00000000..178b6266 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/Persistence/IInstanceDataRepository.cs @@ -0,0 +1,15 @@ +using System.Threading; +using System.Threading.Tasks; +using NBB.ProcessManager.Definition; + +namespace NBB.ProcessManager.Runtime.Persistence +{ + public interface IInstanceDataRepository + { + Task Save(Instance instance, CancellationToken cancellationToken) + where TData : struct; + + Task> Get(IDefinition definition, object identity, CancellationToken cancellationToken) + where TData : struct; + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/Persistence/InstanceDataRepository.cs b/src/Orchestration/NBB.ProcessManager.Runtime/Persistence/InstanceDataRepository.cs new file mode 100644 index 00000000..7b06ece4 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/Persistence/InstanceDataRepository.cs @@ -0,0 +1,48 @@ +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using NBB.Core.Abstractions; +using NBB.EventStore.Abstractions; +using NBB.ProcessManager.Definition; +using NBB.ProcessManager.Runtime.EffectRunners; + +namespace NBB.ProcessManager.Runtime.Persistence +{ + public class InstanceDataRepository : IInstanceDataRepository + { + private readonly IEventStore _eventStore; + private readonly EffectRunnerFactory _effectRunnerFactory; + + public InstanceDataRepository(IEventStore eventStore, EffectRunnerFactory effectRunnerFactory) + { + _eventStore = eventStore; + _effectRunnerFactory = effectRunnerFactory; + } + + public async Task Save(Instance instance, CancellationToken cancellationToken) + where TData : struct + { + var events = instance.GetUncommittedChanges().Cast().ToList(); + var effects = instance.GetUncommittedEffects().ToList(); + var streamId = instance.GetStream(); + var aggregateLoadedAtVersion = instance.Version; + instance.MarkChangesAsCommitted(); + + await _eventStore.AppendEventsToStreamAsync(streamId, events, aggregateLoadedAtVersion, cancellationToken); + foreach (var effect in effects) + await _effectRunnerFactory(effect.GetType())(effect); + } + + public async Task> Get(IDefinition definition, object identity, CancellationToken cancellationToken) + where TData : struct + { + var instance = new Instance(definition); + var streamId = instance.GetStreamFor(identity); + var events = await _eventStore.GetEventsFromStreamAsync(streamId, instance.Version + 1, cancellationToken); + if (events.Any()) + instance.LoadFromHistory(events); + + return instance; + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/ProcessExecutionCoordinator.cs b/src/Orchestration/NBB.ProcessManager.Runtime/ProcessExecutionCoordinator.cs new file mode 100644 index 00000000..6fd13d7f --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/ProcessExecutionCoordinator.cs @@ -0,0 +1,41 @@ +using NBB.Core.Abstractions; +using NBB.ProcessManager.Definition; +using NBB.ProcessManager.Runtime.Persistence; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace NBB.ProcessManager.Runtime +{ + public class ProcessExecutionCoordinator + { + private readonly IInstanceDataRepository _dataRepository; + private readonly IEnumerable _definitions; + + public ProcessExecutionCoordinator(IInstanceDataRepository dataRepository, IEnumerable definitions) + { + _dataRepository = dataRepository; + _definitions = definitions; + } + + public async Task Invoke(TEvent @event, CancellationToken cancellationToken) + where TDefinition : IDefinition + where TData : struct + where TEvent : IEvent + { + var definition = _definitions.OfType().SingleOrDefault(); + if (definition == null) + throw new Exception($"No definition found for type {typeof(TDefinition)}"); + + var identitySelector = definition.GetCorrelationFilter(); + if (identitySelector == null) + throw new ArgumentNullException($"No correlation defined for eventType {typeof(TEvent)} in definition {typeof(TDefinition)}"); + + var instance = await _dataRepository.Get(definition, identitySelector(@event), cancellationToken); + instance.ProcessEvent(@event); + await _dataRepository.Save(instance, cancellationToken); + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/ITimeoutsRepository.cs b/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/ITimeoutsRepository.cs new file mode 100644 index 00000000..ec81bcb9 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/ITimeoutsRepository.cs @@ -0,0 +1,41 @@ +using System; +using System.Threading.Tasks; + +namespace NBB.ProcessManager.Runtime.Timeouts +{ + public interface ITimeoutsRepository + { + /// + /// Adds a new timeout. + /// + /// Timeout data. + Task Add(TimeoutRecord timeout); + + /// + /// Removes the timeout if it hasn't been previously removed. + /// + /// The timeout id to remove. + /// true when the timeout has successfully been removed by this method call, false otherwise. + Task TryRemove(Guid timeoutId); + + /// + /// Returns the timeout with the given id from the storage. The timeout will remain in the storage. + /// + /// The id of the timeout to fetch. + /// with the given id if present in the storage or null otherwise. + Task Peek(Guid timeoutId); + + /// + /// Removes the timeouts by instance id. + /// + /// The instance id of the timeouts to remove. + Task RemoveTimeoutBy(string instanceId); + + /// + /// Retrieves the next range of timeouts that are due. + /// + /// The time where to start retrieving the next slice, the slice should exclude this date. + /// Returns the next range of timeouts that are due. + Task GetNextBatch(DateTime startSlice); + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/InMemoryTimeoutRepository.cs b/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/InMemoryTimeoutRepository.cs new file mode 100644 index 00000000..50124ffc --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/InMemoryTimeoutRepository.cs @@ -0,0 +1,136 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace NBB.ProcessManager.Runtime.Timeouts +{ + public class InMemoryTimeoutRepository : ITimeoutsRepository, IDisposable + { + private readonly Func _currentTimeProvider; + private readonly ReaderWriterLockSlim _readerWriterLock = new ReaderWriterLockSlim(); + private readonly List _storage = new List(); + public static TimeSpan EmptyResultsNextTimeToRunQuerySpan = TimeSpan.FromMinutes(1); + + public InMemoryTimeoutRepository(Func currentTimeProvider) + { + _currentTimeProvider = currentTimeProvider; + } + + public void Dispose() + { + } + + public Task Add(TimeoutRecord timeout) + { + try + { + _readerWriterLock.EnterWriteLock(); + _storage.Add(timeout); + } + finally + { + _readerWriterLock.ExitWriteLock(); + } + + return Task.CompletedTask; + } + + public Task Peek(Guid timeoutId) + { + try + { + _readerWriterLock.EnterReadLock(); + return Task.FromResult(_storage.SingleOrDefault(t => t.Id == timeoutId)); + } + finally + { + _readerWriterLock.ExitReadLock(); + } + } + + public Task TryRemove(Guid timeoutId) + { + try + { + _readerWriterLock.EnterWriteLock(); + + for (var index = 0; index < _storage.Count; index++) + { + var data = _storage[index]; + if (data.Id == timeoutId) + { + _storage.RemoveAt(index); + return Task.FromResult(true); + } + } + + return Task.FromResult(false); + } + finally + { + _readerWriterLock.ExitWriteLock(); + } + } + + public Task RemoveTimeoutBy(string instanceId) + { + try + { + _readerWriterLock.EnterWriteLock(); + for (var index = 0; index < _storage.Count;) + { + var timeoutData = _storage[index]; + if (timeoutData.ProcessManagerInstanceId == instanceId) + { + _storage.RemoveAt(index); + continue; + } + index++; + } + } + finally + { + _readerWriterLock.ExitWriteLock(); + } + + return Task.CompletedTask; + } + + public Task GetNextBatch(DateTime startSlice) + { + var now = _currentTimeProvider(); + var nextTimeToRunQuery = DateTime.MaxValue; + var dueTimeouts = new List(); + + try + { + _readerWriterLock.EnterReadLock(); + + foreach (var data in _storage) + { + if (data.DueDate > now && data.DueDate < nextTimeToRunQuery) + { + nextTimeToRunQuery = data.DueDate; + } + if (data.DueDate > startSlice && data.DueDate <= now) + { + dueTimeouts.Add(data); + } + } + } + finally + { + _readerWriterLock.ExitReadLock(); + } + + if (nextTimeToRunQuery == DateTime.MaxValue) + { + nextTimeToRunQuery = now.Add(EmptyResultsNextTimeToRunQuerySpan); + } + + return Task.FromResult(new TimeoutBatch(dueTimeouts.ToArray(), nextTimeToRunQuery)); + } + } +} diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/TimeoutBatch.cs b/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/TimeoutBatch.cs new file mode 100644 index 00000000..1a604e90 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/TimeoutBatch.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace NBB.ProcessManager.Runtime.Timeouts +{ + public class TimeoutBatch + { + /// + /// Creates a new instance of the timeouts batch. + /// + /// timeouts that are due. + /// the next time to query for due timeouts again. + public TimeoutBatch(TimeoutRecord[] dueTimeouts, DateTime nextTimeToQuery) + { + DueTimeouts = dueTimeouts; + NextTimeToQuery = nextTimeToQuery; + } + + /// + /// timeouts that are due. + /// + public TimeoutRecord[] DueTimeouts { get; } + + /// + /// the next time to query for due timeouts again. + /// + public DateTime NextTimeToQuery { get; } + + + + } +} diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/TimeoutOccured.cs b/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/TimeoutOccured.cs new file mode 100644 index 00000000..25e7a310 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/TimeoutOccured.cs @@ -0,0 +1,20 @@ +using NBB.Core.Abstractions; +using System; +using MediatR; + +namespace NBB.ProcessManager.Runtime.Timeouts +{ + public class TimeoutOccured : IEvent, INotification + { + public Guid EventId { get; } + public string ProcessManagerInstanceId { get; } + public object Message { get; } + + public TimeoutOccured(string processManagerInstanceId, object message, Guid? eventId = null) + { + ProcessManagerInstanceId = processManagerInstanceId; + Message = message; + EventId = eventId ?? Guid.NewGuid(); + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/TimeoutRecord.cs b/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/TimeoutRecord.cs new file mode 100644 index 00000000..9c271e63 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/TimeoutRecord.cs @@ -0,0 +1,41 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace NBB.ProcessManager.Runtime.Timeouts +{ + public class TimeoutRecord + { + /// + /// Id of this timeout. + /// + public Guid Id { get; } + + public object Message { get; } + public Type MessageType { get; } + + /// + /// The ProcessManagerInstanceId who requested the timeout. + /// + public string ProcessManagerInstanceId { get; } + + /// + /// The time at which the timeout expires. + /// + public DateTime DueDate { get; } + + public TimeoutRecord(string processManagerInstanceId, DateTime dueDate, object message, Type messageType, Guid? id = null) + { + Id = id ?? Guid.NewGuid(); + ProcessManagerInstanceId = processManagerInstanceId; + DueDate = dueDate; + Message = message; + MessageType = messageType; + } + + public override string ToString() + { + return $"Timeout({Id}) - Expires:{DueDate}, InstanceId:{ProcessManagerInstanceId}"; + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/TimeoutsManager.cs b/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/TimeoutsManager.cs new file mode 100644 index 00000000..539519fb --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/TimeoutsManager.cs @@ -0,0 +1,109 @@ +using MediatR; +using Microsoft.Extensions.Logging; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace NBB.ProcessManager.Runtime.Timeouts +{ + public class TimeoutsManager : IDisposable + { + private readonly ITimeoutsRepository _timeoutsRepository; + private readonly ILogger _logger; + private readonly IMediator _mediator; + static readonly TimeSpan MaxNextRetrievalDelay = TimeSpan.FromMinutes(1); + static readonly TimeSpan NextRetrievalPollSleep = TimeSpan.FromMilliseconds(1000); + readonly Func _currentTimeProvider; + readonly object _lockObject = new object(); + public DateTime NextRetrieval { get; private set; } + DateTime _startSlice; + + public TimeoutsManager(ITimeoutsRepository timeoutsRepository, ILogger logger, IMediator mediator, Func currentTimeProvider) + { + _timeoutsRepository = timeoutsRepository; + _logger = logger; + _mediator = mediator; + _currentTimeProvider = currentTimeProvider; + + var now = _currentTimeProvider(); + _startSlice = now.AddYears(-10); + NextRetrieval = now; + } + + public async Task Poll(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + try + { + await SpinOnce(cancellationToken).ConfigureAwait(false); + await Task.Delay(NextRetrievalPollSleep, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // ok, since the InnerPoll could observe the token + } + catch (Exception e) + { + _logger.LogError(e, "Failed to fetch timeouts from the timeout storage"); + } + } + } + + internal async Task SpinOnce(CancellationToken cancellationToken) + { + if (NextRetrieval > _currentTimeProvider() || cancellationToken.IsCancellationRequested) + { + return; + } + + _logger.LogDebug("Polling for timeouts at {0}.", _currentTimeProvider()); + var timeoutChunk = await _timeoutsRepository.GetNextBatch(_startSlice).ConfigureAwait(false); + + foreach (var timeoutData in timeoutChunk.DueTimeouts) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + await _mediator.Publish(new TimeoutOccured(timeoutData.ProcessManagerInstanceId, timeoutData.Message), cancellationToken); + + if (_startSlice < timeoutData.DueDate) + { + _startSlice = timeoutData.DueDate; + } + } + + lock (_lockObject) + { + var nextTimeToQuery = timeoutChunk.NextTimeToQuery; + + // we cap the next retrieval to max 1 minute this will make sure that we trip the circuit breaker if we + // loose connectivity to our storage. This will also make sure that timeouts added (during migration) direct to storage + // will be picked up after at most 1 minute + var maxNextRetrieval = _currentTimeProvider() + MaxNextRetrievalDelay; + + NextRetrieval = nextTimeToQuery > maxNextRetrieval ? maxNextRetrieval : nextTimeToQuery; + + _logger.LogDebug("Polling next retrieval is at {0}.", NextRetrieval.ToLocalTime()); + } + } + + public void NewTimeoutRegistered(DateTime expiryTime) + { + lock (_lockObject) + { + if (NextRetrieval > expiryTime) + { + NextRetrieval = expiryTime; + } + } + } + + + public void Dispose() + { + } + } +} \ No newline at end of file diff --git a/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/TimeoutsService.cs b/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/TimeoutsService.cs new file mode 100644 index 00000000..6ad72159 --- /dev/null +++ b/src/Orchestration/NBB.ProcessManager.Runtime/Timeouts/TimeoutsService.cs @@ -0,0 +1,21 @@ +using Microsoft.Extensions.Hosting; +using System.Threading; +using System.Threading.Tasks; + +namespace NBB.ProcessManager.Runtime.Timeouts +{ + public class TimeoutsService : BackgroundService + { + private readonly TimeoutsManager _timeoutsManager; + + public TimeoutsService(TimeoutsManager timeoutsManager) + { + _timeoutsManager = timeoutsManager; + } + + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + return _timeoutsManager.Poll(stoppingToken); + } + } +} \ No newline at end of file diff --git a/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Commands/DoPayment.cs b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Commands/DoPayment.cs new file mode 100644 index 00000000..7e214f65 --- /dev/null +++ b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Commands/DoPayment.cs @@ -0,0 +1,6 @@ +namespace NBB.ProcessManager.Tests.Commands +{ + public class DoPayment + { + } +} diff --git a/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Commands/ShipOrder.cs b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Commands/ShipOrder.cs new file mode 100644 index 00000000..7a427177 --- /dev/null +++ b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Commands/ShipOrder.cs @@ -0,0 +1,19 @@ +using System; +using NBB.Core.Abstractions; + +namespace NBB.ProcessManager.Tests.Commands +{ + public class ShipOrder : ICommand + { + public Guid OrderId { get; } + public decimal Amount { get; } + public string ShippingAddress{ get; } + + public ShipOrder(Guid orderId, decimal amount, string shippingAddress) + { + OrderId = orderId; + Amount = amount; + ShippingAddress = shippingAddress; + } + } +} diff --git a/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderCompleted.cs b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderCompleted.cs new file mode 100644 index 00000000..ec9134c3 --- /dev/null +++ b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderCompleted.cs @@ -0,0 +1,22 @@ +using System; +using NBB.Application.DataContracts; + +namespace NBB.ProcessManager.Tests.Events +{ + public class OrderCompleted : Event + { + public decimal Amount { get; } + public Guid OrderId { get; } + public int DocumentId { get; } + public int SiteId { get; } + + public OrderCompleted(decimal amount, Guid orderId, int documentId, int siteId, EventMetadata metadata = null) + : base(metadata) + { + Amount = amount; + OrderId = orderId; + DocumentId = documentId; + SiteId = siteId; + } + } +} diff --git a/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderCreated.cs b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderCreated.cs new file mode 100644 index 00000000..9ee822bd --- /dev/null +++ b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderCreated.cs @@ -0,0 +1,21 @@ +using System; +using NBB.Application.DataContracts; + +namespace NBB.ProcessManager.Tests.Events +{ + public class OrderCreated : Event + { + public Guid OrderId { get; } + public decimal Amount { get; } + public int DocumentId { get; } + public int SiteId { get; } + + public OrderCreated(Guid orderId, decimal amount, int documentId, int siteId, EventMetadata metadata = null) : base(metadata) + { + OrderId = orderId; + Amount = amount; + DocumentId = documentId; + SiteId = siteId; + } + } +} \ No newline at end of file diff --git a/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderPaymentCreated.cs b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderPaymentCreated.cs new file mode 100644 index 00000000..aa9d5286 --- /dev/null +++ b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderPaymentCreated.cs @@ -0,0 +1,23 @@ +using System; +using NBB.Application.DataContracts; + +namespace NBB.ProcessManager.Tests.Events +{ + public class OrderPaymentCreated : Event + { + + public decimal Amount { get; } + public Guid OrderId { get; } + public int DocumentId { get; } + public int SiteId { get; } + + public OrderPaymentCreated(decimal amount, Guid orderId, int documentId, int siteId, EventMetadata metadata = null) + : base(metadata) + { + Amount = amount; + OrderId = orderId; + DocumentId = documentId; + SiteId = siteId; + } + } +} diff --git a/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderPaymentExpired.cs b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderPaymentExpired.cs new file mode 100644 index 00000000..1b661ca4 --- /dev/null +++ b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderPaymentExpired.cs @@ -0,0 +1,9 @@ +using System; +using NBB.Application.DataContracts; + +namespace NBB.ProcessManager.Tests.Events +{ + public class OrderPaymentExpired : Event + { + } +} diff --git a/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderPaymentReceived.cs b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderPaymentReceived.cs new file mode 100644 index 00000000..43bf901d --- /dev/null +++ b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderPaymentReceived.cs @@ -0,0 +1,16 @@ +using System; +using NBB.Application.DataContracts; + +namespace NBB.ProcessManager.Tests.Events +{ + public class OrderPaymentReceived : Event + { + public Guid ContractOrderId { get; } + + public OrderPaymentReceived(Guid contractOrderId) + { + ContractOrderId = contractOrderId; + } + + } +} \ No newline at end of file diff --git a/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderShipped.cs b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderShipped.cs new file mode 100644 index 00000000..8786833f --- /dev/null +++ b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/Events/OrderShipped.cs @@ -0,0 +1,17 @@ +using System; +using NBB.Application.DataContracts; + +namespace NBB.ProcessManager.Tests.Events +{ + public class OrderShipped : Event + { + public Guid OrderId { get; } + public DateTime ShippingDate { get; } + + public OrderShipped(Guid orderId, DateTime shippingDate, EventMetadata metadata = null) : base(metadata) + { + OrderId = orderId; + ShippingDate = shippingDate; + } + } +} \ No newline at end of file diff --git a/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/InstanceFixture.cs b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/InstanceFixture.cs new file mode 100644 index 00000000..d6bebdb5 --- /dev/null +++ b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/InstanceFixture.cs @@ -0,0 +1,26 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using NBB.EventStore.InMemory; +using NBB.EventStore.Internal; +using NBB.ProcessManager.Runtime.Persistence; +using System.Threading.Tasks; + +namespace NBB.ProcessManager.Tests +{ + public class InstanceDataRepositoryFixture + { + public InstanceDataRepository Repository { get; private set; } + + public InstanceDataRepositoryFixture() + { + var serviceProvider = new ServiceCollection() + .AddLogging(builder => builder.AddConsole()) + .BuildServiceProvider(); + + var logger = serviceProvider.GetRequiredService>(); + Repository = new InstanceDataRepository( + new EventStore.EventStore(new InMemoryRepository(), new NewtonsoftJsonEventStoreSerDes(), logger), + type => effect => Task.CompletedTask); + } + } +} \ No newline at end of file diff --git a/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/NBB.ProcessManager.Tests.csproj b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/NBB.ProcessManager.Tests.csproj new file mode 100644 index 00000000..7538a4c2 --- /dev/null +++ b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/NBB.ProcessManager.Tests.csproj @@ -0,0 +1,22 @@ + + + + netcoreapp2.2 + false + + + + + + + + + + + + + + + + + diff --git a/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/ProcessManagerInstanceUnitTests.cs b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/ProcessManagerInstanceUnitTests.cs new file mode 100644 index 00000000..247157a7 --- /dev/null +++ b/test/UnitTests/Orchestration/NBB.ProcessManager.Tests/ProcessManagerInstanceUnitTests.cs @@ -0,0 +1,335 @@ +using FluentAssertions; +using NBB.ProcessManager.Definition.Builder; +using NBB.ProcessManager.Definition.Effects; +using NBB.ProcessManager.Runtime; +using NBB.ProcessManager.Tests.Commands; +using NBB.ProcessManager.Tests.Events; +using System; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace NBB.ProcessManager.Tests +{ + public class ProcessManagerInstanceUnitTests : IClassFixture + { + private readonly InstanceDataRepositoryFixture _fixture; + + public ProcessManagerInstanceUnitTests(InstanceDataRepositoryFixture fixture) + { + _fixture = fixture; + } + + [Fact] + public async Task TestRepo() + { + var @event = new OrderCreated(Guid.NewGuid(), 100, 0, 0); + var definition = new OrderProcessManager3(); + + var instance = new Instance(definition); + var identitySelector = definition.GetCorrelationFilter(); + if (identitySelector != null) + instance = await _fixture.Repository.Get(definition, identitySelector(@event), CancellationToken.None); + + instance.ProcessEvent(@event); + await _fixture.Repository.Save(instance, CancellationToken.None); + } + + [Fact] + public void Should_throw_for_missing_correlation() + { + var @event = new OrderCreated(Guid.NewGuid(), 100, 0, 0); + var definition = new OrderProcessManagerNoCorrelation(); + + var instance = new Instance(definition); + Action act = () => instance.ProcessEvent(@event); + act.Should().Throw(); + } + + class OrderProcessManagerNoCorrelation : AbstractDefinition + { + public OrderProcessManagerNoCorrelation() + { + StartWith() + .SetState((orderCreated, state) => + { + var newState = state.Data; + newState.Amount = 100; + return newState; + }) + .PublishEvent((orderCreated, state) => new OrderCompleted(100, orderCreated.OrderId, 0, 0)); + } + } + + + [Fact] + public void setStateHandler_on_first_event() + { + var @event = new OrderCreated(Guid.NewGuid(), 100, 0, 0); + var definition = new OrderProcessManager2(); + + var instance = new Instance(definition); + instance.ProcessEvent(@event); + instance.InstanceData.Data.Amount.Should().Be(200); + } + + class OrderProcessManager2 : AbstractDefinition + { + public OrderProcessManager2() + { + Event(configurator => configurator.CorrelateById(orderCreated => Guid.NewGuid())); + + StartWith() + .SetState((orderCreated, state) => + { + var newState = state.Data; + newState.Amount = 100; + return newState; + }) + .PublishEvent((orderCreated, state) => new OrderCompleted(100, orderCreated.OrderId, 0, 0)) + .SetState((orderCreated, state) => + { + var newState = state.Data; + newState.Amount = state.Data.Amount + 100; + return newState; + }); + } + } + + [Fact] + public void setStateHandler_with_two_events() + { + var orderId = Guid.NewGuid(); + var orderCreated = new OrderCreated(orderId, 100, 0, 0); + var definition = new OrderProcessManager3(); + + var instance = new Instance(definition); + instance.ProcessEvent(orderCreated); + instance.InstanceData.Data.Amount.Should().Be(200); + + var orderPaymentCreated = new OrderPaymentCreated(100, orderId, 0, 0); + instance.ProcessEvent(orderPaymentCreated); + instance.InstanceData.Data.Amount.Should().Be(200); + instance.InstanceData.Data.IsPaid.Should().BeTrue(); + } + + class OrderProcessManager3 : AbstractDefinition + { + public OrderProcessManager3() + { + Event(configurator => configurator.CorrelateById(orderCreated => orderCreated.OrderId)); + Event(configurator => configurator.CorrelateById(paymentReceived => paymentReceived.OrderId)); + + StartWith() + .SetState((orderCreated, state) => + { + var newState = state.Data; + newState.Amount = 100; + newState.OrderId = orderCreated.OrderId; + return newState; + }) + .PublishEvent((orderCreated, state) => new OrderCompleted(100, orderCreated.OrderId, 0, 0)) + .SetState((orderCreated, state) => + { + var newState = state.Data; + newState.Amount = state.Data.Amount + 100; + return newState; + }); + + When() + .SetState((@event, data) => + { + var newState = data.Data; + newState.IsPaid = true; + return newState; + }) + .Then((ev, state) => + { + var effect = new SendCommand(new DoPayment()); + return effect; + }); + } + } + + [Fact] + public void should_not_start_when_event_with_false_predicate() + { + var orderId = Guid.NewGuid(); + var orderCreated = new OrderCreated(orderId, 100, 0, 0); + var definition = new OrderProcessManager4(); + + var instance = new Instance(definition); + instance.ProcessEvent(orderCreated); + instance.State.Should().Be(InstanceStates.NotStarted); + } + + + class OrderProcessManager4 : AbstractDefinition + { + public OrderProcessManager4() + { + Event(configurator => configurator.CorrelateById(orderCreated => orderCreated.OrderId)); + StartWith((@event, data) => false); + } + } + + [Fact] + public void complete_with_false_predicate() + { + var orderId = Guid.NewGuid(); + var orderCreated = new OrderCreated(orderId, 100, 0, 0); + var orderPaymentCreated = new OrderPaymentCreated(100, orderId, 0, 0); + var definition = new OrderProcessManager5(); + + var instance = new Instance(definition); + instance.ProcessEvent(orderCreated); + instance.State.Should().Be(InstanceStates.Started); + instance.InstanceData.Data.IsPaid.Should().Be(true); + instance.ProcessEvent(orderPaymentCreated); + instance.State.Should().Be(InstanceStates.Started); + instance.InstanceData.Data.IsPaid.Should().Be(false); + instance.InstanceData.Data.Amount.Should().Be(110); + } + + + class OrderProcessManager5 : AbstractDefinition + { + public OrderProcessManager5() + { + Event(configurator => configurator.CorrelateById(orderCreated => orderCreated.OrderId)); + Event(configurator => configurator.CorrelateById(orderPaymentCreated => orderPaymentCreated.OrderId)); + + StartWith() + .SetState((@event, data) => + { + var newState = data.Data; + newState.IsPaid = true; + return newState; + }); + + When((@event, data) => true) + .SetState((@event, data) => + { + var newState = data.Data; + newState.Amount = 100; + return newState; + }); + + When((@event, data) => data.Data.Amount < 100) + .SetState((@event, data) => + { + var newState = data.Data; + newState.Amount += 20; + newState.IsPaid = false; + return newState; + }) + .Complete((@event, data) => data.Data.IsPaid); + + When((@event, data) => data.Data.Amount >= 100) + .SetState((@event, data) => + { + var newState = data.Data; + newState.Amount += 10; + newState.IsPaid = false; + return newState; + }) + .Complete((@event, data) => data.Data.IsPaid); + } + } + + + [Fact] + public void process_event_after_completion() + { + var orderId = Guid.NewGuid(); + var orderCreated = new OrderCreated(orderId, 100, 0, 0); + var orderCompleted = new OrderCompleted(100, orderId, 0, 0); + var orderPaymentCreated = new OrderPaymentCreated(100, orderId, 0, 0); + var definition = new OrderProcessManager6(); + + var instance = new Instance(definition); + instance.ProcessEvent(orderCreated); + instance.ProcessEvent(orderCompleted); + instance.State.Should().Be(InstanceStates.Completed); + + Action act = () => instance.ProcessEvent(orderPaymentCreated); + act.Should().Throw(); + } + + + class OrderProcessManager6 : AbstractDefinition + { + public OrderProcessManager6() + { + Event(configurator => configurator.CorrelateById(orderCreated => orderCreated.OrderId)); + Event(configurator => configurator.CorrelateById(orderCompleted => orderCompleted.OrderId)); + Event(configurator => configurator.CorrelateById(orderPaymentCreated => orderPaymentCreated.OrderId)); + + StartWith() + .SetState((@event, data) => + { + var newState = data.Data; + newState.OrderId = @event.OrderId; + return newState; + }); + + When() + .Complete(); + + When() + .SetState((@event, data) => + { + var newState = data.Data; + newState.IsPaid = true; + return newState; + }) + .Complete(); + + } + } + + [Fact] + public void when_predicate_false_should_not_complete() + { + var orderId = Guid.NewGuid(); + var orderCreated = new OrderCreated(orderId, 100, 0, 0); + var orderShipped = new OrderShipped(orderId, DateTime.Parse("2019-09-09")); + var definition = new OrderProcessManager7(); + + var instance = new Instance(definition); + instance.ProcessEvent(orderCreated); + instance.ProcessEvent(orderShipped); + instance.State.Should().Be(InstanceStates.Started); + } + + + class OrderProcessManager7 : AbstractDefinition + { + public OrderProcessManager7() + { + Event(configurator => configurator.CorrelateById(orderCreated => orderCreated.OrderId)); + Event(configurator => configurator.CorrelateById(orderCreated => orderCreated.OrderId)); + + StartWith() + .SendCommand((created, data) => new ShipOrder(created.OrderId, created.Amount, "bucuresti")); + + When((@event, data) => @event.ShippingDate < DateTime.Parse("2019-09-08")) + .Complete(); + + When() + .Complete((@event, data) => @event.ShippingDate < DateTime.Parse("2019-09-08")); + } + } + } + + public struct OrderProcessManagerData + { + public Guid OrderId { get; set; } + public int SiteId { get; set; } + public int DocumentId { get; set; } + public int UserId { get; set; } + public decimal Amount { get; set; } + public bool IsPaid { get; set; } + public bool IsShipped { get; set; } + } +} \ No newline at end of file