Skip to content

Commit

Permalink
✨ Creates REstate.Concurrency.Primitives
Browse files Browse the repository at this point in the history
- Adds the new project
- Updates the Sempahore example to use the library instead of manually crafting a machine
  • Loading branch information
Ovan Crone authored and Ovan Crone committed Feb 2, 2018
1 parent 89f0a0e commit ef97767
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 57 deletions.
9 changes: 8 additions & 1 deletion REstate.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.27130.2020
VisualStudioVersion = 15.0.27130.2026
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "REstate", "src\REstate\REstate.csproj", "{8B02FF53-A9F3-4004-8CDC-32831070957D}"
EndProject
Expand Down Expand Up @@ -33,6 +33,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "REstate.Engine.Repositories
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "REstate.Engine.Repositories.EntityFrameworkCore.Tests", "test\REstate.Engine.Repositories.EntityFrameworkCore.Tests\REstate.Engine.Repositories.EntityFrameworkCore.Tests.csproj", "{5F424F7B-4840-414D-9FA3-7700D81AD898}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "REstate.Concurrency.Primitives", "src\REstate.Concurrency.Primitives\REstate.Concurrency.Primitives.csproj", "{D699BD4C-9005-4475-A958-3A0AC2C6F575}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -83,6 +85,10 @@ Global
{5F424F7B-4840-414D-9FA3-7700D81AD898}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5F424F7B-4840-414D-9FA3-7700D81AD898}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5F424F7B-4840-414D-9FA3-7700D81AD898}.Release|Any CPU.Build.0 = Release|Any CPU
{D699BD4C-9005-4475-A958-3A0AC2C6F575}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D699BD4C-9005-4475-A958-3A0AC2C6F575}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D699BD4C-9005-4475-A958-3A0AC2C6F575}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D699BD4C-9005-4475-A958-3A0AC2C6F575}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -100,6 +106,7 @@ Global
{F15E390F-EFCF-4848-96C8-597BD75B7F9C} = {20BCC0A3-8FEB-4EFF-A175-22D1E3E4E233}
{3E0263FE-540E-401E-818A-CB51350D30C2} = {90E8D167-048D-467B-81D5-1007247226A3}
{5F424F7B-4840-414D-9FA3-7700D81AD898} = {88B4B54C-3DA1-40D2-8DCB-0181DD035955}
{D699BD4C-9005-4475-A958-3A0AC2C6F575} = {90E8D167-048D-467B-81D5-1007247226A3}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {0C35B35D-40C1-4521-BF0A-C5A6D7BFF4CD}
Expand Down
67 changes: 11 additions & 56 deletions src/Examples/Semaphore/Program.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Threading.Tasks;
using REstate;
using REstate.Engine;
using Serilog;

namespace Semaphore
Expand All @@ -12,15 +11,12 @@ private static async Task Main(string[] args)
{
Log.Logger =
new LoggerConfiguration()
.MinimumLevel.Verbose()
.WriteTo.Console()
.CreateLogger();
.MinimumLevel.Verbose()
.WriteTo.Console()
.CreateLogger();

var semaphoreSchematic = CreateSemaphoreSchematic();

var semaphore = await REstateHost.Agent
.GetStateEngine<int, int>()
.CreateMachineAsync(semaphoreSchematic);
var sempahore = await REstateHost.Agent
.CreateSemaphoreAsync(3);

var task1 = Task.Run(() => DoSomeProcessingAsync(60));
var task2 = Task.Run(() => DoSomeProcessingAsync(40));
Expand All @@ -30,7 +26,7 @@ private static async Task Main(string[] args)

await Task.WhenAll(task1, task2, task3, task4, task5);

Console.WriteLine("Done!");
Log.Logger.Information("Done!");
Console.ReadLine();

async Task DoSomeProcessingAsync(int workPeriodMs, int onFailedToGetSlotDelayMs = 5)
Expand All @@ -39,11 +35,10 @@ async Task DoSomeProcessingAsync(int workPeriodMs, int onFailedToGetSlotDelayMs
{
try
{
await semaphore.SendAsync(1);

await Task.Delay(workPeriodMs);

await semaphore.SendAsync(-1);
using (await sempahore.EnterAsync())
{
await Task.Delay(workPeriodMs);
}

break;
}
Expand All @@ -57,45 +52,5 @@ async Task DoSomeProcessingAsync(int workPeriodMs, int onFailedToGetSlotDelayMs
}
}
}

/// <summary>
/// Creates a Schematic that represents a semaphore with 3 slots
/// </summary>
/// <remarks>
/// The following is the Schematic in DOT Graph
/// <![CDATA[
/// digraph {
/// rankdir="LR"
/// "0" -> "1" [label= " 1 "];
/// "1" -> "0" [label=" -1 "];
/// "1" -> "2" [label= " 1 "];
/// "2" -> "1" [label=" -1 "];
/// "2" -> "3" [label= " 1 "];
/// "3" -> "2" [label=" -1 "];
/// }
/// ]]>
/// <image url="$(SolutionDir)\src\Examples\Semaphore\diagram_white.png" />
/// </remarks>
private static REstate.Schematics.Schematic<int, int> CreateSemaphoreSchematic() =>
REstateHost.Agent
.CreateSchematic<int, int>("3SlotSemaphore")
.WithStateConflictRetries()
.WithState(0, state => state
.AsInitialState()
.DescribedAs("No slots filled.")
.WithReentrance(-1))
.WithState(1, state => state
.DescribedAs("One slot filled.")
.WithTransitionFrom(0, 1)
.WithTransitionTo(0, -1))
.WithState(2, state => state
.DescribedAs("Two slots filled.")
.WithTransitionFrom(1, 1)
.WithTransitionTo(1, -1))
.WithState(3, state => state
.DescribedAs("Three slots filled.")
.WithTransitionFrom(2, 1)
.WithTransitionTo(2, -1))
.Build();
}
}
}
1 change: 1 addition & 0 deletions src/Examples/Semaphore/Semaphore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\REstate.Concurrency.Primitives\REstate.Concurrency.Primitives.csproj" />
<ProjectReference Include="..\..\REstate\REstate.csproj" />
</ItemGroup>

Expand Down
137 changes: 137 additions & 0 deletions src/REstate.Concurrency.Primitives/ConcurrencyPrimitiveExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using REstate.Concurrency.Primitives;
using REstate.Schematics;

// ReSharper disable once CheckNamespace
namespace REstate
{
public static class ConcurrencyPrimitiveExtensions
{
/// <summary>
/// Creates a new mutex (lock) using Machine state as the synchronization root.
/// </summary>
/// <param name="restateAgent">The agent to use to build the mutex</param>
/// <param name="retryLimit">The number of attempts to try obtaining a lock; if null then no limit</param>
/// <param name="cancellationToken">The cancellation token for building the Machine</param>
/// <returns>A new mutex</returns>
public static Task<IREstateMutex> CreateLockAsync(
this IAgent restateAgent,
int? retryLimit = null,
CancellationToken cancellationToken = default)
{
return CreateMutexAsync(restateAgent, retryLimit, cancellationToken);
}

/// <summary>
/// Creates a new mutex (lock) using Machine state as the synchronization root.
/// </summary>
/// <param name="restateAgent">The agent to use to build the mutex</param>
/// <param name="retryLimit">The number of attempts to try obtaining a lock; if null then no limit</param>
/// <param name="cancellationToken">The cancellation token for building the Machine</param>
/// <returns>A new mutex</returns>
public static async Task<IREstateMutex> CreateMutexAsync(
this IAgent restateAgent,
int? retryLimit = null,
CancellationToken cancellationToken = default)
{
var mutexSchematic = CreateSemaphoreSchematic(restateAgent, 1, retryLimit);

var mutexMachine = await restateAgent.GetStateEngine<int, int>()
.CreateMachineAsync(mutexSchematic, null, cancellationToken);

return new REstateSemaphore(mutexMachine);
}

/// <summary>
/// Creates a new semaphoroe using Machine state as the synchronization root.
/// </summary>
/// <param name="restateAgent">The agent to use to build the semaphore</param>
/// <param name="slots">The number or available slots in the sempahore</param>
/// <param name="retryLimit">The number of attempts to try obtaining a slot; if null then no limit</param>
/// <param name="cancellationToken">The cancellation token for building the Machine</param>
/// <returns>A new semaphore</returns>
public static async Task<IREstateSemaphore> CreateSemaphoreAsync(
this IAgent restateAgent,
int slots,
int? retryLimit = null,
CancellationToken cancellationToken = default)
{
if (slots < 1) throw new ArgumentOutOfRangeException(nameof(slots), "Sempahores must have at least one slot.");

var sempahoreSchematic = CreateSemaphoreSchematic(restateAgent, slots, retryLimit);

var sempahoreMachine = await restateAgent.GetStateEngine<int, int>()
.CreateMachineAsync(sempahoreSchematic, null, cancellationToken);

return new REstateSemaphore(sempahoreMachine);
}

/// <summary>
/// Creates a Schematic that represents a semaphore with n slots
/// </summary>
/// <remarks>
/// The following is the Schematic in DOT Graph
/// <![CDATA[
/// digraph {
/// rankdir="LR"
/// "0" -> "1" [label= " 1 "];
/// "1" -> "0" [label=" -1 "];
/// "1" -> "2" [label= " 1 "];
/// "2" -> "1" [label=" -1 "];
/// "2" -> "3" [label= " 1 "];
/// "3" -> "2" [label=" -1 "];
/// }
/// ]]>
/// <image url="$(SolutionDir)\src\REstate.Concurrency.Primitives\diagram_white.png" />
/// </remarks>
private static Schematic<int, int> CreateSemaphoreSchematic(IAgent restateAgent, int slots, int? retryLimit)
{
if (slots < 1) throw new ArgumentOutOfRangeException(nameof(slots), "Sempahores must have at least one slot.");

var mutexBuilder = restateAgent
.CreateSchematic<int, int>("REstateMutex");

if (retryLimit == null)
{
mutexBuilder.WithStateConflictRetries();
}
else if (retryLimit.Value > 0)
{
mutexBuilder.WithStateConflictRetries(retryLimit.Value);
}
else
{
// No retries, so just don't set it.
}

mutexBuilder
.WithState(0, state => state
.AsInitialState()
.WithReentrance(-1))
.WithState(1, state => state
.WithTransitionFrom(0, 1)
.WithTransitionTo(0, -1));

if (slots == 1)
return mutexBuilder.Build();

var sempahoreBuilder = mutexBuilder;

foreach (var slot in Enumerable.Range(start: 2, count: slots - 1))
{
sempahoreBuilder.WithState(slot, state => state
.WithTransitionFrom(slot - 1, 1)
.WithTransitionTo(slot - 1, -1));
}

var semaphore = sempahoreBuilder.Build();

semaphore.SchematicName = $"REstateSemaphoreOf{slots}";

return semaphore;
}
}
}
11 changes: 11 additions & 0 deletions src/REstate.Concurrency.Primitives/IREstateMutex.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace REstate.Concurrency.Primitives
{
public interface IREstateMutex
{
Task<IDisposable> EnterAsync(CancellationToken cancellationToken = default);
}
}
7 changes: 7 additions & 0 deletions src/REstate.Concurrency.Primitives/IREstateSempahore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace REstate.Concurrency.Primitives
{
public interface IREstateSemaphore
: IREstateMutex
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard1.3;net45</TargetFrameworks>
<Authors>Ovan Crone</Authors>
<Company>Psibernetic Solutions</Company>
<Description>Concurrency primitives such as Semaphores and mutexes build on REstate Machines.</Description>
<Copyright>Ovan Crone 2016</Copyright>
<PackageLicenseUrl>https://opensource.org/licenses/MIT</PackageLicenseUrl>
<Version>3.0.0</Version>
<AssemblyVersion>3.0.0.0</AssemblyVersion>
<FileVersion>3.0.0.0</FileVersion>
<LangVersion>latest</LangVersion>
<PackageProjectUrl>https://github.com/psibr/REstate.Engine</PackageProjectUrl>
<RepositoryUrl>https://github.com/psibr/REstate.Engine</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<PackageTags>REstate</PackageTags>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\REstate\REstate.csproj" />
</ItemGroup>

</Project>
56 changes: 56 additions & 0 deletions src/REstate.Concurrency.Primitives/REstateSemaphore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using REstate.Engine;

namespace REstate.Concurrency.Primitives
{
public class REstateSemaphore
: IREstateSemaphore
{
public REstateSemaphore(IStateMachine<int, int> semaphoreMachine)
{
SemaphoreMachine = semaphoreMachine;
}

private IStateMachine<int, int> SemaphoreMachine { get; }

public async Task<IDisposable> EnterAsync(CancellationToken cancellationToken = default)
{
var criticalSection = new CriticalSection(SemaphoreMachine, cancellationToken);

await criticalSection.EnterAsync();

return criticalSection;
}

private class CriticalSection
: IDisposable
{
public CriticalSection(
IStateMachine<int, int> sempahoreStateMachine,
CancellationToken cancellationToken = default)
{
SempahoreStateMachine = sempahoreStateMachine;
CancellationToken = cancellationToken;
}

public async Task EnterAsync()
{
await SempahoreStateMachine.SendAsync(1, CancellationToken);
}

private IStateMachine<int, int> SempahoreStateMachine { get; }
private CancellationToken CancellationToken { get; }

public void Dispose()
{
var mre = new ManualResetEventSlim();

SempahoreStateMachine.SendAsync(-1, CancellationToken).GetAwaiter().OnCompleted(() => mre.Set());

mre.Wait(CancellationToken.None);
}
}
}
}
File renamed without changes
File renamed without changes
File renamed without changes
File renamed without changes

0 comments on commit ef97767

Please sign in to comment.