Skip to content

Commit

Permalink
Fixed exception handling in BulkCopy #321 (#322)
Browse files Browse the repository at this point in the history
  • Loading branch information
DarkWanderer authored Jul 1, 2023
1 parent efa43eb commit 4a78793
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 8 deletions.
1 change: 0 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ jobs:
fail-fast: false
matrix:
tag:
- 'yandex/clickhouse-server:20.3'
- 'yandex/clickhouse-server:20.6'
- 'yandex/clickhouse-server:20.9'
- 'yandex/clickhouse-server:20.12'
Expand Down
2 changes: 1 addition & 1 deletion ClickHouse.Client.Tests/BulkCopyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public async Task ShouldExecuteSingleValueInsertViaBulkCopy(string clickHouseTyp
public async Task ShouldExecuteMultipleBulkInsertions()
{
var sw = new Stopwatch();
var duration = new TimeSpan(0, 5, 0);
var duration = TimeSpan.FromMinutes(5);

var targetTable = "test." + SanitizeTableName($"bulk_load_test");

Expand Down
22 changes: 16 additions & 6 deletions ClickHouse.Client/Copy/ClickHouseBulkCopy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,21 @@ public async Task WriteToServerAsync(IEnumerable<object[]> rows, IReadOnlyCollec
var completedTaskIndex = Array.FindIndex(tasks, t => t.IsCompleted);
if (completedTaskIndex >= 0)
{
// propagate exception if one happens
// 'await' instead of 'Wait()' to avoid dealing with AggregateException
await tasks[completedTaskIndex].ConfigureAwait(false);
tasks[completedTaskIndex] = connection.PostStreamAsync(useInlineQuery ? null : query, stream, true, token)
.ContinueWith(t => { using (stream) Interlocked.Add(ref rowsWritten, counter); }, token);
async Task SendBatch()
{
using (stream)
{
await connection.PostStreamAsync(useInlineQuery ? null : query, stream, true, token).ConfigureAwait(false);
Interlocked.Add(ref rowsWritten, counter);
}
}
tasks[completedTaskIndex] = SendBatch();
break; // while (true); go to next batch
}
else
{
await Task.WhenAny(tasks).ConfigureAwait(false);
var completedTask = await Task.WhenAny(tasks).ConfigureAwait(false);
await completedTask.ConfigureAwait(false);
}
}
}
Expand All @@ -193,5 +198,10 @@ public void Dispose()
GC.SuppressFinalize(this);
}

// Utility method wrapping sending data to ClickHouse and counter increment as a single task
private async Task WriteStreamAndIncrement(Stream stream, string query, bool useInlineQuery, int counter, CancellationToken token)
{
}

private static string GetColumnsExpression(IReadOnlyCollection<string> columns) => columns == null || columns.Count == 0 ? "*" : string.Join(",", columns);
}

0 comments on commit 4a78793

Please sign in to comment.