Last active
September 25, 2025 13:09
-
-
Save egil/4c7c170ca164515cdd5d5771529445d8 to your computer and use it in GitHub Desktop.
How to create stable tests that waits for an stream message has been handled by the grain that is being tested, before proceeding.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| public class ExampleTest(SiloFixture fixture) : IClassFixture<SiloFixture> | |
| { | |
| [Fact] | |
| public async Task Sending_data_to_stream() | |
| { | |
| var message = "foo bar baz"; | |
| var stream = fixture.GetStream<string>("grainId1"); | |
| await stream.OnNextAsync(message); | |
| // Tries the assertionAction once immidiately when the | |
| // WaitForAssertion is called, and then after every | |
| // completed grain call to the grain with the matching grainId. | |
| await fixture.WaitForAssertion<IMyGrain>( | |
| grainId: "grainId1", | |
| assertionAction: async myGrain => Assert.Equal(message, await myGrain.GetLastMessageAsync())); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using R3; | |
| using System.Collections.Concurrent; | |
| using System.Diagnostics; | |
| namespace Egil.Orleans.Testing; | |
| public sealed class GrainCallObserver : IIncomingGrainCallFilter, IDisposable | |
| { | |
| private readonly Func<string, bool> interfaceFilter; | |
| private readonly ConcurrentDictionary<GrainId, ReplaySubject<IIncomingGrainCallContext?>> subjects = []; | |
| private bool disposed; | |
| public GrainCallObserver(Func<string, bool> interfaceFilter) | |
| { | |
| this.interfaceFilter = interfaceFilter; | |
| } | |
| public void Dispose() | |
| { | |
| disposed = true; | |
| foreach (var subject in subjects.Values) | |
| { | |
| subject.Dispose(); | |
| } | |
| } | |
| public Observable<IIncomingGrainCallContext?> GetLiveCallFeed(GrainId grainId) => GetSubject(grainId); | |
| [StackTraceHidden] | |
| public Task WaitForAssertion<TGrain>(TGrain grain, Func<TGrain, Task> assertionAction, TimeSpan? timeout = null) where TGrain : IGrain | |
| { | |
| var asserctionException = default(Exception); | |
| var task = GetLiveCallFeed(grain.GetGrainId()) | |
| .WhereAwait(async (item, ct) => | |
| { | |
| try | |
| { | |
| await assertionAction(grain); | |
| asserctionException = null; | |
| return true; | |
| } | |
| catch (Exception exception) | |
| { | |
| asserctionException = exception; | |
| return false; | |
| } | |
| }) | |
| .FirstAsync(TestContext.Current.CancellationToken); | |
| if (!Debugger.IsAttached) | |
| { | |
| task = task.WaitAsync(timeout ?? TimeSpan.FromSeconds(5)); | |
| } | |
| return task.ContinueWith(t => | |
| { | |
| if (asserctionException is not null) | |
| { | |
| return Task.FromException(asserctionException); | |
| } | |
| return Task.CompletedTask; | |
| }, TaskContinuationOptions.ExecuteSynchronously).Unwrap(); | |
| } | |
| [StackTraceHidden] | |
| public Task<TOutput> WaitForAssertion<TGrain, TOutput>(TGrain grain, Func<TGrain, Task<TOutput>> assertionActionWithReturnValue, TimeSpan? timeout = null) where TGrain : IGrain | |
| { | |
| var asserctionException = default(Exception); | |
| var task = GetLiveCallFeed(grain.GetGrainId()) | |
| .SelectAwait(async (item, ct) => | |
| { | |
| try | |
| { | |
| var result = await assertionActionWithReturnValue(grain); | |
| asserctionException = null; | |
| return (Passed: true, Output: result); | |
| } | |
| catch (Exception exception) | |
| { | |
| asserctionException = exception; | |
| return (Passed: false, Output: default(TOutput)!); | |
| } | |
| }) | |
| .Where(result => result.Passed) | |
| .Select(result => result.Output) | |
| .FirstAsync(TestContext.Current.CancellationToken); | |
| if (!Debugger.IsAttached) | |
| { | |
| task = task.WaitAsync(timeout ?? TimeSpan.FromSeconds(5)); | |
| } | |
| return task | |
| .ContinueWith(continuation => | |
| { | |
| if (asserctionException is not null) | |
| { | |
| return Task.FromException<TOutput>(asserctionException); | |
| } | |
| return continuation; | |
| }, | |
| TaskContinuationOptions.ExecuteSynchronously) | |
| .Unwrap(); | |
| } | |
| [StackTraceHidden] | |
| public Task WaitForCheck<TGrain>(TGrain grain, Func<TGrain, Task<bool>> checkAction, TimeSpan? timeout = null) where TGrain : IGrain | |
| { | |
| var task = GetLiveCallFeed(grain.GetGrainId()) | |
| .Prepend(default(IIncomingGrainCallContext)!) | |
| .WhereAwait(async (_, ct) => await checkAction(grain)) | |
| .FirstAsync(TestContext.Current.CancellationToken); | |
| if (!Debugger.IsAttached) | |
| { | |
| task = task.WaitAsync(timeout ?? TimeSpan.FromSeconds(5)); | |
| } | |
| return task; | |
| } | |
| private ReplaySubject<IIncomingGrainCallContext?> GetSubject(GrainId grainId) | |
| => subjects.GetOrAdd(grainId, _ => new ReplaySubject<IIncomingGrainCallContext?>(bufferSize: 1)); | |
| Task IIncomingGrainCallFilter.Invoke(IIncomingGrainCallContext context) | |
| { | |
| if (!interfaceFilter.Invoke(context.InterfaceName)) | |
| { | |
| return context.Invoke(); | |
| } | |
| return context | |
| .Invoke() | |
| .ContinueWith(task => | |
| { | |
| if (disposed) | |
| { | |
| return task; | |
| } | |
| var subject = GetSubject(context.TargetId); | |
| subject.OnNext(context); | |
| return task; | |
| }, | |
| TaskContinuationOptions.ExecuteSynchronously) | |
| .Unwrap(); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using Egil.Orleans.Testing; | |
| using Microsoft.Extensions.DependencyInjection; | |
| using Orleans.Streams; | |
| using Orleans.TestingHost; | |
| using TimeProviderExtensions; | |
| using R3; | |
| public sealed class SiloFixture : IAsyncLifetime, IGrainFactory | |
| { | |
| private const string TestStreamProviderName = "TestStreamProvider"; | |
| private IStreamProvider? streamProvider; | |
| private InProcessTestCluster? cluster; | |
| private IGrainFactory? grainFactory; | |
| public GrainCallObserver CallObserver { get; } | |
| public ManualTimeProvider TimeProvider { get; } | |
| public SiloFixture() | |
| { | |
| TimeProvider = new ManualTimeProvider(DateTimeOffset.UtcNow); | |
| // Only observe calls that are not sent to grains whose namespace starts with "Orleans." or "System.". | |
| // Can also be changed to a positive filter, e.g. only observe calls where the namespace matches the | |
| // solutions namespace. | |
| CallObserver = new GrainCallObserver( | |
| static interfaceNamespace => !interfaceNamespace.StartsWith("Orleans.") && !interfaceNamespace.StartsWith("System.")); | |
| } | |
| public async ValueTask InitializeAsync() | |
| { | |
| var builder = new InProcessTestClusterBuilder(initialSilosCount: 1); | |
| builder.ConfigureSilo((options, siloBuilder) => | |
| { | |
| // add other service registrations here. to make the services accessible in tests, | |
| // provide instances of the service directly and keep a reference to them | |
| // in the SiloFixture, e.g., through properties, as done with TimeProvider below. | |
| siloBuilder.Services.AddKeyedSingleton<TimeProvider>("DomainTimeProvider", TimeProvider); | |
| // register the grain call observer | |
| siloBuilder.Services.AddSingleton<IIncomingGrainCallFilter>(CallObserver); | |
| // Set up default storage and make that observable in tests | |
| siloBuilder.AddMemoryGrainStorageAsDefault(); | |
| // Set up custom storage and make that observable in tests | |
| siloBuilder.AddMemoryGrainStorage("Custom"); | |
| siloBuilder.AddMemoryStreams( | |
| TestStreamProviderName, | |
| configurator => | |
| { | |
| configurator.ConfigureStreamPubSub(StreamPubSubType.ImplicitOnly); | |
| }); | |
| }); | |
| builder.ConfigureClient((clientBuilder) => | |
| { | |
| clientBuilder.AddMemoryStreams(TestStreamProviderName); | |
| }); | |
| cluster = builder.Build(); | |
| await cluster.DeployAsync(); | |
| grainFactory = cluster.Client; | |
| streamProvider = cluster.Client.GetStreamProvider(TestStreamProviderName); | |
| } | |
| public async ValueTask DisposeAsync() | |
| { | |
| if (cluster is not null) | |
| { | |
| await cluster.StopAllSilosAsync(); | |
| } | |
| CallObserver.Dispose(); | |
| } | |
| [StackTraceHidden] | |
| public Task WaitForAssertion<TGrain>(string grainId, Func<TGrain, Task> assertionAction, TimeSpan? timeout = null) where TGrain : IGrainWithStringKey | |
| { | |
| return GrainCallObserver.WaitForAssertion(GetGrain<TGrain>(grainId), assertionAction, timeout); | |
| } | |
| public IAsyncStream<T> GetStream<T>(string @namespace, string key) | |
| => streamProvider?.GetStream<T>(StreamId.Create(@namespace, key)) | |
| ?? throw new InvalidOperationException("Cluster not ready"); | |
| public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidKey | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); | |
| public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerKey | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); | |
| public TGrainInterface GetGrain<TGrainInterface>(string primaryKey, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix); | |
| public TGrainInterface GetGrain<TGrainInterface>(Guid primaryKey, string keyExtension, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithGuidCompoundKey | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix); | |
| public TGrainInterface GetGrain<TGrainInterface>(long primaryKey, string keyExtension, string? grainClassNamePrefix = null) where TGrainInterface : IGrainWithIntegerCompoundKey | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(primaryKey, keyExtension, grainClassNamePrefix); | |
| public TGrainObserverInterface CreateObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).CreateObjectReference<TGrainObserverInterface>(obj); | |
| public void DeleteObjectReference<TGrainObserverInterface>(IGrainObserver obj) where TGrainObserverInterface : IGrainObserver | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).DeleteObjectReference<TGrainObserverInterface>(obj); | |
| public IGrain GetGrain(Type grainInterfaceType, Guid grainPrimaryKey) | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey); | |
| public IGrain GetGrain(Type grainInterfaceType, long grainPrimaryKey) | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey); | |
| public IGrain GetGrain(Type grainInterfaceType, string grainPrimaryKey) | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey); | |
| public IGrain GetGrain(Type grainInterfaceType, Guid grainPrimaryKey, string keyExtension) | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey, keyExtension); | |
| public IGrain GetGrain(Type grainInterfaceType, long grainPrimaryKey, string keyExtension) | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainInterfaceType, grainPrimaryKey, keyExtension); | |
| public TGrainInterface GetGrain<TGrainInterface>(GrainId grainId) where TGrainInterface : IAddressable | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain<TGrainInterface>(grainId); | |
| public IAddressable GetGrain(GrainId grainId) | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainId); | |
| public IAddressable GetGrain(GrainId grainId, GrainInterfaceType interfaceType) | |
| => (grainFactory ?? throw new InvalidOperationException("Cluster not ready")).GetGrain(grainId, interfaceType); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment