From 91f3a38c9660c90ddfd3bbef41fa38ce2c2dd9bf Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 14 Aug 2024 13:09:23 +0300 Subject: [PATCH 01/29] init commit --- slo/src/AdoNet/SloContext.cs | 2 +- slo/src/Internal/Cli/Cli.cs | 2 +- slo/src/Internal/Internal.csproj | 1 + slo/src/Internal/SloContext.cs | 17 +++++++++++++---- slo/src/TableService/SloContext.cs | 2 +- 5 files changed, 17 insertions(+), 7 deletions(-) diff --git a/slo/src/AdoNet/SloContext.cs b/slo/src/AdoNet/SloContext.cs index 8ccd99a5..1041ba3f 100644 --- a/slo/src/AdoNet/SloContext.cs +++ b/slo/src/AdoNet/SloContext.cs @@ -53,7 +53,7 @@ protected override Task CleanUp(string dropTableSql, int operationTimeout) throw new NotImplementedException(); } - public override Task CreateClient(Config config) + protected override Task CreateClient(Config config) { var splitEndpoint = config.Endpoint.Split("://"); var useTls = splitEndpoint[0] switch diff --git a/slo/src/Internal/Cli/Cli.cs b/slo/src/Internal/Cli/Cli.cs index b3f1bb83..9f3beeb9 100644 --- a/slo/src/Internal/Cli/Cli.cs +++ b/slo/src/Internal/Cli/Cli.cs @@ -132,7 +132,7 @@ public static async Task Run(SloContext sloContext, string[] args) wh CleanupCommand.SetHandler(async cleanUpConfig => { await CliCommands.CleanUp(cleanUpConfig); }, new CleanUpConfigBinder(EndpointArgument, DbArgument, TableOption, WriteTimeoutOption)); - RunCommand.SetHandler(async runConfig => { await CliCommands.Run(runConfig); }, + RunCommand.SetHandler(async runConfig => { await sloContext.Run(runConfig); }, new RunConfigBinder(EndpointArgument, DbArgument, TableOption, PromPgwOption, ReportPeriodOption, ReadRpsOption, ReadTimeoutOption, WriteRpsOption, WriteTimeoutOption, TimeOption, ShutdownTimeOption)); diff --git a/slo/src/Internal/Internal.csproj b/slo/src/Internal/Internal.csproj index a2519504..fec702c1 100644 --- a/slo/src/Internal/Internal.csproj +++ b/slo/src/Internal/Internal.csproj @@ -14,6 +14,7 @@ + diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index 4cab1a1c..30b89b2f 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -1,5 +1,6 @@ using Internal.Cli; using Microsoft.Extensions.Logging; +using Polly; using Prometheus; using Ydb.Sdk.Value; @@ -83,9 +84,16 @@ PRIMARY KEY (hash, id) protected abstract Task Create(T client, string createTableSql, int operationTimeout); - // public async Task Run(RunConfig runConfig) - // { - // } + public async Task Run(RunConfig runConfig) + { + var writeLimiter = Policy.RateLimit(runConfig.WriteRps, TimeSpan.FromSeconds(1)); + var readLimiter = Policy.RateLimit(runConfig.ReadRps, TimeSpan.FromSeconds(1)); + + Task.Run(async () => + { + + }) + } // return attempt count protected abstract Task Upsert(T client, string upsertSql, Dictionary parameters, @@ -126,7 +134,8 @@ private Task Upsert(T client, Config config) }, config.WriteTimeout); } - public abstract Task CreateClient(Config config); + protected abstract Task CreateClient(Config config); + // private Task Select(RunConfig config) // { // return Select( diff --git a/slo/src/TableService/SloContext.cs b/slo/src/TableService/SloContext.cs index e08c8f79..c6b32776 100644 --- a/slo/src/TableService/SloContext.cs +++ b/slo/src/TableService/SloContext.cs @@ -59,7 +59,7 @@ protected override Task CleanUp(string dropTableSql, int operationTimeout) throw new NotImplementedException(); } - public override async Task CreateClient(Config config) + protected override async Task CreateClient(Config config) { return new TableClient(await Driver.CreateInitialized(new DriverConfig(config.Endpoint, config.Db))); } From fd865813b340fdd96a6c14f6a231a82269be9e62 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 14 Aug 2024 16:56:51 +0300 Subject: [PATCH 02/29] feat: slo context run --- slo/src/AdoNet/SloContext.cs | 62 ++++++++--- slo/src/Internal/Jobs/Job.cs | 6 - slo/src/Internal/SloContext.cs | 167 +++++++++++++++++++++------- slo/src/TableService/SloContext.cs | 20 ++-- src/Ydb.Sdk/src/Ado/YdbException.cs | 5 +- 5 files changed, 190 insertions(+), 70 deletions(-) diff --git a/slo/src/AdoNet/SloContext.cs b/slo/src/AdoNet/SloContext.cs index 1041ba3f..2566edc4 100644 --- a/slo/src/AdoNet/SloContext.cs +++ b/slo/src/AdoNet/SloContext.cs @@ -1,7 +1,9 @@ +using System.Diagnostics; using Internal; using Internal.Cli; using Polly; using Prometheus; +using Ydb.Sdk; using Ydb.Sdk.Ado; using Ydb.Sdk.Value; @@ -9,9 +11,17 @@ namespace AdoNet; public class SloContext : SloContext { - private readonly AsyncPolicy _policy = Policy.Handle() + private readonly AsyncPolicy _policy = Policy.Handle(exception => exception.IsTransient) .WaitAndRetryAsync(10, attempt => TimeSpan.FromSeconds(attempt), - (_, _, retryCount, context) => { context["RetryCount"] = retryCount; }); + (e, _, retryCount, context) => + { + context["RetryCount"] = retryCount; + var errorsGauge = (Gauge)context["errorsGauge"]; + + errorsGauge?.WithLabels(((YdbException)e).Code.ToString(), "retried").Inc(); + }); + + protected override string JobName => "workload-ado-net"; protected override async Task Create(YdbDataSource client, string createTableSql, int operationTimeout) { @@ -22,10 +32,16 @@ protected override async Task Create(YdbDataSource client, string createTableSql .ExecuteNonQueryAsync(); } - protected override async Task Upsert(YdbDataSource dataSource, string upsertSql, + protected override async Task<(int, StatusCode)> Upsert(YdbDataSource dataSource, string upsertSql, Dictionary parameters, int writeTimeout, Gauge? errorsGauge = null) { - var policyResult = await _policy.ExecuteAndCaptureAsync(async () => + var context = new Context(); + if (errorsGauge != null) + { + context["errorsGauge"] = errorsGauge; + } + + var policyResult = await _policy.ExecuteAndCaptureAsync(async _ => { await using var ydbConnection = await dataSource.OpenConnectionAsync(); @@ -38,19 +54,39 @@ protected override async Task Upsert(YdbDataSource dataSource, string upser } await ydbCommand.ExecuteNonQueryAsync(); - }); + }, context); - return (int)policyResult.Context["RetryCount"]; - } - protected override Task Select(string selectSql, Dictionary parameters, int readTimeout) - { - throw new NotImplementedException(); + return (policyResult.Context.TryGetValue("RetryCount", out var countAttempts) ? (int)countAttempts : 1, + ((YdbException)policyResult.FinalException)?.Code ?? StatusCode.Success); } - protected override Task CleanUp(string dropTableSql, int operationTimeout) + protected override async Task<(int, StatusCode, object?)> Select(YdbDataSource dataSource, string selectSql, + Dictionary parameters, int readTimeout, Gauge? errorsGauge = null) { - throw new NotImplementedException(); + var context = new Context(); + if (errorsGauge != null) + { + context["errorsGauge"] = errorsGauge; + } + + var policyResult = await _policy.ExecuteAndCaptureAsync(async _ => + { + await using var ydbConnection = await dataSource.OpenConnectionAsync(); + + var ydbCommand = new YdbCommand(ydbConnection) + { CommandText = selectSql, CommandTimeout = readTimeout }; + + foreach (var (key, value) in parameters) + { + ydbCommand.Parameters.AddWithValue(key, value); + } + + return await ydbCommand.ExecuteScalarAsync(); + }, context); + + return (policyResult.Context.TryGetValue("RetryCount", out var countAttempts) ? (int)countAttempts : 1, + ((YdbException)policyResult.FinalException)?.Code ?? StatusCode.Success, policyResult.Result); } protected override Task CreateClient(Config config) @@ -67,6 +103,6 @@ protected override Task CreateClient(Config config) var port = splitEndpoint[1].Split(":")[1]; return Task.FromResult(new YdbDataSource(new YdbConnectionStringBuilder - { UseTls = useTls, Host = host, Port = int.Parse(port), Database = config.Db })); + { UseTls = useTls, Host = host, Port = int.Parse(port), Database = config.Db, LoggerFactory = Factory })); } } \ No newline at end of file diff --git a/slo/src/Internal/Jobs/Job.cs b/slo/src/Internal/Jobs/Job.cs index 3671aac5..da7e37b4 100644 --- a/slo/src/Internal/Jobs/Job.cs +++ b/slo/src/Internal/Jobs/Job.cs @@ -55,12 +55,6 @@ protected Job(Client client, RateLimitedCaller rateLimitedCaller, string jobName new HistogramConfiguration { Buckets = Histogram.LinearBuckets(1, 1, 10) }); ErrorsGauge = metricFactory.CreateGauge("errors", "amount of errors", new[] { "class", "in" }); - - foreach (var statusCode in Enum.GetValues()) - { - ErrorsGauge.WithLabels(Utils.GetResonseStatusName(statusCode), "retried").IncTo(0); - ErrorsGauge.WithLabels(Utils.GetResonseStatusName(statusCode), "finally").IncTo(0); - } } public async void Start() diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index 30b89b2f..ccab1f15 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -1,19 +1,29 @@ +using System.Diagnostics; using Internal.Cli; using Microsoft.Extensions.Logging; using Polly; +using Polly.RateLimit; using Prometheus; +using Ydb.Sdk; using Ydb.Sdk.Value; namespace Internal; public abstract class SloContext where T : IDisposable { - private readonly ILogger _logger = LoggerFactory - .Create(builder => builder.AddConsole().SetMinimumLevel(LogLevel.Information)) - .CreateLogger>(); + protected readonly ILoggerFactory Factory; + protected readonly ILogger Logger; private volatile int _maxId; + protected SloContext() + { + Factory = LoggerFactory.Create(builder => builder.AddConsole().SetMinimumLevel(LogLevel.Information)); + Logger = Factory.CreateLogger>(); + } + + protected abstract string JobName { get; } + public async Task Create(CreateConfig config) { const int maxCreateAttempts = 10; @@ -21,7 +31,7 @@ public async Task Create(CreateConfig config) using var client = await CreateClient(config); for (var attempt = 0; attempt < maxCreateAttempts; attempt++) { - _logger.LogInformation("Creating table {TableName}..", config.TableName); + Logger.LogInformation("Creating table {TableName}..", config.TableName); try { var createTableSql = $""" @@ -41,17 +51,17 @@ PRIMARY KEY (hash, id) AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = {config.MaxPartitionsCount} ); """; - _logger.LogInformation("YQL script: {sql}", createTableSql); + Logger.LogInformation("YQL script: {sql}", createTableSql); await Create(client, createTableSql, config.WriteTimeout); - _logger.LogInformation("Created table {TableName}!", config.TableName); + Logger.LogInformation("Created table {TableName}!", config.TableName); break; } catch (Exception e) { - _logger.LogError(e, "Fail created table"); + Logger.LogError(e, "Fail created table"); if (attempt == maxCreateAttempts - 1) { @@ -74,11 +84,11 @@ PRIMARY KEY (hash, id) } catch (Exception e) { - _logger.LogError(e, "Init failed when all tasks, continue.."); + Logger.LogError(e, "Init failed when all tasks, continue.."); } finally { - _logger.LogInformation("Created task is finished"); + Logger.LogInformation("Created task is finished"); } } @@ -86,29 +96,107 @@ PRIMARY KEY (hash, id) public async Task Run(RunConfig runConfig) { + var promPgwEndpoint = $"{runConfig.PromPgw}/metrics"; + var client = await CreateClient(runConfig); + using var prometheus = new MetricPusher(promPgwEndpoint, JobName, intervalMilliseconds: runConfig.ReportPeriod); + prometheus.Start(); + + var (_, _, maxId) = await Select(client, $"SELECT COUNT(*) FROM `{runConfig.TableName};`", + new Dictionary(), runConfig.ReadTimeout); + _maxId = (int)maxId!; + + var metricFactory = Metrics.WithLabels(new Dictionary + { { "jobName", JobName }, { "sdk", "dotnet" }, { "sdkVersion", Environment.Version.ToString() } }); + + var okGauge = metricFactory.CreateGauge("oks", "Count of OK"); + var notOkGauge = metricFactory.CreateGauge("not_oks", "Count of not OK"); + var latencySummary = metricFactory.CreateSummary("latency", "Latencies (OK)", new[] { "status" }, + new SummaryConfiguration + { + MaxAge = TimeSpan.FromSeconds(15), Objectives = new QuantileEpsilonPair[] + { new(0.5, 0.05), new(0.99, 0.005), new(0.999, 0.0005) } + }); + + var attemptsHistogram = metricFactory.CreateHistogram("attempts", "summary of amount for request", + new[] { "status" }, + new HistogramConfiguration { Buckets = Histogram.LinearBuckets(1, 1, 10) }); + + var errorsGauge = metricFactory.CreateGauge("errors", "amount of errors", new[] { "class", "in" }); + var writeLimiter = Policy.RateLimit(runConfig.WriteRps, TimeSpan.FromSeconds(1)); var readLimiter = Policy.RateLimit(runConfig.ReadRps, TimeSpan.FromSeconds(1)); - - Task.Run(async () => + + var cancellationTokenSource = new CancellationTokenSource(); + cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(runConfig.ShutdownTime)); + + var writeTask = ShootingTask(writeLimiter, "write", Upsert); + var readTask = ShootingTask(readLimiter, "read", Select); + + Logger.LogInformation("Started write / read shooting.."); + + await Task.WhenAll(readTask, writeTask); + + Logger.LogInformation("Run task is finished"); + return; + + Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, + Func> action) { - - }) - } + return Task.Run(async () => + { + var tasks = new List(); - // return attempt count - protected abstract Task Upsert(T client, string upsertSql, Dictionary parameters, - int writeTimeout, Gauge? errorsGauge = null); + while (!cancellationTokenSource.Token.IsCancellationRequested) + { + try + { + tasks.Add(rateLimitPolicy.Execute(async () => + { + var sw = Stopwatch.StartNew(); + var (attempts, statusCode) = await action(client, runConfig, errorsGauge); + string label; + + if (statusCode != StatusCode.Success) + { + Logger.LogError("Failed {ShootingName} operation code: {StatusCode}", shootingName, + statusCode); + notOkGauge.Inc(); + label = "err"; + } + else + { + okGauge.Inc(); + label = "ok"; + } + + attemptsHistogram.WithLabels(label).Observe(attempts); + latencySummary.WithLabels(label).Observe(sw.ElapsedMilliseconds); + })); + } + catch (RateLimitRejectedException) + { + Logger.LogInformation("Waiting {ShootingName} tasks", shootingName); + + await Task.Delay(990, cancellationTokenSource.Token); + } + } - protected abstract Task Select(string selectSql, Dictionary parameters, int readTimeout); + await Task.WhenAll(tasks); - // public async Task CleanUp(CleanUpConfig config) - // { - // await CleanUp($"DROP TABLE ${config.TableName}", config.WriteTimeout); - // } + Logger.LogInformation("{ShootingName} shooting is stopped", shootingName); + }, cancellationTokenSource.Token); + } + } + + // return attempt count & StatusCode operation + protected abstract Task<(int, StatusCode)> Upsert(T client, string upsertSql, + Dictionary parameters, + int writeTimeout, Gauge? errorsGauge = null); - protected abstract Task CleanUp(string dropTableSql, int operationTimeout); + protected abstract Task<(int, StatusCode, object?)> Select(T client, string selectSql, + Dictionary parameters, int readTimeout, Gauge? errorsGauge = null); - private Task Upsert(T client, Config config) + private Task<(int, StatusCode)> Upsert(T client, Config config, Gauge? errorsGauge = null) { const int minSizeStr = 20; const int maxSizeStr = 40; @@ -131,21 +219,24 @@ private Task Upsert(T client, Config config) }, { "$payload_double", YdbValue.MakeDouble(Random.Shared.NextDouble()) }, { "$payload_timestamp", YdbValue.MakeTimestamp(DateTime.Now) } - }, config.WriteTimeout); + }, config.WriteTimeout, errorsGauge); } protected abstract Task CreateClient(Config config); - -// private Task Select(RunConfig config) -// { -// return Select( -// $""" -// SELECT id, payload_str, payload_double, payload_timestamp, payload_hash -// FROM `{config.TableName}` WHERE id = $id AND hash = Digest::NumericHash($id) -// """, -// new Dictionary -// { -// { "$id", YdbValue.MakeUint64((ulong)Random.Shared.Next(_maxId)) } -// }, config.ReadTimeout); -// } + + private async Task<(int, StatusCode)> Select(T client, RunConfig config, Gauge? errorsGauge = null) + { + var (attempts, code, _) = await Select(client, + $""" + DECLARE $id AS Uint64; + SELECT id, payload_str, payload_double, payload_timestamp, payload_hash + FROM `{config.TableName}` WHERE id = $id AND hash = Digest::NumericHash($id) + """, + new Dictionary + { + { "$id", YdbValue.MakeUint64((ulong)Random.Shared.Next(_maxId)) } + }, config.ReadTimeout, errorsGauge); + + return (attempts, code); + } } \ No newline at end of file diff --git a/slo/src/TableService/SloContext.cs b/slo/src/TableService/SloContext.cs index c6b32776..52c2d9a0 100644 --- a/slo/src/TableService/SloContext.cs +++ b/slo/src/TableService/SloContext.cs @@ -1,5 +1,6 @@ using Internal; using Internal.Cli; +using Microsoft.Extensions.Logging; using Prometheus; using Ydb.Sdk; using Ydb.Sdk.Services.Table; @@ -9,6 +10,8 @@ namespace TableService; public class SloContext : SloContext { + protected override string JobName => "workload-table-service"; + protected override async Task Create(TableClient client, string createTableSql, int operationTimeout) { var response = await client.SessionExec( @@ -18,7 +21,7 @@ protected override async Task Create(TableClient client, string createTableSql, response.Status.EnsureSuccess(); } - protected override async Task Upsert(TableClient tableClient, string upsertSql, + protected override async Task<(int, StatusCode)> Upsert(TableClient tableClient, string upsertSql, Dictionary parameters, int writeTimeout, Gauge? errorsGauge = null) { var txControl = TxControl.BeginSerializableRW().Commit(); @@ -39,28 +42,21 @@ protected override async Task Upsert(TableClient tableClient, string upsert } errorsGauge?.WithLabels(Utils.GetResonseStatusName(response.Status.StatusCode), "retried").Inc(); - Console.WriteLine(response.Status); return response; }); - response.Status.EnsureSuccess(); - - return attempts; - } - - protected override Task Select(string selectSql, Dictionary parameters, int readTimeout) - { - throw new NotImplementedException(); + return (attempts, response.Status.StatusCode); } - protected override Task CleanUp(string dropTableSql, int operationTimeout) + protected override Task<(int, StatusCode, object)> Select(TableClient client, string selectSql, + Dictionary parameters, int readTimeout, Gauge? errorsGauge = null) { throw new NotImplementedException(); } protected override async Task CreateClient(Config config) { - return new TableClient(await Driver.CreateInitialized(new DriverConfig(config.Endpoint, config.Db))); + return new TableClient(await Driver.CreateInitialized(new DriverConfig(config.Endpoint, config.Db), Factory)); } } \ No newline at end of file diff --git a/src/Ydb.Sdk/src/Ado/YdbException.cs b/src/Ydb.Sdk/src/Ado/YdbException.cs index 4d3c98a6..8426b203 100644 --- a/src/Ydb.Sdk/src/Ado/YdbException.cs +++ b/src/Ydb.Sdk/src/Ado/YdbException.cs @@ -14,8 +14,9 @@ public YdbException(string message, Exception e) : base(message, e) public YdbException(Status status) : base(status.ToString()) { + Code = status.StatusCode; var policy = RetrySettings.DefaultInstance.GetRetryRule(status.StatusCode).Policy; - + IsTransient = policy == RetryPolicy.Unconditional; IsTransientWhenIdempotent = policy != RetryPolicy.None; // TODO: Add SQLSTATE message with order with https://en.wikipedia.org/wiki/SQLSTATE @@ -24,6 +25,8 @@ public YdbException(Status status) : base(status.ToString()) public override bool IsTransient { get; } public bool IsTransientWhenIdempotent { get; } + + public StatusCode Code { get; } } public class YdbOperationInProgressException : DbException From 9ba3faccb283e163f983ab05592d1d0072746f53 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 14 Aug 2024 17:19:56 +0300 Subject: [PATCH 03/29] Uint64 -> Int32 --- slo/src/Internal/SloContext.cs | 46 ++++++++++++++++++------------ slo/src/TableService/SloContext.cs | 30 +++++++++++++++---- 2 files changed, 53 insertions(+), 23 deletions(-) diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index ccab1f15..ef9696f2 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -12,14 +12,14 @@ namespace Internal; public abstract class SloContext where T : IDisposable { protected readonly ILoggerFactory Factory; - protected readonly ILogger Logger; + private readonly ILogger _logger; private volatile int _maxId; protected SloContext() { Factory = LoggerFactory.Create(builder => builder.AddConsole().SetMinimumLevel(LogLevel.Information)); - Logger = Factory.CreateLogger>(); + _logger = Factory.CreateLogger>(); } protected abstract string JobName { get; } @@ -31,13 +31,13 @@ public async Task Create(CreateConfig config) using var client = await CreateClient(config); for (var attempt = 0; attempt < maxCreateAttempts; attempt++) { - Logger.LogInformation("Creating table {TableName}..", config.TableName); + _logger.LogInformation("Creating table {TableName}..", config.TableName); try { var createTableSql = $""" CREATE TABLE `{config.TableName}` ( hash Uint64, - id Uint64, + id Int32, payload_str Text, payload_double Double, payload_timestamp Timestamp, @@ -51,17 +51,17 @@ PRIMARY KEY (hash, id) AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = {config.MaxPartitionsCount} ); """; - Logger.LogInformation("YQL script: {sql}", createTableSql); + _logger.LogInformation("YQL script: {sql}", createTableSql); await Create(client, createTableSql, config.WriteTimeout); - Logger.LogInformation("Created table {TableName}!", config.TableName); + _logger.LogInformation("Created table {TableName}", config.TableName); break; } catch (Exception e) { - Logger.LogError(e, "Fail created table"); + _logger.LogError(e, "Fail created table"); if (attempt == maxCreateAttempts - 1) { @@ -84,11 +84,11 @@ PRIMARY KEY (hash, id) } catch (Exception e) { - Logger.LogError(e, "Init failed when all tasks, continue.."); + _logger.LogError(e, "Init failed when all tasks, continue.."); } finally { - Logger.LogInformation("Created task is finished"); + _logger.LogInformation("Created task is finished"); } } @@ -101,7 +101,7 @@ public async Task Run(RunConfig runConfig) using var prometheus = new MetricPusher(promPgwEndpoint, JobName, intervalMilliseconds: runConfig.ReportPeriod); prometheus.Start(); - var (_, _, maxId) = await Select(client, $"SELECT COUNT(*) FROM `{runConfig.TableName};`", + var (_, _, maxId) = await Select(client, $"SELECT MAX(id) as max_id FROM `{runConfig.TableName};`", new Dictionary(), runConfig.ReadTimeout); _maxId = (int)maxId!; @@ -132,11 +132,14 @@ public async Task Run(RunConfig runConfig) var writeTask = ShootingTask(writeLimiter, "write", Upsert); var readTask = ShootingTask(readLimiter, "read", Select); - Logger.LogInformation("Started write / read shooting.."); + _logger.LogInformation("Started write / read shooting.."); await Task.WhenAll(readTask, writeTask); - Logger.LogInformation("Run task is finished"); + await prometheus.StopAsync(); + await MetricReset(promPgwEndpoint); + + _logger.LogInformation("Run task is finished"); return; Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, @@ -158,7 +161,7 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, if (statusCode != StatusCode.Success) { - Logger.LogError("Failed {ShootingName} operation code: {StatusCode}", shootingName, + _logger.LogError("Failed {ShootingName} operation code: {StatusCode}", shootingName, statusCode); notOkGauge.Inc(); label = "err"; @@ -175,7 +178,7 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, } catch (RateLimitRejectedException) { - Logger.LogInformation("Waiting {ShootingName} tasks", shootingName); + _logger.LogInformation("Waiting {ShootingName} tasks", shootingName); await Task.Delay(990, cancellationTokenSource.Token); } @@ -183,7 +186,7 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, await Task.WhenAll(tasks); - Logger.LogInformation("{ShootingName} shooting is stopped", shootingName); + _logger.LogInformation("{ShootingName} shooting is stopped", shootingName); }, cancellationTokenSource.Token); } } @@ -203,7 +206,7 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, return Upsert(client, $""" - DECLARE $id AS Uint64; + DECLARE $id AS Int32; DECLARE $payload_str AS Utf8; DECLARE $payload_double AS Double; DECLARE $payload_timestamp AS Timestamp; @@ -211,7 +214,7 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, VALUES ($id, Digest::NumericHash($id), $payload_str, $payload_double, $payload_timestamp) """, new Dictionary { - { "$id", YdbValue.MakeUint64((ulong)Interlocked.Increment(ref _maxId)) }, + { "$id", YdbValue.MakeInt32(Interlocked.Increment(ref _maxId)) }, { "$payload_str", YdbValue.MakeUtf8(string.Join(string.Empty, Enumerable .Repeat(0, Random.Shared.Next(minSizeStr, maxSizeStr)) @@ -228,7 +231,7 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, { var (attempts, code, _) = await Select(client, $""" - DECLARE $id AS Uint64; + DECLARE $id AS Int32; SELECT id, payload_str, payload_double, payload_timestamp, payload_hash FROM `{config.TableName}` WHERE id = $id AND hash = Digest::NumericHash($id) """, @@ -239,4 +242,11 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, return (attempts, code); } + + private async Task MetricReset(string promPgwEndpoint) + { + var deleteUri = $"{promPgwEndpoint}/job/{JobName}"; + using var httpClient = new HttpClient(); + await httpClient.DeleteAsync(deleteUri); + } } \ No newline at end of file diff --git a/slo/src/TableService/SloContext.cs b/slo/src/TableService/SloContext.cs index 52c2d9a0..4d09a0db 100644 --- a/slo/src/TableService/SloContext.cs +++ b/slo/src/TableService/SloContext.cs @@ -10,6 +10,7 @@ namespace TableService; public class SloContext : SloContext { + private readonly TxControl _txControl = TxControl.BeginSerializableRW().Commit(); protected override string JobName => "workload-table-service"; protected override async Task Create(TableClient client, string createTableSql, int operationTimeout) @@ -24,8 +25,6 @@ protected override async Task Create(TableClient client, string createTableSql, protected override async Task<(int, StatusCode)> Upsert(TableClient tableClient, string upsertSql, Dictionary parameters, int writeTimeout, Gauge? errorsGauge = null) { - var txControl = TxControl.BeginSerializableRW().Commit(); - var querySettings = new ExecuteDataQuerySettings { OperationTimeout = TimeSpan.FromSeconds(writeTimeout) }; @@ -35,7 +34,7 @@ protected override async Task Create(TableClient client, string createTableSql, async session => { attempts++; - var response = await session.ExecuteDataQuery(upsertSql, txControl, parameters, querySettings); + var response = await session.ExecuteDataQuery(upsertSql, _txControl, parameters, querySettings); if (response.Status.IsSuccess) { return response; @@ -49,10 +48,31 @@ protected override async Task Create(TableClient client, string createTableSql, return (attempts, response.Status.StatusCode); } - protected override Task<(int, StatusCode, object)> Select(TableClient client, string selectSql, + protected override async Task<(int, StatusCode, object?)> Select(TableClient tableClient, string selectSql, Dictionary parameters, int readTimeout, Gauge? errorsGauge = null) { - throw new NotImplementedException(); + var querySettings = new ExecuteDataQuerySettings + { OperationTimeout = TimeSpan.FromSeconds(readTimeout) }; + + var attempts = 0; + + var response = (ExecuteDataQueryResponse)await tableClient.SessionExec( + async session => + { + attempts++; + var response = await session.ExecuteDataQuery(selectSql, _txControl, parameters, querySettings); + if (response.Status.IsSuccess) + { + return response; + } + + errorsGauge?.WithLabels(Utils.GetResonseStatusName(response.Status.StatusCode), "retried").Inc(); + + return response; + }); + + return (attempts, response.Status.StatusCode, + response.Status.IsSuccess ? response.Result.ResultSets[0].Rows[0][0].GetInt32() : null); } protected override async Task CreateClient(Config config) From d1f80aed3f365416cbbd9e7dd28bc480e1a03330 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 14 Aug 2024 17:26:26 +0300 Subject: [PATCH 04/29] fix linter --- src/Ydb.Sdk/src/Ado/YdbException.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Ydb.Sdk/src/Ado/YdbException.cs b/src/Ydb.Sdk/src/Ado/YdbException.cs index 8426b203..704c5b1b 100644 --- a/src/Ydb.Sdk/src/Ado/YdbException.cs +++ b/src/Ydb.Sdk/src/Ado/YdbException.cs @@ -16,7 +16,7 @@ public YdbException(Status status) : base(status.ToString()) { Code = status.StatusCode; var policy = RetrySettings.DefaultInstance.GetRetryRule(status.StatusCode).Policy; - + IsTransient = policy == RetryPolicy.Unconditional; IsTransientWhenIdempotent = policy != RetryPolicy.None; // TODO: Add SQLSTATE message with order with https://en.wikipedia.org/wiki/SQLSTATE From f98330f05a5c877f717f6a1ce82edfa5e18cb8f6 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 14 Aug 2024 17:27:41 +0300 Subject: [PATCH 05/29] fix linter --- slo/src/AdoNet/SloContext.cs | 5 ++--- slo/src/Internal/Jobs/Job.cs | 1 - slo/src/Internal/SloContext.cs | 4 ++-- slo/src/TableService/SloContext.cs | 1 - 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/slo/src/AdoNet/SloContext.cs b/slo/src/AdoNet/SloContext.cs index 2566edc4..5334d3d7 100644 --- a/slo/src/AdoNet/SloContext.cs +++ b/slo/src/AdoNet/SloContext.cs @@ -1,4 +1,3 @@ -using System.Diagnostics; using Internal; using Internal.Cli; using Polly; @@ -69,7 +68,7 @@ protected override async Task Create(YdbDataSource client, string createTableSql { context["errorsGauge"] = errorsGauge; } - + var policyResult = await _policy.ExecuteAndCaptureAsync(async _ => { await using var ydbConnection = await dataSource.OpenConnectionAsync(); @@ -84,7 +83,7 @@ protected override async Task Create(YdbDataSource client, string createTableSql return await ydbCommand.ExecuteScalarAsync(); }, context); - + return (policyResult.Context.TryGetValue("RetryCount", out var countAttempts) ? (int)countAttempts : 1, ((YdbException)policyResult.FinalException)?.Code ?? StatusCode.Success, policyResult.Result); } diff --git a/slo/src/Internal/Jobs/Job.cs b/slo/src/Internal/Jobs/Job.cs index da7e37b4..b4a023a1 100644 --- a/slo/src/Internal/Jobs/Job.cs +++ b/slo/src/Internal/Jobs/Job.cs @@ -1,6 +1,5 @@ using System.Diagnostics; using Prometheus; -using Ydb.Sdk; namespace Internal.Jobs; diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index ef9696f2..4c0fca01 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -138,7 +138,7 @@ public async Task Run(RunConfig runConfig) await prometheus.StopAsync(); await MetricReset(promPgwEndpoint); - + _logger.LogInformation("Run task is finished"); return; @@ -242,7 +242,7 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, return (attempts, code); } - + private async Task MetricReset(string promPgwEndpoint) { var deleteUri = $"{promPgwEndpoint}/job/{JobName}"; diff --git a/slo/src/TableService/SloContext.cs b/slo/src/TableService/SloContext.cs index 4d09a0db..f9c0e5e1 100644 --- a/slo/src/TableService/SloContext.cs +++ b/slo/src/TableService/SloContext.cs @@ -1,6 +1,5 @@ using Internal; using Internal.Cli; -using Microsoft.Extensions.Logging; using Prometheus; using Ydb.Sdk; using Ydb.Sdk.Services.Table; From 6fb7aa1fff3ebb7152f4061d03440fb0a18ca057 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 14 Aug 2024 19:07:19 +0300 Subject: [PATCH 06/29] fix query select max(id) --- slo/src/Internal/SloContext.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index 4c0fca01..406e7ffc 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -101,7 +101,7 @@ public async Task Run(RunConfig runConfig) using var prometheus = new MetricPusher(promPgwEndpoint, JobName, intervalMilliseconds: runConfig.ReportPeriod); prometheus.Start(); - var (_, _, maxId) = await Select(client, $"SELECT MAX(id) as max_id FROM `{runConfig.TableName};`", + var (_, _, maxId) = await Select(client, $"SELECT MAX(id) as max_id FROM `{runConfig.TableName}`;", new Dictionary(), runConfig.ReadTimeout); _maxId = (int)maxId!; From 20d3ffbe0eab91103f784b5163c5a6a6f0192810 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 14 Aug 2024 19:13:09 +0300 Subject: [PATCH 07/29] fix --- slo/src/TableService/SloContext.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/slo/src/TableService/SloContext.cs b/slo/src/TableService/SloContext.cs index f9c0e5e1..658c3934 100644 --- a/slo/src/TableService/SloContext.cs +++ b/slo/src/TableService/SloContext.cs @@ -71,7 +71,7 @@ protected override async Task Create(TableClient client, string createTableSql, }); return (attempts, response.Status.StatusCode, - response.Status.IsSuccess ? response.Result.ResultSets[0].Rows[0][0].GetInt32() : null); + response.Status.IsSuccess ? response.Result.ResultSets[0].Rows[0][0].GetOptionalInt32() : null); } protected override async Task CreateClient(Config config) From 9ac96c48654ebf7f78c46caa4a7cab5e15b8970d Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 14 Aug 2024 19:39:11 +0300 Subject: [PATCH 08/29] added debug info --- slo/src/Internal/SloContext.cs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index 406e7ffc..74c253a7 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -105,6 +105,8 @@ public async Task Run(RunConfig runConfig) new Dictionary(), runConfig.ReadTimeout); _maxId = (int)maxId!; + _logger.LogInformation("Init row count: {MaxId}", _maxId); + var metricFactory = Metrics.WithLabels(new Dictionary { { "jobName", JobName }, { "sdk", "dotnet" }, { "sdkVersion", Environment.Version.ToString() } }); @@ -149,12 +151,17 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, { var tasks = new List(); + long activeTasks = 0; + while (!cancellationTokenSource.Token.IsCancellationRequested) { try { tasks.Add(rateLimitPolicy.Execute(async () => { + // ReSharper disable once AccessToModifiedClosure + Interlocked.Increment(ref activeTasks); + var sw = Stopwatch.StartNew(); var (attempts, statusCode) = await action(client, runConfig, errorsGauge); string label; @@ -172,13 +179,14 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, label = "ok"; } + Interlocked.Decrement(ref activeTasks); attemptsHistogram.WithLabels(label).Observe(attempts); latencySummary.WithLabels(label).Observe(sw.ElapsedMilliseconds); })); } catch (RateLimitRejectedException) { - _logger.LogInformation("Waiting {ShootingName} tasks", shootingName); + _logger.LogInformation("Waiting {ShootingName} task, count active tasks: {}", shootingName, Interlocked.Read(ref activeTasks)); await Task.Delay(990, cancellationTokenSource.Token); } From fb664a17ef69ca71a4c16748ddc0162f160495cd Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 14 Aug 2024 20:21:08 +0300 Subject: [PATCH 09/29] fix ratelimiter --- slo/src/Internal/SloContext.cs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index 74c253a7..1e12d0f6 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -125,8 +125,8 @@ public async Task Run(RunConfig runConfig) var errorsGauge = metricFactory.CreateGauge("errors", "amount of errors", new[] { "class", "in" }); - var writeLimiter = Policy.RateLimit(runConfig.WriteRps, TimeSpan.FromSeconds(1)); - var readLimiter = Policy.RateLimit(runConfig.ReadRps, TimeSpan.FromSeconds(1)); + var writeLimiter = Policy.RateLimit(runConfig.WriteRps, TimeSpan.FromSeconds(1), runConfig.WriteRps); + var readLimiter = Policy.RateLimit(runConfig.ReadRps, TimeSpan.FromSeconds(1), runConfig.ReadRps); var cancellationTokenSource = new CancellationTokenSource(); cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(runConfig.ShutdownTime)); @@ -152,7 +152,7 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, var tasks = new List(); long activeTasks = 0; - + while (!cancellationTokenSource.Token.IsCancellationRequested) { try @@ -161,7 +161,7 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, { // ReSharper disable once AccessToModifiedClosure Interlocked.Increment(ref activeTasks); - + var sw = Stopwatch.StartNew(); var (attempts, statusCode) = await action(client, runConfig, errorsGauge); string label; @@ -184,11 +184,12 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, latencySummary.WithLabels(label).Observe(sw.ElapsedMilliseconds); })); } - catch (RateLimitRejectedException) + catch (RateLimitRejectedException e) { - _logger.LogInformation("Waiting {ShootingName} task, count active tasks: {}", shootingName, Interlocked.Read(ref activeTasks)); + _logger.LogInformation(e, "Waiting {ShootingName} task, count active tasks: {}", shootingName, + Interlocked.Read(ref activeTasks)); - await Task.Delay(990, cancellationTokenSource.Token); + await Task.Delay(e.RetryAfter, cancellationTokenSource.Token); } } From cc5a7894be8b1b18390f6d3b40cf205e9e456a23 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Wed, 14 Aug 2024 20:39:17 +0300 Subject: [PATCH 10/29] fix kek --- slo/src/Internal/SloContext.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index 1e12d0f6..9b25ba9f 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -189,7 +189,7 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, _logger.LogInformation(e, "Waiting {ShootingName} task, count active tasks: {}", shootingName, Interlocked.Read(ref activeTasks)); - await Task.Delay(e.RetryAfter, cancellationTokenSource.Token); + await Task.Delay(990, cancellationTokenSource.Token); } } From 7b5cae2e99d9ea82d7a536862d4af4092c4416d5 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 11:01:12 +0300 Subject: [PATCH 11/29] new strategy --- slo/src/Internal/SloContext.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index 9b25ba9f..5f7f6469 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -125,8 +125,8 @@ public async Task Run(RunConfig runConfig) var errorsGauge = metricFactory.CreateGauge("errors", "amount of errors", new[] { "class", "in" }); - var writeLimiter = Policy.RateLimit(runConfig.WriteRps, TimeSpan.FromSeconds(1), runConfig.WriteRps); - var readLimiter = Policy.RateLimit(runConfig.ReadRps, TimeSpan.FromSeconds(1), runConfig.ReadRps); + var writeLimiter = Policy.RateLimit(runConfig.WriteRps, TimeSpan.FromSeconds(1), 10); + var readLimiter = Policy.RateLimit(runConfig.ReadRps, TimeSpan.FromSeconds(1), 10); var cancellationTokenSource = new CancellationTokenSource(); cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(runConfig.ShutdownTime)); @@ -189,7 +189,7 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, _logger.LogInformation(e, "Waiting {ShootingName} task, count active tasks: {}", shootingName, Interlocked.Read(ref activeTasks)); - await Task.Delay(990, cancellationTokenSource.Token); + await Task.Delay(e.RetryAfter, cancellationTokenSource.Token); } } From c0ca09e463c87ee19c331d18575b0827ba60d034 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 11:05:16 +0300 Subject: [PATCH 12/29] fix --- slo/src/Internal/SloContext.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index 5f7f6469..10db55a7 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -124,6 +124,11 @@ public async Task Run(RunConfig runConfig) new HistogramConfiguration { Buckets = Histogram.LinearBuckets(1, 1, 10) }); var errorsGauge = metricFactory.CreateGauge("errors", "amount of errors", new[] { "class", "in" }); + foreach (var statusCode in Enum.GetValues()) + { + errorsGauge.WithLabels(statusCode.ToString(), "retried").IncTo(0); + errorsGauge.WithLabels(statusCode.ToString(), "finally").IncTo(0); + } var writeLimiter = Policy.RateLimit(runConfig.WriteRps, TimeSpan.FromSeconds(1), 10); var readLimiter = Policy.RateLimit(runConfig.ReadRps, TimeSpan.FromSeconds(1), 10); From 5354ec17f2dc967b46cfdfa4eedf8e60d0049a96 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 11:26:42 +0300 Subject: [PATCH 13/29] fixedWindow --- slo/src/Internal/Internal.csproj | 1 + slo/src/Internal/SloContext.cs | 72 +++++++++++++++----------------- 2 files changed, 34 insertions(+), 39 deletions(-) diff --git a/slo/src/Internal/Internal.csproj b/slo/src/Internal/Internal.csproj index fec702c1..831fad78 100644 --- a/slo/src/Internal/Internal.csproj +++ b/slo/src/Internal/Internal.csproj @@ -17,5 +17,6 @@ + diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index 10db55a7..7766eaa1 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -1,8 +1,7 @@ using System.Diagnostics; +using System.Threading.RateLimiting; using Internal.Cli; using Microsoft.Extensions.Logging; -using Polly; -using Polly.RateLimit; using Prometheus; using Ydb.Sdk; using Ydb.Sdk.Value; @@ -130,8 +129,10 @@ public async Task Run(RunConfig runConfig) errorsGauge.WithLabels(statusCode.ToString(), "finally").IncTo(0); } - var writeLimiter = Policy.RateLimit(runConfig.WriteRps, TimeSpan.FromSeconds(1), 10); - var readLimiter = Policy.RateLimit(runConfig.ReadRps, TimeSpan.FromSeconds(1), 10); + var writeLimiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions + { Window = TimeSpan.FromSeconds(1), PermitLimit = runConfig.WriteRps, QueueLimit = int.MaxValue }); + var readLimiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions + { Window = TimeSpan.FromSeconds(1), PermitLimit = runConfig.ReadRps, QueueLimit = int.MaxValue }); var cancellationTokenSource = new CancellationTokenSource(); cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(runConfig.ShutdownTime)); @@ -149,7 +150,7 @@ public async Task Run(RunConfig runConfig) _logger.LogInformation("Run task is finished"); return; - Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, + Task ShootingTask(RateLimiter rateLimitPolicy, string shootingName, Func> action) { return Task.Run(async () => @@ -160,42 +161,35 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, while (!cancellationTokenSource.Token.IsCancellationRequested) { - try - { - tasks.Add(rateLimitPolicy.Execute(async () => - { - // ReSharper disable once AccessToModifiedClosure - Interlocked.Increment(ref activeTasks); - - var sw = Stopwatch.StartNew(); - var (attempts, statusCode) = await action(client, runConfig, errorsGauge); - string label; - - if (statusCode != StatusCode.Success) - { - _logger.LogError("Failed {ShootingName} operation code: {StatusCode}", shootingName, - statusCode); - notOkGauge.Inc(); - label = "err"; - } - else - { - okGauge.Inc(); - label = "ok"; - } - - Interlocked.Decrement(ref activeTasks); - attemptsHistogram.WithLabels(label).Observe(attempts); - latencySummary.WithLabels(label).Observe(sw.ElapsedMilliseconds); - })); - } - catch (RateLimitRejectedException e) + using var lease = await rateLimitPolicy + .AcquireAsync(cancellationToken: cancellationTokenSource.Token); + + tasks.Add(Task.Run(async () => { - _logger.LogInformation(e, "Waiting {ShootingName} task, count active tasks: {}", shootingName, - Interlocked.Read(ref activeTasks)); + // ReSharper disable once AccessToModifiedClosure + Interlocked.Increment(ref activeTasks); - await Task.Delay(e.RetryAfter, cancellationTokenSource.Token); - } + var sw = Stopwatch.StartNew(); + var (attempts, statusCode) = await action(client, runConfig, errorsGauge); + string label; + + if (statusCode != StatusCode.Success) + { + _logger.LogError("Failed {ShootingName} operation code: {StatusCode}", shootingName, + statusCode); + notOkGauge.Inc(); + label = "err"; + } + else + { + okGauge.Inc(); + label = "ok"; + } + + Interlocked.Decrement(ref activeTasks); + attemptsHistogram.WithLabels(label).Observe(attempts); + latencySummary.WithLabels(label).Observe(sw.ElapsedMilliseconds); + }, cancellationTokenSource.Token)); } await Task.WhenAll(tasks); From 3a088747588f62e0af5d2313753ae324c67e0a9d Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 11:42:08 +0300 Subject: [PATCH 14/29] fix Time --- slo/src/Internal/Internal.csproj | 1 - 1 file changed, 1 deletion(-) diff --git a/slo/src/Internal/Internal.csproj b/slo/src/Internal/Internal.csproj index 831fad78..b99bd6bb 100644 --- a/slo/src/Internal/Internal.csproj +++ b/slo/src/Internal/Internal.csproj @@ -14,7 +14,6 @@ - From da5880ca7382c7ea676f19a00a73d1a1efa59f90 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 11:53:41 +0300 Subject: [PATCH 15/29] fixes --- slo/src/Internal/SloContext.cs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index 7766eaa1..cdc4005b 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -135,7 +135,7 @@ public async Task Run(RunConfig runConfig) { Window = TimeSpan.FromSeconds(1), PermitLimit = runConfig.ReadRps, QueueLimit = int.MaxValue }); var cancellationTokenSource = new CancellationTokenSource(); - cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(runConfig.ShutdownTime)); + cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(runConfig.Time)); var writeTask = ShootingTask(writeLimiter, "write", Upsert); var readTask = ShootingTask(readLimiter, "read", Select); @@ -153,22 +153,18 @@ public async Task Run(RunConfig runConfig) Task ShootingTask(RateLimiter rateLimitPolicy, string shootingName, Func> action) { + // ReSharper disable once MethodSupportsCancellation return Task.Run(async () => { var tasks = new List(); - long activeTasks = 0; - while (!cancellationTokenSource.Token.IsCancellationRequested) { using var lease = await rateLimitPolicy .AcquireAsync(cancellationToken: cancellationTokenSource.Token); - tasks.Add(Task.Run(async () => + _ = Task.Run(async () => { - // ReSharper disable once AccessToModifiedClosure - Interlocked.Increment(ref activeTasks); - var sw = Stopwatch.StartNew(); var (attempts, statusCode) = await action(client, runConfig, errorsGauge); string label; @@ -186,16 +182,16 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string shootingName, label = "ok"; } - Interlocked.Decrement(ref activeTasks); attemptsHistogram.WithLabels(label).Observe(attempts); latencySummary.WithLabels(label).Observe(sw.ElapsedMilliseconds); - }, cancellationTokenSource.Token)); + }, cancellationTokenSource.Token); } - await Task.WhenAll(tasks); + // ReSharper disable once MethodSupportsCancellation + await Task.Delay(TimeSpan.FromSeconds(runConfig.ShutdownTime)); _logger.LogInformation("{ShootingName} shooting is stopped", shootingName); - }, cancellationTokenSource.Token); + }); } } From 9bb933d027f51e94ccc86f0eed13ed0196ccb516 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 12:18:59 +0300 Subject: [PATCH 16/29] fix --- slo/src/Internal/SloContext.cs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index cdc4005b..bdae7fd3 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -142,7 +142,14 @@ public async Task Run(RunConfig runConfig) _logger.LogInformation("Started write / read shooting.."); - await Task.WhenAll(readTask, writeTask); + try + { + await Task.WhenAll(readTask, writeTask); + } + catch (Exception e) + { + _logger.LogError(e, "Failed waiting read / write tasks"); + } await prometheus.StopAsync(); await MetricReset(promPgwEndpoint); @@ -156,13 +163,16 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string shootingName, // ReSharper disable once MethodSupportsCancellation return Task.Run(async () => { - var tasks = new List(); - while (!cancellationTokenSource.Token.IsCancellationRequested) { using var lease = await rateLimitPolicy .AcquireAsync(cancellationToken: cancellationTokenSource.Token); + if (lease.IsAcquired) + { + continue; + } + _ = Task.Run(async () => { var sw = Stopwatch.StartNew(); @@ -171,8 +181,6 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string shootingName, if (statusCode != StatusCode.Success) { - _logger.LogError("Failed {ShootingName} operation code: {StatusCode}", shootingName, - statusCode); notOkGauge.Inc(); label = "err"; } From 0e9a37756967276b9e893e95af392d47b802a8b2 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 12:43:06 +0300 Subject: [PATCH 17/29] delete legacy --- slo/playground/docker-compose.yml | 16 --- slo/src/AdoNet/Program.cs | 2 +- slo/src/AdoNet/SloContext.cs | 3 +- slo/src/Internal/{Cli => }/Cli.cs | 17 +--- slo/src/Internal/Cli/CliCommands.cs | 96 ------------------ slo/src/Internal/Client.cs | 103 -------------------- slo/src/Internal/{Cli => }/ConfigBinders.cs | 2 +- slo/src/Internal/{Cli => }/Configs.cs | 2 +- slo/src/Internal/DataGenerator.cs | 35 ------- slo/src/Internal/Executor.cs | 66 ------------- slo/src/Internal/Jobs/Job.cs | 93 ------------------ slo/src/Internal/Jobs/ReadJob.cs | 28 ------ slo/src/Internal/Jobs/WriteJob.cs | 23 ----- slo/src/Internal/Queries.cs | 58 ----------- slo/src/Internal/RateLimitedCaller.cs | 37 ------- slo/src/Internal/SloContext.cs | 57 ++++++----- slo/src/Internal/TokenBucket.cs | 57 ----------- slo/src/Internal/Utils.cs | 12 --- slo/src/TableService/Program.cs | 2 +- slo/src/TableService/SloContext.cs | 7 +- 20 files changed, 38 insertions(+), 678 deletions(-) rename slo/src/Internal/{Cli => }/Cli.cs (88%) delete mode 100644 slo/src/Internal/Cli/CliCommands.cs delete mode 100644 slo/src/Internal/Client.cs rename slo/src/Internal/{Cli => }/ConfigBinders.cs (99%) rename slo/src/Internal/{Cli => }/Configs.cs (97%) delete mode 100644 slo/src/Internal/DataGenerator.cs delete mode 100644 slo/src/Internal/Executor.cs delete mode 100644 slo/src/Internal/Jobs/Job.cs delete mode 100644 slo/src/Internal/Jobs/ReadJob.cs delete mode 100644 slo/src/Internal/Jobs/WriteJob.cs delete mode 100644 slo/src/Internal/Queries.cs delete mode 100644 slo/src/Internal/RateLimitedCaller.cs delete mode 100644 slo/src/Internal/TokenBucket.cs delete mode 100644 slo/src/Internal/Utils.cs diff --git a/slo/playground/docker-compose.yml b/slo/playground/docker-compose.yml index 0f8d2678..5dee06d8 100644 --- a/slo/playground/docker-compose.yml +++ b/slo/playground/docker-compose.yml @@ -106,19 +106,3 @@ services: depends_on: slo-create: condition: service_completed_successfully - - slo-cleanup: - build: - context: ../.. - dockerfile: slo/src/Dockerfile - command: - - 'cleanup' - - 'http://ydb:2136' - - '/local' - - '--table-name' - - 'slo-dotnet' - networks: - - monitor-net - depends_on: - slo-run: - condition: service_completed_successfully diff --git a/slo/src/AdoNet/Program.cs b/slo/src/AdoNet/Program.cs index 164e5bc5..ae7fab09 100644 --- a/slo/src/AdoNet/Program.cs +++ b/slo/src/AdoNet/Program.cs @@ -1,6 +1,6 @@ // See https://aka.ms/new-console-template for more information using AdoNet; -using Internal.Cli; +using Internal; await Cli.Run(new SloContext(), args); \ No newline at end of file diff --git a/slo/src/AdoNet/SloContext.cs b/slo/src/AdoNet/SloContext.cs index 5334d3d7..47cf7aa2 100644 --- a/slo/src/AdoNet/SloContext.cs +++ b/slo/src/AdoNet/SloContext.cs @@ -1,5 +1,4 @@ using Internal; -using Internal.Cli; using Polly; using Prometheus; using Ydb.Sdk; @@ -20,7 +19,7 @@ public class SloContext : SloContext errorsGauge?.WithLabels(((YdbException)e).Code.ToString(), "retried").Inc(); }); - protected override string JobName => "workload-ado-net"; + protected override string Job => "workload-ado-net"; protected override async Task Create(YdbDataSource client, string createTableSql, int operationTimeout) { diff --git a/slo/src/Internal/Cli/Cli.cs b/slo/src/Internal/Cli.cs similarity index 88% rename from slo/src/Internal/Cli/Cli.cs rename to slo/src/Internal/Cli.cs index 9f3beeb9..c8ff60c9 100644 --- a/slo/src/Internal/Cli/Cli.cs +++ b/slo/src/Internal/Cli.cs @@ -1,6 +1,6 @@ using System.CommandLine; -namespace Internal.Cli; +namespace Internal; public static class Cli { @@ -90,16 +90,6 @@ public static class Cli WriteTimeoutOption }; - private static readonly Command CleanupCommand = new( - "cleanup", - "drops table in database") - { - EndpointArgument, - DbArgument, - TableOption, - WriteTimeoutOption - }; - private static readonly Command RunCommand = new( "run", "runs workload (read and write to table with sets RPS)") @@ -120,7 +110,7 @@ public static class Cli private static readonly RootCommand RootCommand = new("SLO app") { - CreateCommand, CleanupCommand, RunCommand + CreateCommand, RunCommand }; public static async Task Run(SloContext sloContext, string[] args) where T : IDisposable @@ -129,9 +119,6 @@ public static async Task Run(SloContext sloContext, string[] args) wh new CreateConfigBinder(EndpointArgument, DbArgument, TableOption, MinPartitionsCountOption, MaxPartitionsCountOption, PartitionSizeOption, InitialDataCountOption, WriteTimeoutOption)); - CleanupCommand.SetHandler(async cleanUpConfig => { await CliCommands.CleanUp(cleanUpConfig); }, - new CleanUpConfigBinder(EndpointArgument, DbArgument, TableOption, WriteTimeoutOption)); - RunCommand.SetHandler(async runConfig => { await sloContext.Run(runConfig); }, new RunConfigBinder(EndpointArgument, DbArgument, TableOption, PromPgwOption, ReportPeriodOption, ReadRpsOption, ReadTimeoutOption, WriteRpsOption, WriteTimeoutOption, TimeOption, ShutdownTimeOption)); diff --git a/slo/src/Internal/Cli/CliCommands.cs b/slo/src/Internal/Cli/CliCommands.cs deleted file mode 100644 index cb320a58..00000000 --- a/slo/src/Internal/Cli/CliCommands.cs +++ /dev/null @@ -1,96 +0,0 @@ -using Internal.Jobs; -using Prometheus; - -namespace Internal.Cli; - -public static class CliCommands -{ - internal static async Task Create(CreateConfig config) - { - Console.WriteLine(config); - - await using var client = await Client.CreateAsync(config.Endpoint, config.Db, config.TableName); - - const int maxCreateAttempts = 10; - for (var i = 0; i < maxCreateAttempts; i++) - { - try - { - await client.Init( - config.PartitionSize, - config.MinPartitionsCount, - config.MaxPartitionsCount, - TimeSpan.FromMilliseconds(config.WriteTimeout)); - break; - } - catch (Exception e) - { - Console.WriteLine(e); - Thread.Sleep(millisecondsTimeout: 1000); - } - } - } - - internal static async Task CleanUp(CleanUpConfig config) - { - Console.WriteLine(config); - - await using var client = await Client.CreateAsync(config.Endpoint, config.Db, config.TableName); - - await client.CleanUp(TimeSpan.FromMilliseconds(config.WriteTimeout)); - } - - internal static async Task Run(RunConfig config) - { - var promPgwEndpoint = $"{config.PromPgw}/metrics"; - const string job = "workload-table-service"; - - await using var client = await Client.CreateAsync(config.Endpoint, config.Db, config.TableName); - - await client.Init(1, 6, 1000, TimeSpan.FromMilliseconds(config.WriteTimeout)); - - Console.WriteLine(config.PromPgw); - - await MetricReset(promPgwEndpoint, job); - using var prometheus = new MetricPusher(promPgwEndpoint, job, intervalMilliseconds: config.ReportPeriod); - - prometheus.Start(); - - var duration = TimeSpan.FromSeconds(config.Time); - - var readJob = new ReadJob( - client, - new RateLimitedCaller( - config.ReadRps, - duration - ), - TimeSpan.FromMilliseconds(config.ReadTimeout)); - - var writeJob = new WriteJob( - client, - new RateLimitedCaller( - config.WriteRps, - duration - ), - TimeSpan.FromMilliseconds(config.WriteTimeout)); - - var readThread = new Thread(readJob.Start); - var writeThread = new Thread(writeJob.Start); - - readThread.Start(); - writeThread.Start(); - await Task.Delay(duration + TimeSpan.FromSeconds(config.ShutdownTime)); - readThread.Join(); - writeThread.Join(); - - await prometheus.StopAsync(); - await MetricReset(promPgwEndpoint, job); - } - - private static async Task MetricReset(string promPgwEndpoint, string job) - { - var deleteUri = $"{promPgwEndpoint}/job/{job}"; - using var httpClient = new HttpClient(); - await httpClient.DeleteAsync(deleteUri); - } -} \ No newline at end of file diff --git a/slo/src/Internal/Client.cs b/slo/src/Internal/Client.cs deleted file mode 100644 index f21d4c07..00000000 --- a/slo/src/Internal/Client.cs +++ /dev/null @@ -1,103 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Logging.Abstractions; -using Ydb.Sdk; -using Ydb.Sdk.Services.Sessions; -using Ydb.Sdk.Services.Table; - -namespace Internal; - -public class Client : IAsyncDisposable -{ - public readonly Executor Executor; - public readonly string TableName; - - private readonly ServiceProvider _serviceProvider; - private readonly Driver _driver; - private readonly TableClient _tableClient; - - private readonly Semaphore _semaphore; - - private Client(string tableName, Executor executor, ServiceProvider serviceProvider, Driver driver, - TableClient tableClient, uint sessionPoolLimit) - { - TableName = tableName; - Executor = executor; - _serviceProvider = serviceProvider; - _driver = driver; - _tableClient = tableClient; - _semaphore = new Semaphore((int)sessionPoolLimit, (int)sessionPoolLimit); - } - - public async Task Init(int partitionSize, int minPartitionsCount, int maxPartitionsCount, - TimeSpan timeout) - { - await Executor.ExecuteSchemeQuery( - Queries.GetCreateQuery(TableName, partitionSize, minPartitionsCount, maxPartitionsCount), - timeout); - - await DataGenerator.LoadMaxId(TableName, Executor); - } - - public async Task CleanUp(TimeSpan timeout) - { - await Executor.ExecuteSchemeQuery(Queries.GetDropQuery(TableName), timeout); - } - - private static ServiceProvider GetServiceProvider() - { - return new ServiceCollection() - .AddLogging(configure => configure.AddConsole().SetMinimumLevel(LogLevel.Information)) - .BuildServiceProvider(); - } - - public static async Task CreateAsync(string endpoint, string db, string tableName, - uint sessionPoolLimit = 100) - { - var driverConfig = new DriverConfig( - endpoint, - db - ); - - var serviceProvider = GetServiceProvider(); - var loggerFactory = serviceProvider.GetService(); - - loggerFactory ??= NullLoggerFactory.Instance; - var driver = await Driver.CreateInitialized(driverConfig, loggerFactory); - - var tableClient = new TableClient(driver, new TableClientConfig(new SessionPoolConfig(sessionPoolLimit))); - - var executor = new Executor(tableClient); - - var table = new Client(tableName, executor, serviceProvider, driver, tableClient, sessionPoolLimit); - - return table; - } - - public Task CallFuncWithSessionPoolLimit(Func func) - { - _semaphore.WaitOne(); - - async Task FuncWithRelease() - { - try - { - await func(); - } - finally - { - _semaphore.Release(); - } - } - - _ = FuncWithRelease(); - return Task.CompletedTask; - } - - public async ValueTask DisposeAsync() - { - _tableClient.Dispose(); - await _driver.DisposeAsync(); - await _serviceProvider.DisposeAsync(); - } -} \ No newline at end of file diff --git a/slo/src/Internal/Cli/ConfigBinders.cs b/slo/src/Internal/ConfigBinders.cs similarity index 99% rename from slo/src/Internal/Cli/ConfigBinders.cs rename to slo/src/Internal/ConfigBinders.cs index 7662ba82..9a668e14 100644 --- a/slo/src/Internal/Cli/ConfigBinders.cs +++ b/slo/src/Internal/ConfigBinders.cs @@ -1,7 +1,7 @@ using System.CommandLine; using System.CommandLine.Binding; -namespace Internal.Cli; +namespace Internal; public class CreateConfigBinder( Argument endpointArgument, diff --git a/slo/src/Internal/Cli/Configs.cs b/slo/src/Internal/Configs.cs similarity index 97% rename from slo/src/Internal/Cli/Configs.cs rename to slo/src/Internal/Configs.cs index 119838bb..119361bd 100644 --- a/slo/src/Internal/Cli/Configs.cs +++ b/slo/src/Internal/Configs.cs @@ -1,4 +1,4 @@ -namespace Internal.Cli; +namespace Internal; public record CreateConfig( string Endpoint, diff --git a/slo/src/Internal/DataGenerator.cs b/slo/src/Internal/DataGenerator.cs deleted file mode 100644 index c6e84f8f..00000000 --- a/slo/src/Internal/DataGenerator.cs +++ /dev/null @@ -1,35 +0,0 @@ -using Ydb.Sdk.Value; - -namespace Internal; - -public static class DataGenerator -{ - private static readonly Random Random = new(); - - public static int MaxId { get; private set; } - - public static async Task LoadMaxId(string tableName, Executor executor) - { - var response = await executor.ExecuteDataQuery(Queries.GetLoadMaxIdQuery(tableName)); - var row = response.Result.ResultSets[0].Rows[0]; - var value = row[0]; - MaxId = (int?)value.GetOptionalUint64() ?? 0; - } - - public static Dictionary GetUpsertData() - { - MaxId++; - return new Dictionary - { - { "$id", YdbValue.MakeUint64((ulong)MaxId) }, - { - "$payload_str", - YdbValue.MakeUtf8(string.Join("_", Enumerable - .Repeat(0, Random.Next(20, 40)) - .Select(_ => (char)new Random().Next(127)))) - }, - { "$payload_double", YdbValue.MakeDouble(Random.NextDouble()) }, - { "$payload_timestamp", YdbValue.MakeTimestamp(DateTime.Now) } - }; - } -} \ No newline at end of file diff --git a/slo/src/Internal/Executor.cs b/slo/src/Internal/Executor.cs deleted file mode 100644 index 6566c464..00000000 --- a/slo/src/Internal/Executor.cs +++ /dev/null @@ -1,66 +0,0 @@ -using Prometheus; -using Ydb.Sdk.Services.Table; -using Ydb.Sdk.Value; - -namespace Internal; - -public class Executor(TableClient tableClient) -{ - public async Task ExecuteSchemeQuery(string query, TimeSpan? timeout = null) - { - var response = await tableClient.SessionExec( - async session => await session.ExecuteSchemeQuery(query, - new ExecuteSchemeQuerySettings { OperationTimeout = timeout, TransportTimeout = timeout * 1.1 })); - response.Status.EnsureSuccess(); - } - - public async Task ExecuteDataQuery( - string query, - Dictionary? parameters = null, - TimeSpan? timeout = null, - Histogram? attemptsHistogram = null, - Gauge? errorsGauge = null) - - { - var txControl = TxControl.BeginSerializableRW().Commit(); - - var querySettings = new ExecuteDataQuerySettings - { OperationTimeout = timeout, TransportTimeout = timeout * 1.1 }; - - var attempts = 0; - - var response = await tableClient.SessionExec( - async session => - { - attempts++; - var response = parameters == null - ? await session.ExecuteDataQuery( - query, - txControl, - querySettings) - : await session.ExecuteDataQuery( - query, - txControl, - parameters, - querySettings); - if (response.Status.IsSuccess) - { - return response; - } - - errorsGauge?.WithLabels(Utils.GetResonseStatusName(response.Status.StatusCode), "retried").Inc(); - Console.WriteLine(response.Status); - - return response; - }); - attemptsHistogram?.WithLabels(response.Status.IsSuccess ? "ok" : "err").Observe(attempts); - if (!response.Status.IsSuccess) - { - errorsGauge?.WithLabels(Utils.GetResonseStatusName(response.Status.StatusCode), "finally").Inc(); - } - - response.Status.EnsureSuccess(); - - return (ExecuteDataQueryResponse)response; - } -} \ No newline at end of file diff --git a/slo/src/Internal/Jobs/Job.cs b/slo/src/Internal/Jobs/Job.cs deleted file mode 100644 index b4a023a1..00000000 --- a/slo/src/Internal/Jobs/Job.cs +++ /dev/null @@ -1,93 +0,0 @@ -using System.Diagnostics; -using Prometheus; - -namespace Internal.Jobs; - -public abstract class Job -{ - private readonly Gauge _inFlightGauge; - - private readonly Gauge _okGauge; - private readonly Gauge _notOkGauge; - - private readonly Summary _latencySummary; - - private readonly RateLimitedCaller _rateLimitedCaller; - protected readonly TimeSpan Timeout; - - protected readonly Histogram AttemptsHistogram; - protected readonly Gauge ErrorsGauge; - protected readonly Random Random = new(); - - protected readonly Client Client; - - protected Job(Client client, RateLimitedCaller rateLimitedCaller, string jobName, TimeSpan timeout) - { - Client = client; - _rateLimitedCaller = rateLimitedCaller; - Timeout = timeout; - - var metricFactory = Metrics.WithLabels(new Dictionary - { - { "jobName", jobName }, - { "sdk", "dotnet" }, - { "sdkVersion", Environment.Version.ToString() } - }); - - _okGauge = metricFactory.CreateGauge("oks", "Count of OK"); - _notOkGauge = metricFactory.CreateGauge("not_oks", "Count of not OK"); - _inFlightGauge = metricFactory.CreateGauge("in_flight", "amount of requests in flight"); - _latencySummary = metricFactory.CreateSummary("latency", "Latencies (OK)", new[] { "status" }, - new SummaryConfiguration - { - MaxAge = TimeSpan.FromSeconds(15), - Objectives = new QuantileEpsilonPair[] - { - new(0.5, 0.05), - new(0.99, 0.005), - new(0.999, 0.0005) - } - }); - - AttemptsHistogram = metricFactory.CreateHistogram("attempts", "summary of amount for request", - new[] { "status" }, - new HistogramConfiguration { Buckets = Histogram.LinearBuckets(1, 1, 10) }); - - ErrorsGauge = metricFactory.CreateGauge("errors", "amount of errors", new[] { "class", "in" }); - } - - public async void Start() - { - await _rateLimitedCaller.StartCalling( - () => Client.CallFuncWithSessionPoolLimit( - async () => await DoJob()), - _inFlightGauge); - } - - private async Task DoJob() - { - _inFlightGauge.Inc(); - var sw = Stopwatch.StartNew(); - try - { - await PerformQuery(); - sw.Stop(); - - _latencySummary.WithLabels("ok").Observe(sw.ElapsedMilliseconds); - _okGauge.Inc(); - _inFlightGauge.Dec(); - } - catch (Exception e) - { - Console.WriteLine(e); - sw.Stop(); - - _latencySummary.WithLabels("err").Observe(sw.ElapsedMilliseconds); - _notOkGauge.Inc(); - _inFlightGauge.Dec(); - throw; - } - } - - protected abstract Task PerformQuery(); -} \ No newline at end of file diff --git a/slo/src/Internal/Jobs/ReadJob.cs b/slo/src/Internal/Jobs/ReadJob.cs deleted file mode 100644 index 4669af4d..00000000 --- a/slo/src/Internal/Jobs/ReadJob.cs +++ /dev/null @@ -1,28 +0,0 @@ -using Ydb.Sdk.Value; - -namespace Internal.Jobs; - -public class ReadJob : Job -{ - public ReadJob(Client client, RateLimitedCaller rateLimitedCaller, TimeSpan timeout) : - base(client, rateLimitedCaller, "read", timeout) - { - } - - - protected override async Task PerformQuery() - { - var parameters = new Dictionary - { - { "$id", YdbValue.MakeUint64((ulong)Random.Next(DataGenerator.MaxId)) } - }; - - await Client.Executor.ExecuteDataQuery( - Queries.GetReadQuery(Client.TableName), - parameters, - Timeout, - AttemptsHistogram, - ErrorsGauge - ); - } -} \ No newline at end of file diff --git a/slo/src/Internal/Jobs/WriteJob.cs b/slo/src/Internal/Jobs/WriteJob.cs deleted file mode 100644 index b8d93d05..00000000 --- a/slo/src/Internal/Jobs/WriteJob.cs +++ /dev/null @@ -1,23 +0,0 @@ -namespace Internal.Jobs; - -public class WriteJob : Job -{ - public WriteJob(Client client, RateLimitedCaller rateLimitedCaller, TimeSpan timeout) : - base(client, rateLimitedCaller, "write", timeout) - { - } - - - protected override async Task PerformQuery() - { - var parameters = DataGenerator.GetUpsertData(); - - await Client.Executor.ExecuteDataQuery( - Queries.GetWriteQuery(Client.TableName), - parameters, - Timeout, - AttemptsHistogram, - ErrorsGauge - ); - } -} \ No newline at end of file diff --git a/slo/src/Internal/Queries.cs b/slo/src/Internal/Queries.cs deleted file mode 100644 index b1713812..00000000 --- a/slo/src/Internal/Queries.cs +++ /dev/null @@ -1,58 +0,0 @@ -namespace Internal; - -public static class Queries -{ - public static string GetCreateQuery(string tableName, int partitionSize, int minPartitionsCount, - int maxPartitionsCount) - { - return $@" -CREATE TABLE `{tableName}` ( - `hash` UINT64, - `id` UINT64, - `payload_str` UTF8, - `payload_double` DOUBLE, - `payload_timestamp` TIMESTAMP, - `payload_hash` UINT64, - PRIMARY KEY (`hash`, `id`) -) -WITH ( - AUTO_PARTITIONING_BY_SIZE = ENABLED, - AUTO_PARTITIONING_PARTITION_SIZE_MB = {partitionSize}, - AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = {minPartitionsCount}, - AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = {maxPartitionsCount} -); -"; - } - - public static string GetDropQuery(string tableName) - { - return $"DROP TABLE `{tableName}`"; - } - - public static string GetLoadMaxIdQuery(string tableName) - { - return $"SELECT MAX(id) as max_id FROM `{tableName}`"; - } - - public static string GetReadQuery(string tableName) - { - return $@" -DECLARE $id AS Uint64; -SELECT id, payload_str, payload_double, payload_timestamp, payload_hash -FROM `{tableName}` -WHERE id = $id AND hash = Digest::NumericHash($id) -"; - } - - public static string GetWriteQuery(string tableName) - { - return $@" -DECLARE $id AS Uint64; -DECLARE $payload_str AS Utf8; -DECLARE $payload_double AS Double; -DECLARE $payload_timestamp AS Timestamp; -INSERT INTO `{tableName}` (id, hash, payload_str, payload_double, payload_timestamp) -VALUES ($id, Digest::NumericHash($id), $payload_str, $payload_double, $payload_timestamp) -"; - } -} \ No newline at end of file diff --git a/slo/src/Internal/RateLimitedCaller.cs b/slo/src/Internal/RateLimitedCaller.cs deleted file mode 100644 index 8b28e61a..00000000 --- a/slo/src/Internal/RateLimitedCaller.cs +++ /dev/null @@ -1,37 +0,0 @@ -using Prometheus; - -namespace Internal; - -public class RateLimitedCaller(int rate, TimeSpan duration = default, int bunchCount = 10) -{ - private readonly TokenBucket _tokenBucket = new(rate / bunchCount, (int)(1000.0f / bunchCount)); - - public Task StartCalling(Func action, Gauge inFlightGauge) - { - var endTime = DateTime.Now + duration; - - while (duration == default || DateTime.Now < endTime) - { - while (inFlightGauge.Value > rate) - { - Thread.Sleep(1); - } - - while (true) - { - try - { - _tokenBucket.UseToken(); - _ = action(); - break; - } - catch (NoTokensAvailableException) - { - Thread.Sleep(1); - } - } - } - - return Task.CompletedTask; - } -} \ No newline at end of file diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index bdae7fd3..c6e26445 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -1,6 +1,5 @@ using System.Diagnostics; using System.Threading.RateLimiting; -using Internal.Cli; using Microsoft.Extensions.Logging; using Prometheus; using Ydb.Sdk; @@ -21,7 +20,7 @@ protected SloContext() _logger = Factory.CreateLogger>(); } - protected abstract string JobName { get; } + protected abstract string Job { get; } public async Task Create(CreateConfig config) { @@ -97,7 +96,7 @@ public async Task Run(RunConfig runConfig) { var promPgwEndpoint = $"{runConfig.PromPgw}/metrics"; var client = await CreateClient(runConfig); - using var prometheus = new MetricPusher(promPgwEndpoint, JobName, intervalMilliseconds: runConfig.ReportPeriod); + using var prometheus = new MetricPusher(promPgwEndpoint, Job, intervalMilliseconds: runConfig.ReportPeriod); prometheus.Start(); var (_, _, maxId) = await Select(client, $"SELECT MAX(id) as max_id FROM `{runConfig.TableName}`;", @@ -106,29 +105,6 @@ public async Task Run(RunConfig runConfig) _logger.LogInformation("Init row count: {MaxId}", _maxId); - var metricFactory = Metrics.WithLabels(new Dictionary - { { "jobName", JobName }, { "sdk", "dotnet" }, { "sdkVersion", Environment.Version.ToString() } }); - - var okGauge = metricFactory.CreateGauge("oks", "Count of OK"); - var notOkGauge = metricFactory.CreateGauge("not_oks", "Count of not OK"); - var latencySummary = metricFactory.CreateSummary("latency", "Latencies (OK)", new[] { "status" }, - new SummaryConfiguration - { - MaxAge = TimeSpan.FromSeconds(15), Objectives = new QuantileEpsilonPair[] - { new(0.5, 0.05), new(0.99, 0.005), new(0.999, 0.0005) } - }); - - var attemptsHistogram = metricFactory.CreateHistogram("attempts", "summary of amount for request", - new[] { "status" }, - new HistogramConfiguration { Buckets = Histogram.LinearBuckets(1, 1, 10) }); - - var errorsGauge = metricFactory.CreateGauge("errors", "amount of errors", new[] { "class", "in" }); - foreach (var statusCode in Enum.GetValues()) - { - errorsGauge.WithLabels(statusCode.ToString(), "retried").IncTo(0); - errorsGauge.WithLabels(statusCode.ToString(), "finally").IncTo(0); - } - var writeLimiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions { Window = TimeSpan.FromSeconds(1), PermitLimit = runConfig.WriteRps, QueueLimit = int.MaxValue }); var readLimiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions @@ -157,9 +133,32 @@ public async Task Run(RunConfig runConfig) _logger.LogInformation("Run task is finished"); return; - Task ShootingTask(RateLimiter rateLimitPolicy, string shootingName, + Task ShootingTask(RateLimiter rateLimitPolicy, string jobName, Func> action) { + var metricFactory = Metrics.WithLabels(new Dictionary + { { "jobName", jobName }, { "sdk", "dotnet" }, { "sdkVersion", Environment.Version.ToString() } }); + + var okGauge = metricFactory.CreateGauge("oks", "Count of OK"); + var notOkGauge = metricFactory.CreateGauge("not_oks", "Count of not OK"); + var latencySummary = metricFactory.CreateSummary("latency", "Latencies (OK)", new[] { "status" }, + new SummaryConfiguration + { + MaxAge = TimeSpan.FromSeconds(15), Objectives = new QuantileEpsilonPair[] + { new(0.5, 0.05), new(0.99, 0.005), new(0.999, 0.0005) } + }); + + var attemptsHistogram = metricFactory.CreateHistogram("attempts", "summary of amount for request", + new[] { "status" }, + new HistogramConfiguration { Buckets = Histogram.LinearBuckets(1, 1, 10) }); + + var errorsGauge = metricFactory.CreateGauge("errors", "amount of errors", new[] { "class", "in" }); + foreach (var statusCode in Enum.GetValues()) + { + errorsGauge.WithLabels(statusCode.ToString(), "retried").IncTo(0); + errorsGauge.WithLabels(statusCode.ToString(), "finally").IncTo(0); + } + // ReSharper disable once MethodSupportsCancellation return Task.Run(async () => { @@ -198,7 +197,7 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string shootingName, // ReSharper disable once MethodSupportsCancellation await Task.Delay(TimeSpan.FromSeconds(runConfig.ShutdownTime)); - _logger.LogInformation("{ShootingName} shooting is stopped", shootingName); + _logger.LogInformation("{ShootingName} shooting is stopped", jobName); }); } } @@ -257,7 +256,7 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string shootingName, private async Task MetricReset(string promPgwEndpoint) { - var deleteUri = $"{promPgwEndpoint}/job/{JobName}"; + var deleteUri = $"{promPgwEndpoint}/job/{Job}"; using var httpClient = new HttpClient(); await httpClient.DeleteAsync(deleteUri); } diff --git a/slo/src/Internal/TokenBucket.cs b/slo/src/Internal/TokenBucket.cs deleted file mode 100644 index 8e5ee1e1..00000000 --- a/slo/src/Internal/TokenBucket.cs +++ /dev/null @@ -1,57 +0,0 @@ -using System.Collections.Concurrent; -using System.Timers; -using Timer = System.Timers.Timer; - -namespace Internal; - -public record Token; - -[Serializable] -internal class NoTokensAvailableException : Exception -{ - public static NoTokensAvailableException Instance = new(); - - public NoTokensAvailableException() - { - } - - public NoTokensAvailableException(string? message) : base(message) - { - } - - public NoTokensAvailableException(string? message, Exception? innerException) : base(message, innerException) - { - } -} - -public class TokenBucket -{ - private readonly int _maxTokens; - private readonly BlockingCollection _tokens; - - public TokenBucket(int maxNumberOfTokens, int refillRateMilliseconds) - { - _maxTokens = maxNumberOfTokens; - var timer = new Timer(refillRateMilliseconds); - _tokens = new BlockingCollection(); - - for (var i = 0; i < maxNumberOfTokens; i++) _tokens.Add(new Token()); - - timer.AutoReset = true; - timer.Enabled = true; - timer.Elapsed += OnTimerElapsed; - } - - private void OnTimerElapsed(object? sender, ElapsedEventArgs e) - { - var token = new Token(); - var refill = _maxTokens - _tokens.Count; - for (var i = 0; i < refill; i++) - _tokens.Add(token); - } - - public void UseToken() - { - if (!_tokens.TryTake(out _)) throw NoTokensAvailableException.Instance; - } -} \ No newline at end of file diff --git a/slo/src/Internal/Utils.cs b/slo/src/Internal/Utils.cs deleted file mode 100644 index 66eb7585..00000000 --- a/slo/src/Internal/Utils.cs +++ /dev/null @@ -1,12 +0,0 @@ -using Ydb.Sdk; - -namespace Internal; - -public static class Utils -{ - public static string GetResonseStatusName(StatusCode statusCode) - { - var prefix = statusCode >= StatusCode.ClientTransportResourceExhausted ? "GRPC" : "YDB"; - return $"{prefix}_{statusCode}"; - } -} \ No newline at end of file diff --git a/slo/src/TableService/Program.cs b/slo/src/TableService/Program.cs index 46bd01ab..703a202d 100644 --- a/slo/src/TableService/Program.cs +++ b/slo/src/TableService/Program.cs @@ -1,6 +1,6 @@ // See https://aka.ms/new-console-template for more information -using Internal.Cli; +using Internal; using TableService; return await Cli.Run(new SloContext(), args); \ No newline at end of file diff --git a/slo/src/TableService/SloContext.cs b/slo/src/TableService/SloContext.cs index 658c3934..8561f30d 100644 --- a/slo/src/TableService/SloContext.cs +++ b/slo/src/TableService/SloContext.cs @@ -1,5 +1,4 @@ using Internal; -using Internal.Cli; using Prometheus; using Ydb.Sdk; using Ydb.Sdk.Services.Table; @@ -10,7 +9,7 @@ namespace TableService; public class SloContext : SloContext { private readonly TxControl _txControl = TxControl.BeginSerializableRW().Commit(); - protected override string JobName => "workload-table-service"; + protected override string Job => "workload-table-service"; protected override async Task Create(TableClient client, string createTableSql, int operationTimeout) { @@ -39,7 +38,7 @@ protected override async Task Create(TableClient client, string createTableSql, return response; } - errorsGauge?.WithLabels(Utils.GetResonseStatusName(response.Status.StatusCode), "retried").Inc(); + errorsGauge?.WithLabels(response.Status.StatusCode.ToString(), "retried").Inc(); return response; }); @@ -65,7 +64,7 @@ protected override async Task Create(TableClient client, string createTableSql, return response; } - errorsGauge?.WithLabels(Utils.GetResonseStatusName(response.Status.StatusCode), "retried").Inc(); + errorsGauge?.WithLabels(response.Status.StatusCode.ToString(), "retried").Inc(); return response; }); From 1e8aa80ff143f4825dd755b05d3e561dff254b49 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 12:55:58 +0300 Subject: [PATCH 18/29] last fixes --- slo/src/Internal/SloContext.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index c6e26445..31caab9f 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -124,7 +124,7 @@ public async Task Run(RunConfig runConfig) } catch (Exception e) { - _logger.LogError(e, "Failed waiting read / write tasks"); + _logger.LogInformation(e, "Cancel shooting"); } await prometheus.StopAsync(); @@ -167,7 +167,7 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string jobName, using var lease = await rateLimitPolicy .AcquireAsync(cancellationToken: cancellationTokenSource.Token); - if (lease.IsAcquired) + if (!lease.IsAcquired) { continue; } From be8e661617c8c367cacbda813b21f0303744fe33 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 13:12:03 +0300 Subject: [PATCH 19/29] add ADO.NET runner --- .github/workflows/slo.yml | 6 ++++++ slo/src/Internal/SloContext.cs | 8 ++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index 740d378e..a3b0c4c5 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -58,6 +58,12 @@ jobs: language0: '.NET SDK over table-service' workload_build_context0: ../.. workload_build_options0: -f Dockerfile --build-arg SRC_PATH=TableService + + language_id1: 'ado-net' + workload_path1: 'slo/src' + language1: 'ADO.NET over query-service' + workload_build_context1: ../.. + workload_build_options1: -f Dockerfile --build-arg SRC_PATH=AdoNet - uses: actions/upload-artifact@v3 if: always() && env.DOCKER_REPO != null diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index 31caab9f..20e8c29a 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -106,9 +106,13 @@ public async Task Run(RunConfig runConfig) _logger.LogInformation("Init row count: {MaxId}", _maxId); var writeLimiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions - { Window = TimeSpan.FromSeconds(1), PermitLimit = runConfig.WriteRps, QueueLimit = int.MaxValue }); + { + Window = TimeSpan.FromMilliseconds(100), PermitLimit = runConfig.WriteRps / 10, QueueLimit = int.MaxValue + }); var readLimiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions - { Window = TimeSpan.FromSeconds(1), PermitLimit = runConfig.ReadRps, QueueLimit = int.MaxValue }); + { + Window = TimeSpan.FromMilliseconds(100), PermitLimit = runConfig.ReadRps / 10, QueueLimit = int.MaxValue + }); var cancellationTokenSource = new CancellationTokenSource(); cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(runConfig.Time)); From 3798d85ab10c92ecb88714ae3f4018ffaf8834df Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 13:33:54 +0300 Subject: [PATCH 20/29] rename AssemblyName --- slo/src/AdoNet/AdoNet.csproj | 1 + 1 file changed, 1 insertion(+) diff --git a/slo/src/AdoNet/AdoNet.csproj b/slo/src/AdoNet/AdoNet.csproj index 8755ebbb..8abfeaa7 100644 --- a/slo/src/AdoNet/AdoNet.csproj +++ b/slo/src/AdoNet/AdoNet.csproj @@ -5,6 +5,7 @@ net8.0 enable enable + slo From c92bd28f622a62b55f75a846f1045e452caa5bc8 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 14:29:45 +0300 Subject: [PATCH 21/29] fix statusName --- slo/src/AdoNet/SloContext.cs | 2 +- slo/src/Internal/SloContext.cs | 13 +++++++++++-- slo/src/TableService/SloContext.cs | 2 +- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/slo/src/AdoNet/SloContext.cs b/slo/src/AdoNet/SloContext.cs index 47cf7aa2..a106290f 100644 --- a/slo/src/AdoNet/SloContext.cs +++ b/slo/src/AdoNet/SloContext.cs @@ -16,7 +16,7 @@ public class SloContext : SloContext context["RetryCount"] = retryCount; var errorsGauge = (Gauge)context["errorsGauge"]; - errorsGauge?.WithLabels(((YdbException)e).Code.ToString(), "retried").Inc(); + errorsGauge?.WithLabels(((YdbException)e).Code.StatusName(), "retried").Inc(); }); protected override string Job => "workload-ado-net"; diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index 20e8c29a..8c6d6662 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -159,8 +159,8 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string jobName, var errorsGauge = metricFactory.CreateGauge("errors", "amount of errors", new[] { "class", "in" }); foreach (var statusCode in Enum.GetValues()) { - errorsGauge.WithLabels(statusCode.ToString(), "retried").IncTo(0); - errorsGauge.WithLabels(statusCode.ToString(), "finally").IncTo(0); + errorsGauge.WithLabels(statusCode.StatusName(), "retried").IncTo(0); + errorsGauge.WithLabels(statusCode.StatusName(), "finally").IncTo(0); } // ReSharper disable once MethodSupportsCancellation @@ -264,4 +264,13 @@ private async Task MetricReset(string promPgwEndpoint) using var httpClient = new HttpClient(); await httpClient.DeleteAsync(deleteUri); } +} + +public static class StatusCodeExtension +{ + public static string StatusName(this StatusCode statusCode) + { + var prefix = statusCode >= StatusCode.ClientTransportResourceExhausted ? "GRPC" : "YDB"; + return $"{prefix}_{statusCode}"; + } } \ No newline at end of file diff --git a/slo/src/TableService/SloContext.cs b/slo/src/TableService/SloContext.cs index 8561f30d..96e556ba 100644 --- a/slo/src/TableService/SloContext.cs +++ b/slo/src/TableService/SloContext.cs @@ -64,7 +64,7 @@ protected override async Task Create(TableClient client, string createTableSql, return response; } - errorsGauge?.WithLabels(response.Status.StatusCode.ToString(), "retried").Inc(); + errorsGauge?.WithLabels(response.Status.StatusCode.StatusName(), "retried").Inc(); return response; }); From 2fcc463a3ff33278f355d70f193e2978a7d42ac2 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 14:58:58 +0300 Subject: [PATCH 22/29] fix bug --- slo/src/AdoNet/SloContext.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/slo/src/AdoNet/SloContext.cs b/slo/src/AdoNet/SloContext.cs index a106290f..00632b2b 100644 --- a/slo/src/AdoNet/SloContext.cs +++ b/slo/src/AdoNet/SloContext.cs @@ -83,7 +83,7 @@ protected override async Task Create(YdbDataSource client, string createTableSql return await ydbCommand.ExecuteScalarAsync(); }, context); - return (policyResult.Context.TryGetValue("RetryCount", out var countAttempts) ? (int)countAttempts : 1, + return (policyResult.Context.TryGetValue("RetryCount", out var countAttempts) ? (int)countAttempts : 0, ((YdbException)policyResult.FinalException)?.Code ?? StatusCode.Success, policyResult.Result); } From d69679b1ae6f2c3c2b82e694c9abb6aae1d9e7e6 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 15:21:17 +0300 Subject: [PATCH 23/29] Stop Stopwatch --- slo/src/Internal/SloContext.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index 8c6d6662..4c14b357 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -180,6 +180,7 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string jobName, { var sw = Stopwatch.StartNew(); var (attempts, statusCode) = await action(client, runConfig, errorsGauge); + sw.Stop(); string label; if (statusCode != StatusCode.Success) From dafadb04ed6675a6e191d1520b2434192c2d854b Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 15:48:18 +0300 Subject: [PATCH 24/29] continue-on-error: true --- .github/workflows/slo.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index a3b0c4c5..1e9451c0 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -33,6 +33,7 @@ jobs: if: env.DOCKER_REPO != null env: DOCKER_REPO: ${{ secrets.SLO_DOCKER_REPO }} + continue-on-error: true with: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} KUBECONFIG_B64: ${{ secrets.SLO_KUBE_CONFIG }} From fb40322899ea5deb595ed3b281bd69ee37d1a02a Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 15:53:23 +0300 Subject: [PATCH 25/29] finally errors --- slo/src/Internal/SloContext.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index 4c14b357..18bc66e0 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -187,6 +187,7 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string jobName, { notOkGauge.Inc(); label = "err"; + errorsGauge.WithLabels(statusCode.StatusName(), "finally").Inc(); } else { From c40ff8d8eb557bf6f60ad67491bb0324ede450a7 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 16:05:00 +0300 Subject: [PATCH 26/29] fix --- slo/src/AdoNet/SloContext.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/slo/src/AdoNet/SloContext.cs b/slo/src/AdoNet/SloContext.cs index 00632b2b..a106290f 100644 --- a/slo/src/AdoNet/SloContext.cs +++ b/slo/src/AdoNet/SloContext.cs @@ -83,7 +83,7 @@ protected override async Task Create(YdbDataSource client, string createTableSql return await ydbCommand.ExecuteScalarAsync(); }, context); - return (policyResult.Context.TryGetValue("RetryCount", out var countAttempts) ? (int)countAttempts : 0, + return (policyResult.Context.TryGetValue("RetryCount", out var countAttempts) ? (int)countAttempts : 1, ((YdbException)policyResult.FinalException)?.Code ?? StatusCode.Success, policyResult.Result); } From bedbca4acb9290e2e6b7c203838fca8f1cc02bde Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 16:08:06 +0300 Subject: [PATCH 27/29] fix count attempts --- slo/src/AdoNet/SloContext.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/slo/src/AdoNet/SloContext.cs b/slo/src/AdoNet/SloContext.cs index a106290f..0e9f5407 100644 --- a/slo/src/AdoNet/SloContext.cs +++ b/slo/src/AdoNet/SloContext.cs @@ -10,10 +10,9 @@ namespace AdoNet; public class SloContext : SloContext { private readonly AsyncPolicy _policy = Policy.Handle(exception => exception.IsTransient) - .WaitAndRetryAsync(10, attempt => TimeSpan.FromSeconds(attempt), - (e, _, retryCount, context) => + .WaitAndRetryAsync(10, attempt => TimeSpan.FromMilliseconds(attempt * 10), + (e, _, _, context) => { - context["RetryCount"] = retryCount; var errorsGauge = (Gauge)context["errorsGauge"]; errorsGauge?.WithLabels(((YdbException)e).Code.StatusName(), "retried").Inc(); @@ -68,8 +67,10 @@ protected override async Task Create(YdbDataSource client, string createTableSql context["errorsGauge"] = errorsGauge; } + var attempts = 0; var policyResult = await _policy.ExecuteAndCaptureAsync(async _ => { + attempts++; await using var ydbConnection = await dataSource.OpenConnectionAsync(); var ydbCommand = new YdbCommand(ydbConnection) @@ -83,8 +84,7 @@ protected override async Task Create(YdbDataSource client, string createTableSql return await ydbCommand.ExecuteScalarAsync(); }, context); - return (policyResult.Context.TryGetValue("RetryCount", out var countAttempts) ? (int)countAttempts : 1, - ((YdbException)policyResult.FinalException)?.Code ?? StatusCode.Success, policyResult.Result); + return (attempts, ((YdbException)policyResult.FinalException)?.Code ?? StatusCode.Success, policyResult.Result); } protected override Task CreateClient(Config config) From b2af5e0e89c1aa598a39b32e4127a6c8ecb69cc5 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 16:18:10 +0300 Subject: [PATCH 28/29] fix Uint64 -> Int32 --- slo/src/AdoNet/SloContext.cs | 2 ++ slo/src/Internal/SloContext.cs | 37 ++++++++++++++---------------- slo/src/TableService/SloContext.cs | 4 ++++ 3 files changed, 23 insertions(+), 20 deletions(-) diff --git a/slo/src/AdoNet/SloContext.cs b/slo/src/AdoNet/SloContext.cs index 0e9f5407..5ead9eec 100644 --- a/slo/src/AdoNet/SloContext.cs +++ b/slo/src/AdoNet/SloContext.cs @@ -1,4 +1,5 @@ using Internal; +using Microsoft.Extensions.Logging; using Polly; using Prometheus; using Ydb.Sdk; @@ -15,6 +16,7 @@ public class SloContext : SloContext { var errorsGauge = (Gauge)context["errorsGauge"]; + Logger.LogWarning(e, "Failed read / write operation"); errorsGauge?.WithLabels(((YdbException)e).Code.StatusName(), "retried").Inc(); }); diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index 18bc66e0..f7407eff 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -9,16 +9,13 @@ namespace Internal; public abstract class SloContext where T : IDisposable { - protected readonly ILoggerFactory Factory; - private readonly ILogger _logger; + // ReSharper disable once StaticMemberInGenericType + protected static readonly ILoggerFactory Factory = + LoggerFactory.Create(builder => builder.AddConsole().SetMinimumLevel(LogLevel.Information)); - private volatile int _maxId; + protected static readonly ILogger Logger = Factory.CreateLogger>(); - protected SloContext() - { - Factory = LoggerFactory.Create(builder => builder.AddConsole().SetMinimumLevel(LogLevel.Information)); - _logger = Factory.CreateLogger>(); - } + private volatile int _maxId; protected abstract string Job { get; } @@ -29,7 +26,7 @@ public async Task Create(CreateConfig config) using var client = await CreateClient(config); for (var attempt = 0; attempt < maxCreateAttempts; attempt++) { - _logger.LogInformation("Creating table {TableName}..", config.TableName); + Logger.LogInformation("Creating table {TableName}..", config.TableName); try { var createTableSql = $""" @@ -49,17 +46,17 @@ PRIMARY KEY (hash, id) AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = {config.MaxPartitionsCount} ); """; - _logger.LogInformation("YQL script: {sql}", createTableSql); + Logger.LogInformation("YQL script: {sql}", createTableSql); await Create(client, createTableSql, config.WriteTimeout); - _logger.LogInformation("Created table {TableName}", config.TableName); + Logger.LogInformation("Created table {TableName}", config.TableName); break; } catch (Exception e) { - _logger.LogError(e, "Fail created table"); + Logger.LogError(e, "Fail created table"); if (attempt == maxCreateAttempts - 1) { @@ -82,11 +79,11 @@ PRIMARY KEY (hash, id) } catch (Exception e) { - _logger.LogError(e, "Init failed when all tasks, continue.."); + Logger.LogError(e, "Init failed when all tasks, continue.."); } finally { - _logger.LogInformation("Created task is finished"); + Logger.LogInformation("Created task is finished"); } } @@ -103,7 +100,7 @@ public async Task Run(RunConfig runConfig) new Dictionary(), runConfig.ReadTimeout); _maxId = (int)maxId!; - _logger.LogInformation("Init row count: {MaxId}", _maxId); + Logger.LogInformation("Init row count: {MaxId}", _maxId); var writeLimiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions { @@ -120,7 +117,7 @@ public async Task Run(RunConfig runConfig) var writeTask = ShootingTask(writeLimiter, "write", Upsert); var readTask = ShootingTask(readLimiter, "read", Select); - _logger.LogInformation("Started write / read shooting.."); + Logger.LogInformation("Started write / read shooting.."); try { @@ -128,13 +125,13 @@ public async Task Run(RunConfig runConfig) } catch (Exception e) { - _logger.LogInformation(e, "Cancel shooting"); + Logger.LogInformation(e, "Cancel shooting"); } await prometheus.StopAsync(); await MetricReset(promPgwEndpoint); - _logger.LogInformation("Run task is finished"); + Logger.LogInformation("Run task is finished"); return; Task ShootingTask(RateLimiter rateLimitPolicy, string jobName, @@ -203,7 +200,7 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string jobName, // ReSharper disable once MethodSupportsCancellation await Task.Delay(TimeSpan.FromSeconds(runConfig.ShutdownTime)); - _logger.LogInformation("{ShootingName} shooting is stopped", jobName); + Logger.LogInformation("{ShootingName} shooting is stopped", jobName); }); } } @@ -254,7 +251,7 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string jobName, """, new Dictionary { - { "$id", YdbValue.MakeUint64((ulong)Random.Shared.Next(_maxId)) } + { "$id", YdbValue.MakeInt32(Random.Shared.Next(_maxId)) } }, config.ReadTimeout, errorsGauge); return (attempts, code); diff --git a/slo/src/TableService/SloContext.cs b/slo/src/TableService/SloContext.cs index 96e556ba..d426ace7 100644 --- a/slo/src/TableService/SloContext.cs +++ b/slo/src/TableService/SloContext.cs @@ -1,4 +1,5 @@ using Internal; +using Microsoft.Extensions.Logging; using Prometheus; using Ydb.Sdk; using Ydb.Sdk.Services.Table; @@ -38,6 +39,7 @@ protected override async Task Create(TableClient client, string createTableSql, return response; } + errorsGauge?.WithLabels(response.Status.StatusCode.ToString(), "retried").Inc(); return response; @@ -64,6 +66,8 @@ protected override async Task Create(TableClient client, string createTableSql, return response; } + Logger.LogWarning("{}", response.Status.ToString()); + errorsGauge?.WithLabels(response.Status.StatusCode.StatusName(), "retried").Inc(); return response; From e58df0389a6c364e3770acff7495a8fd6a0a06bf Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 16:32:38 +0300 Subject: [PATCH 29/29] fix --- .github/workflows/slo.yml | 1 - src/Ydb.Sdk/src/Pool/EndpointPool.cs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index 1e9451c0..a3b0c4c5 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -33,7 +33,6 @@ jobs: if: env.DOCKER_REPO != null env: DOCKER_REPO: ${{ secrets.SLO_DOCKER_REPO }} - continue-on-error: true with: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} KUBECONFIG_B64: ${{ secrets.SLO_KUBE_CONFIG }} diff --git a/src/Ydb.Sdk/src/Pool/EndpointPool.cs b/src/Ydb.Sdk/src/Pool/EndpointPool.cs index 9a656e10..93b64e16 100644 --- a/src/Ydb.Sdk/src/Pool/EndpointPool.cs +++ b/src/Ydb.Sdk/src/Pool/EndpointPool.cs @@ -129,7 +129,7 @@ public bool PessimizeEndpoint(string endpoint) _preferredEndpointCount = preferredEndpointCount; - _logger.LogTrace("Endpoint {Endpoint} was pessimized. New pessimization ratio: {} / {}", + _logger.LogWarning("Endpoint {Endpoint} was pessimized. New pessimization ratio: {} / {}", endpoint, pessimizedCount, _sortedByPriorityEndpoints.Length); return 100 * pessimizedCount > _sortedByPriorityEndpoints.Length * DiscoveryDegradationLimit;