Skip to content

Commit

Permalink
Add support for listening to global events (#1110)
Browse files Browse the repository at this point in the history
  • Loading branch information
adelinag08 authored Jul 8, 2024
1 parent 75849b9 commit a2589e1
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 0 deletions.
54 changes: 54 additions & 0 deletions Minio.Examples/Cases/ListenNotifications.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* MinIO .NET Library for Amazon S3 Compatible Cloud Storage, (C) 2024 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using Minio.DataModel.Args;
using Minio.DataModel.Notification;

namespace Minio.Examples.Cases;

internal static class ListenNotifications
{
// Listen for gloabal notifications (a Minio-only extension)
public static void Run(IMinioClient minio,
List<EventType> events = null)
{
IDisposable subscription = null;
try
{
Console.WriteLine("Running example for API: ListenNotifications");
Console.WriteLine();
events ??= new List<EventType> { EventType.BucketCreatedAll };
var args = new ListenBucketNotificationsArgs().WithEvents(events);
var observable = minio.ListenNotificationsAsync(events);

subscription = observable.Subscribe(
notification => Console.WriteLine($"Notification: {notification.Json}"),
ex => Console.WriteLine($"OnError: {ex}"),
() => Console.WriteLine("Stopped listening for bucket notifications\n"));

Console.WriteLine("Press any key to stop listening for notifications...");
Console.ReadLine();
}
catch (Exception e)
{
Console.WriteLine($"[Bucket] Exception: {e}");
}
finally
{
subscription?.Dispose();
}
}
}
3 changes: 3 additions & 0 deletions Minio.Examples/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ public static async Task Main()
// Start listening for bucket notifications
ListenBucketNotifications.Run(minioClient, bucketName, new List<EventType> { EventType.ObjectCreatedAll });

// Start listening for global notifications
ListenNotifications.Run(minioClient, new List<EventType> { EventType.BucketCreatedAll });

// Put an object to the new bucket
await PutObject.Run(minioClient, bucketName, objectName, smallFileName, progress).ConfigureAwait(false);

Expand Down
95 changes: 95 additions & 0 deletions Minio.Functional.Tests/FunctionalTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public static class FunctionalTest
private const string listenBucketNotificationsSignature =
"IObservable<MinioNotificationRaw> ListenBucketNotificationsAsync(ListenBucketNotificationsArgs args, CancellationToken cancellationToken = default(CancellationToken))";

private const string listenNotificationsSignature =
"IObservable<MinioNotificationRaw> ListenNotificationsAsync(IList<EventType> events, CancellationToken cancellationToken = default(CancellationToken))";

private const string copyObjectSignature =
"Task<CopyObjectResult> CopyObjectAsync(CopyObjectArgs args, CancellationToken cancellationToken = default(CancellationToken))";

Expand Down Expand Up @@ -1034,6 +1037,19 @@ internal static async Task<ObjectStat> PutObject_Tester(IMinioClient minio,
return statObject;
}

internal static async Task<bool> CreateBucket_Tester(IMinioClient minio, string bucketName)
{
// Create a new bucket
await minio.MakeBucketAsync(new MakeBucketArgs().WithBucket(bucketName)).ConfigureAwait(false);

// Verify the bucket exists
var bucketExists = await minio.BucketExistsAsync(new BucketExistsArgs().WithBucket(bucketName))
.ConfigureAwait(false);
Assert.IsTrue(bucketExists, $"Bucket {bucketName} was not created successfully.");

return bucketExists;
}

internal static async Task StatObject_Test1(IMinioClient minio)
{
var startTime = DateTime.Now;
Expand Down Expand Up @@ -2621,6 +2637,84 @@ await ListObjects_Test(minio, bucketName, singleObjectName, 1, headers: extractH
}
}

#region Global Notifications

internal static async Task ListenNotificationsAsync_Test1(IMinioClient minio)
{
var startTime = DateTime.Now;
var bucketName = GetRandomName(15);
var args = new Dictionary<string, string>
(StringComparer.Ordinal) { { "bucketName", bucketName } };
try
{
var received = new List<MinioNotificationRaw>();

var eventsList = new List<EventType> { EventType.BucketCreatedAll };

var events = minio.ListenNotificationsAsync(eventsList);
var subscription = events.Subscribe(
received.Add,
ex => Console.WriteLine($"OnError: {ex}"),
() => Console.WriteLine("Stopped listening for bucket notifications\n"));

// Ensure the subscription is established
await Task.Delay(1000).ConfigureAwait(false);

// Trigger the event by creating a new bucket
var isBucketCreated1 = await CreateBucket_Tester(minio, bucketName).ConfigureAwait(false);

var eventDetected = false;
for (var attempt = 0; attempt < 20; attempt++)
{
if (received.Count > 0)
{
var notification = JsonSerializer.Deserialize<MinioNotification>(received[0].Json);

if (notification.Records is not null)
{
Assert.AreEqual(1, notification.Records.Count);
eventDetected = true;
break;
}
}

await Task.Delay(500).ConfigureAwait(false); // Delay between attempts
}

subscription.Dispose();
if (!eventDetected)
throw new UnexpectedMinioException("Failed to detect the expected bucket notification event.");

new MintLogger(nameof(ListenNotificationsAsync_Test1),
listenNotificationsSignature,
"Tests whether ListenNotifications passes",
TestStatus.PASS, DateTime.Now - startTime, args: args).Log();
}
catch (NotImplementedException ex)
{
new MintLogger(nameof(ListenNotificationsAsync_Test1),
listenNotificationsSignature,
"Tests whether ListenNotifications passes",
TestStatus.NA, DateTime.Now - startTime, ex.Message,
ex.ToString(), args: args).Log();
}
catch (Exception ex)
{
new MintLogger(nameof(ListenNotificationsAsync_Test1),
listenNotificationsSignature,
"Tests whether ListenNotifications passes",
TestStatus.FAIL, DateTime.Now - startTime, ex.Message,
ex.ToString(), args: args).Log();
throw;
}
finally
{
await TearDown(minio, bucketName).ConfigureAwait(false);
}
}

#endregion

#region Bucket Notifications

internal static async Task ListenBucketNotificationsAsync_Test1(IMinioClient minio)
Expand Down Expand Up @@ -2655,6 +2749,7 @@ internal static async Task ListenBucketNotificationsAsync_Test1(IMinioClient min
() => { }
);


_ = await PutObject_Tester(minio, bucketName, objectName, null, contentType,
0, null, rsg.GenerateStreamFromSeed(1 * KB)).ConfigureAwait(false);

Expand Down
3 changes: 3 additions & 0 deletions Minio.Functional.Tests/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ public static async Task Main(string[] args)

ConcurrentBag<Task> functionalTestTasks = new();

// Global Notification
await FunctionalTest.ListenNotificationsAsync_Test1(minioClient).ConfigureAwait(false);

// Try catch as 'finally' section needs to run in the Functional Tests
// Bucket notification is a minio specific feature.
// If the following test is run against AWS, then the SDK throws
Expand Down
13 changes: 13 additions & 0 deletions Minio/ApiEndpoints/BucketOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,19 @@ await this.ExecuteTaskAsync(ResponseErrorHandlers, requestMessageBuilder,
.ConfigureAwait(false);
}

/// <summary>
/// Subscribes to global change notifications (a Minio-only extension)
/// </summary>
/// <param name="events">Events to listen for</param>
/// <param name="cancellationToken">Optional cancellation token to cancel the operation</param>
/// <returns>An observable of JSON-based notification events</returns>
public IObservable<MinioNotificationRaw> ListenNotificationsAsync(IList<EventType> events,
CancellationToken cancellationToken = default)
{
var args = new ListenBucketNotificationsArgs().WithEvents(events);
return ListenBucketNotificationsAsync(args, cancellationToken);
}

/// <summary>
/// Subscribes to bucket change notifications (a Minio-only extension)
/// </summary>
Expand Down
3 changes: 3 additions & 0 deletions Minio/ApiEndpoints/IBucketOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ Task<ReplicationConfiguration> GetBucketReplicationAsync(GetBucketReplicationArg

Task<string> GetPolicyAsync(GetPolicyArgs args, CancellationToken cancellationToken = default);

IObservable<MinioNotificationRaw> ListenNotificationsAsync(IList<EventType> events,
CancellationToken cancellationToken = default);

IObservable<MinioNotificationRaw> ListenBucketNotificationsAsync(string bucketName, IList<EventType> events,
string prefix = "", string suffix = "", CancellationToken cancellationToken = default);

Expand Down
3 changes: 3 additions & 0 deletions Minio/DataModel/Notification/EventType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public sealed class EventType
public static readonly EventType ObjectRemovedDeleteMarkerCreated = new("s3:ObjectRemoved:DeleteMarkerCreated");
public static readonly EventType ReducedRedundancyLostObject = new("s3:ReducedRedundancyLostObject");

public static readonly EventType BucketCreatedAll = new("s3:BucketCreated:*");
public static readonly EventType BucketRemovedAll = new("s3:BucketRemoved:*");

private EventType()
{
Value = null;
Expand Down

0 comments on commit a2589e1

Please sign in to comment.