diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 04f07b2f..038180fa 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -143,7 +143,6 @@ jobs: fail-fast: false matrix: tag: - - 'yandex/clickhouse-server:20.3' - 'yandex/clickhouse-server:20.6' - 'yandex/clickhouse-server:20.9' - 'yandex/clickhouse-server:20.12' diff --git a/ClickHouse.Client.Tests/BulkCopyTests.cs b/ClickHouse.Client.Tests/BulkCopyTests.cs index 91d95df5..d4faf813 100644 --- a/ClickHouse.Client.Tests/BulkCopyTests.cs +++ b/ClickHouse.Client.Tests/BulkCopyTests.cs @@ -60,7 +60,7 @@ public async Task ShouldExecuteSingleValueInsertViaBulkCopy(string clickHouseTyp public async Task ShouldExecuteMultipleBulkInsertions() { var sw = new Stopwatch(); - var duration = new TimeSpan(0, 5, 0); + var duration = TimeSpan.FromMinutes(5); var targetTable = "test." + SanitizeTableName($"bulk_load_test"); diff --git a/ClickHouse.Client/Copy/ClickHouseBulkCopy.cs b/ClickHouse.Client/Copy/ClickHouseBulkCopy.cs index 7fd68293..e850158d 100644 --- a/ClickHouse.Client/Copy/ClickHouseBulkCopy.cs +++ b/ClickHouse.Client/Copy/ClickHouseBulkCopy.cs @@ -165,16 +165,21 @@ public async Task WriteToServerAsync(IEnumerable rows, IReadOnlyCollec var completedTaskIndex = Array.FindIndex(tasks, t => t.IsCompleted); if (completedTaskIndex >= 0) { - // propagate exception if one happens - // 'await' instead of 'Wait()' to avoid dealing with AggregateException - await tasks[completedTaskIndex].ConfigureAwait(false); - tasks[completedTaskIndex] = connection.PostStreamAsync(useInlineQuery ? null : query, stream, true, token) - .ContinueWith(t => { using (stream) Interlocked.Add(ref rowsWritten, counter); }, token); + async Task SendBatch() + { + using (stream) + { + await connection.PostStreamAsync(useInlineQuery ? null : query, stream, true, token).ConfigureAwait(false); + Interlocked.Add(ref rowsWritten, counter); + } + } + tasks[completedTaskIndex] = SendBatch(); break; // while (true); go to next batch } else { - await Task.WhenAny(tasks).ConfigureAwait(false); + var completedTask = await Task.WhenAny(tasks).ConfigureAwait(false); + await completedTask.ConfigureAwait(false); } } } @@ -193,5 +198,10 @@ public void Dispose() GC.SuppressFinalize(this); } + // Utility method wrapping sending data to ClickHouse and counter increment as a single task + private async Task WriteStreamAndIncrement(Stream stream, string query, bool useInlineQuery, int counter, CancellationToken token) + { + } + private static string GetColumnsExpression(IReadOnlyCollection columns) => columns == null || columns.Count == 0 ? "*" : string.Join(",", columns); }