Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UF-2022 Add back fill acceptance tests #28

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,81 @@ public async Task Events_Are_Ordered_By_Insert_Order_And_Ignoring_Timestamp()
Assert.Empty(thirdPageResult);
}

[Fact]
public async Task Back_Fill_OutBox()
{
// Existing events - before back fill and changefeed is started
var timestamp = DateTimeOffset.Now.AddDays(-1);
var oldEventSourceEntry1 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "0", Timestamp = timestamp };
var oldEventSourceEntry2 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "1", Timestamp = timestamp };
await using var oldEventsTransaction = _connection.BeginTransaction();
await InsertIntoEventSource(oldEventSourceEntry1, _connection, oldEventsTransaction);
await InsertIntoEventSource(oldEventSourceEntry2, _connection, oldEventsTransaction);
await oldEventsTransaction.CommitAsync();

// Starting to back fill outbox
await using var backFillTransaction = _connection.BeginTransaction();
await InsertIntoOutbox(oldEventSourceEntry1, _connection, backFillTransaction);
await InsertIntoOutbox(oldEventSourceEntry2, _connection, backFillTransaction);
await backFillTransaction.CommitAsync();

// Starting changefeed - Simulate live events at the same time as back filling is running
timestamp = DateTimeOffset.Now;
var liveEvent1 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "2", Timestamp = timestamp };
var liveEvent2 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "3", Timestamp = timestamp };
await InsertIntoDatabaseInTransaction(liveEvent1, _connection);
await InsertIntoDatabaseInTransaction(liveEvent2, _connection);

// Continue to back fill outbox after changefeed is started
await using var continueBackFillTransaction = _connection.BeginTransaction();
await InsertIntoOutbox(liveEvent1, _connection, continueBackFillTransaction);
await InsertIntoOutbox(liveEvent2, _connection, continueBackFillTransaction);
await continueBackFillTransaction.CommitAsync();

// Running changefeed - Back filling is stopped
timestamp = DateTimeOffset.Now;
var liveEvent3 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "4", Timestamp = timestamp };
var liveEvent4 = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "5", Timestamp = timestamp };
await InsertIntoDatabaseInTransaction(liveEvent3, _connection);
await InsertIntoDatabaseInTransaction(liveEvent4, _connection);

// First read determines the order, so wait until back filling is done and stopped
// There should be a small overlap on back filling and live events processing to ensure all
// events are added to the outbox
var result = await ReadFeed(_startCursor, 100, _connection);

var feedResult = result.ToList();
Assert.Equal(6, feedResult.Count);
Assert.Equal("0", feedResult[0].Data);
Assert.Equal("1", feedResult[1].Data);
Assert.Equal("2", feedResult[2].Data);
Assert.Equal("3", feedResult[3].Data);
Assert.Equal("4", feedResult[4].Data);
Assert.Equal("5", feedResult[5].Data);
}

[Fact]
public async Task Ignore_Inserting_Into_Outbox_If_It_Already_Exists()
{
var eventSourceDto = new EventSourceDto { AggregateId = Guid.NewGuid(), Sequence = 0, Data = "0", Timestamp = DateTimeOffset.Now };
await InsertIntoDatabaseInTransaction(eventSourceDto, _connection);

await using var transaction = _connection.BeginTransaction();
await InsertIntoOutbox(eventSourceDto, _connection, transaction);
await transaction.CommitAsync();

var result = await ReadFeed(_startCursor, 10, _connection);

Assert.NotNull(result);
var feedResult = result.ToList();
Assert.Single(feedResult);
Assert.Equal(eventSourceDto.AggregateId, feedResult.Single().AggregateId);
Assert.Equal(eventSourceDto.Sequence, feedResult.Single().Sequence);
Assert.Equal(eventSourceDto.Data, feedResult.Single().Data);
Assert.Equal(eventSourceDto.Timestamp, feedResult.Single().Timestamp);
Assert.NotEqual(_startCursor, feedResult.Single().Ulid);
}

private static async Task InsertIntoDatabaseInTransaction(
EventSourceDto eventSourceDto,
SqlConnection connection)
Expand All @@ -115,8 +190,16 @@ private static async Task InsertIntoOutbox(
{
const string insertIntoOutboxStatement =
"""
INSERT INTO [changefeed].[outbox:dbo.EventSource] (shard_id, time_hint, AggregateId, Sequence)
VALUES (0, @TimeHint, @AggregateId, @Sequence);
IF NOT EXISTS (
SELECT 1
FROM [changefeed].[outbox:dbo.EventSource]
WHERE
AggregateId = @AggregateId
AND Sequence = @Sequence)
BEGIN
INSERT INTO [changefeed].[outbox:dbo.EventSource] (shard_id, time_hint, AggregateId, Sequence)
VALUES (0, @TimeHint, @AggregateId, @Sequence);
END;
""";

var results = await connection.ExecuteAsync(
Expand Down
Loading