Skip to content

Commit

Permalink
Account for nullable events (#324)
Browse files Browse the repository at this point in the history
* Account for nullable events

* Fix json vulnerability

* Upgrade deprecated gh actions

* Fix tests

* Remove susbcribe_to_stream test
  • Loading branch information
w1am authored Nov 1, 2024
1 parent e37e8ec commit 699470d
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// ReSharper disable ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract

using System.Diagnostics;
using EventStore.Diagnostics;
using EventStore.Diagnostics.Telemetry;
Expand Down Expand Up @@ -32,7 +34,7 @@ public static void TraceSubscriptionEvent(
EventStoreClientSettings settings,
UserCredentials? userCredentials
) {
if (source.HasNoActiveListeners())
if (source.HasNoActiveListeners() || resolvedEvent.Event is null)
return;

var parentContext = resolvedEvent.Event.Metadata.ExtractPropagationContext();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// ReSharper disable ConditionalAccessQualifierIsNonNullableAccordingToAPIContract

using EventStore.Client.Diagnostics;
using EventStore.Diagnostics.Tracing;

Expand Down Expand Up @@ -171,4 +173,49 @@ async Task Subscribe(IAsyncEnumerator<StreamMessage> internalEnumerator) {
}
}
}

[Fact]
[Trait("Category", "Special cases")]
public async Task should_not_trace_when_event_is_null() {
var category = Guid.NewGuid().ToString("N");
var streamName = category + "-123";

var seedEvents = Fixture.CreateTestEvents(type: $"{category}-{Fixture.GetStreamName()}").ToArray();
await Fixture.Streams.AppendToStreamAsync(streamName, StreamState.NoStream, seedEvents);

await Fixture.Streams.DeleteAsync(streamName, StreamState.StreamExists);

await using var subscription = Fixture.Streams.SubscribeToStream("$ce-" + category, FromStream.Start, resolveLinkTos: true);

await using var enumerator = subscription.Messages.GetAsyncEnumerator();

Assert.True(await enumerator.MoveNextAsync());

Assert.IsType<StreamMessage.SubscriptionConfirmation>(enumerator.Current);

await Subscribe().WithTimeout();

var appendActivities = Fixture
.GetActivitiesForOperation(TracingConstants.Operations.Append, streamName)
.ShouldNotBeNull();

var subscribeActivities = Fixture
.GetActivitiesForOperation(TracingConstants.Operations.Subscribe, "$ce-" + category)
.ToArray();

appendActivities.ShouldHaveSingleItem();
subscribeActivities.ShouldBeEmpty();

return;

async Task Subscribe() {
while (await enumerator.MoveNextAsync()) {
if (enumerator.Current is not StreamMessage.Event(var resolvedEvent))
continue;

if (resolvedEvent.Event?.EventType is "$metadata")
return;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace EventStore.Client.Tests;
public class DiagnosticsFixture : EventStoreFixture {
readonly ConcurrentDictionary<(string Operation, string Stream), List<Activity>> _activities = [];

public DiagnosticsFixture() {
public DiagnosticsFixture() : base(x => x.RunProjections()) {
var diagnosticActivityListener = new ActivityListener {
ShouldListenTo = source => source.Name == EventStoreClientDiagnostics.InstrumentationName,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllDataAndRecorded,
Expand Down

0 comments on commit 699470d

Please sign in to comment.