Skip to content

Commit

Permalink
sources/config_reader.c: new default client_max_routing (#715)
Browse files Browse the repository at this point in the history
Default per worker 16 value should be increased to achive
more CPU utilization and client connections rate.

After testing some client connection parallelism i learn that
there should be ~ 60-80 client connections per worker to get 100% of cpu loading
(with non-localhost connections)

So this patch increases 16 to 64

benchmarks/client_max_routing might be used for future learning

Signed-off-by: rkhapov <r.khapov@ya.ru>
Co-authored-by: rkhapov <r.khapov@ya.ru>
  • Loading branch information
rkhapov and rkhapov authored Nov 15, 2024
1 parent b494f12 commit f081cf9
Show file tree
Hide file tree
Showing 5 changed files with 358 additions and 2 deletions.
10 changes: 10 additions & 0 deletions benchmarks/client_max_routing/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module catchuploadtest

go 1.23.3

require (
github.com/lib/pq v1.10.9
golang.org/x/term v0.26.0
)

require golang.org/x/sys v0.27.0 // indirect
6 changes: 6 additions & 0 deletions benchmarks/client_max_routing/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.26.0 h1:WEQa6V3Gja/BhNxg540hBip/kkaYtRg3cxg4oXSw4AU=
golang.org/x/term v0.26.0/go.mod h1:Si5m1o57C5nBNQo5z1iq+XDijt21BDBDp2bK0QI8e3E=
340 changes: 340 additions & 0 deletions benchmarks/client_max_routing/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,340 @@
package main

import (
"context"
"database/sql"
"encoding/csv"
"flag"
"fmt"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

_ "github.com/lib/pq"
"golang.org/x/term"
)

type IntArrayFlags []int

func newIntArrayFlags(defaults []int) *IntArrayFlags {
i := IntArrayFlags(defaults)
return &i
}

func (i *IntArrayFlags) String() string {
return fmt.Sprintf("%v", *i)
}

func (i *IntArrayFlags) Set(value string) error {
*i = nil

for _, n := range strings.FieldsFunc(value, func(r rune) bool { return r == ',' || r == ' ' }) {
v, err := strconv.Atoi(n)
if err != nil {
return err
}
*i = append(*i, v)
}

return nil
}

type DurationArrayFlags []time.Duration

func newDurationArrayFlags(defaults []time.Duration) *DurationArrayFlags {
d := DurationArrayFlags(defaults)
return &d
}

func (d *DurationArrayFlags) String() string {
return fmt.Sprintf("%v", *d)
}

func (d *DurationArrayFlags) Set(value string) error {
*d = nil

for _, n := range strings.FieldsFunc(value, func(r rune) bool { return r == ',' || r == ' ' }) {
v, err := time.ParseDuration(n)
if err != nil {
return err
}
*d = append(*d, v)
}

return nil
}

type Config struct {
ConnectionString string
BenchmarkDuration time.Duration
ClientParallels []int
ConnectTimeouts []time.Duration
SelectTimeouts []time.Duration
PauseDuration time.Duration
OutputFile string
}

type Result struct {
Connections int64
Selects int64
Errors int64
Clients int
TotalTime time.Duration
ConnectTimeout time.Duration
SelectTimeout time.Duration
}

var (
connectionsCounter int64 = 0
selectCounter int64 = 0
errorsCounter int64 = 0
)

func doConnect(cfg *Config, connectTimeout time.Duration) {
db, err := sql.Open("postgres", cfg.ConnectionString)
if err != nil {
fmt.Printf("connect error: %v\n", err)
atomic.AddInt64(&errorsCounter, 1)
return
}

defer db.Close()

ctx, cancel := context.WithTimeout(context.Background(), connectTimeout)
defer cancel()

err = db.PingContext(ctx)
if err != nil {
fmt.Printf("ping error: %v\n", err)
atomic.AddInt64(&errorsCounter, 1)
return
}

atomic.AddInt64(&connectionsCounter, 1)
}

func doConnectInf(ctx context.Context, cfg *Config, connectTimeout time.Duration, wg *sync.WaitGroup, syncStart *sync.WaitGroup) {
syncStart.Wait()

for {
select {
case <-ctx.Done():
wg.Done()
return
default:
doConnect(cfg, connectTimeout)
}
}
}

func doSelect(db *sql.DB, timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

var val int
err := db.QueryRowContext(ctx, "select 1 + 2").Scan(&val)
if err != nil {
fmt.Printf("select error: %v\n", err)
atomic.AddInt64(&errorsCounter, 1)
return
}

if val != 3 {
panic("wrong value")
}

atomic.AddInt64(&selectCounter, 1)
}

func doSelectInf(ctx context.Context, cfg *Config, selectTimeout time.Duration, wg *sync.WaitGroup) {
db, err := sql.Open("postgres", cfg.ConnectionString)
noerror(err)

defer db.Close()

for {
select {
case <-ctx.Done():
wg.Done()
return
default:
doSelect(db, selectTimeout)
}

}
}

func doMeasure(cfg *Config, nparallel int, connectTimeout time.Duration, selectTimeout time.Duration) *Result {
atomic.StoreInt64(&connectionsCounter, 0)
atomic.StoreInt64(&selectCounter, 0)
atomic.StoreInt64(&errorsCounter, 0)

ctx, cancel := context.WithTimeout(context.Background(), cfg.BenchmarkDuration)
defer cancel()

wg := &sync.WaitGroup{}
syncStart := &sync.WaitGroup{}

syncStart.Add(1)

for i := 0; i < nparallel; i++ {
wg.Add(1)
go doSelectInf(ctx, cfg, selectTimeout, wg)
}

for i := 0; i < nparallel; i++ {
wg.Add(1)
go doConnectInf(ctx, cfg, connectTimeout, wg, syncStart)
}

startTime := time.Now()
syncStart.Done()
wg.Wait()
totalTime := time.Since(startTime)

return &Result{
Connections: atomic.LoadInt64(&connectionsCounter),
Selects: atomic.LoadInt64(&selectCounter),
Errors: atomic.LoadInt64(&errorsCounter),
TotalTime: totalTime,
Clients: nparallel,
ConnectTimeout: connectTimeout,
SelectTimeout: selectTimeout,
}
}

func printResultHeaderLine() {
fmt.Printf("clients\ttime\tconn to\tc/s\ts/s\te/s\n")
}

func printResultLine(r *Result) {
fmt.Printf("%d\t%v\t%v\t%v\t%f\t%f\t%f\n",
r.Clients,
r.TotalTime,
r.ConnectTimeout,
r.SelectTimeout,
float64(r.Connections)/r.TotalTime.Seconds(),
float64(r.Selects)/r.TotalTime.Seconds(),
float64(r.Errors)/r.TotalTime.Seconds(),
)
}

func printResultConsole(results []*Result) {
printResultHeaderLine()

for _, r := range results {
printResultLine(r)
}
}

func printResultCsv(filename string, results []*Result) {
f, err := os.Create(filename)
noerror(err)
defer f.Close()

w := csv.NewWriter(f)
defer w.Flush()

w.Write([]string{"#", "clients", "time", "conn to", "select to", "c/s", "s/s", "e/s"})
for i, r := range results {
err := w.Write([]string{
strconv.Itoa(i + 1),
strconv.Itoa(r.Clients),
r.TotalTime.String(),
r.ConnectTimeout.String(),
r.SelectTimeout.String(),
fmt.Sprintf("%f", float64(r.Connections)/r.TotalTime.Seconds()),
fmt.Sprintf("%f", float64(r.Selects)/r.TotalTime.Seconds()),
fmt.Sprintf("%f", float64(r.Errors)/r.TotalTime.Seconds()),
})
noerror(err)
}

fmt.Printf("Results saved to %s\n", filename)
}

func printResults(cfg *Config, results []*Result) {
printResultConsole(results)
printResultCsv(cfg.OutputFile, results)
}

func runBenchmarks(cfg *Config) []*Result {
results := make([]*Result, 0, len(cfg.ClientParallels))

for _, np := range cfg.ClientParallels {
for _, ct := range cfg.ConnectTimeouts {
for _, st := range cfg.SelectTimeouts {
r := doMeasure(cfg, np, ct, st)
printResultLine(r)
results = append(results, r)

time.Sleep(cfg.PauseDuration)
}

}
}

return results
}

func readConfig() *Config {
host := flag.String("host", "", "odyssey host")
port := flag.Int("port", 6432, "odyssey port")
user := flag.String("user", "user1", "user to connect")
dbName := flag.String("db", "db1", "db to connect")
sslRoot := flag.String("sslroot", "./root.crt", "ssl root.crt file path")
benchDuration := flag.Duration("bench-duration", time.Second*5, "one benchmark run duration")
pauseDuration := flag.Duration("pause-duration", time.Second*3, "pause between runs")
outputFile := flag.String("output-file", "./result.csv", "result filename")

clients := newIntArrayFlags([]int{10, 20, 30, 40, 50, 60, 70, 80, 90, 100})
flag.Var(clients, "clients", "numbers of parallel connecting clients")

connectTimeouts := newDurationArrayFlags([]time.Duration{time.Second})
flag.Var(connectTimeouts, "connect-timeouts", "connect timeouts")

selectTimeouts := newDurationArrayFlags([]time.Duration{time.Second})
flag.Var(selectTimeouts, "select-timeouts", "select timeouts")

flag.Parse()

if len(*host) == 0 {
fmt.Printf("Error: the host parameter is empty\n")
flag.Usage()
return nil
}

fmt.Printf("%s's password (no echo):", *user)
password, err := term.ReadPassword(0)
noerror(err)

return &Config{
ConnectionString: fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=verify-full sslrootcert=%s", *host, *port, *user, string(password), *dbName, *sslRoot),
BenchmarkDuration: *benchDuration,
ClientParallels: *clients,
PauseDuration: *pauseDuration,
OutputFile: *outputFile,
ConnectTimeouts: *connectTimeouts,
SelectTimeouts: *selectTimeouts,
}
}

func main() {
cfg := readConfig()
if cfg == nil {
return
}

results := runBenchmarks(cfg)

printResults(cfg, results)
}

func noerror(err error) {
if err != nil {
panic(err)
}
}
2 changes: 1 addition & 1 deletion odyssey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ keepalive_usr_timeout 0
# message is read and connection is assigned route to the database. Most of the
# routing time is occupied with TLS handshake.
#
# Unset or zero 'client_max_routing' will set it's value equal to 16 * workers
# Unset or zero 'client_max_routing' will set it's value equal to 64 * workers
#
# client_max_routing 32

Expand Down
2 changes: 1 addition & 1 deletion sources/config_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -2825,7 +2825,7 @@ static int od_config_reader_parse(od_config_reader_t *reader,
return NOT_OK_RESPONSE;
success:
if (!config->client_max_routing) {
config->client_max_routing = config->workers * 16;
config->client_max_routing = config->workers * 64;
}

od_config_setup_default_tcp_usr_timeout(config);
Expand Down

0 comments on commit f081cf9

Please sign in to comment.