Skip to content

Commit

Permalink
feat: checkpoints can be created within a branch (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seddryck authored Sep 21, 2024
1 parent 71a9ad0 commit e6e9461
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 48 deletions.
10 changes: 8 additions & 2 deletions Streamistry.Core/Fluent/BasePipeBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,19 @@ public Pipeline Build()
return Instance!.Pipe.Pipeline!;
}

public BasePipeBuilder<TOutput> Checkpoint(out IChainablePort<TOutput> port)
{
port = BuildPipeElement();
return this;
}

public SinkBuilder<TOutput> Sink()
=> new(this);

public FilterBuilder<TOutput> Filter(Func<TOutput?, bool>? function)
=> new(this, function);
public MapperBuilder<TOutput, TNext> Map<TNext>(Func<TOutput?, TNext?>? function)
=> new(this, function);
public FilterBuilder<TOutput> Filter(Func<TOutput?, bool>? function)
=> new(this, function);
public PluckerBuilder<TOutput, TNext> Pluck<TNext>(Expression<Func<TOutput, TNext?>> expr)
=> new(this, expr);
public SplitterBuilder<TOutput, TNext> Split<TNext>(Func<TOutput?, TNext[]?>? function)
Expand Down
6 changes: 0 additions & 6 deletions Streamistry.Core/Fluent/CombinatorBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,4 @@ public abstract class BaseCombinatorBuilder<TOutput> : BasePipeBuilder<TOutput>
public BaseCombinatorBuilder(IBuilder<IChainablePort[]> upstream)
: base()
=> (Upstream) = (upstream);

public BasePipeBuilder<TOutput> Checkpoint(out IChainablePort<TOutput> port)
{
port = BuildPipeElement();
return this;
}
}
6 changes: 0 additions & 6 deletions Streamistry.Core/Fluent/PipeElementBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,4 @@ public abstract class PipeElementBuilder<TInput, TOutput> : BasePipeBuilder<TOut

public PipeElementBuilder(IPipeBuilder<TInput> upstream)
=> Upstream = upstream;

public PipeElementBuilder<TInput, TOutput> Checkpoint(out IChainablePort<TOutput> port)
{
port = BuildPipeElement();
return this;
}
}
6 changes: 3 additions & 3 deletions Streamistry.Core/Fluent/PipelineBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
using System.Threading.Tasks;

namespace Streamistry.Fluent;
internal class PipelineBuilder<T> : IBuilder<Pipeline>
public class PipelineBuilder : IBuilder<Pipeline>
{
protected Pipeline? Instance { get; set; }

public SourceBuilder<T> Source(IEnumerable<T> enumeration)
public SourceBuilder<T> Source<T>(IEnumerable<T> enumeration)
=> new (this, enumeration);

public Pipeline BuildPipeElement()
=> Instance ??= OnBuildPipeElement();

public Pipeline OnBuildPipeElement()
=> new Pipeline();
=> new();
}
2 changes: 1 addition & 1 deletion Streamistry.Core/Fluent/SourceBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using Streamistry.Pipes.Sources;

namespace Streamistry.Fluent;
internal class SourceBuilder<TOutput> : BasePipeBuilder<TOutput>
public class SourceBuilder<TOutput> : BasePipeBuilder<TOutput>
{
protected IBuilder<Pipeline> Upstream { get; }

Expand Down
60 changes: 30 additions & 30 deletions Streamistry.Testing/Fluent/PipelineBuilderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ public class PipelineBuilderTests
[Test]
public void Build_EmptyPipeline_Pipeline()
{
var pipeline = new PipelineBuilder<int>().BuildPipeElement();
var pipeline = new PipelineBuilder().BuildPipeElement();
Assert.That(pipeline, Is.Not.Null);
Assert.That(pipeline, Is.TypeOf<Pipeline>());
}

[Test]
public void Build_SourceWithinPipeline_Pipeline()
{
var pipeline = new PipelineBuilder<int>()
var pipeline = new PipelineBuilder()
.Source([1, 2, 3])
.Build();

Expand All @@ -33,7 +33,7 @@ public void Build_SourceWithinPipeline_Pipeline()
[Test]
public void Build_SourceThenPipeWithinPipeline_Pipeline()
{
var pipeline = new PipelineBuilder<int>()
var pipeline = new PipelineBuilder()
.Source([1, 2, 3])
.Filter(x => x % 2 != 0)
.Build();
Expand All @@ -45,7 +45,7 @@ public void Build_SourceThenPipeWithinPipeline_Pipeline()
[Test]
public void Build_SourceThenSinkWithinPipeline_Pipeline()
{
var pipeline = new PipelineBuilder<int>()
var pipeline = new PipelineBuilder()
.Source([1, 2, 3])
.Sink().InMemory()
.Build();
Expand All @@ -57,7 +57,7 @@ public void Build_SourceThenSinkWithinPipeline_Pipeline()
[Test]
public void Build_FilterCheckpoint_Success()
{
var pipeline = new PipelineBuilder<int>()
var pipeline = new PipelineBuilder()
.Source([1, 2, 3])
.Filter(x => x % 2 != 0).Checkpoint(out var filter)
.Build();
Expand All @@ -72,7 +72,7 @@ public void Build_FilterCheckpoint_Success()
[Test]
public void Build_PluckerCheckpoint_Success()
{
var pipeline = new PipelineBuilder<int>()
var pipeline = new PipelineBuilder()
.Source([1, 2, 3])
.Map(x => new DateOnly(2024, x, 1))
.Pluck(x => x.Month).Checkpoint(out var plucker)
Expand All @@ -93,7 +93,7 @@ public void Build_PluckerCheckpoint_Success()
[Test]
public void Build_SplitterCheckpoint_Success()
{
var pipeline = new PipelineBuilder<string>()
var pipeline = new PipelineBuilder()
.Source(["1-2-3", "4-5"])
.Split(x => x?.Split('-') ?? []).Checkpoint(out var splitter)
.Build();
Expand All @@ -112,7 +112,7 @@ public void Build_SplitterCheckpoint_Success()
[Test]
public void Build_AggregateMaxCheckpoint_Success()
{
var pipeline = new PipelineBuilder<int>()
var pipeline = new PipelineBuilder()
.Source([1, 3, 2])
.Aggregate().AsMax().Checkpoint(out var aggr)
.Build();
Expand All @@ -127,7 +127,7 @@ public void Build_AggregateMaxCheckpoint_Success()
[Test]
public void Build_AggregateMinCheckpoint_Success()
{
var pipeline = new PipelineBuilder<int>()
var pipeline = new PipelineBuilder()
.Source([1, 3, 2])
.Aggregate().AsMin().Checkpoint(out var aggr)
.Build();
Expand All @@ -142,7 +142,7 @@ public void Build_AggregateMinCheckpoint_Success()
[Test]
public void Build_AggregateAverageCheckpoint_Success()
{
var pipeline = new PipelineBuilder<int>()
var pipeline = new PipelineBuilder()
.Source([1, 3, 2])
.Aggregate().AsAverage().Checkpoint(out var aggr)
.Build();
Expand All @@ -157,7 +157,7 @@ public void Build_AggregateAverageCheckpoint_Success()
[Test]
public void Build_AggregateMedianCheckpoint_Success()
{
var pipeline = new PipelineBuilder<int>()
var pipeline = new PipelineBuilder()
.Source([1, 3, 2])
.Aggregate().AsMedian().Checkpoint(out var aggr)
.Build();
Expand All @@ -172,7 +172,7 @@ public void Build_AggregateMedianCheckpoint_Success()
[Test]
public void Build_AggregateSumCheckpoint_Success()
{
var pipeline = new PipelineBuilder<int>()
var pipeline = new PipelineBuilder()
.Source([1, 3, 2])
.Aggregate().AsSum().Checkpoint(out var aggr)
.Build();
Expand All @@ -187,7 +187,7 @@ public void Build_AggregateSumCheckpoint_Success()
[Test]
public void Build_AggregateCountCheckpoint_Success()
{
var pipeline = new PipelineBuilder<int>()
var pipeline = new PipelineBuilder()
.Source([1, 3, 2])
.Aggregate().AsCount().Checkpoint(out var aggr)
.Build();
Expand All @@ -202,7 +202,7 @@ public void Build_AggregateCountCheckpoint_Success()
[Test]
public void Build_AggregateUniversalCheckpoint_Success()
{
var pipeline = new PipelineBuilder<string>()
var pipeline = new PipelineBuilder()
.Source(["f", "oo", "Bar"])
.Aggregate((x, y) => x + y).Checkpoint(out var aggr)
.Build();
Expand All @@ -217,7 +217,7 @@ public void Build_AggregateUniversalCheckpoint_Success()
[Test]
public void Build_AggregateUniversal2Checkpoint_Success()
{
var pipeline = new PipelineBuilder<string>()
var pipeline = new PipelineBuilder()
.Source(["f", "oo", "Bar"])
.Aggregate<int>((x, y) => x + (string.IsNullOrEmpty(y) ? 0 : y!.Length)).Checkpoint(out var aggr)
.Build();
Expand All @@ -233,7 +233,7 @@ public void Build_AggregateUniversal2Checkpoint_Success()
[Test]
public void Build_AggregateUniversal3Checkpoint_Success()
{
var pipeline = new PipelineBuilder<string>()
var pipeline = new PipelineBuilder()
.Source(["f", "oo", "Bar"])
.Aggregate<(int, int), bool>((x, y) => char.IsUpper(y![0]) ? (x.Item1++, x.Item2) : (x.Item1, x.Item2++))
.WithSelector(x => x.Item1 > x.Item2)
Expand All @@ -250,7 +250,7 @@ public void Build_AggregateUniversal3Checkpoint_Success()
[Test]
public void Build_AggregateUniversal4Checkpoint_Success()
{
var pipeline = new PipelineBuilder<string>()
var pipeline = new PipelineBuilder()
.Source(["f", "oo", "Bar"])
.Aggregate<(int Upper, int Lower), bool>((x, y) => char.IsUpper(y![0]) ? (x.Upper++, x.Lower) : (x.Upper, x.Lower++))
.WithSelector(x => x.Upper > x.Lower)
Expand All @@ -268,7 +268,7 @@ public void Build_AggregateUniversal4Checkpoint_Success()
[Test]
public void Build_ParserDateCheckpoint_Success()
{
var pipeline = new PipelineBuilder<string>()
var pipeline = new PipelineBuilder()
.Source(["2024-09-14", "2024-09-15", "2024-45-78"])
.Parse()
.AsDate()
Expand All @@ -286,7 +286,7 @@ public void Build_ParserDateCheckpoint_Success()
[Test]
public void Build_ParserDateTimeCheckpoint_Success()
{
var pipeline = new PipelineBuilder<string>()
var pipeline = new PipelineBuilder()
.Source(["2024-09-14 11:12:20", "2024-09-15 17:12:16", "2024-45-78"])
.Parse()
.AsDateTime()
Expand All @@ -306,7 +306,7 @@ public void Build_ParserDateTimeCheckpoint_Success()
[Test]
public void Build_ParserRomanFiguresCheckpoint_Success()
{
var pipeline = new PipelineBuilder<char>()
var pipeline = new PipelineBuilder()
.Source(['I', 'X', 'Z'])
.Parse((char x, out int y) =>
{
Expand All @@ -333,7 +333,7 @@ public void Build_ParserRomanFiguresCheckpoint_Success()
[Test]
public void Build_ComplexTryOnlyMainCheckpoint_Success()
{
var pipeline = new PipelineBuilder<string>()
var pipeline = new PipelineBuilder()
.Source(["2024-09-14", "2024-09-15", "2024-45-78"])
.Parse()
.AsDate()
Expand All @@ -355,7 +355,7 @@ public void Build_ComplexTryOnlyMainCheckpoint_Success()
[Test]
public void Build_CombineTwoUpstreamsCheckpoint_Success()
{
var pipeline = new PipelineBuilder<string>()
var pipeline = new PipelineBuilder()
.Source(["2024-09-14", "2024-09-15", "2024-45-78"])
.Parse()
.AsDate()
Expand All @@ -376,7 +376,7 @@ public void Build_CombineTwoUpstreamsCheckpoint_Success()
[Test]
public void Build_CombineThreeUpstreamsCheckpoint_Success()
{
var pipeline = new PipelineBuilder<string>()
var pipeline = new PipelineBuilder()
.Source(["2024-09-14", "2024-09-15", "2024-45-78"])
.Parse()
.AsDate()
Expand All @@ -398,7 +398,7 @@ public void Build_CombineThreeUpstreamsCheckpoint_Success()
[Test]
public void Build_InBranchCheckpoint_Success()
{
var pipeline = new PipelineBuilder<string>()
var pipeline = new PipelineBuilder()
.Source(["2024-09-14", "2024-09-15", "2024-45-78"])
.Parse()
.AsDate()
Expand All @@ -419,7 +419,7 @@ public void Build_InBranchCheckpoint_Success()
[Test]
public void Build_InBranchCheckpointForAllPorts_Success()
{
var pipeline = new PipelineBuilder<string>()
var pipeline = new PipelineBuilder()
.Source(["2024-09-14", "2024-09-15", "2024-45-78"])
.Parse()
.AsDate()
Expand All @@ -441,7 +441,7 @@ public void Build_InBranchCheckpointForAllPorts_Success()
[Test]
public void Build_InBranchCheckpointWithDiscardedPorts_Success()
{
var pipeline = new PipelineBuilder<string>()
var pipeline = new PipelineBuilder()
.Source(["2024-09-14", "2024-09-15", "2024-45-78"])
.Parse()
.AsDate()
Expand All @@ -462,7 +462,7 @@ public void Build_InBranchCheckpointWithDiscardedPorts_Success()
[Test]
public void Build_InBranchCheckpointForAllPortsAllAsserted_Success()
{
var pipeline = new PipelineBuilder<string>()
var pipeline = new PipelineBuilder()
.Source(["2024-09-14", "2024-09-15", "2024-45-78"])
.Parse()
.AsDate()
Expand All @@ -486,7 +486,7 @@ public void Build_InBranchCheckpointForAllPortsAllAsserted_Success()
[Test]
public void Build_InBranchCheckpointForAllPortsTypedAllAsserted_Success()
{
var pipeline = new PipelineBuilder<string>()
var pipeline = new PipelineBuilder()
.Source(["2024-09-14", "2024-09-15", "2024-45-78"])
.Parse()
.AsDate()
Expand All @@ -510,7 +510,7 @@ public void Build_InBranchCheckpointForAllPortsTypedAllAsserted_Success()
[Test]
public void Build_InBranchOfBranchCheckpointForAllPortsTypedAllAsserted_Success()
{
var pipeline = new PipelineBuilder<string>()
var pipeline = new PipelineBuilder()
.Source(["2024-09-14", "2024-09-15", "2024-45-78"])
.Parse()
.AsDate()
Expand All @@ -537,7 +537,7 @@ public void Build_InBranchOfBranchCheckpointForAllPortsTypedAllAsserted_Success(
[Test]
public void Build_CombineFiveUpstreamsCheckpoint_Success()
{
var pipeline = new PipelineBuilder<int>()
var pipeline = new PipelineBuilder()
.Source([1, 2, 3])
.Branch(
stream1 => stream1.Map(x => x += 1)
Expand Down

0 comments on commit e6e9461

Please sign in to comment.