Skip to content

Instantly share code, notes, and snippets.

@egil
Created November 4, 2025 15:54
Show Gist options
  • Select an option

  • Save egil/debdd5cb36050bd8dd6b5bbd56cd6577 to your computer and use it in GitHub Desktop.

Select an option

Save egil/debdd5cb36050bd8dd6b5bbd56cd6577 to your computer and use it in GitHub Desktop.
Delay stream ingestion start example for Orleans
using Microsoft.Extensions.Logging;
using Orleans.Providers.Streams.Common;
// ReSharper disable once CheckNamespace
namespace Orleans.Providers.Streams;
internal sealed partial class DelayedStreamProviderStarter(
string name,
IControllable streamProvider,
TimeSpan delayedStart,
ILogger<DelayedStreamProviderStarter> logger) : ILifecycleParticipant<ISiloLifecycle>
{
private async Task TriggerDelayedAgentsStart(CancellationToken cancellationToken)
{
LogStreamProviderStartDelayed(name, delayedStart);
await Task.Delay(delayedStart, cancellationToken);
await StartAgents();
}
private async Task StartAgents()
{
LogStartingStreamProvider(name, delayedStart);
await streamProvider.ExecuteCommand((int)PersistentStreamProviderCommand.StartAgents, null);
}
public void Participate(ISiloLifecycle lifecycle)
{
lifecycle.Subscribe<DelayedStreamProviderStarter>(
ServiceLifecycleStage.Active,
TriggerDelayedAgentsStart);
}
[LoggerMessage(Level = LogLevel.Information, Message = "Starting stream provider '{StreamProvider}' after a delay of {Delay}.")]
private partial void LogStartingStreamProvider(string streamProvider, TimeSpan delay);
[LoggerMessage(Level = LogLevel.Information, Message = "Start of stream provider '{StreamProvider}' delayed by {Delay}.")]
private partial void LogStreamProviderStartDelayed(string streamProvider, TimeSpan delay);
}
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Orleans.Configuration;
using Orleans.Providers;
using Orleans.Providers.Streams;
using Orleans.Streams;
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.Hosting;
public static class SiloPersistentStreamConfiguratorExtensions
{
public static ISiloPersistentStreamConfigurator WithDelayedStart(this ISiloPersistentStreamConfigurator configurator, TimeSpan delayedStart)
{
configurator.ConfigureLifecycle(options => options.Configure(options =>
{
options.StartupState = StreamLifecycleOptions.RunState.AgentsStopped;
}));
configurator.ConfigureDelegate(services =>
services.AddTransient<ILifecycleParticipant<ISiloLifecycle>>(serviceProvider =>
new DelayedStreamProviderStarter(
configurator.Name,
(IControllable)serviceProvider.GetRequiredKeyedService<IStreamProvider>(configurator.Name),
delayedStart,
serviceProvider.GetService<ILogger<DelayedStreamProviderStarter>>() ?? NullLogger<DelayedStreamProviderStarter>.Instance)));
return configurator;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment