Based on my analysis, here's my approach to convert the Progress<> callbacks to use Rx.NET with throttling:
Analysis and Approach
Current State
- The export jobs use Progress callbacks with async handlers in ExportLoginLinksJob
- Progress updates are reported through IJobExecutionContext.ReportProgressAsync()
- The system already has Rx.NET (System.Reactive) available through TheCommons.Eventing
- Progress callbacks fire frequently during export operations (every 10 records)
Issues with Current Implementation
- Using async inside Progress constructor is problematic - Progress expects synchronous handlers
- No throttling means excessive progress updates for large exports
- Progress might report after job completion due to async timing
Proposed Solution
- Create an Rx.NET-based progress reporter that: - Converts Progress events to an Observable stream - Throttles updates to max once per 15 seconds - Ensures final progress update before job completion - Handles cancellation properly
- Implementation approach: - Create IThrottledProgressReporter interface - Implement RxThrottledProgressReporter using Subject - Use Throttle() operator with 15-second window - Use Sample() to ensure we get periodic updates - Buffer and ensure last value is sent on completion
- Key design decisions: - Use Subject as the event source - Apply Sample(TimeSpan.FromSeconds(15)) for regular updates - Use Concat() with LastAsync() to ensure final update - Dispose pattern to complete the observable stream - Integration with existing IJobExecutionContext
- Benefits: - Reduces database/network pressure from excessive progress updates - Guarantees final progress report before job completes - Clean separation of concerns - Testable with time-based operators - Reusable across all export job types
Would you like me to proceed with implementing this solution?
can you give me code examples of how you'd do that
⏺ Here are code examples showing how I would implement the Rx.NET-based throttled progress reporting:
- Interface Definition
// File: src/shared/TheCommons.BackgroundJobs/Abstractions/IThrottledProgressReporter.cs using System.Reactive;
namespace TheCommons.BackgroundJobs.Abstractions;
///
/// <summary>
/// Completes the progress reporting and ensures final value is sent
/// </summary>
Task CompleteAsync();
/// <summary>
/// Creates a Progress<T> adapter for compatibility with existing code
/// </summary>
IProgress<int> AsProgress();
}
- Implementation
// File: src/shared/TheCommons.BackgroundJobs/Services/RxThrottledProgressReporter.cs using System.Reactive.Linq; using System.Reactive.Subjects; using System.Reactive.Threading.Tasks; using TheCommons.BackgroundJobs.Abstractions;
namespace TheCommons.BackgroundJobs.Services;
///
public RxThrottledProgressReporter(
IJobExecutionContext context,
string messageTemplate = "Progress: {0}%",
TimeSpan? throttleInterval = null)
{
_context = context ?? throw new ArgumentNullException(nameof(context));
_messageTemplate = messageTemplate;
_throttleInterval = throttleInterval ?? TimeSpan.FromSeconds(15);
_progressSubject = new Subject<int>();
_completionSource = new TaskCompletionSource<bool>();
// Create the throttled observable pipeline
_subscription = CreateThrottledPipeline();
}
private IDisposable CreateThrottledPipeline()
{
// Create a pipeline that:
// 1. Filters out duplicate consecutive values
// 2. Samples at regular intervals (throttling)
// 3. Always includes the last value when completing
var throttledStream = _progressSubject
.Do(value => _latestValue = value) // Track latest value
.DistinctUntilChanged()
.Sample(_throttleInterval)
.Where(value => value != _lastReportedValue); // Avoid duplicate reports
// Subscribe to the throttled stream
return throttledStream.Subscribe(
onNext: async percentage =>
{
try
{
_lastReportedValue = percentage;
var message = string.Format(_messageTemplate, percentage);
await _context.ReportProgressAsync(percentage, message);
}
catch (Exception ex)
{
// Log error but don't fail the job due to progress reporting issues
await _context.LogAsync(
$"Failed to report progress: {ex.Message}",
TheCommons.Data.Models.JobLogLevel.Warning);
}
},
onError: error =>
{
_completionSource.TrySetException(error);
},
onCompleted: () =>
{
_completionSource.TrySetResult(true);
});
}
public void Report(int percentage)
{
if (_isCompleted)
return;
// Clamp percentage to valid range
percentage = Math.Max(0, Math.Min(100, percentage));
_progressSubject.OnNext(percentage);
}
public async Task CompleteAsync()
{
await _completionLock.WaitAsync();
try
{
if (_isCompleted)
return;
_isCompleted = true;
// Ensure we report the final value if it hasn't been reported
if (_latestValue >= 0 && _latestValue != _lastReportedValue)
{
var message = string.Format(_messageTemplate, _latestValue);
await _context.ReportProgressAsync(_latestValue, message);
}
// Complete the subject
_progressSubject.OnCompleted();
// Wait for the pipeline to complete
await _completionSource.Task;
}
finally
{
_completionLock.Release();
}
}
public IProgress<int> AsProgress()
{
return new Progress<int>(Report);
}
public void Dispose()
{
_completionLock?.Dispose();
_subscription?.Dispose();
_progressSubject?.Dispose();
}
}
- Updated ExportLoginLinksJob Usage
// In ExportLoginLinksJob.cs - Updated GenerateXlsxExportAsync method private async Task<byte[]> GenerateXlsxExportAsync( List loginLinks, XlsxExportOptions? options, IJobExecutionContext context, IServiceScope scope, CancellationToken cancellationToken) { await LogAndReportProgressAsync(context, 50, "Generating XLSX export", cancellationToken: cancellationToken);
options ??= new XlsxExportOptions();
var mobileAppHelper = scope.ServiceProvider.GetRequiredService<IMobileAppHelper>();
var xlsxService = new UserLoginLinksXlsxExportService(mobileAppHelper);
// Extract unique users and group login links
var users = loginLinks.Select(ll => ll.User).Distinct().ToList();
var loginLinksByUserId = loginLinks.GroupBy(ll => ll.UserId)
.ToDictionary(g => g.Key, g => g.ToList());
var exportData = new UserLoginLinksExportData
{
Users = users,
LoginLinksByUserId = loginLinksByUserId,
GeneratedAt = DateTime.UtcNow,
TotalUsers = users.Count,
TotalLoginLinks = loginLinks.Count
};
// Create throttled progress reporter
using var progressReporter = new RxThrottledProgressReporter(
context,
"Generating XLSX export ({0}%)",
TimeSpan.FromSeconds(15));
try
{
// Generate the XLSX with progress reporting
var result = await xlsxService.GenerateUserLoginLinksXlsxAsync(
exportData,
options,
progressReporter.AsProgress(),
cancellationToken);
// Ensure final progress is reported
await progressReporter.CompleteAsync();
return result;
}
finally
{
progressReporter.Dispose();
}
}
- Alternative: Factory Pattern for Reusability
// File: src/shared/TheCommons.BackgroundJobs/Services/IProgressReporterFactory.cs namespace TheCommons.BackgroundJobs.Services;
public interface IProgressReporterFactory { IThrottledProgressReporter CreateThrottledReporter( IJobExecutionContext context, string messageTemplate, TimeSpan? throttleInterval = null); }
// File: src/shared/TheCommons.BackgroundJobs/Services/ProgressReporterFactory.cs public class ProgressReporterFactory : IProgressReporterFactory { private readonly ILogger _logger;
public ProgressReporterFactory(ILogger<RxThrottledProgressReporter> logger)
{
_logger = logger;
}
public IThrottledProgressReporter CreateThrottledReporter(
IJobExecutionContext context,
string messageTemplate,
TimeSpan? throttleInterval = null)
{
return new RxThrottledProgressReporter(context, messageTemplate, throttleInterval);
}
}
- Enhanced Version with Smarter Throttling
// Enhanced version that ensures important milestones are reported public class SmartThrottledProgressReporter : IThrottledProgressReporter { private readonly Subject _progressSubject; private readonly IJobExecutionContext _context; private readonly string _messageTemplate; private readonly TimeSpan _throttleInterval; private readonly HashSet _importantMilestones; private readonly IDisposable _subscription; private int _lastReportedValue = -1; private int _latestValue = -1;
public SmartThrottledProgressReporter(
IJobExecutionContext context,
string messageTemplate = "Progress: {0}%",
TimeSpan? throttleInterval = null,
params int[] importantMilestones)
{
_context = context;
_messageTemplate = messageTemplate;
_throttleInterval = throttleInterval ?? TimeSpan.FromSeconds(15);
_importantMilestones = new HashSet<int>(importantMilestones ?? new[] { 0, 25, 50, 75, 100 });
_progressSubject = new Subject<int>();
_subscription = CreateSmartPipeline();
}
private IDisposable CreateSmartPipeline()
{
// Split into two streams:
// 1. Important milestones (reported immediately)
// 2. Regular updates (throttled)
var milestoneStream = _progressSubject
.Where(p => _importantMilestones.Contains(p))
.Where(p => p != _lastReportedValue);
var throttledStream = _progressSubject
.Where(p => !_importantMilestones.Contains(p))
.Sample(_throttleInterval)
.Where(p => p != _lastReportedValue);
// Merge both streams
var combinedStream = Observable.Merge(milestoneStream, throttledStream)
.Do(value => _latestValue = value)
.DistinctUntilChanged();
return combinedStream.Subscribe(async percentage =>
{
try
{
_lastReportedValue = percentage;
var message = string.Format(_messageTemplate, percentage);
await _context.ReportProgressAsync(percentage, message);
}
catch (Exception ex)
{
await _context.LogAsync(
$"Failed to report progress: {ex.Message}",
TheCommons.Data.Models.JobLogLevel.Warning);
}
});
}
// ... rest of implementation similar to RxThrottledProgressReporter
}
- Unit Test Example
[Fact] public async Task ThrottledProgressReporter_Should_Throttle_Updates() { // Arrange var context = new TestJobExecutionContext(); var testScheduler = new TestScheduler(); var progressReporter = new RxThrottledProgressReporter( context, "Test progress: {0}%", TimeSpan.FromSeconds(15));
// Act
progressReporter.Report(10);
testScheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks);
progressReporter.Report(20);
testScheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks);
progressReporter.Report(30);
testScheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks); // Total: 15 seconds
progressReporter.Report(40);
testScheduler.AdvanceBy(TimeSpan.FromSeconds(1).Ticks);
await progressReporter.CompleteAsync();
// Assert
context.ProgressReports.Count.Should().Be(2); // Initial sample at 15s + final value
context.ProgressReports[0].Percentage.Should().Be(30); // Sampled at 15s
context.ProgressReports[1].Percentage.Should().Be(40); // Final value
}
This solution provides:
- Clean separation of concerns
- Testable with TestScheduler
- Guaranteed final progress update
- Configurable throttling interval
- Support for important milestones
- Backward compatibility via AsProgress()
- Proper resource cleanup