Skip to content

Instantly share code, notes, and snippets.

@wullemsb
Created March 5, 2026 07:32
Show Gist options
  • Select an option

  • Save wullemsb/8909094d99e5548f7690ce9ff479f95d to your computer and use it in GitHub Desktop.

Select an option

Save wullemsb/8909094d99e5548f7690ce9ff479f95d to your computer and use it in GitHub Desktop.
async Task FetchDocuments(BatchBlock<Document> batchDocuments, string connectionString, string sql, ProgressTask status)
{
status.Description = "[cyan]Fetching documents from database...[/]";
using SqlConnection sqlConnection = new(connectionString);
await sqlConnection.OpenAsync();
// Use READ UNCOMMITTED to avoid holding shared locks while streaming rows.
// This prevents blocking the concurrent UPDATE statements in ProcessMigratedDocuments,
// which operate on the same table. Dirty reads are acceptable here because:
// - We only read rows where the key IS NULL (not yet migrated)
// - Processed rows get a non-NULL key, so they won't be re-read
using var transaction = sqlConnection.BeginTransaction(IsolationLevel.ReadUncommitted);
//Fetch the data in unbuffered mode to avoid out-of-memory (https://github.com/DapperLib/Dapper#buffered-vs-unbuffered-readers)
foreach (var document in sqlConnection.Query<Document>(sql, transaction: transaction, buffered: false))
{
//Buffer
try
{
while (!await batchDocuments.SendAsync(document, cts.Token))
{
status.Description = $"[yellow]Buffer full, waiting for document {document.Id}...[/]";
await Task.Delay(1000, cts.Token);
}
}
catch (OperationCanceledException)
{
AnsiConsole.MarkupLine("[red]Migration cancelled[/]");
throw;
}
}
batchDocuments.Complete();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment