Created
November 4, 2025 15:54
-
-
Save egil/debdd5cb36050bd8dd6b5bbd56cd6577 to your computer and use it in GitHub Desktop.
Delay stream ingestion start example for Orleans
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using Microsoft.Extensions.Logging; | |
| using Orleans.Providers.Streams.Common; | |
| // ReSharper disable once CheckNamespace | |
| namespace Orleans.Providers.Streams; | |
| internal sealed partial class DelayedStreamProviderStarter( | |
| string name, | |
| IControllable streamProvider, | |
| TimeSpan delayedStart, | |
| ILogger<DelayedStreamProviderStarter> logger) : ILifecycleParticipant<ISiloLifecycle> | |
| { | |
| private async Task TriggerDelayedAgentsStart(CancellationToken cancellationToken) | |
| { | |
| LogStreamProviderStartDelayed(name, delayedStart); | |
| await Task.Delay(delayedStart, cancellationToken); | |
| await StartAgents(); | |
| } | |
| private async Task StartAgents() | |
| { | |
| LogStartingStreamProvider(name, delayedStart); | |
| await streamProvider.ExecuteCommand((int)PersistentStreamProviderCommand.StartAgents, null); | |
| } | |
| public void Participate(ISiloLifecycle lifecycle) | |
| { | |
| lifecycle.Subscribe<DelayedStreamProviderStarter>( | |
| ServiceLifecycleStage.Active, | |
| TriggerDelayedAgentsStart); | |
| } | |
| [LoggerMessage(Level = LogLevel.Information, Message = "Starting stream provider '{StreamProvider}' after a delay of {Delay}.")] | |
| private partial void LogStartingStreamProvider(string streamProvider, TimeSpan delay); | |
| [LoggerMessage(Level = LogLevel.Information, Message = "Start of stream provider '{StreamProvider}' delayed by {Delay}.")] | |
| private partial void LogStreamProviderStartDelayed(string streamProvider, TimeSpan delay); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using Microsoft.Extensions.DependencyInjection; | |
| using Microsoft.Extensions.Logging; | |
| using Microsoft.Extensions.Logging.Abstractions; | |
| using Orleans.Configuration; | |
| using Orleans.Providers; | |
| using Orleans.Providers.Streams; | |
| using Orleans.Streams; | |
| // ReSharper disable once CheckNamespace | |
| namespace Microsoft.Extensions.Hosting; | |
| public static class SiloPersistentStreamConfiguratorExtensions | |
| { | |
| public static ISiloPersistentStreamConfigurator WithDelayedStart(this ISiloPersistentStreamConfigurator configurator, TimeSpan delayedStart) | |
| { | |
| configurator.ConfigureLifecycle(options => options.Configure(options => | |
| { | |
| options.StartupState = StreamLifecycleOptions.RunState.AgentsStopped; | |
| })); | |
| configurator.ConfigureDelegate(services => | |
| services.AddTransient<ILifecycleParticipant<ISiloLifecycle>>(serviceProvider => | |
| new DelayedStreamProviderStarter( | |
| configurator.Name, | |
| (IControllable)serviceProvider.GetRequiredKeyedService<IStreamProvider>(configurator.Name), | |
| delayedStart, | |
| serviceProvider.GetService<ILogger<DelayedStreamProviderStarter>>() ?? NullLogger<DelayedStreamProviderStarter>.Instance))); | |
| return configurator; | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment