Skip to content

Commit

Permalink
process manager - add effect extensions (#36)
Browse files Browse the repository at this point in the history
process manager - add effect extensions
  • Loading branch information
lghinet authored Oct 30, 2019
1 parent 70340a7 commit 3d16534
Show file tree
Hide file tree
Showing 29 changed files with 334 additions and 392 deletions.
23 changes: 16 additions & 7 deletions samples/Orchestration/ProcessManagerSample/OrderProcessManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
using ProcessManagerSample.Events;
using ProcessManagerSample.Queries;
using System;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
using MediatR;

namespace ProcessManagerSample
{
Expand Down Expand Up @@ -35,8 +39,16 @@ public OrderProcessManager(IMapper mapper)
})
.Then((orderCreated, data) =>
{
return new SendQueryEffect<Partner>(new GetPartnerQuery())
.ContinueWith(partner => new PublishMessageEffect(new DoPayment()));
var q1 = Query(new GetClientQuery());
var q2 = Effect.Parallel(Query(new GetPartnerQuery()), Query(new GetPartnerQuery()));
var queries =
from x in q1
from y in q2
select x.ClientCode + string.Join("; ", y.Select(z => z.PartnerName));
return queries.ContinueWith(partners => PublishMessage(new DoPayment()))
.ContinueWith(partner => PublishMessage(new DoPayment()));
})
.RequestTimeout(TimeSpan.FromSeconds(10), (created, data) => new OrderPaymentExpired(Guid.Empty, 0, 0));

Expand All @@ -49,10 +61,9 @@ public OrderProcessManager(IMapper mapper)
.Complete();
}

private static IEffect OrderCreatedHandler(OrderCreated orderCreated, InstanceData<OrderProcessManagerData> state)
private static IEffect<Unit> OrderCreatedHandler(OrderCreated orderCreated, InstanceData<OrderProcessManagerData> state)
{
var effect = new PublishMessageEffect(new DoPayment());
return effect;
return PublishMessage(new DoPayment());
}

private static DoPayment OrderPaymentCreatedHandler(OrderPaymentCreated orderPaymentReceived, InstanceData<OrderProcessManagerData> state)
Expand All @@ -65,6 +76,4 @@ private static DoPayment OrderShippedHandler(OrderShipped orderShipped, Instance
return new DoPayment();
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System.Threading;
using System.Threading.Tasks;
using MediatR;
using NBB.Application.DataContracts;

namespace ProcessManagerSample.Queries
{
public class GetClientQuery : Query<Client>
{

}

public class GetClientQueryHandler : IRequestHandler<GetClientQuery, Client>
{
public async Task<Client> Handle(GetClientQuery request, CancellationToken cancellationToken)
{
return new Client("ion","vasile");
}
}

public class Client
{
public Client(string clientName, string clientCode)
{
ClientName = clientName;
ClientCode = clientCode;
}

public string ClientName { get; set; }
public string ClientCode { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ public class GetPartnerQueryHandler : IRequestHandler<GetPartnerQuery, Partner>
{
public async Task<Partner> Handle(GetPartnerQuery request, CancellationToken cancellationToken)
{
return new Partner();
return new Partner("ion","vasile");
}
}

public class Partner
{
public Partner(string partnerName, string partnerCode)
{
PartnerName = partnerName;
PartnerCode = partnerCode;
}

public string PartnerName { get; set; }
public string PartnerCode { get; set; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using MediatR;
using NBB.ProcessManager.Definition.Effects;

Expand Down Expand Up @@ -59,7 +60,7 @@ EffectFunc<TEvent, TData> IDefinition<TData>.GetEffectFunc<TEvent>()
.Where(x => x.EventType == typeof(TEvent))
.Select(x => x.EffectFunc)
.DefaultIfEmpty()
.Aggregate(EffectHelpers.Sequential);
.Aggregate(EffectFuncs.Sequential);

return (@event, data) => func?.Invoke((IEvent) @event, data) ?? NoEffect.Instance;
}
Expand Down Expand Up @@ -104,5 +105,18 @@ EventPredicate<TEvent, TData> IDefinition<TData>.GetCompletionPredicate<TEvent>(

return (@event, data) => func?.Invoke((IEvent) @event, data) ?? true;
}


public static IEffect<TResult> Http<TResult>(HttpRequestMessage message) => Effect.Http<TResult>(message);

public static IEffect<TResult> Query<TResult>(IRequest<TResult> query) => Effect.Query(query);

public static IEffect<Unit> PublishMessage(object message) => Effect.PublishMessage(message);

public static IEffect<Unit> RequestTimeout(string instanceId, TimeSpan timeSpan, object message, Type messageType)
=> Effect.RequestTimeout(instanceId, timeSpan, message, messageType);

public static IEffect<Unit> CancelTimeout(object instanceId) => Effect.CancelTimeout(instanceId);

}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using NBB.Core.Abstractions;
using MediatR;
using NBB.Core.Abstractions;
using NBB.ProcessManager.Definition.Effects;
using System;
using MediatR;

namespace NBB.ProcessManager.Definition.Builder
{
Expand Down Expand Up @@ -41,7 +41,7 @@ IEffect<Unit> newFunc(IEvent @event, InstanceData<TData> data)
return func((TEvent) @event, data);
}

EffectFunc = EffectHelpers.Sequential(EffectFunc, newFunc);
EffectFunc = EffectFuncs.Sequential(EffectFunc, newFunc);
}

public void AddSetStateHandler(SetStateFunc<TEvent, TData> func)
Expand Down Expand Up @@ -92,28 +92,6 @@ public void UseForCompletion(EventPredicate<TEvent, TData> predicate = null)
}
}

public static class EffectHelpers
{
public static EffectFunc<TEvent, TData> Aggregate<TEvent, TData>(EffectFunc<TEvent, TData> func1, EffectFunc<TEvent, TData> func2,
Func<IEffect<Unit>, IEffect<Unit>, IEffect<Unit>> accumulator)
where TData : struct
{
return (@event, data) =>
{
var ef1 = func1(@event, data);
var ef2 = func2(@event, data);
return accumulator(ef1, ef2);
};
}

public static EffectFunc<TEvent, TData> Sequential<TEvent, TData>(EffectFunc<TEvent, TData> func1, EffectFunc<TEvent, TData> func2)
where TData : struct
{
return Aggregate(func1, func2, (effect1, effect2) => new SequentialEffect(effect1, effect2));
}
}

public interface IEventActivitySet<TData>
where TData : struct
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public EventActivitySetBuilder<TEvent, TData> SendCommand<T>(Func<TEvent, Instan
Then((whenEvent, state) =>
{
var command = handler(whenEvent, state);
return new PublishMessageEffect(command);
return Effect.PublishMessage(command);
}, predicate);
return this;
}
Expand All @@ -52,7 +52,8 @@ public EventActivitySetBuilder<TEvent, TData> RequestTimeout<T>(TimeSpan timeSpa
EventPredicate<TEvent, TData> predicate = null)
where T : IEvent
{
Then((whenEvent, state) => new RequestTimeoutEffect(state.InstanceId.ToString(), timeSpan, messageFactory(whenEvent, state), typeof(T)), predicate);
Then((whenEvent, state) =>
Effect.RequestTimeout(state.InstanceId.ToString(), timeSpan, messageFactory(whenEvent, state), typeof(T)), predicate);
return this;
}

Expand All @@ -62,15 +63,15 @@ public EventActivitySetBuilder<TEvent, TData> PublishEvent<T>(Func<TEvent, Insta
Then((whenEvent, state) =>
{
var @event = handler(whenEvent, state);
return new PublishMessageEffect(@event);
return Effect.PublishMessage(@event);
}, predicate);
return this;
}

public void Complete(EventPredicate<TEvent, TData> predicate = null)
{
_eventActivitySet.UseForCompletion(predicate);
Then((whenEvent, state) => new CancelTimeoutsEffect(state.InstanceId), predicate);
Then((whenEvent, state) => Effect.CancelTimeout(state.InstanceId), predicate);
}
}
}

This file was deleted.

This file was deleted.

18 changes: 18 additions & 0 deletions src/Orchestration/NBB.ProcessManager.Definition/Effects/Effect.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using MediatR;
using System;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;

namespace NBB.ProcessManager.Definition.Effects
{
public class Effect<TResult> : IEffect<TResult>
{
public Func<IEffectRunner, Task<TResult>> Computation { get; }

public Effect(Func<IEffectRunner, Task<TResult>> computation)
{
Computation = computation;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace NBB.ProcessManager.Definition.Effects
{
public static class EffectExtensions
{
public static IEffect<TEffectResult2> ContinueWith<TEffectResult1, TEffectResult2>(this IEffect<TEffectResult1> effect,
Func<TEffectResult1, IEffect<TEffectResult2>> continuation)
{
return new Effect<TEffectResult2>(async runner =>
{
var r1 = await effect.Computation(runner);
return await continuation(r1).Computation(runner);
});
}

public static IEffect<TResult> Select<T, TResult>(this IEffect<T> effect, Func<T, TResult> selector)
{
return new Effect<TResult>(async runner =>
{
var r1 = await effect.Computation(runner);
return selector(r1);
});
}

public static IEffect<TResult> SelectMany<TSource1, TSource2, TResult>(this IEffect<TSource1> sourceEffect,
Func<TSource1, IEffect<TSource2>> monadicFn,
Func<TSource1, TSource2, TResult> resultSelector)
{
return sourceEffect.ContinueWith(source => monadicFn(source).Select(x => resultSelector(source, x)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using MediatR;
using System;

namespace NBB.ProcessManager.Definition.Effects
{

public static class EffectFuncs
{
public static EffectFunc<TEvent, TData> Aggregate<TEvent, TData>(EffectFunc<TEvent, TData> func1, EffectFunc<TEvent, TData> func2,
Func<IEffect<Unit>, IEffect<Unit>, IEffect<Unit>> accumulator)
where TData : struct
{
return (@event, data) =>
{
var ef1 = func1(@event, data);
var ef2 = func2(@event, data);
return accumulator(ef1, ef2);
};
}

public static EffectFunc<TEvent, TData> Sequential<TEvent, TData>(EffectFunc<TEvent, TData> func1, EffectFunc<TEvent, TData> func2)
where TData : struct
{
return Aggregate(func1, func2, (effect1, effect2) => Effect.Sequential(effect1, effect2));
}
}
}
Loading

0 comments on commit 3d16534

Please sign in to comment.