Created
March 5, 2026 07:32
-
-
Save wullemsb/8909094d99e5548f7690ce9ff479f95d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| async Task FetchDocuments(BatchBlock<Document> batchDocuments, string connectionString, string sql, ProgressTask status) | |
| { | |
| status.Description = "[cyan]Fetching documents from database...[/]"; | |
| using SqlConnection sqlConnection = new(connectionString); | |
| await sqlConnection.OpenAsync(); | |
| // Use READ UNCOMMITTED to avoid holding shared locks while streaming rows. | |
| // This prevents blocking the concurrent UPDATE statements in ProcessMigratedDocuments, | |
| // which operate on the same table. Dirty reads are acceptable here because: | |
| // - We only read rows where the key IS NULL (not yet migrated) | |
| // - Processed rows get a non-NULL key, so they won't be re-read | |
| using var transaction = sqlConnection.BeginTransaction(IsolationLevel.ReadUncommitted); | |
| //Fetch the data in unbuffered mode to avoid out-of-memory (https://github.com/DapperLib/Dapper#buffered-vs-unbuffered-readers) | |
| foreach (var document in sqlConnection.Query<Document>(sql, transaction: transaction, buffered: false)) | |
| { | |
| //Buffer | |
| try | |
| { | |
| while (!await batchDocuments.SendAsync(document, cts.Token)) | |
| { | |
| status.Description = $"[yellow]Buffer full, waiting for document {document.Id}...[/]"; | |
| await Task.Delay(1000, cts.Token); | |
| } | |
| } | |
| catch (OperationCanceledException) | |
| { | |
| AnsiConsole.MarkupLine("[red]Migration cancelled[/]"); | |
| throw; | |
| } | |
| } | |
| batchDocuments.Complete(); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment