Skip to content

Instantly share code, notes, and snippets.

@ejlevin1
Last active July 14, 2025 17:12
Show Gist options
  • Select an option

  • Save ejlevin1/de0df90b4f5a4f87cb38714c507e904a to your computer and use it in GitHub Desktop.

Select an option

Save ejlevin1/de0df90b4f5a4f87cb38714c507e904a to your computer and use it in GitHub Desktop.
Rx.NET Progress reporter

Rx.NET Progress reporter

Based on my analysis, here's my approach to convert the Progress<> callbacks to use Rx.NET with throttling:

Analysis and Approach

Current State

  1. The export jobs use Progress callbacks with async handlers in ExportLoginLinksJob
  2. Progress updates are reported through IJobExecutionContext.ReportProgressAsync()
  3. The system already has Rx.NET (System.Reactive) available through TheCommons.Eventing
  4. Progress callbacks fire frequently during export operations (every 10 records)

Issues with Current Implementation

  1. Using async inside Progress constructor is problematic - Progress expects synchronous handlers
  2. No throttling means excessive progress updates for large exports
  3. Progress might report after job completion due to async timing

Proposed Solution

  1. Create an Rx.NET-based progress reporter that: - Converts Progress events to an Observable stream - Throttles updates to max once per 15 seconds - Ensures final progress update before job completion - Handles cancellation properly
  2. Implementation approach: - Create IThrottledProgressReporter interface - Implement RxThrottledProgressReporter using Subject - Use Throttle() operator with 15-second window - Use Sample() to ensure we get periodic updates - Buffer and ensure last value is sent on completion
  3. Key design decisions: - Use Subject as the event source - Apply Sample(TimeSpan.FromSeconds(15)) for regular updates - Use Concat() with LastAsync() to ensure final update - Dispose pattern to complete the observable stream - Integration with existing IJobExecutionContext
  4. Benefits: - Reduces database/network pressure from excessive progress updates - Guarantees final progress report before job completes - Clean separation of concerns - Testable with time-based operators - Reusable across all export job types

Would you like me to proceed with implementing this solution?

can you give me code examples of how you'd do that

⏺ Here are code examples showing how I would implement the Rx.NET-based throttled progress reporting:

  1. Interface Definition

// File: src/shared/TheCommons.BackgroundJobs/Abstractions/IThrottledProgressReporter.cs using System.Reactive;

namespace TheCommons.BackgroundJobs.Abstractions;

///

/// Provides throttled progress reporting for background jobs /// public interface IThrottledProgressReporter : IDisposable { /// /// Reports progress that will be throttled /// void Report(int percentage);

  /// <summary>
  /// Completes the progress reporting and ensures final value is sent
  /// </summary>
  Task CompleteAsync();

  /// <summary>
  /// Creates a Progress<T> adapter for compatibility with existing code
  /// </summary>
  IProgress<int> AsProgress();

}

  1. Implementation

// File: src/shared/TheCommons.BackgroundJobs/Services/RxThrottledProgressReporter.cs using System.Reactive.Linq; using System.Reactive.Subjects; using System.Reactive.Threading.Tasks; using TheCommons.BackgroundJobs.Abstractions;

namespace TheCommons.BackgroundJobs.Services;

///

/// Implements throttled progress reporting using Rx.NET /// public class RxThrottledProgressReporter : IThrottledProgressReporter { private readonly Subject _progressSubject; private readonly IJobExecutionContext _context; private readonly string _messageTemplate; private readonly TimeSpan _throttleInterval; private readonly IDisposable _subscription; private readonly TaskCompletionSource _completionSource; private int _lastReportedValue = -1; private int _latestValue = -1; private readonly SemaphoreSlim _completionLock = new(1, 1); private bool _isCompleted;

  public RxThrottledProgressReporter(
      IJobExecutionContext context,
      string messageTemplate = "Progress: {0}%",
      TimeSpan? throttleInterval = null)
  {
      _context = context ?? throw new ArgumentNullException(nameof(context));
      _messageTemplate = messageTemplate;
      _throttleInterval = throttleInterval ?? TimeSpan.FromSeconds(15);
      _progressSubject = new Subject<int>();
      _completionSource = new TaskCompletionSource<bool>();

      // Create the throttled observable pipeline
      _subscription = CreateThrottledPipeline();
  }

  private IDisposable CreateThrottledPipeline()
  {
      // Create a pipeline that:
      // 1. Filters out duplicate consecutive values
      // 2. Samples at regular intervals (throttling)
      // 3. Always includes the last value when completing
      var throttledStream = _progressSubject
          .Do(value => _latestValue = value) // Track latest value
          .DistinctUntilChanged()
          .Sample(_throttleInterval)
          .Where(value => value != _lastReportedValue); // Avoid duplicate reports

      // Subscribe to the throttled stream
      return throttledStream.Subscribe(
          onNext: async percentage =>
          {
              try
              {
                  _lastReportedValue = percentage;
                  var message = string.Format(_messageTemplate, percentage);
                  await _context.ReportProgressAsync(percentage, message);
              }
              catch (Exception ex)
              {
                  // Log error but don't fail the job due to progress reporting issues
                  await _context.LogAsync(
                      $"Failed to report progress: {ex.Message}",
                      TheCommons.Data.Models.JobLogLevel.Warning);
              }
          },
          onError: error =>
          {
              _completionSource.TrySetException(error);
          },
          onCompleted: () =>
          {
              _completionSource.TrySetResult(true);
          });
  }

  public void Report(int percentage)
  {
      if (_isCompleted)
          return;

      // Clamp percentage to valid range
      percentage = Math.Max(0, Math.Min(100, percentage));
      _progressSubject.OnNext(percentage);
  }

  public async Task CompleteAsync()
  {
      await _completionLock.WaitAsync();
      try
      {
          if (_isCompleted)
              return;

          _isCompleted = true;

          // Ensure we report the final value if it hasn't been reported
          if (_latestValue >= 0 && _latestValue != _lastReportedValue)
          {
              var message = string.Format(_messageTemplate, _latestValue);
              await _context.ReportProgressAsync(_latestValue, message);
          }

          // Complete the subject
          _progressSubject.OnCompleted();

          // Wait for the pipeline to complete
          await _completionSource.Task;
      }
      finally
      {
          _completionLock.Release();
      }
  }

  public IProgress<int> AsProgress()
  {
      return new Progress<int>(Report);
  }

  public void Dispose()
  {
      _completionLock?.Dispose();
      _subscription?.Dispose();
      _progressSubject?.Dispose();
  }

}

  1. Updated ExportLoginLinksJob Usage

// In ExportLoginLinksJob.cs - Updated GenerateXlsxExportAsync method private async Task<byte[]> GenerateXlsxExportAsync( List loginLinks, XlsxExportOptions? options, IJobExecutionContext context, IServiceScope scope, CancellationToken cancellationToken) { await LogAndReportProgressAsync(context, 50, "Generating XLSX export", cancellationToken: cancellationToken);

  options ??= new XlsxExportOptions();

  var mobileAppHelper = scope.ServiceProvider.GetRequiredService<IMobileAppHelper>();
  var xlsxService = new UserLoginLinksXlsxExportService(mobileAppHelper);

  // Extract unique users and group login links
  var users = loginLinks.Select(ll => ll.User).Distinct().ToList();
  var loginLinksByUserId = loginLinks.GroupBy(ll => ll.UserId)
      .ToDictionary(g => g.Key, g => g.ToList());

  var exportData = new UserLoginLinksExportData
  {
      Users = users,
      LoginLinksByUserId = loginLinksByUserId,
      GeneratedAt = DateTime.UtcNow,
      TotalUsers = users.Count,
      TotalLoginLinks = loginLinks.Count
  };

  // Create throttled progress reporter
  using var progressReporter = new RxThrottledProgressReporter(
      context,
      "Generating XLSX export ({0}%)",
      TimeSpan.FromSeconds(15));

  try
  {
      // Generate the XLSX with progress reporting
      var result = await xlsxService.GenerateUserLoginLinksXlsxAsync(
          exportData,
          options,
          progressReporter.AsProgress(),
          cancellationToken);

      // Ensure final progress is reported
      await progressReporter.CompleteAsync();

      return result;
  }
  finally
  {
      progressReporter.Dispose();
  }

}

  1. Alternative: Factory Pattern for Reusability

// File: src/shared/TheCommons.BackgroundJobs/Services/IProgressReporterFactory.cs namespace TheCommons.BackgroundJobs.Services;

public interface IProgressReporterFactory { IThrottledProgressReporter CreateThrottledReporter( IJobExecutionContext context, string messageTemplate, TimeSpan? throttleInterval = null); }

// File: src/shared/TheCommons.BackgroundJobs/Services/ProgressReporterFactory.cs public class ProgressReporterFactory : IProgressReporterFactory { private readonly ILogger _logger;

  public ProgressReporterFactory(ILogger<RxThrottledProgressReporter> logger)
  {
      _logger = logger;
  }

  public IThrottledProgressReporter CreateThrottledReporter(
      IJobExecutionContext context,
      string messageTemplate,
      TimeSpan? throttleInterval = null)
  {
      return new RxThrottledProgressReporter(context, messageTemplate, throttleInterval);
  }

}

  1. Enhanced Version with Smarter Throttling

// Enhanced version that ensures important milestones are reported public class SmartThrottledProgressReporter : IThrottledProgressReporter { private readonly Subject _progressSubject; private readonly IJobExecutionContext _context; private readonly string _messageTemplate; private readonly TimeSpan _throttleInterval; private readonly HashSet _importantMilestones; private readonly IDisposable _subscription; private int _lastReportedValue = -1; private int _latestValue = -1;

  public SmartThrottledProgressReporter(
      IJobExecutionContext context,
      string messageTemplate = "Progress: {0}%",
      TimeSpan? throttleInterval = null,
      params int[] importantMilestones)
  {
      _context = context;
      _messageTemplate = messageTemplate;
      _throttleInterval = throttleInterval ?? TimeSpan.FromSeconds(15);
      _importantMilestones = new HashSet<int>(importantMilestones ?? new[] { 0, 25, 50, 75, 100 });
      _progressSubject = new Subject<int>();

      _subscription = CreateSmartPipeline();
  }

  private IDisposable CreateSmartPipeline()
  {
      // Split into two streams:
      // 1. Important milestones (reported immediately)
      // 2. Regular updates (throttled)

      var milestoneStream = _progressSubject
          .Where(p => _importantMilestones.Contains(p))
          .Where(p => p != _lastReportedValue);

      var throttledStream = _progressSubject
          .Where(p => !_importantMilestones.Contains(p))
          .Sample(_throttleInterval)
          .Where(p => p != _lastReportedValue);

      // Merge both streams
      var combinedStream = Observable.Merge(milestoneStream, throttledStream)
          .Do(value => _latestValue = value)
          .DistinctUntilChanged();

      return combinedStream.Subscribe(async percentage =>
      {
          try
          {
              _lastReportedValue = percentage;
              var message = string.Format(_messageTemplate, percentage);
              await _context.ReportProgressAsync(percentage, message);
          }
          catch (Exception ex)
          {
              await _context.LogAsync(
                  $"Failed to report progress: {ex.Message}",
                  TheCommons.Data.Models.JobLogLevel.Warning);
          }
      });
  }

  // ... rest of implementation similar to RxThrottledProgressReporter

}

  1. Unit Test Example

[Fact] public async Task ThrottledProgressReporter_Should_Throttle_Updates() { // Arrange var context = new TestJobExecutionContext(); var testScheduler = new TestScheduler(); var progressReporter = new RxThrottledProgressReporter( context, "Test progress: {0}%", TimeSpan.FromSeconds(15));

  // Act
  progressReporter.Report(10);
  testScheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks);
  progressReporter.Report(20);
  testScheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks);
  progressReporter.Report(30);
  testScheduler.AdvanceBy(TimeSpan.FromSeconds(5).Ticks); // Total: 15 seconds
  progressReporter.Report(40);
  testScheduler.AdvanceBy(TimeSpan.FromSeconds(1).Ticks);

  await progressReporter.CompleteAsync();

  // Assert
  context.ProgressReports.Count.Should().Be(2); // Initial sample at 15s + final value
  context.ProgressReports[0].Percentage.Should().Be(30); // Sampled at 15s
  context.ProgressReports[1].Percentage.Should().Be(40); // Final value

}

This solution provides:

  • Clean separation of concerns
  • Testable with TestScheduler
  • Guaranteed final progress update
  • Configurable throttling interval
  • Support for important milestones
  • Backward compatibility via AsProgress()
  • Proper resource cleanup
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment